U
    9%eT_                     @   s  d dl Z d dlZd dlZd dlmZ d dlm  mZ d dl	m
Z
mZmZmZmZ d dlmZ ddlmZ ddlmZ d dlmZ e rdd	 Zn8zd d
lmZ W n& ek
r   e d dd	 ZY nX eee eee  ejde
def f Zdd Z dLej!e"ee"dddZ#dMej!eee"dddZ$dNej!e"eee"dddZ%dOeej! e"ee"eej! dddZ&dPeej! ee"eej! dddZ'dQeej! e"ee ee"eej! d d!d"Z(d#d$ Z)G d%d& d&ej!Z*dRee"e
e"ee ef d'd(d)Z+e,d*d+d,Z-ej!d*d-d.Z.d/d0 Z/d1d2 Z0d3d4 Z1d5d6 Z2d7d8 Z3d9d: Z4d;d< Z5d=d> Z6e s~ej78d?d@Z9ej78d?dAZ:e6  n
e dB dSej!ej!e,e"edDdEdFZ;dTej!ej!e"e,ee"dHdIdJZ<d dKlm=Z>m%Z? e>e;e?e<iZ@dS )U    N)TupleUnionListcastTYPE_CHECKING)tree_map_only   )_functional_collectives_impl)_register_tensor_wrapper)get_innermost_proxy_modec                   C   s   dS )z9Can't import torchdynamo in torchdeploy builds currently.F r   r   r   h/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/torch/distributed/_functional_collectives.pyis_torchdynamo_compiling   s    r   )is_compilingzdUnable to import torchdynamo util `is_torchdynamo_compiling`, so won't support torchdynamo correctlyc                   C   s   dS )NFr   r   r   r   r   r      s    zdist._tensor.DeviceMeshc                 C   s   t jj| S )z
    Wait on a tensor returned by the collectives ops.

    Waiting follows device semantics, which means blocking on CPU and synchronizing streams on CUDA.
    )torchopsc10d_functionalwait_tensor)tensorr   r   r   r   t   s    r    )selfreduceOpgrouptagc                 C   s.   t ||\}}}tjj| ||||}t|S )a  
    Reduces the tensor data across all machines in such a way that all get
    the final result.

    The input tensor is left unmodified.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )_expand_groupr   r   r   
all_reduce_maybe_wrap_tensor)r   r   r   r   rankset
group_sizer   r   r   r   r   }   s    r   )r   
gather_dimr   r   c                 C   s^   |   stt||\}}}tjj| |||}t|}|dkrZtjtj	||dd|d}|S )a%  
    Gather tensor data across from all machines and concatenate over ``gather_dim``.

    Note that it currently only supports gather_dim = 0.

    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   dim)
Zis_contiguousAssertionErrorr   r   r   r   all_gather_into_tensorr   catchunk)r   r   r   r   r   r   r   resr   r   r   all_gather_tensor   s    r'   )r   r   scatter_dimr   r   c           
      C   s   t ||\}}}| || dks<td| d d| |dkr^tj| ||d}t|} tjj| ||||}t	|}	|	S )a(  
    Reduces the tensor data across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.


    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh
    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   zinput dimension 0 (" must be a multiple of group_size r    )
r   sizer"   r   r%   r$   r   r   reduce_scatter_tensorr   )
r   r   r(   r   r   r   r   tensor_listr   r&   r   r   r   r+      s    
r+   )r   r   r   r   returnc                 C   s4   t ||\}}}tjj| ||||}ttt|S )a  
    Reduces a list of tensors across all machines in such a way that all get
    the final result.

    The all tensors in the input list are left unmodified.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )r   r   r   r   all_reduce_coalescedlistmapr   )r   r   r   r   r   r   r,   r   r   r   r.      s    r.   )r   r   r   r-   c                 C   s2   t ||\}}}tjj| |||}ttt|S )a  
    Gather a list of tensors across from all machines.

    Note that it currently only supports gather_dim = 0.

    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )r   r   r   r    all_gather_into_tensor_coalescedr/   r0   r   )r   r   r   r   r   r,   r   r   r   r1      s    r1   )inputsr   r(   r   r   r-   c              
   C   s   t ||\}}}t|t| ks$ttt|| D ]j\}\}}	|	|| dksvtd| d|	| d| d| |dkr2tj|	||d}
t|
| |< q2tj	j
| ||||}
ttt|
S )a,  
    Reduces a list of tensors across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.

    The input tensors are left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   zinput dimension z (r)   z for tensor at index r    )r   lenr"   	enumeratezipr*   r   r%   r$   r   r   reduce_scatter_tensor_coalescedr/   r0   r   )r2   r   r(   r   r   r   r   idxr!   r   r,   r   r   r   r6     s     r6   c                 C   sH   t | tjjst| j}t|jdkrD|jd }|jd k	oB|jj	 S d S Nr   )

isinstancer   Z_opsZ
OpOverloadr"   Z_schemar3   	argumentsZ
alias_infoZis_write)ZtgtZschemaZ	first_argr   r   r   _is_view_op(  s
    
r;   c                   @   sd   e Zd ZU dZejed< dgZejj	Z
eejdddZdd Zdd	 Zd
d ZedddZdS )AsyncCollectiveTensora  
    A Tensor wrapper subclass that is used to trigger a call to wait
    prior to first use of the underlying tensor.
    Use it inside functional collective pytorch wrappers like the following:
    def functional_collective(self, group, tag):
        tag, rankset, group_size = _expand_group(group, tag)
        tensor = torch.ops.c10d_functional.{collective}(self, tag, rankset, group_size)
        return _maybe_wrap_tensor(tensor)
    elemr=   c              
   C   s8   t jj| | | | |j|j|jdd}||_	|S )NF)stridesstorage_offsetdtypelayoutdeviceZrequires_grad)
r   TensorZ_make_wrapper_subclassr*   Zstrider@   rA   rB   rC   r=   )clsr=   rr   r   r   __new__@  s        zAsyncCollectiveTensor.__new__c                 C   s   t | j d| j dS )NzAsyncCollectiveTensor()r   r=   r   r   r   r   __repr__L  s    
zAsyncCollectiveTensor.__repr__c                 C   s   t | j | S NrI   rJ   r   r   r   trigger_waitP  s    
z"AsyncCollectiveTensor.trigger_waitc                 C   s   | j S )zOThis method enables  _functional_collectives_impl to test if a tensor is an ACSr>   rJ   r   r   r   _get_acs_underlying_tensorT  s    z0AsyncCollectiveTensor._get_acs_underlying_tensorr   Nc           
         sb   t | td fdd}tjddd}tt||}tt||}|||}	 r^ttj||	}	|	S )Nec                    s    st | j | jS rL   rI   rO   Z
is_view_opr   r   unwrap\  s    
z8AsyncCollectiveTensor.__torch_dispatch__.<locals>.unwrapc                 S   s"   t | trtt| }t| |S rL   )r9   r<   r"   r
   )rP   r&   r   r   r   wrapb  s    z6AsyncCollectiveTensor.__torch_dispatch__.<locals>.wrap)r;   r<   r   rD   r   )
rE   functypesargskwargsrR   rS   Zunwrapped_argsZunwrapped_kwargsoutr   rQ   r   __torch_dispatch__X  s    
z(AsyncCollectiveTensor.__torch_dispatch__)r   N)__name__
__module____qualname____doc__r   rD   __annotations__	__slots__Z_CZ_disabled_torch_function_implZ__torch_function__staticmethodrG   rK   rM   rN   classmethodrY   r   r   r   r   r<   0  s   
	
r<   )r   r   r-   c                 C   s  ddl m  m} tr(dd }dd }ndd }dd }t| trt| d tr|| }g }d	}|D ]B}|| |d	kr|t|krtd
| dt| t|}qdn|| }t|}nt| t	j
rt	| }t|}|pt| }nt| |jr$| jdkstd| jd \}}t|}nvt| trt| dkrt| d |jrt| d tr| d }	| d }
|	j|
 \}}t|}ntdntd|||fS )a5  
    _expand_group desugars the different RANK_TYPES types into a canonical format that is traceable.

    By having this be part of the explicit eager codepath, we avoid having to specialize behavior inside
    torchdynamo and can still interoperate with processgroup objects or other untraceable forms.
    r   Nc                 S   s   t ttt  | S rL   r   r   intxr   r   r   cast_listlistint  s    z'_expand_group.<locals>.cast_listlistintc                 S   s   t tt | S rL   rb   rd   r   r   r   cast_listint  s    z#_expand_group.<locals>.cast_listintc                 S   s   | S rL   r   rd   r   r   r   rf     s    c                 S   s   | S rL   r   rd   r   r   r   rg     s    z$group sizes must be identical found z and r   zJOnly 1D mesh is supported, pass in (DeviceMesh, int) together if mesh > 1D   z1Invalid tuple for group must be (DeviceMesh, int)z[Invalid type for group, must be one of List, Processgroup, DeviceMesh or (DeviceMesh, int).)Ztorch.distributed._tensordistributedZ_tensorr   r9   r/   extendr3   
ValueErrordistProcessGroupZget_process_group_ranksc10dZ_get_group_tagZ
DeviceMeshndimr"   Z_dim_group_infostuplerc   )r   r   dtrf   rg   Znested_listr   r   rsZdmeshr!   r   r   r   r   y  sJ    





0

r   )r-   c                  C   s&   t  r
dS t } | d krdS | jd k	S )NTF)r   r   Ztracer)moder   r   r   _are_we_tracing  s    ru   c                 C   s*   t  rt| S t| }t| ttj|S rL   )ru   r   r<   r
   r   r   rD   )r   r&   r   r   r   r     s
    r   c                    s    fddfdd| D S )Nc                    s*   t |  }|d   9  < | |}|S r8   r/   r*   Z	new_empty)shardout_size
out_tensorr   r   r   mk_out_tensor  s    
z=_all_gather_into_tensor_coalesced_meta.<locals>.mk_out_tensorc                    s   g | ]} |qS r   r   .0tr{   r   r   
<listcomp>  s     z:_all_gather_into_tensor_coalesced_meta.<locals>.<listcomp>r   )r   r   r   r   r   r   r{   r   &_all_gather_into_tensor_coalesced_meta  s    r   c                 G   s
   t | S rL   r   Z
empty_liker   rV   r   r   r   _all_reduce_meta  s    r   c                 G   s
   t | S rL   r   r   r   r   r   _wait_tensor_meta  s    r   c                 C   s&   t |  }|d  |9  < | |S r8   rv   )rw   r   r   r   rx   r   r   r   _all_gather_into_tensor_meta  s    r   c                 C   s&   t |  }|d  |  < | |S r8   rv   )inputZ	reduce_opr   r   r   rx   r   r   r   _reduce_scatter_tensor_meta  s    r   c                 C   s   dd | D S )Nc                 S   s   g | ]}t |qS r   r   r|   r   r   r   r     s     z._all_reduce_coalesced_meta.<locals>.<listcomp>r   )r   r   r   r   r   r   r   r   _all_reduce_coalesced_meta  s    r   c                    s    fddfdd| D S )Nc                    s*   t |  }|d     < | |}|S r8   rv   )r   rx   ry   rz   r   r   r{     s    
z<_reduce_scatter_tensor_coalesced_meta.<locals>.mk_out_tensorc                    s   g | ]} |qS r   r   r|   r   r   r   r     s     z9_reduce_scatter_tensor_coalesced_meta.<locals>.<listcomp>r   )r2   r   r   r   r   r   r   r   %_reduce_scatter_tensor_coalesced_meta  s    r   c                  C   s   dddddddg} t jt }| D ]^}|d|d	 }ttd
| }t|d
| d}t| t	||d t	||d q d S )NzUall_reduce(Tensor self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensorzcall_reduce_coalesced(Tensor[] self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor[]z"wait_tensor(Tensor self) -> TensorzTall_gather_into_tensor(Tensor shard, str tag, int[] ranks, int group_size) -> Tensorzball_gather_into_tensor_coalesced(Tensor[] input, str tag, int[] ranks, int group_size) -> Tensor[]zareduce_scatter_tensor(Tensor input, str reduceOp, str tag, int[] ranks, int group_size) -> Tensorzpreduce_scatter_tensor_coalesced(Tensor[] inputs, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor[]r   (__metaZCompositeExplicitAutogradZMeta)
sysmodulesrZ   indexgetattrfun_col_implc10_libZdefinec10_lib_implimpl)Zops_defsZ	my_moduleZop_defZop_nameZbackend_implZ	meta_implr   r   r   _register_ops  s     


r   r   ZDEFZIMPLzJPyTorch Distributed functional collectives do not work with torch::deploy.F)outputr   async_opr   r   c                 C   s    |rt d| t||||S Nz@Can't remap async version of inplace op to functional collective)r"   copy_r'   )r   r   r   r   r   r   r   r   r   all_gather_tensor_inplace  s    r   sum)r   r   opr   r(   r   c                 C   s"   |rt d| t|||||S r   )r"   r   r+   )r   r   r   r   r   r(   r   r   r   r   reduce_scatter_tensor_inplace(  s    	r   )r#   r+   )r   )r   )r   )r   )r   )r   )r   )Fr   r   )r   NFr   r   )Awarningsr   r   Ztorch.distributedrj   rm   Z"torch.distributed.distributed_c10dZdistributed_c10dro   typingr   r   r   r   r   Ztorch.utils._pytreer   r   r	   r   r
   Z"torch.fx.experimental.proxy_tensorr   Z_running_with_deployr   Ztorch._dynamo.external_utilsr   	Exceptionwarnrc   rn   Z
RANK_TYPESr   rD   strr   r'   r+   r.   r1   r6   r;   r<   r   boolru   r   r   r   r   r   r   r   r   r   ZlibraryLibraryr   r   r   r   r#   Zlegacy_allgatherZlegacy_reducescatterZtraceable_collective_remapsr   r   r   r   <module>   s   
;(	 $ #$" 'I"D



          