U
    9%e0                     @   s  d dl Z d dlZd dlmZmZmZmZmZmZm	Z	m
Z
mZ d dlmZ ddlmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ ddlmZmZ d	d
gZedZedZ eeeef  ee!ef dddZ"G dd dZ#eeedddZ$ej%eej%ddd	Z&eee	ddd
Z'ee! ee! ee! dddZ(ee! ee! ee! dddZ)G dd dej*Z+ej*e!e!ej*dd d!Z,e-e!e-d"d#d$Z.dS )%    N)	ListCallableOptionalUnionTypeVarDictAnycastSequence   )CheckpointException_wrap_exception_is_wrapped_exceptionWRAPPED_EXCEPTION)ShardedTensor)Shard)DTensor)STATE_DICT_TYPEMetadataIndexfind_tensor_shardfind_state_dict_objectTR)resultsreturnc                 C   s    t tttf dd t| D S )Nc                 S   s   i | ]\}}t |r||qS  )r   ).0ierrr   r   a/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/torch/distributed/checkpoint/utils.py
<dictcomp>.   s       z%_get_failure_dict.<locals>.<dictcomp>)r	   r   intr   	enumerate)r   r   r   r   _get_failure_dict)   s    
r#   c                   @   s,  e Zd ZdZeej eedddZ	edddZ
eddd	Zee ed
ddZeeee  d
ddZeee d
ddZeee  edddZeeg ef eee gee f edddZeeg ef eee gef edddZeeg ef ee dddZeeg ef edddZdS )_DistWrapperaH  
    This is a wrapper around PG that provides a series of features around object collectives.

    It works without distributed initialized, where most collectives turns into nops.

    All variants that take functions are exception robust, meaning that if one or more
    ranks raise errors, all ranks will observe those.
    )groupuse_distcoordinator_rankc                 C   sB   || _ || _|| _| jr2t|| _| j|k| _nd| _d| _d S )Nr   T)r%   r&   r'   distget_rankrankis_coordinator)selfr%   r&   r'   r   r   r   __init__<   s    z_DistWrapper.__init__r   c                 C   s   | j S N)r*   r,   r   r   r   r)   L   s    z_DistWrapper.get_rankc                 C   s   | j rt| jS dS )Nr   )r&   r(   get_world_sizer%   r0   r   r   r   r1   O   s    z_DistWrapper.get_world_size)objectr   c                 C   s.   |g}| j r tj|| j| jd tt|d S )z\
        Same as c10d::broadcast_object_list but works without distributed enabled.
        )object_listr%   srcr   )r&   r(   Zbroadcast_object_listr%   r'   r	   r   )r,   r2   r3   r   r   r   broadcast_objectT   s    z_DistWrapper.broadcast_objectc                 C   s\   | j rR| jr(ttt dgt| j nd}tj|| jr<|nd| j	| jd |}n|g}|S )zT
        Same as c10d::gather_object but works without distributed enabled.
        N)objZobject_gather_listdstr%   )
r&   r+   r	   r   r   r(   r1   r%   gather_objectr'   )r,   r2   gather_objsresultr   r   r   r8   a   s    z_DistWrapper.gather_objectc                 C   s@   | j r6ttt dgt| j }tj||| jd n|g}|S )zX
        Same as c10d::all_gather_object but works without distributed enabled.
        N)r3   r6   r%   )r&   r	   r   r   r(   r1   r%   all_gather_object)r,   r2   r9   r   r   r   r;   w   s       z_DistWrapper.all_gather_object)r3   r   c                 C   sX   | j r@ttt dg}tj|| jr&|nd| j| jd |d }n|dk	sLt	|d }|S )zU
        Same as c10d::scatter_object but works without distributed enabled.
        N)Zscatter_object_output_listZscatter_object_input_listr4   r%   r   )
r&   r	   r   r   r(   Zscatter_object_listr+   r'   r%   AssertionError)r,   r3   Zgather_resultZlocal_replyr   r   r   scatter_object   s    	
z_DistWrapper.scatter_object)stepmap_fun
reduce_funr   c           
   
   C   s   z
| }W n* t k
r4 } zt|}W 5 d}~X Y nX | |}d}| jr|dk	sVtt|}t|dkrz(ttt	t
tf  |ttt |}W n0 t k
r } zt||| j< W 5 d}~X Y nX t|dkrt||g|   }| |}	t|	tr|	|	S )a^  
        Compute a value on each rank, then do centralized reduce on a single rank, followed by a scatter.

        This method operates in the following way:
            Run ``map_fun`` on all ranks
            Gather results on rank 0
            Call ``reduce_fun`` on all those values
            Scatter to each rank part of the result.
        Nr   )BaseExceptionr   r8   r+   r<   r#   lenr	   r   r   r   r   r   r*   r1   r=   
isinstance)
r,   r>   r?   r@   
local_dataeall_dataall_resultsnode_failuresr:   r   r   r   reduce_scatter   s4    

 

z_DistWrapper.reduce_scatterc           
   
   C   s   z
| }W n* t k
r4 } zt|}W 5 d}~X Y nX | |}d}| jr|dk	sVtt|}t|dkrz|ttt	 |}W n0 t k
r } zt||| j
< W 5 d}~X Y nX t|dkrt||}| |}	t|	tr|	tt|	S )aa  
        Compute a value on each rank, then do centralized reduce on a single rank, followed by a broadcast.

        This method operates in the following way:
            Run ``map_fun`` on all ranks
            Gather results on rank 0
            Call ``reduce_fun`` on all those values
            Broadcast the reduced value to all ranks.
        Nr   )rA   r   r8   r+   r<   r#   rB   r	   r   r   r*   r   r5   rC   r   )
r,   r>   r?   r@   rD   rE   rF   r:   rH   final_resultr   r   r   
all_reduce   s(    

 


z_DistWrapper.all_reduce)r>   r?   r   c              
   C   sl   z
| }W n* t k
r4 } zt|}W 5 d}~X Y nX | |}t|}t|dkr^t||ttt |S )z
        Compute a value on each rank, then all_gather them.

        This method operates in the following way:
            Run ``map_cp`` on all ranks
            all_gather the values to all ranks
        Nr   )	rA   r   r;   r#   rB   r   r	   r   r   )r,   r>   r?   r:   rE   rG   rH   r   r   r   
all_gather   s    


z_DistWrapper.all_gatherc              
   C   sn   d}| j rLz
| }W n6 tk
rJ } zt|| jt|i}W 5 d}~X Y nX | |}t|trd|tt|S )z
        Compute a value on rank 0 and broadcast it.

        This method operates in the following way:
            Run ``map_cp`` on rank 0
            broadcast the value
        N)	r+   rA   r   r*   r   r5   rC   r	   r   )r,   r>   r?   r:   rE   rJ   r   r   r   	broadcast  s    
 

z_DistWrapper.broadcastN)__name__
__module____qualname____doc__r   r(   ZProcessGroupboolr!   r-   r)   r1   r   r5   r   r8   r;   r=   strr   r   rI   rK   rL   rM   r   r   r   r   r$   2   s:   
1
*

r$   )tensorindexr   c                 C   s   |j d krtd|j d|  }|jd k	r`t||jkr`t||j jj	|j kr`||j S |D ] }t|jj	|j krd|  S qdtd|j  d|j dd S )NzCannot lookup z5 since its a ShardedTensor and no offset was providedzCould not find shard at 'z' for FQN: '')
offset
ValueErrorfqnZlocal_shardsrU   rB   torchSizemetadataZshard_offsets)rT   rU   ZshardsZshardr   r   r   _find_shard&  s&    



r]   c                 C   sr   t | tr|  S t | tr(t| |jS |jd k	rn|jtdgt	| 
  krT| S td|j d|j d| S )Nr   FQN: '1' is not a ShardedTensor, can't find by offset: 'rV   )rC   r   Zto_localr   r]   rT   rW   rZ   r[   rB   sizerX   rY   )rT   rU   r   r   r   r   >  s    


)
state_dictrU   r   c                 C   sd   |j | krtd|j  d| |j  }t|tjr<t||S |jd k	r`td|j  d|j d|S )NzCould not find FQN: 'rV   r^   r_   )rY   rX   rC   rZ   Tensorr   rW   )ra   rU   r6   r   r   r   r   O  s    



)abr   c                 C   s   dd t | |D S )Nc                 S   s   g | ]\}}|| qS r   r   r   Zi_aZi_br   r   r   
<listcomp>`  s     z%_element_wise_add.<locals>.<listcomp>ziprc   rd   r   r   r   _element_wise_add_  s    rj   c                 C   s   dd t | |D S )Nc                 S   s   g | ]\}}|| qS r   r   re   r   r   r   rf   d  s     z%_element_wise_sub.<locals>.<listcomp>rg   ri   r   r   r   _element_wise_subc  s    rk   c                       s|   e Zd Zejeed fddZejfeeedddZ	eddd	Z
edd
dZedddZdd ZdddZ  ZS )_ReaderView)base_streamrW   rB   c                    s*   t    || _|| _|| _| d d S )Nr   )superr-   rW   rB   rm   seek)r,   rm   rW   rB   	__class__r   r   r-   h  s
    
z_ReaderView.__init__)_ReaderView__offset_ReaderView__whencer   c                 C   sD   |t jkr| j| }n |t jkr6t j}| j| j | }| j||S r/   )osSEEK_SETrW   SEEK_ENDrB   rm   ro   )r,   rr   rs   r   r   r   ro   o  s    

z_ReaderView.seekr.   c                 C   s   | j  | j S r/   )rm   tellrW   r0   r   r   r   rw   w  s    z_ReaderView.tellc                 C   s
   | j  S r/   )rm   readabler0   r   r   r   rx   z  s    z_ReaderView.readablec                 C   s
   | j  S r/   )rm   seekabler0   r   r   r   ry   }  s    z_ReaderView.seekablec                 C   s   | j |S r/   )rm   readinto)r,   rd   r   r   r   rz     s    z_ReaderView.readintoc                 C   s   | j |S r/   )rm   read)r,   r`   r   r   r   r|     s    z_ReaderView.read)r{   )rN   rO   rP   ioIOBaser!   r-   rt   ru   ro   rw   rR   rx   ry   rz   r|   __classcell__r   r   rp   r   rl   g  s   rl   )filerW   lengthr   c                 C   s   t | ||S r/   )rl   )r   rW   r   r   r   r   _create_file_view  s    r   )device_type	device_idr   c                 C   s   | dkrdS |  d| S )z$
    Device info normalization.
    cpu:r   )r   r   r   r   r   _normalize_device_info  s    r   )/rt   r}   typingr   r   r   r   r   r   r   r	   r
   Ztorch.distributeddistributedr(   apir   r   r   r   rZ   Z'torch.distributed._shard.sharded_tensorr   Z-torch.distributed._shard.sharded_tensor.shardr   Ztorch.distributed._tensorr   r\   r   r   __all__r   r   r!   r#   r$   r]   rb   r   r   rj   rk   r~   rl   r   rS   r   r   r   r   r   <module>   s>   ,
	 u   