U
    9%ex0                     @   s  d dl Z d dlmZmZmZmZmZmZmZ d dl	m
Z
 d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm  mZ d dlmZmZmZmZmZm Z  d d	l!m"Z"m#Z# d d
l$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3 ee4eeee5  ee5 f f Z6dgZ7d&e5e4e4dddZ8d'eej9 edddZ:ej;e<dddZ=d(eee5 e4ej;dddZ>eee6eej9 f dd d!Z?G d"d# d#e)Z@ee4ejAed$d%dZBdS ))    N)DictListOptionalSequenceTupleUnioncast)LoadPlan)ShardedTensor)TensorProperties)Shard)ChunkShardingSpec)BytesStorageMetadataMetadataMetadataIndexSTATE_DICT_TYPETensorStorageMetadataChunkStorageMetadata) create_read_items_for_chunk_list_create_read_items)_remote_device)DTensor)DefaultLoadPlanner)_shard_tensor)unflatten_state_dict)_element_wise_add_element_wise_sub_normalize_device_info)_get_device_module!load_sharded_optimizer_state_dictcuda)global_rankdevice_typereturnc                 C   s2   |dkrdS t |}| r.t|| |  S dS )Ncpu)r   Zis_availabler   device_count)r!   r"   device_module r'   e/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/torch/distributed/checkpoint/optimizer.py_gen_rank_device7   s    r)   )pgr#   c                    sl   t j j d kr2fddtt  D }n fddt  D }tdtt	t
ttf  |dS )Nc                    s"   g | ]}d | dt |  qS rank:/)r)   .0idx)pg_device_typer'   r(   
<listcomp>E   s   z(_create_colwise_spec.<locals>.<listcomp>c              
      s*   g | ]"}d | dt t | qS r+   )r)   distZget_global_rankr.   r*   r1   r'   r(   r2   J   s   r   dim
placements)r3   distributed_c10d_get_pg_default_devicetyperangeget_world_sizesizer   r   r   r   r   str)r*   r7   r'   r4   r(   _create_colwise_spec@   s    


r?   )valr#   c                 C   s   t | tkrZt|  dkr dS t |  d jtkr:dS t |  d jtkrtdn0t | tkrt | jtkst | jtkrtddS )Nr   FTz2Cannot handle DTensor nested insided ShardedTensorzCannot handle nested DTensor)r:   r
   lenlocal_shardstensorr   
ValueErrorZ_local_tensor)r@   r'   r'   r(   _is_nested_tensorT   s     rE   )propsr=   r"   r#   c              
   C   s.   t j|| j| j| j| jtt jt|	 dS )N)r=   dtypelayoutrequires_grad
pin_memorydevice)
torchemptyrG   rH   rI   rJ   r   rK   r   Zcurrent_device)rF   r=   r"   r'   r'   r(   _alloc_tensorf   s    rN   )
state_dictr#   c                 C   s   i }d}|   D ]r\}}d| f||< t|rt| dksHtdt|tsZtd| d }|jj	|jj
f||< |jj}q||fS )a5  
    We have to load the right TP slice of the optimizer state.
    This is not easy since the per-tensor slicing can't be inferred from checkpoint metadata.
    We take advantage of the model state_dict producing a sliced ST to figure out what we need to load.
    This is pretty fragile and it might be easier for FSDP to compute this info for us.
    Returns a dictionary where keys are the same of the state_dict and the value is a tuple of
    (offset, size) for the current rank TP slice.
    N.B. The state_dict *MUST* come from FSDP.sharded_state_dict.
    N   z%Cannot handle ST with multiple shardsz$Can only handle nested ShardedTensorr   )itemsr=   rE   rA   rB   AssertionError
isinstancer
   metadatashard_offsetsshard_sizesrC   Z_process_group)rO   specsdp_pgkeyvalueZshardr'   r'   r(   _get_state_dict_2d_layoutq   s.     
r[   c                       sv   e Zd ZU eeef ed< eed< eed< eee	e
 f dd fddZedd	d
Zeejd fddZ  ZS )_ReaderWithOffsettranslationrO   rT   N)fqn_to_offsetr#   c                    s*   t    || _ti | _i | _i | _d S N)super__init__r^   r   rT   rO   r]   )selfr^   	__class__r'   r(   ra      s
    

z_ReaderWithOffset.__init__)r#   c                 C   s"  g }i | _ | j D ]\}}| jj| }t|tsF|t|||7 }q|| jkrb|t|||7 }q| j| }t	|
 dkst|
 d }ttt|jj|t|jjdg}t|tt||}|D ]D}	|	jjd k	stt|	jj|}
tj|	jt|
d}|| j |	j< q||7 }qt|S )NrP   r   )offsetssizes)offset)r]   rO   rQ   rT   state_dict_metadatarS   r
   r   r^   rA   rB   rR   r   rL   Sizer   rU   rV   r   r   r   Z
dest_indexrg   r   dataclassesreplacer	   )rb   requestsZfqnobjmdrg   Zoriginal_shardZlocal_chunksreqsriZoriginal_offsetZoriginal_indexr'   r'   r(   create_local_plan   sR    


     
z#_ReaderWithOffset.create_local_plan)indexr#   c                    s   t  | j||S r_   )r`   lookup_tensorr]   get)rb   rr   rc   r'   r(   rs      s    z_ReaderWithOffset.lookup_tensor)__name__
__module____qualname__r   r   __annotations__r   r   r>   r   intra   r	   rq   rL   Tensorrs   __classcell__r'   r'   rc   r(   r\      s   
 .r\   )model_state_dictoptimizer_keystorage_readerr#   c              	   C   s   |  }t| \}}tj|j}t|}|dkr~g }tt D ],}	t	||	|
  }
|d|	 d|
  qBtd|d}nt|}i }i }|j D ]R\}}|j| }|d |krqt|trd||< q|j dkrt|j|j|||< q|dkrtt|j|j||||< q|d }||d|jfd }|t||j}g }t|}|jD ]>}tt|j ! |kr|q^|t"t|j|j#||d	 q^t$j%|||d
}||kr|| d dk	rtt&t' || d ||< |||< qt(j)|||dk	rt*|ndd t+||j}|S )a  
    Loads a state_dict in conjunction with FSDP sharded optimizer state.
    This is the current recommended way to checkpoint FSDP.
    >>> # xdoctest: +SKIP
    >>> import torch.distributed.checkpoint as dist_cp
    >>> # Save
    >>> model: torch.nn.Model
    >>> optim_params = model.parameters()
    >>> optim = torch.optim.SGD(optim_params, lr=0.01)
    >>> # Save
    >>> with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT):
    >>>     state_dict = {
    >>>         "optimizer": FSDP.optim_state_dict(model, optim),
    >>>         "model": model.state_dict()
    >>>     }
    >>>     dist_cp.save_state_dict(
    >>>         state_dict=optim_state,
    >>>         storage_writer=dist_cp.FileSystemWriter("checkpoint"),
    >>>         planner=dist_cp.DefaultSavePlanner(),
    >>>     )
    >>>
    >>> # Load
    >>> with FSDP.state_dict_type(model_tp, StateDictType.SHARDED_STATE_DICT):
    >>>     model_state_dict = model_tp.state_dict()
    >>>     checkpoint = {
    >>>         "model": model_state_dict
    >>>     }
    >>>     dist_cp.load_state_dict(
    >>>         state_dict=checkpoint,
    >>>         storage_reader=dist_cp.FileSystemReader(checkpoint_file),
    >>>         planner=dist_cp.DefaultLoadPlanner(),
    >>>     )
    >>>     model.load_state_dict(checkpoint["model_state"])
    >>>
    >>>     optim_state = dist_cp.load_sharded_optimizer_state_dict(
    >>>         model_state_dict,
    >>>         optimizer_key="optimizer",
    >>>         storage_reader=dist_cp.FileSystemReader("checkpoint"),
    >>>     )
    >>>
    >>>     flattened_osd = FSDP.optim_state_dict_to_load(
    >>>        model, optim, optim_state["optimizer"]
    >>>     )
    >>>
    >>>     optim.load_state_dict(flattened_osd)
    Nr,   r-   r   r5   z
<bytes_io>rP      )rC   rT   )Zprocess_group)rO   r~   Zplanner),Zread_metadatar[   r3   r8   r9   r:   r   r;   r<   r   r%   appendr   r?   rh   rQ   Zplanner_datarS   r   r=   ZnumelrN   
propertiesr   rt   Zbuild_metadatarL   ri   Zget_rankZshards_metadatar   r   Z	placementZrankr   rV   r
   Z+_init_from_local_shards_and_global_metadatar   ry   dist_cpZload_state_dictr\   r   )r|   r}   r~   rT   Zlayout_specsrX   Zdp_pg_device_typer&   r7   iZdevice_infoZsharding_specrO   r^   rY   rZ   Zkey_pathZspec_keyZ
alloc_sizeZst_mdrB   Zcurrent_rankZshard_mdstr'   r'   r(   r      s    3


 
 

  	   

)r    )N)r    )Crj   typingr   r   r   r   r   r   r   Z$torch.distributed.checkpoint.plannerr	   rL   Ztorch.distributeddistributedr3   Z+torch.distributed._shard.sharded_tensor.apir
   Z0torch.distributed._shard.sharded_tensor.metadatar   Z-torch.distributed._shard.sharded_tensor.shardr   Z:torch.distributed._shard.sharding_spec.chunk_sharding_specr   Ztorch.distributed.checkpoint
checkpointr   Z%torch.distributed.checkpoint.metadatar   r   r   r   r   r   Z,torch.distributed.checkpoint.planner_helpersr   r   Ztorch.distributed.remote_devicer   Ztorch.distributed._tensorr   Z,torch.distributed.checkpoint.default_plannerr   Ztorch.distributed._shard.apir   Z)torch.distributed.checkpoint._nested_dictr   Z"torch.distributed.checkpoint.utilsr   r   r   Ztorch._utilsr   r>   ry   ZSTATE_DICT_2D_LAYOUT__all__r)   ZProcessGroupr?   rz   boolrE   rN   r[   r\   ZStorageReaderr   r'   r'   r'   r(   <module>   sL   $  
 $?