U
    9%e!S                     @   s   d dl Z d dlZd dlmZmZ d dlmZmZmZ d dl	Z	d dl	m
Z
 d dlmZ d dlmZmZ d dlmZ e eZe	jedZd	d
 ZeG dd dZeedddZee edddZdd ZG dd dZdS )    N)	dataclassfield)AnyListOptional)fx)GraphCompileReason)deepcopy_to_fake_tensordetect_fake_mode)NodeZ
ddp_graphsc                 C   sp   t | rd| j dS t| tr>dddd | D  dS t| trdddd	d | D  dS t| S d S )
NzT[]ztuple(z, c                 S   s   g | ]}t |qS  args_str.0xr   r   a/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/torch/_dynamo/backends/distributed.py
<listcomp>   s     zargs_str.<locals>.<listcomp>)zlist(c                 S   s   g | ]}t |qS r   r   r   r   r   r   r      s     )torchZ	is_tensorshape
isinstancetuplejoinliststr)argsr   r   r   r      s    


r   c                   @   sr   e Zd ZU dZeed< eedZe	e
 ed< eedZe	ej ed< eedZe	ed< dZeed< dZeed< d	S )
Bucketr   size)default_factoryparamsnodes	param_ids,opcount_increased_to_capture_external_output!paramsize_before_opcount_increaseN)__name__
__module____qualname__r   int__annotations__r   r   r!   r   r   r"   r   r   r#   r$   r%   r   r   r   r   r      s   
r   )bucketreturnc                 C   s<   t  }| jD ]*}|| |jD ]}||kr   dS q qdS )NTF)setr"   addZusers)r+   Znodes_in_bucketnodeuserr   r   r   bucket_has_external_output*   s    


r1   )bucketsbucket_bytes_capc           	      C   s6  d}g }g }t t| D ]v\}}t|jdkrj|||j|jd f |jdd  D ]}|d d |f qT|jdkr|||j|j|j f qt|r(t	d|t|  t|rt
d zFddlm} td|||dd	 t|rt
d
||ddd	 W n  tk
r$   td Y nX n
td d S )N)IndexzSize (b)zParam Namesr      zd
DDPOptimizer used bucket cap %s and created %d buckets. Enable debug logs for detailed bucket info.a  Some buckets were extended beyond their requested parameter capacities in order to ensure each subgraph has an output node, required for fx graph partitioning. This can be the case when a subgraph would have only contained nodes performing inplace mutation, and returning no logical outputs. This should not be a problem, unless it results in too few graph partitions for optimal DDP performance.)tabulatez;
DDPOptimizer produced the following bucket assignments:
%sZsimple_grid)headersZtablefmtzKDDPOptimizer extended these buckets to ensure per-subgraph output nodes:
%s)r4   z	Extra OpszExtra Param Size (b)z^Please `pip install tabulate` in order to display ddp bucket sizes and diagnostic information.zADDPOptimizer captured no parameters and did not split this graph.)	enumeratereversedlenr!   appendr   r$   r%   loginfowarningr6   debugImportError)	r2   r3   r7   rowsZextended_bucketsidxr+   paramr6   r   r   r   pretty_print_buckets7   sZ    



rD   c                 C   s<   | j jD ].}|jdkrt| |j}t|tjjr dS qdS )Nget_attrTF)	graphr"   opgetattrtargetr   r   r   GraphModule)gmr/   maybe_paramr   r   r   has_higher_order_opr   s    
rM   c                   @   sF   e Zd ZdZdeee dddZdd Zej	e
ej dd	d
ZdS )DDPOptimizera  Note [DDPOptimizer]
    DDPOptimizer applies when dynamo compiles models wrapped in DistributedDataParallel (DDP),
    breaking the dynamo graph into chunks to compile separately, with the breaks aligning to
    the boundaries of gradient-allreduce buckets chosen by DDP.

    Background/Motivation
     - DDP uses allreduce collectives to synchronize partial gradients computed on different workers
     - DDP groups gradient allreduces into 'buckets' to optimize communication efficiency of all-reduce
     - Parameters grouped into buckets are assumed to be adjacent in time, so they become ready
       at around the same time during backward and thus can share the same allreduce efficiently
     - Allreduces must overlap with backward compute for optimal training performance
     - DDP schedules allreduces using 'hooks' fired from the c++ autograd engine in pytorch, which
       operates when individual grads become 'ready'
     - Dynamo+AOTAutograd produces a single fused graph that runs 'atomically' from the perspective of the
       autograd engine, such that all gradients become 'ready' at the same time.  Hooks fire after the whole
       fused backward function executes, preventing any overlap of compute and communication

    Algorithm
     - DDPOptimizer starts off with an FX graph traced by dynamo which represents forward.  It can traverse
       this graph in reverse order to determine the true order that gradients will become ready during backward.
     - Parameter sizes are counted in reverse order, up to a bucket size limit, at which point a new bucket is started
       and a graph break introduced
     - Each of the subgraphs is compiled by the compiler provided to dynamo by the user, and then fused back together
       into an outer module that is returned to the user

    Notes
     - It would be better to enforce (by adding an API to DDP) that the bucket splits chosen here are used by DDP,
       and that DDP does not need to detect or optimize bucket order by observing execution at runtime, as it does
       in eager.
     - If Dynamo can't capture a whole graph for the portion of the model wrapped by DDP, this algorithm will currently
       produce splits that do not necessarily align with the buckets used by DDP.  This should result in performance
       degradation approaching the baseline case where graph-splits are not used, but not worse.
     - If the backend compiler fails to compile a single subgraph, it will execute eagerly despite the rest of the
       subgraphs being compiled
     - DDP has a 'parameters_and_buffers_to_ignore' field, which DDPOptimizer attempts to honor by reading markers
       left by DDP on individual parameters.  In cases where other transformations, such as reparameterization, are
       also used, the ignore markers could be lost.  If DDPOptimizer fails to ignore a parameter ignored by DDP,
       it is not catastrophic but could impact performance by choosing sub-optimal bucket splits.
     - DDPOptimizer always ignores all buffers, regardless of their ignore flag, since buffers do not require gradients,
       and therefore aren't allreduced by DDP.  (They are broadcast during forward, but this is not covered by
       DDPOptimizer)

    Debugging
     - Generally, it is easiest to debug DDPOptimizer in a single process program, using pdb.
     - In many cases, the log messages are helpful (they show bucket size assignments)-
       just configure torch._dynamo.config.log_level to info or debug.
     - See `benchmarks/dynamo/distributed.py` for a simple harness that will run a toy model or a torchbench model
       in a single process (or with torchrun, in multiple processes)

    Args:
        bucket_bytes_cap (int): Controls the size of buckets, in bytes, used to determine graphbreaks.  Should be
            set to match the equivalent parameter on the original DDP module.

        backend_compile_fn (callable): A dynamo compiler function, to be invoked to compile each subgraph.

        first_bucket_cap (int): Controls the size of the first bucket.  Should match DDP's first bucket cap.  DDP
            special-cases the first bucket size since it is sometimes optimal to start a small allreduce early.

    N)r3   first_bucket_capc                 C   sP   |d k	r|| _ ntj r&tjj| _ n|| _ || _| j | jksFtd|| _d S )NzQFirst bucket should be smaller/equal to other buckets to get comms warmed up ASAP)rO   r   distributedZis_availableZ_DEFAULT_FIRST_BUCKET_BYTESr3   AssertionErrorbackend_compile_fn)selfr3   rR   rO   r   r   r   __init__   s    

zDDPOptimizer.__init__c                 C   s   t |do|jS )N_ddp_ignored)hasattrrU   )rS   Z	parameterr   r   r   _ignore_parameter   s    zDDPOptimizer._ignore_parameter)rK   example_inputsc                    sP  t |  dkrtjj  t|r,tdt g}t|j	j
D ]}|jdkrRq@|d j| jks~t|dkr|d j| jkrt|d r|dt  n0|d jdkr|d j|d _|d  jd7  _|jdkrP||j}| D ]b\}}|jr| |s|d  j|  7  _|d j|j d|  |d jt| qnl|jdkrt||j}|jr| |s|d  j|  7  _|d j|j |d jt| |d j
| q@t|dkr$|d jdkr$|d j
 |d j
 t|d jdkst!d	|d= || _"t#|| j t|dkrP| $||S i t%|D ] \}	}
|
j
D ]}|	|< qjq\t&j'j((|dfd
d}d|j	 dd|j	 d }|) D ]6\}}d|krt|r|d| d|j	 d7 }q|d7 }t*+| G  fdddtj&j,j-}||| j$}|j.|  |/  t*+d|j	 |S )a1  
        Implements graph splitting, first determining a set of of buckets by counting
        parameter sizes in reverse graph order, then invoking the user/backend compiler
        to compile each subgraph. Finally, stiches compiled graphs into one graphmodule
        and returns its callable.
        Na]  DDPOptimizer backend: Found a higher order op in the graph. This is not supported. Please turn off DDP optimizer using torch._dynamo.config.optimize_ddp=False. Note that this can cause performance degradation because there will be one bucket for the entire Dynamo graph. Please refer to this issue - https://github.com/pytorch/pytorch/issues/104674.)outputplaceholderr   r5   call_module_rE   z#Params should be empty if size is 0c                    s    |  S Nr   )r/   )partition_mapr   r   <lambda>,      z)DDPOptimizer.compile_fn.<locals>.<lambda>z
---orig graph---

z
---split graph---
.z
---z
 graph---
z
---------------
c                       s8   e Zd Z fddZdd ZeedfddZ  ZS )z/DDPOptimizer.compile_fn.<locals>.SubmodCompilerc                    s   t  | || _d S r]   )superrT   compiler)rS   modulerd   	__class__r   r   rT   <  s    z8DDPOptimizer.compile_fn.<locals>.SubmodCompiler.__init__c                 S   s   t |dkstdG dd dtjj}d}|jjD ],}|jdkr4t|j	d t
s4d}|j	f|_	q4|  tdttdtg|_|| |||}|S )	z
                Compile the submodule,
                using a wrapper to make sure its output is always a tuple,
                which is required by AotAutograd based compilers
                r   z%We assume only args for these modulesc                       s$   e Zd Z fddZdd Z  ZS )zUDDPOptimizer.compile_fn.<locals>.SubmodCompiler.compile_submod.<locals>.WrapperModulec                    s   t    || _|| _d S r]   )rc   rT   submodunwrap_singleton_tuple)rS   rh   ri   rf   r   r   rT   I  s    
z^DDPOptimizer.compile_fn.<locals>.SubmodCompiler.compile_submod.<locals>.WrapperModule.__init__c                 W   s*   | j | }| jr&t|ttfr&|d S |S )Nr   )rh   ri   r   r   r   )rS   r   r   r   r   r   forwardN  s    
z]DDPOptimizer.compile_fn.<locals>.SubmodCompiler.compile_submod.<locals>.WrapperModule.forward)r&   r'   r(   rT   rj   __classcell__r   r   rf   r   WrapperModuleH  s   rl   FrY   Tz{DDPOptimizer intentional graph-break (See Note [DDPOptimizer]). Set `torch._dynamo.config.optimize_ddp = False` to disable.)r:   rQ   r   nnModulerF   r"   rG   r   r   r   	recompiler   	tracebackFrameSummary__file__rN   Zcompile_subgraph_reasonrd   )rS   Z	input_modr   kwargsrl   ri   Zsnwrapperr   r   r   compile_submod@  s&    

z>DDPOptimizer.compile_fn.<locals>.SubmodCompiler.compile_submod)nr,   c           	   
      sB  |  |\}}g } st|D ]:}t|tjrNt|tjjsN| | q|| qt	
d|j|jt| t|tstt|tst|jdkr(| |j} rt| }n|}t
d|j|j | |||}| j|j d|j |_| j|j|   |||W  5 Q R  S Q R X nt| |j|j||S d S )Nzrun_node %s, %s got args %sr[   z
---%s graph---
%sZ	compiled_)Zfetch_args_kwargs_from_envrQ   r   r   Tensor_subclassesZ
FakeTensorr;   Zfrom_tensorr<   r?   rG   rI   r   r   dictZ
fetch_attrr	   ddp_graph_logrF   ru   re   Zdelete_submoduleZadd_submodulerH   )	rS   rv   r   rs   new_argsargZreal_modZcurr_submodZcompiled_submod_real	fake_moder   r   run_node  sD         z8DDPOptimizer.compile_fn.<locals>.SubmodCompiler.run_node)	r&   r'   r(   rT   ru   r   r   r   rk   r   r}   rf   r   SubmodCompiler;  s   Br   z&
---final graph---
%s
---------------
)0r
   r   rx   Zfake_tensorZFakeTensorModerM   NotImplementedErrorr   r9   rF   r"   rG   r   r3   r:   rO   r1   insertr$   r%   Zget_submodulerI   Znamed_parametersZrequires_gradrW   Zuntyped_storagenbytesr!   r;   r#   idrH   extendrQ   r2   rD   rR   r8   r   ZpassesZsplit_moduleZnamed_modulesrz   r?   interpreterZInterpreterrunro   )rS   rK   rX   r2   r/   rI   namerC   rL   rB   bZsplit_gmZ	debug_strre   r   Zsubmod_compilerr   )r~   r^   r   
compile_fn   s    



  

x
 zDDPOptimizer.compile_fn)N)r&   r'   r(   __doc__r)   r   rT   rW   r   rJ   r   r   rw   r   r   r   r   r   rN   |   s   @ rN   ) loggingrp   dataclassesr   r   typingr   r   r   r   r   Ztorch._dynamo.output_graphr   Ztorch._dynamo.utilsr	   r
   Ztorch.fx.noder   	getLoggerr&   r<   Z_loggingZgetArtifactLoggerrz   r   r   boolr1   r)   rD   rM   rN   r   r   r   r   <module>   s"   
;
