U
    dD                    @   s  d dl mZ d dlmZmZmZ d dlZd dlZd dlm	Z	 d dl
mZmZmZmZmZmZmZ d dlmZ d dlZd dlZed e d kred ed	 ed
Zeej dZdZdd Zdd Z dd Z!dddddddddddddddddddefddZ"dd Z#dd Z$dddddddddddddddefdd Z%dd!d"Z&d#d$ Z'd%d& Z(d'd( Z)d)d* Z*d+d, Z+efd-d.Z,dd/d0Z-dd1d2Z.d3d4 Z/d5d6 Z0d7d8 Z1dd9d:Z2d;d< Z3dd=d>Z4dd?d@Z5ddAdBZ6ddCdDZ7dEdF Z8dGdH Z9dIdJ Z:dKdL Z;dMdN Z<dOdP Z=G dQdR dRe>Z?dSdT Z@dUdV ZAddWdXZBddYdZZCdd[d\ZDd]d^ ZEd_d` ZFdadb ZGdcdd ZHdedf ZIdgdh ZJdidj ZKdkdl ZLdmdn ZMdodp ZNdqdr ZOdsdt ZPddudvZQddwdxZRdydz ZSd{d| ZTd}d~ ZUdS )    )OrderedDict)	viewitemsviewkeys
viewvaluesN)	cpu_count)model_helperdyndepscope	workspacecorememongerutils)
caffe2_pb2z%@/caffe2/caffe2/contrib/gloo:gloo_opsz%@/caffe2/caffe2/contrib/nccl:nccl_opsz)@/caffe2/caffe2/contrib/gloo:gloo_ops_gpuZdata_parallel_model   i,  c                  O   s   d|d< t | | d S NF
cpu_deviceParallelizeargskwargs r   E/tmp/pip-unpacked-wheel-ua33x9lu/caffe2/python/data_parallel_model.pyParallelize_GPU%   s    r   c                  O   s   d|d< t | | d S NTr   r   r   r   r   r   Parallelize_CPU*   s    r   c                  O   s   d|d< t | | d S )NTideepr   r   r   r   r   Parallelize_iDeep.   s    r   ZdagTF      c           /         s  t  dks.t  jtjks.tdt  |dkrb|sR|sRttdt	
 }nttdt }|s|s|D ]*}|t	 krntdt	 |  qqnt	j| _d| _d| _d}|dkstdnd|rtj| _d	| _d
}|| _|r|	dk	rdstn2tj| _d| _d}|| _|r(|	dk	r(ds(ttd| |	dk	rFdnd}t|| | }t||d }|| j _|
| j _|| _|	| _d| _d| _g | _t | t!j"stt##| j$}td i }|	dkrdn|	d }dt||  }|dk	p |dk	}|dk	r|dk	rtd|s| j%rtd td td| j&|  td td td td td |D ]} t'(| j| }!t')|! t'*d| j| n td||  ||  || |}"|rt |"tstd|"D ]}#t |#t'j+s tdq |"|| < W 5 Q R X W 5 Q R X qt,| j$ t-| || j$|| _.t-| || /dg }$| j.0|$ tt1| j.| _2tt1|$| _3|r||  |rtd  t4|| | |r|| t|| j| j |std! t5|  dS |r.|std"t6|  |r&t7|  nt8|  t,| j$ | j9  fd#d$| j$D }% fd%d$|D }&t-| ||%|&}'| j.0|' tt1|'| _|| _:t5|  td& |rt;|| |	| t| jdkrt<| }(t|(dkstt=|(|| | j|	|| n
td' td( |	dkrdn|	d }t>| ?d})|r@t@|  |dk	r|D ]R} t'(| j| }!t')|!. t'*d| j|  ||  W 5 Q R X W 5 Q R X qNntd) || }*|*| _AtB| \}+},t-| ||+g }-| j.0|- t5|  tC|  |  jDE }.d*|._&d|._Ftd+ tG|| | jH| jH|	|,dd, |dk	r|D ]R} t'(| j| }!t')|!. t'*d| j|  ||  W 5 Q R X W 5 Q R X q>|r|rtd-|rtI| || |rtJ| || | jHg| _K| jg| _LtM| | |rtN| |) dS ).a  
    Function to create a model that can run on many GPUs or CPUs.
      model_helper_obj: an object of ModelHelper
      input_builder_fun:
                         Function that adds the input operators
                         Note: Remember to instantiate reader outside of this
                         function so all devices share same reader object.
                         Signature:  input_builder_fun(model)
      forward_pass_builder_fun:
                        Function to add the operators to the model.
                        Must return list of loss-blob references that
                        are used to build the gradient. Loss scale parameter
                        is passed, as you should scale the loss of your model
                        by 1.0 / the total number of devices.
                        Signature: forward_pass_builder_fun(model, loss_scale)
      param_update_builder_fun:
                        Function that adds operators that are run after
                        gradient update, such as updating the weights and
                        weight decaying. This is called for each GPU separately.
                        Signature: param_update_builder_fun(model)
      optimizer_builder_fun:
                        Alternative to param_update_builder_fun, allows one
                        to add an optimizer for the whole model. Called only
                        once, without name or devicescope.
      net_transformer_fun:
                        Optional function to transform the network after the
                        network is built. It will be called once (NOT once per
                        GPU.)
                        Signature:
                        net_transformer_fun(
                            model, num_devices, device_prefix, device_type)
      pre_grad_net_transformer_fun:
                        Optional function to transform the network similar to
                        net_transformer_fun, but happens before gradient ops
                        been add.
                        Signature: pre_grad_net_transformer_fun(model)
      post_sync_builder_fun:
                        Function applied after initial parameter sync has been
                        completed, such as keeping multi-precision parameters
                        in sync.
                        Signature: post_sync_builder_fun(model)
      devices:          List of GPU ids, such as [0, 1, 2, 3],
      rendezvous:       used for rendezvous in distributed computation, if None
                        then only one node is used. To create rendezvous,
                        use <TBD>.
      net_type:         Network type
      optimize_gradient_memory: whether to apply 'memonger' to share blobs
      shared_model      (only for CPU) use same parameters on each device
                        in gradient computation to reduce memory footprint.
      dynamic_memory_management: Whether to apply dynamic memory optimization
                        by freeing unused blobs. The underlying (de)allocation
                        uses cached allocator. For GPU training PLEASE MAKE SURE
                        caffe2_cuda_memory_pool is set.
      blobs_to_keep :   A list of blob names to keep and don't free during
                        dynamic memory optimization (for example loss blob).
      cpu_device        Use CPU instead of GPU.
      ideep             Use ideep.
      combine_spatial_bn:
                        When set to True, applies batch normalization across
                        all devices within the node. If False, batch
                        normalization will be done separately for each device.
                        This option is currently only supported on the CPU.
      barrier_net_timeout_sec:
                        The timeout in seconds of the barrier net, which is run
                        to synchronize shards before a training epoch starts.
                        Defaults to 300 seconds.
    NMParallelize must be called without device-scope,         device scope was: {}r   ,** Only {} GPUs available, GPUs {} requestedgpuFZGPUz"Shared model only supported on CPUr   IDEEPz4Shared model only supported on single-node currentlycpuCPUz#Parallelizing model for devices: {}      z)Create input and model training operators
num_shards      ?zGCan only specify one of param_update_builder_fun, optimizer_builder_fun z############## WARNING #############z.Model {}/{} is used for testing/validation butzhas init_params=True!z&This can conflict with model training.z4Please ensure model = ModelHelper(init_params=False)z$####################################{}_{}zModel for {} : {}z5Model builder function must return list of loss blobsz1Model builder func must return list of loss blobszAdding gradient operatorsz6Parameter update function not defined --> only forwardz6combine_spatial_bn should only be used for train modelc                    s   g | ]}| kr | qS r   r   .0pparam_to_gradr   r   
<listcomp>)  s    zParallelize.<locals>.<listcomp>c                    s   g | ]} | qS r   r   r,   r/   r   r   r1   +  s     z$Add gradient all-reduces for SyncSGDz;NOTE: Param builder function did not create any parameters.z,Post-iteration operators for updating paramsz"Calling optimizer builder functionZfirst_iter_only_one_workerzAdd initial parameter syncmax_concurrent_distributed_opszcIt is not advised to use gradient optimization ('memonger')
        with dynamic memory management.)Or	   CurrentDeviceScopedevice_typer   r%   AssertionErrorformatlistranger
   ZNumCudaDevicesr   NumGpuDeviceslogwarningGpuDeviceType_device_type_device_prefix_shared_modelr#   infolenminnetProtonum_workerstype_devices_rendezvous_sync_barrier_net_broadcast_context_grad_names
isinstancer   ModelHelpercopyparamsZinit_paramsnamer   DeviceOptionDeviceScope	NameScopeBlobReference_ValidateParams_GroupByDevice_device_grouped_blobsGetComputedParamsupdater   _param_names_computed_param_names_AddGradientOperators_InferBlobDevice_InterleaveOps!_CPUInterDeviceBatchNormalization!_GPUInterDeviceBatchNormalizationr0   _losses_by_gpu_BroadcastComputedParams_GetReverseOrderedGrads_AllReduceBlobsset	GetParams_PruneParametersForSharing
_optimizer_ComputeBlobsToSync_AnalyzeOperatorsargaddi_SyncAllParamsparam_init_net_OptimizeGradientMemorySimple_AddDynamicMemoryOptimization_data_parallel_model_init_nets_data_parallel_model_nets_AddBarrierToModelNets"_RemapParameterBlobsForSharedModel)/model_helper_objinput_builder_funforward_pass_builder_funparam_update_builder_funZoptimizer_builder_funZpost_sync_builder_funZpre_grad_net_transformer_funZnet_transformer_fundevices
rendezvousnet_typeZbroadcast_computed_paramsoptimize_gradient_memoryZdynamic_memory_managementblobs_to_keepuse_ncclr3   r   r   num_threads_per_deviceZshared_modelZcombine_spatial_bnbarrier_net_timeout_secr"   Zdevice_nameZextra_workersrF   non_datapar_paramslosses_by_gpur(   
loss_scaleZhas_parameter_updatesdevice
device_optZlosseslossZcomputed_params_groupedZgrads_orderedZnon_datapar_gradsZgradients_groupedZreverse_ordered_grads
all_paramsZ	optimizerZ
sync_blobs
sync_namesZsync_blobs_groupedrl   r   r/   r   r   2   s   \ 





 





 
  










"


 
r   c                  O   s   d|d< t | | d S r   Parallelize_BMUFr   r   r   r   Parallelize_GPU_BMUF  s    r   c                  O   s   d|d< t | | d S r   r   r   r   r   r   Parallelize_CPU_BMUF  s    r   r)   c           %   
      s  t  dks.t  jtjks.tdt  ttj	s>t|dkrXt
tdt }|	dkrh|d }	|s|D ]*}|t krptdt |  qqptj_d_ntj_d_|_|_d_d_d_tj|	}|r|d nd	}t|| }|t| }|r|d
7 }d| |dkr:dd|  }t||d	 }|j _|j _t d_!|j! _|j! _t d_"|j" _|j" _dd }dd }dd }t##j$}i _% fdd}t&||jjdd t'j$ t(|j$|_)t
t*j)_+t,|j% t'j$ t- fdd}t&||jjdd t
t*j)}|dk	r|_.t d_/|j/ _|j/ _t0|j1j/||| t*j)D ]<} j)|  |	 }!t2| j/3|!||! W 5 Q R X qt*j)D ]n} j)|  |	 }!t2|J j!j4|!||!dd j!3|!||! |rxj!j4|!||!dd W 5 Q R X qt5||j"||
| |D ]@} j)|  |	 }!t2| j"j6|!|!d| d j"7|!||!g|! j"j6|!|!|d j"j6||!||!|d j"8||!|!g||! j"8||!||!g||! |rĈj"7||!||!g||! j"j6||!||!|d j"7||!||!g||! j"3||!||! j"3||!|! W 5 Q R X qt0|j1j"||| |dk	rt9|j"d |rtdd  j j:D }"|"D ]<}#|#j;d	 }$t2|#j< j"j4|$g|$dd W 5 Q R X q6|rt=j%| j1j!g_>jj"d	fg_?t@| dS )!a  
    Function to create model that run on many GPUs and creates a net for
    parameter_updates that can be run independently for number of iterations
    then followed by another net that runs once to compute the final parameter
    updates according to block wise model update filtering rule described
    in : Scalable Training of Deep Learning Machines by Incremental Block
    Training with Intra-block Parallel Optimization and Blockwise Model-Update
    Filtering (ICASSP 2016).
    Nr    r   r!   r"   r$   Fr(   r'   r&   r)   Zglobal_model_initZglobal_modelc                 S   s
   d | S )Nz{}_vr7   paramr   r   r   _v  s    zParallelize_BMUF.<locals>._vc                 S   s
   d | S )Nz{}_gr   r   r   r   r   _g  s    zParallelize_BMUF.<locals>._gc                 S   s
   d | S )Nz{}_prevr   r   r   r   r   _v_prev  s    z!Parallelize_BMUF.<locals>._v_prevc                    s      }|j | < d S N)rb   )gpu_idr   )ry   rx   r   rw   r   r   _InitializeModels  s    
z+Parallelize_BMUF.<locals>._InitializeModelsT)r5   device_prefixscopedc                    s     d S r   r   )r   )rw   rz   r   r   _InitializeParamUpdate6  s    z0Parallelize_BMUF.<locals>._InitializeParamUpdatezwarmup-broadcast        value)Zscale)rD   c                 S   s   g | ]}|j d kr|qS )ZMomentumSGDUpdate)rG   )r-   opr   r   r   r1     s    
z$Parallelize_BMUF.<locals>.<listcomp>)Ar	   r4   r5   r   r%   r6   r7   rM   r   rN   r8   r9   r
   r:   r;   r<   r=   r>   r?   rH   rI   rJ   rK   r@   r   rR   rB   rC   rD   rE   rF   rG   NetZ_global_model_init_netZ_global_model_param_updates_netrO   rP   rb   _ForEachDevicerV   rW   rX   r   r[   r]   r^   _warmup_iterations_warmup_broadcastro   rp   rS   CopyConstantFillre   ZScaleZSubZAddAddBlobSyncr   inputdevice_optionrq   rs   rt   ru   )%rw   rx   ry   rz   Zblock_learning_rateZblock_momentumr{   r|   r}   master_devicer   Znesterovr~   Zreset_momentum_sgdZwarmup_iterationsr3   Zadd_blobs_to_syncr   r   r   r"   Zmaster_dev_optr(   num_devicesrF   r   r   r   r   r   r   Zmodel_parameter_names
param_namer   Zmomentum_opsr   Zmomentum_blobr   )ry   rx   r   rw   rz   r   r     s    



   


	     
  
     
        

    r   c                 C   s<   | j D ]0}t|tr(tj|d |d qtj||d qd S )Nr   )	overwrite)rt   rM   tupler
   	CreateNet)modelr   Z	net_itersr   r   r   r     s    

r   c                 C   s"   | j D ]}t| qt|  d S r   )rs   r
   
RunNetOncer   )r   init_netr   r   r   
RunInitNet  s    
r   c                 C   s    t | j| j t | j d S r   )r
   RunNetrD   r   r   r   r   r   r   r   	RunWarmup  s    r   c                 C   sB   | j D ]6}t|tr0t|d  j|d  qt|| qd S )Nr   r'   )rt   rM   r   r
   r   rE   rQ   )r   Znum_iterationsZnet_iterr   r   r   r     s    

r   c                 C   sZ   | j d k	rV| j d dkrVtd| _t| | jd|| _| jd| j | jd| j d S )NengineGLOObarrier_init_netZpre_trainingr   )	rI   r   r   Z_barrier_init_net_CreateBarrierNet_barrier_netrs   insertrt   )r   r   r   r   r   ru     s    	 ru   c                 C   s`   t d | jd dks tdt||d | j|d}t|d }|j|gg | jd d |S )	NzCreating barrier netr   r   zEngine does not support barrierZ_barrier_cw)r|   timeout_secr   inputsoutputsr   )r;   rA   rI   r6   _CreateOrCloneCommonWorldr   r   Barrier)r   r   Zname_prefixr   
comm_worldZbarrier_netr   r   r   r     s    
r   c                 C   s   t jdtdd | jd ks(| jd dkr,d S | jd krltd}t| |d|| _t	| t
| j || _| j|kstd| j|td	 t| j d S )
Na  The Synchronize API has been deprecated.  We now have a barrier net which runs before training to ensure all hosts wait before training starts.  The default timeout for the barrier is 300s and it can be overridden using the barrier_net_timeout_sec parameter when calling Parallelize.   )category
stacklevelr(   r'   Zsync_barrier_init_netsyncz Must use fixed timeout, {} != {}zSynchronize run barrier net.)warningswarnDeprecationWarningrI   rJ   r   r   r   r
   r   r   Z_sync_barrier_net_timeoutr6   r7   r;   rA   r   )r   r   r   r   r   r   Synchronize  s2     

   
 
r   c                 C   s>  t | }|dkrt }t|jr,d}n|jtjkr>d}nd}d	||j
}| jD ]}d|jkrptdt|jD ]\}}|| |j|< qzt|jD ]\}}|| |j|< qt|jD ]\}}	||	 |j|< q|j| qZt| jD ]\}}
||
 | j|< qt| jD ]\}}|| | j|< q|S )a  
    Converts all blobs in the net to have namescope gpu_X, and correct
    device scope. You can use this to enable AppendNet with a
    forward_pass_builder_fun:

       def builder_fun(model):
          ...
          model.net.AppendNet(
             data_parallel_model.ConvertNetForDevice(othermodel.net))
          model.param_init_net.AppendNet(
             data_parallel_model.ConvertNetForDevice(othermodel.param_init_net))
    Nr"   r   r$   {}_{}/RecurrentNetworkz-RecurrentNetwork conversion not yet supported)rO   deepcopyr	   r4   r   IsGPUDeviceTyper5   r   r#   r7   	device_idrE   r   rG   NotImplementedError	enumerater   outputcontrol_inputr   ZCopyFromZexternal_inputZexternal_output)rD   r   Zmnetr   	namescoper   rn   ZinputbZoutputbblobZeinpZeoutpr   r   r   ConvertNetForDevice.  s0    

r   c           	      O   st   | D ]j}t ||}t |J |rTt d|| ||f|| W 5 Q R X n||f|| W 5 Q R X qd S )Nr+   )r   rR   rS   rT   r7   )	r{   fr5   r   r   r   r   r   r   r   r   r   r   X  s    r   c           	   
      sr    fdd}i }| D ]N}t  j|}t |, || D ]}||}t||t|< q:W 5 Q R X q | d S )Nc                    s    j | t| d ddS )NZ_gradr)   r   )r   str)Zlosspr   r   r   create_grade  s    z*_AddGradientOperators.<locals>.create_grad)r   rR   r>   rS   r   ZAddGradientOperators)	r{   r   r   r   Z	loss_gradr   r   lZlgr   r   r   r]   d  s    r]   c           	   
      sz   | j d }d| j|  fdd|D } fdd|D }tj| j |||dd t|| || D d\}}||fS )	zX
    Returns (net, params) that can be exported to be used as a prediction
    net.
    r   r   c                    s   g | ]} t | qS r   r   r-   bprefixr   r   r1   {  s     z'ExtractPredictorNet.<locals>.<listcomp>c                    s   g | ]} t | qS r   r   r   r   r   r   r1   |  s     c                 S   s   i | ]\}}||qS r   r   )r-   ar   r   r   r   
<dictcomp>  s    z'ExtractPredictorNet.<locals>.<dictcomp>)Z	net_protoinput_blobsZoutput_blobsr   renames)rH   r7   r?   r   ExtractPredictorNetrD   rE   zip)	r   r   r   r   r   Zprefix_inputsZprefix_outputsZpredictor_netZexport_blobsr   r   r   r   t  s    

r   c                    s|   t  \}} fdd|D }t } j jD ]@}|jdksH|jdkr0|jd d j	s0|
|jd  q0||S )z
    Returns a set of blobs that are needed for a complete check point.
    They are blobs for the first gpu and iteration blobs.
    c              	      s.   h | ]&}t |d  j jd r|qS )r   r   )r   
startswithr7   r?   rH   r   r   r   r   	<setcomp>  s
   z&GetCheckpointParams.<locals>.<setcomp>IterZ
AtomicIterr   {}_)rj   rf   rD   rE   r   rG   r   r   r7   r?   rm   union)r   Z	all_blobs_Zfirst_gpu_blobsZiteration_blobsr   r   r   r   GetCheckpointParams  s    
	r   c              	      s  t  ds|dkr t \}}ndd |D }td   }|D ], jkrD fdd|D }| j< qDtd _|s j	  d} j
dk	r j
d	 d
krtd}|s|	  t| | j j
|d
d |rt| t j td t j j dS )zr
    This function should be called after loading parameters from a
    checkpoint / initial parameters file.
    _checkpoint_netNc                 S   s   g | ]}t |qS r   stripBlobNamer,   r   r   r   r1     s     z+FinalizeAfterCheckpoint.<locals>.<listcomp>z'Creating checkpoint synchronization netc                    s(   i | ] }|t d  j|tjqS )z	{}_{}{}{})r   rU   r7   r?   r	   _NAMESCOPE_SEPARATORr-   dr   rQ   r   r   r     s   z+FinalizeAfterCheckpoint.<locals>.<dictcomp>Zcheckpoint_sync_netr(   r'   checkpoint_init_netr2   zRun checkpoint net)hasattrrj   r;   rA   Z
GetDevicesrX   r   r   r   ZRunAllOnGPUrI   ro   r
   r   r   r   rE   rQ   )r   blobsZcpu_moder   Zuniq_blob_namesr{   groupedr   r   r   r   FinalizeAfterCheckpoint  sD    




	

r   c                    s    j dk	rb jtjks" jtjkr0 j dgS t jrP fdd jD S t	d
 jn4g } j jD ]}|jdkrr||d qr|S dS )zL
    Returns a list of learning rates blob names used in the optimizer.
    Nlrc                    s   g | ]} j d |dqS )r   r*   )ri   Zget_gpu_blob_name)r-   r"   r   r   r   r1     s   z,GetLearningRateBlobNames.<locals>.<listcomp>zUnsupported device type : {}ZLearningRater   )ri   r>   r   r%   r#   Zget_cpu_blob_namer   r   rH   	Exceptionr7   rD   rE   r   rG   appendr   )r   Zlr_blob_namesr   r   r   r   GetLearningRateBlobNames  s    



r   c           	   
   C   s   | d }|rpt ||rpt|j|}t|< |jtt|j| tt|j| dd W 5 Q R  d S Q R X | dd  D ]z}t ||rtt	j
|}n&t||rttjdnttjd}t|& ||j| | |j| |  W 5 Q R X q|d S )Nr   )rootr'   )
_IsGPUBlobr   rR   r>   rS   ZNCCLBroadcastr8   r   rX   r
   r=   _IsIDEEPBlobr   r#   r%   r   )	r{   r   rD   r   r   Z
master_devmaster_device_optZdev_idxr   r   r   r   
_Broadcast  s,    

r   c                    s  t tj  jtjkr6|r6j  |d d S jtjkrLt	 nd  fdd}t
| dkrtdD ]}||d |d d  qxtdD ]}||d |d d  qtdD ]}||d |d d  q|d	d nt
| dkrLtdD ]}||d |d d  qtdD ]}||d |d d  q"|d	d n>t
| dkrz|d	d |dd
 |d	d n|tt
|   t|  d S )Nr   c               
      s   fdd| D } fdd| D }t j|d }t |x t|D ]R\}}|dkr\qJdk	rJjrJ|d |f sJ|| d|d |||< qJj||d gdd W 5 Q R X dS )	a	  Create a Sum op for 2 or more blobs on different devices.
        Saves the result on the first device.

        Args:
        dev_indices -- a list of device indices, which can be translated into
                       CUDA identifiers with model._devices
        c                    s   g | ]} j | qS r   )rH   r-   idxr   r   r   r1   '  s     z,_AllReduce.<locals>.sumN.<locals>.<listcomp>c                    s   g | ]} | qS r   r   r  )blobs_groupr   r   r1   (  s     r   Nzgpu_{}/{}_gpu{}_copydpm)rQ   )	r   rR   r>   rS   r   sizer   r7   Sum)Zdev_indicesr{   r   r   rn   Zpeerr  r   rD   Zp2p_access_patternr   r   r   sumN  s    

z_AllReduce.<locals>.sumNr   r&   r   r'   r   r      )r8   r   rX   r>   r   ZCUDANCCLAllreducer
   r=   ZGetGpuPeerAccessPatternrB   r9   r   )r{   r   rD   r   r   r   r	  jr   r  r   
_AllReduce  s@      


r  c                 C   s<   |d ks|d dkr$t | ||| nt| |||||| d S Nr(   r'   )_SyncAllParamsSingleHost_SyncAllParamsDistributed)r{   r   r   rD   r|   unique_param_namesr3   r   r   r   ro   Q  s    	ro   c                    s   t |dkrdS |dkrjn|}|D ]:  jrDtd  fddjD j < q&tjj	|j
t| dS )z.
    Sync a blob across devices and hosts
    r   Nz Provide unprefixed blob name: {}c              
      s$   i | ]}|t d j| qS ){}_{}/{})r   rU   r7   r?   r   r   r   r   r   r   r  s    zAddBlobSync.<locals>.<dictcomp>)rB   rD   r   r?   r6   r7   rH   rX   ro   rp   rI   rf   )r   r   rD   r   r  r   r   h  s$    r   c                 C   sZ   | j dkrdS ddd |D }t| jd| | j d}| jj|g| || j d d dS )	z=
    Sync blobs across machines (but not across devices)
    Nr   c                 S   s   g | ]}t |qS r   r   r   r   r   r   r1     s     z*AddDistributedBlobSync.<locals>.<listcomp>Zblob_sync_cw_r|   r   r   )rI   joinr   rp   rD   	Allreduce)r   r   Z
synth_namer   r   r   r   AddDistributedBlobSync  s    
r  c              
      sj  d dkst t|j| d }ttj}ttj}	|jd krTtd|||_|j t	|D ] |j
 | d  }
tt|j
 } fdd}t|r|nt|r|	n|}d dkrt| || W 5 Q R X qbt| |
t|
d	 }W 5 Q R X t| ||g W 5 Q R X t| ||
 W 5 Q R X t| | qbd S )
Nr(   r'   r   	broadcastc                    s0     | \}}j|g|  | d |d d S )Nr   r   r   rQ   r   r   )get_control_and_contextZ	Broadcast)rP   r   r   contextrD   r   r|   r   r   r    s    z,_SyncAllParamsDistributed.<locals>.broadcastr   r   r$   )r6   r   rR   r>   r   r%   r#   rK   CollectivesConcurrencyControlsortedrX   r8   r   r   r   rS   CopyGPUToCPUr   CopyCPUToGPUr   )r{   r   r   rD   r|   r  r3   Zgpu_device_optZcpu_device_optZideep_device_optZmaster_paramZparams_groupr  r   Z	param_cpur   r  r   r    sF    	

 
r  c                 C   s   |D ]}t | ||| qd S r   )r   )r{   r   rD   r  r   r   r   r   r    s    r  c                 C   s<   |d ks|d dkr&t | |||| nt| ||||| d S r  )_AllReduceBlobsSingleHost_AllReduceBlobsDistributed)
blob_namesr{   r   rD   r|   r   r3   r   r   r   re     s     re   c                    s|    j s
td j jd } | _t j fdd jD  _fdd j	D  _	fdd j
D  _
d S )Nr   r   c                    s    i | ]}|kr| j | qS r   r/   r,   r   paramsetr   r   r     s     z._PruneParametersForSharing.<locals>.<dictcomp>c                    s   g | ]}| kr|qS r   r   r-   wr%  r   r   r1     s      z._PruneParametersForSharing.<locals>.<listcomp>c                    s   g | ]}| kr|qS r   r   r&  r(  r   r   r1     s      )r@   r6   r7   r?   rH   rg   rP   rf   r0   weightsZbiases)r   master_prefixr   r$  r   rh     s    

rh   c                    sb   | j s
td| j| jd td t|   fdd}|| j	 || j
 d S )Nr   r   z%Remapping param blobs to master -> {}c              	      s   g }|   jD ]}d}|jD ]2}| kr|krd}td|t|  qPq|rVqt|jD ]*\}}| kr`|kr`t	| |j|< q`|
| q|   jd d = |   j| d S )NFTzDelete b/c {}:  {})rE   r   r   r;   debugr7   r   r   r   r   r   extend)rD   opsr   Z	delete_opZoutpr  inpr   Zmaster_paramsr*  r   r   
modify_ops  s     
z6_RemapParameterBlobsForSharedModel.<locals>.modify_ops)r@   r6   r7   r?   rH   r;   rA   rf   rg   rp   rD   )r   r   r0  r   r/  r   rv     s    
 
rv   c                   @   s    e Zd ZdZdd Zdd ZdS )r  z
    Creates common worlds (up to max_concurrent_context) and manage the
    sequential execution of collectives that shares the same context with
    cyclic control inputs.
    c                 C   s.   || _ || _|| _d| _g | _g | _|| _d S )Nr   )rQ   rp   max_concurrent_contextcountercommon_worldscontrol_inputsr|   )selfrQ   r1  rp   r|   r   r   r   __init__"  s    z&CollectivesConcurrencyControl.__init__c                 C   s   d d g\}}| j | j }t| j| jk r^t| jd| j|| jd}| j	| | j
	| n| j| }| j
| }|| j
|< |  j d7  _ ||fS )Nz{}_{}_cwr  r'   )r2  r1  rB   r3  r   rp   r7   rQ   r|   r   r4  )r5  Zcontrol_output_blobZcommon_worldr   Zcurrent_slotr   r   r   r  1  s    


z5CollectivesConcurrencyControl.get_control_and_contextN)__name__
__module____qualname____doc__r6  r  r   r   r   r   r    s   r  c              
      sV  |j  j}|dkstd|d  t|j|d }|td||j|d }| D ]|j	 |d  }	t
t|j	 }
|	|
kstt|	d } fdd}|d d	kr||
|d
d dkd qRt|: |j|	|dd j|
|
|d |
d }|	| W 5 Q R X ||g t| ||	 W 5 Q R X t|| qRd S )Nr'   z!Please specify more than 1 workerr   r   	allreduceZ_redc              	      sN   t : | d \}}jf |g|  |  |d| W 5 Q R X d S )Nr   r  )r   rS   r  r  )r   r   r   r   Zall_reduce_engine	blob_namer  rD   Zreducing_device_optr   r   r;  g  s    z-_AllReduceBlobsDistributed.<locals>.allreducer   	transportZibverbs)Z
gpu_directr   r   r  )rD   rE   rF   r6   r   rR   r>   r  rp   rX   r8   r   r   getrS   r   r  r   r   )r#  r{   r   rD   r|   r3   rF   r   Znccl_control_blobZmaster_blobr  Zreduced_blobr;  r   r<  r   r"  D  sH    
r"  c                 C   s  t |dkrdS t|j|d }d}t }| D ]}tt|j| }	t |	dkrVq0t |	t |ksvtd	||t
||r
t|t t|	d tjst|||||| |	d }n@d	|j|d }
d}|	D ]}|j|krd}q|s~|jdd	 |	D d
	|
|d	|
|gddd\}}t|j| D ]H\}}t|j|}t|  |||j ||j W 5 Q R X q4|jdd	 |	D d	|
|d	|
|gddd\}}t|j| D ]<\}}t|j|}t| |||j W 5 Q R X qW 5 Q R X q0t||rxt|	d tjr0tdtttj. ||	|	d g |jslt|||| W 5 Q R X q0t|	d tjrtdtttj. ||	|	d g |jst|||| W 5 Q R X q0dS )z<Performs NCCL AllReduce to distribute blobs to all the GPUs.r'   Nr   z+Each GPU from {}, should have a copy of {}.r+   FTc                 S   s   g | ]
}|j qS r   )indicesr-   gr   r   r   r1     s     z-_AllReduceBlobsSingleHost.<locals>.<listcomp>z{}/{}_index_concatz{}/{}_index_splitinfoznote:data_parallel_model)ZaxisrQ   c                 S   s   g | ]
}|j qS r   )valuesrA  r   r   r   r1     s     z{}/{}_val_concatz{}/{}_val_splitinfoz+Synchronizing gradient slices not supported)rB   r   rR   r>   rf   r8   r   rX   r6   r7   r   rS   rM   GradientSlicer  r?   r@  Concatr   r   rm   rC  r   r   r#   r  r@   r   r%   )r#  r{   r   rD   r   r   Zlast_outZconcatenated_idxr=  r  Z	master_nsZskip_idx_concatrB  Zgrad_idx_concatr   r"   r   Zgrad_val_concatr   r   r   r!    s    
      





 
(r!  c                 C   s(   |d krt | || nt| ||| d S r   )"_BroadcastComputedParamsSingleHost#_BroadcastComputedParamsDistributedr{   r   r|   r   r   r   r   rc     s    rc   c                 C   s   t | || td d S )Nz?Distributed broadcast of computed params is not implemented yet)rF  r;   r   rH  r   r   r   rG    s    rG  c                 C   s2   t | dkrdS |jD ]}t| ||j|| qdS )z2
    Average computed params over all devices
    r'   N)rB   r\   r   rD   )r{   r   r   r   r   r   r   rF    s    
rF  c                 C   s   t t| jS )zq
    Returns the gradients in reverse order (namespace stripped),
    for the optimal synchronization order.
    )r8   reversedrL   r   r   r   r   rd   
  s    rd   c                 C   sD   t | tjr$t| jd t| j S t| }||tj	d d  S )N:r'   )
rM   r   rD  r   r@  rC  r   indexr	   r   )r   rQ   r   r   r   r     s    r   c                 C   s   |   jD ]}d|jks
d|jks
d|jkr.q
d|jkrD|jdkrDq
d|jkrZd|jkrZq
|j}|j}t|j	stq
d
| j|}t|jt|j D ]D}|d	
| jr||std

||jd
| j|t|qq
dS )zR
    Look at all the operators and check that they do not cross device scopes
    ZNCCLr   rE  r  r  r  r   r   r   z2Blob {} of op {}, should have namescope {}. Op: {}N)rE   r   rG   rQ   r   r   r   r   r   r5   r7   r?   r8   r   r   r   r   r   )r   r   Zop_devZop_gpur   r.  r   r   r   rk     s0    rk   c                    s8   i  fdd  | j    | j  | _dS )zI
    Assign blob to device option based on the operator outputing it
    c                    s   | j D ]~}|j}|jdkr*t }tj|_t|jt|j	 D ]}|kr>||< q>|j
drdd |jD }|D ]} |j qtqd S )Nr   r   c                 S   s   g | ]}|j d r|qS )Zstep_net)rQ   endswith)r-   r   r   r   r   r1   N  s      z5_InferBlobDevice.<locals>.map_ops.<locals>.<listcomp>)r   r   rG   r   rR   r%   r5   r8   r   r   r   rl   n)protor   r   r   Z	step_argsZstep_argmap_opsmappingr   r   rP  C  s    


z!_InferBlobDevice.<locals>.map_opsN)rp   rE   rD   _blob_to_devicer   r   rO  r   r^   =  s
    r^   c                 C   s^   || j kr| j | jtjkS d| j| jd |}|| j krH| jtjkS | j | jtjkS d S Nr  r   )rR  r5   r   r#   r7   r?   rH   r>   r   r=  r   r   r   r   U  s    
  
r   c                 C   s^   || j krt| j | jS d| j| jd |}|| j krHt| jS t| j | jS d S rS  )rR  r   r   r5   r7   r?   rH   r>   rT  r   r   r   r   `  s    
  
r   c           	      C   s`  t  }|t|d }t|D ]:\}}t|tjsNt|tjsNtd|t	|}d}t|tjrt
| dd dd }d| j|| kstdt|| j|nt
|j dd dd }d| j||j ks
td	t|| j|d| j||j ks<td
t|| j|||krNi ||< ||| |< q|S )z
    Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}.
    Returns ordered dictionary, ensuring the original order.
    Nz.Param {} is not BlobReference or GradientSlicer   r'   /r   r   z+Param {} expected to have namescope '{}_{}'z-Indices {} expected to have namescope '{}_{}'z,Values {} expected to have namescope '{}_{}')r   rB   r   rM   r   rU   rD  r6   r7   r   intZGetNameScopesplitr?   r   r@  rC  )	r   r{   rP   Znon_data_paramsr   Z_ir.   rQ   Zgpuidr   r   r   rW   l  s4    
 "
rW   c                 C   sz   t | }t| t|krvg }t| }t|D ]*\}}|dkr,||d  |kr,|| q,t| t|ksvtd|d S )Nr   r'   zDuplicate entries in params: {})rf   rB   r  r   r   r6   r7   )rP   Z
set_paramsZdupesspr  r.   r   r   r   rV     s    rV   c                    s   t  } jr0dd  dD }dd |D }nng } j jD ]6} fdd|jD }|dd |D  || q@t  j	| }|t  kst
d|t jd fd	d
}ttt ||d}dd |D }||fS )zv
    We sync all blobs that are generated by param init net and
    are 'data parallel', i.e assigned to a device
    c                 S   s   g | ]}t |qS r   r   r,   r   r   r   r1     s     z'_ComputeBlobsToSync.<locals>.<listcomp>r*   c                 S   s   g | ]}t |qS r   r   r,   r   r   r   r1     s     c                    s"   g | ]}| d  jr|qS )r   )r   r7   r?   r-   or   r   r   r1     s   c                 S   s   g | ]}t |qS r   r   rY  r   r   r   r1     s     z2Some params not instantiated in param init net: {}r'   c                    s    t |  | tj }|| fS r   )rV  rK  r	   r   )r   Zdeviceid)	prefixlenr   r   extract_sort_key  s    z-_ComputeBlobsToSync.<locals>.extract_sort_key)keyc                 S   s   g | ]}t |qS r   )r   rU   r   r   r   r   r1     s     )rf   r@   rY   rp   rE   r   r   rZ   r,  r[   r6   r7   rB   r?   r  r8   )r   r   Zblobs_to_syncr   Z
dp_outputsZdiffr\  r   )r   r[  r   rj     s.    

rj   c                 C   sN   t d |D ]:}d| j|}tj| j|| tt| j	|dd| j_
qd S )NzV------- DEPRECATED API, please use data_parallel_model.OptimizeGradientMemory() ----- r   F)share_activations)r;   r<   r7   r?   r   share_grad_blobsrD   rf   r   r0   _net)r   r   r{   r   r   r   r   r   rq     s    
rq   c              	   C   sv   t  }|d k	r8|D ]$}|D ]}|d| j|| qq| jd k	r\|dd t| jD  t	| j
 || j
_d S )Nr  c                 S   s   g | ]}t |qS r   r   r   r   r   r   r1     s     z1_AddDynamicMemoryOptimization.<locals>.<listcomp>)rf   rm   r7   r?   rI   rZ   r   r0   r   Zrelease_blobs_when_usedrD   rE   r`  )r   r   r{   Zblobs_to_keep_all_devicesr   r=  r   r   r   rr     s    
rr   c              
      s   |dk	rXi }t |D ](\}}| jD ]}||d| j||< q"qt| j| jg|\}}	nd}| jD ]V}
d| j|
 t fdd|D }t	j
| j| j|
 tt| j |||d| j_qbdS )a  
    Optimize memory usage of the backward pass by recycling blobs for gradient
    inputs that have been 'used'.
    input_shapes:  dict of blob name to shape for the inputs of the model.
                   Pass empty dictionary if not known.
    excluded_blobs: list of blobs that cannot be recycled. These are blobs
                   that you will access externally.
    recycle_activations: whether to also recycle forward pass activations
    Nr  r   c                 3   s   | ]} | V  qd S r   r   r   r   r   r   	<genexpr>	  s     z)OptimizeGradientMemory.<locals>.<genexpr>)Zdont_share_blobsr^  Zblob_shapes)r   rH   r7   r?   r
   ZInferShapesAndTypesrp   rD   rf   r   r_  rb   r   r0   r`  )r   Zinput_shapesZexcluded_blobsZrecycle_activationsZinput_shapes_all_devicesr   Zshpr   Zshapestypesr   Zexcluded_blobs_by_devicer   ra  r   OptimizeGradientMemory  s6    
  


rd  c                 C   s  |d krt }|d }d }|  jD ]L}|jdkr2q"d}|jD ]}	|	jdkr<|	j} qVq<||kr`q"|jd } qpq"|d krd|}|d k	r| j	|g|||d d}
nvt
 }d	|kr|d	 |d	< d
|kr|d
 |d
< d|kr|d |d< | j|d pg |f||d |d |d |d|}
|
S )Ni  CreateCommonWorld
timeout_msr   z{}_opr   )rQ   r   r>  Z	interfacempi_rendezvous
kv_handlerr(   shard_id)rQ   r  rankr   rg  )_DEFAULT_TIMEOUT_SECrE   r   rG   rl   rQ   rn   r   r7   ZCloneCommonWorlddictre  )rD   Zcommon_world_blobr|   rQ   r   rg  existingr   Zop_timeout_msrl   r   r   r   r   r   r     sZ    





r   c              	   C   s  |d kr| j | }t|j | j}|d ks:|d dkrHW 5 Q R  dS t|d tj}d||d < t	d| t
d}t }d|kr|d |d< |j|d pg d	f| j jd
 |d |d |d d|}|d }|j|g|gdd |d }	|jd|g|	dd |j||	g|	g|d d t| t|	}
|
d }t|d D ]&}|
| |ksRtd|d qRW 5 Q R  dS Q R X d S )Nr(   r'   Trj  Zcompare_arrZallcompare_netrh  ri  Zinitial_syncz.cw_master_selectr   )rQ   r  rk  r   Z	_checksumF)ZaverageZ_gather)r   r   r  r   r   zallcompare failed on shard {}.)rR  r   rS   rI   npzerosZastypeZfloat32r
   ZFeedBlobr   rm  re  rD   rE   rQ   ZSumSqrElementsZMulr  r   Z	FetchBlobr9   r6   r7   )r   r=  r   r|   Ztest_data_arrZcomparison_netr   r   Zblob_name_checksumZblob_name_gatherZ
gather_arrZbaselinern   r   r   r   _RunComparisonW  sb    



  

rq  c           
      C   s
  t | j j}t| j}t|| }|| t|ks>tdg }dd t|D }|D ]}||jj	 
| qXt|D ]h}d}| jD ]X}	|dkr||	 | j}|
||	 |  ||	 | j|kstd|||	 | jqqx| j jdd= | j j| dS )a  
    Data Parallel Model creates a net with ops in one device grouped together.
    This will interleave the ops so that each op for each device is next
    to each other in the net. Kind of like combining decks of cards. This
    ensures that progress is made along the critical path roughly concurrently
    for each device, which is important due to the extra intra-node
    synchronization required for multi-device batch normalization.
    z7Number of ops per device in original net is not uniformc                 S   s   i | ]
}|g qS r   r   r   r   r   r   r     s      z"_InterleaveOps.<locals>.<dictcomp>NzType mismatch {} / {})r8   rD   rE   r   rB   rH   r6   r9   r   r   r   rG   r7   r,  )
r   orig_opsr   Znum_ops_per_devnew_opsr-  r   r  tpr   r   r   r   r_     s*    	

r_   c                 C   s  t | j j}g }t| j}g }g }d}g }g }g }	d }
d}g }g }dd }|D ]F}|jdkr@|jdkr@|r|| |t	
d||
d  |t	
d||
d  || g }g }g }g }d}d }
nb|r4|| ||t|d	 || ||t|d	 || || g }g }g }g }d}|| qR|jdkrd
}|
d krd|jd	 }
|jd	 }	|t	
d|	|	d |	d g ||	d  ||	d  |j|
d  |j|
d  |jtd|g || qR|jdkrRd
}|t	
d|jd	 |jd |jd |jd g|jd |jd g ||jd  ||jd  |jtd|g |j|jd |jd g || qR|rtd| j jd d = | j j| d S )NFc                 S   sF   g }d|  d }| td|| |D ]}| td|| q(|S )a  
        Reduce results from multiple cpus and distributes the results back
        to each device. This is done by copying values to cpu_0 and summing
        them. The cpu_0 result is then copied back to each of the devices.

        param: the name of the data (blobs) to reduce
        input_blobs: the list of blobs to reduce
        destination_blobs: list of blobs to copy the result to
        zcpu_0/Z	_combinedr  r   )r   r   CreateOperator)r   r   destination_blobs	added_opsZresult_blobr   r   r   r   
_cpuReduce  s    
z5_CPUInterDeviceBatchNormalization.<locals>._cpuReduce	SpatialBNSpatialBNGradientr  _sums_combined_sumsq_combinedr   TChannelStats_sums_sumsqnum_batchesChannelBackpropStatsr
  r   r   r'   z@Net modification for cpu inter-device batch normalization failed)r8   rD   rE   r   rB   rH   rG   r,  r   r   ru  r   r   rl   r   MakeArgumentr   r6   )r   rr  rs  r   batch_norm_opsinjected_opsspatial_bn_phase
sums_blobssumsq_blobsrQ   input_blob_namespatial_bn_gradient_phasescale_grad_blobsbias_grad_blobsrx  r   r   r   r   r`     s    











r`   c              
      s  t j j}g }tj}g }g }d}g }g }g }	d }
d}g }g }d}ttj	 d fdd	}|D ]z}|j
dkr`|j
dkr`|r|| ||t|
d || ||t|
d || || g }g }g }g }d}d }
nf|rT|| ||t|d	 ||| ||t|d	 ||| || g }g }g }g }d}|| qj|j
dkr"d
}|
d kr|jd	 }
|jd	 }	tj|jj}|tjd|	|	d |	d g|d ||	d  ||	d  |j|	d  |j|	d  |jtd|g || qj|j
dkrjd
}tj|jj}|tjd|jd	 |jd |jd |jd g|jd |jd g|d ||jd  ||jd  |jtd|g |j|jd |jd g || qj|rtdj jd d = j j| d S )NFZcpu_0c           	   	      s   g }g }g }|dkr* fddt |D }t |D ]X}tj|}|d|  |d| | |tjd|| || |d q2|tjd|d	| d t |D ]6}tj|}|tjd
d	| || |d q|S )a  
        Reduces results from multiple gpus and distributes the results back
        to each device. This is done by copying values to the master device
        and summing them. The master device result is then copied back to
        each of the devices.

        param: the name of the data (blobs) to reduce
        num_devices: the number of devices
        master_device: the device to copy/compute values on
        result_blobs: optional list of result blobs to copy to
        Nc                    s   g | ]}d  | qS )zgpu_{}/{}_combinedr   )r-   rn   r   r   r   r1   <  s    zI_GPUInterDeviceBatchNormalization.<locals>._gpuReduce.<locals>.<listcomp>z	gpu_{}/{}z{}/{}_gpu_{}_copyr  r   r  z{}/{}_combinedr   )r9   r   rR   r>   r   r7   ru  )	r   r   r   Zresult_blobsrw  Zsource_blobsrv  rn   r   Zmaster_device_optionr   r   r   
_gpuReduce,  sP    


z5_GPUInterDeviceBatchNormalization.<locals>._gpuReducery  rz  r~  r  r   Tr}  r  r{  r|  r  r  r
  r   r   r'   z@Net modification for gpu inter-device batch normalization failed)N)r8   rD   rE   r   rB   rH   r   rR   r   r%   rG   r,  r   r   r   r>   r   r   ru  rl   r   r  r   r6   )r   rr  rs  r   r  r  r  r  r  rQ   r  r  r  r  r   r  r   r   r   r  r   ra     s    
.












ra   )F)N)F)NF)F)FN)r   )N)F)F)F)NN)N)Vcollectionsr   Zfuture.utilsr   r   r   loggingrO   multiprocessingr   Zcaffe2.pythonr   r   r	   r
   r   r   r   Zcaffe2.protor   Znumpyro  r   ZInitOpsLibraryr:   	getLoggerr;   setLevelINFOrl  Z _DEFAULT_BARRIER_NET_TIMEOUT_SECr   r   r   r   r   r   r   r   r   r   r   ru   r   r   r   r   r]   r   r   r   r   r   r  ro   r   r  r  r  re   rh   rv   objectr  r"  r!  rc   rG  rF  rd   r   rk   r^   r   r   rW   rV   rj   rq   rr   rd  r   rq  r_   r`   ra   r   r   r   r   <module>   s   $




  r

  (

*

9

G 

= (PX
 


		!#+-  
B
8!i