U
    9%e4                     @   s   d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZ d dlZd dlmZ dddgZG dd dZG d	d deZG d
d de	ZG dd dZdS )    N)ABCabstractmethod)TracebackType)AnyList
NamedTupleOptionalTypeJoinHookJoinableJoinc                   @   s.   e Zd ZdZddddZeddddZdS )	r
   a  
    This defines a join hook, which provides two entry points in the join
    context manager: a main hook, which is called repeatedly while there exists
    a non-joined process, and a post-hook, which is called once all processes
    have joined.

    To implement a join hook for the generic join context manager, define a
    class that inherits from :class:`JoinHook` and override ``main_hook()`` and
    ``post_hook()`` as appropriate.
    Nreturnc                 C   s   dS )z
        This hook is called repeatedly while there exists a non-joined process
        to shadow collective communications in one training iteration (i.e. in
        one forward pass, backward pass, and optimizer step).
        N selfr   r   `/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/torch/distributed/algorithms/join.py	main_hook   s    zJoinHook.main_hook)is_last_joinerr   c                 C   s   dS )a\  
        This hook is called after all processes have joined. It is passed an
        additional ``bool`` argument ``is_last_joiner``, which indicates if the
        rank is one of the last to join.

        Arguments:
            is_last_joiner (bool): ``True`` if the rank is one of the last to
                join; ``False`` otherwise.
        Nr   )r   r   r   r   r   	post_hook   s    
zJoinHook.post_hook)__name__
__module____qualname____doc__r   boolr   r   r   r   r   r
      s   
c                       sd   e Zd ZdZe fddZeedddZeee	j
dddZeeedd	d
Z  ZS )r   aZ  
    This defines an abstract base class for joinable classes. A joinable class
    (inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
    which returns a :class:`JoinHook` instance, in addition to
    :meth:`join_device` and :meth:`join_process_group` that return device and
    process group information, respectively.
    c                    s   t    t | _d S N)super__init___JoinConfigconstruct_disabled_join_config_join_configr   	__class__r   r   r   3   s    
zJoinable.__init__r   c                 K   s   dS )a  
        Returns a :class:`JoinHook` instance for the given :class:`Joinable`.

        Arguments:
            kwargs (dict): a :class:`dict` containing any keyword arguments
                to modify the behavior of the join hook at run time; all
                :class:`Joinable` instances sharing the same join context
                manager are forwarded the same value for ``kwargs``.
        Nr   )r   kwargsr   r   r   	join_hook8   s    zJoinable.join_hookc                 C   s   dS )z
        Returns the device from which to perform collective communications
        needed by the join context manager implementation itself.
        Nr   r   r   r   r   join_deviceE   s    zJoinable.join_devicec                 C   s   dS )z
        Returns the process group for the collective communications needed by
        the join context manager itself.
        Nr   r   r   r   r   join_process_groupN   s    zJoinable.join_process_group)r   r   r   r   r   r   r
   r$   propertytorchdevicer%   r   r&   __classcell__r   r   r!   r   r   +   s   c                   @   s6   e Zd ZU dZeed< eed< eed< edd ZdS )r   zr
    This includes all fields needed from a :class:`Joinable` instance for the
    join context manager side.
    enablethrow_on_early_terminationis_first_joinablec                   C   s   t ddddS )z
        Returns a :class:`_JoinConfig` instance indicating that join-related
        logic should be disabled, e.g. if the caller is not in a join context
        manager.
        Fr+   r,   r-   )r   r   r   r   r   r   a   s
    z*_JoinConfig.construct_disabled_join_configN)r   r   r   r   r   __annotations__staticmethodr   r   r   r   r   r   X   s   
r   c                   @   s   e Zd ZdZdee eedddZddd	d
ZddddZ	dd Z
eee  ee ee dddZdd Zdd ZeedddZdS )r   a
  
    This class defines the generic join context manager, which allows custom
    hooks to be called after a process joins. These hooks should shadow the
    collective communications of non-joined processes to prevent hanging and
    erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
    for details about the hook definition.

    .. warning::
        The context manager requires each participating :class:`Joinable` to
        call the method :meth:`notify_join_context()` before its own per-
        iteration collective communications to ensure correctness.

    .. warning::
        The context manager requires that all ``process_group`` attributes in
        the :class:`JoinHook` objects are the same. If there are multiple
        :class:`JoinHook` objects, then the ``device`` of the first is used.
        The process group and device information is used for checking for non-
        joined processes and for notifying processes to throw an exception if
        ``throw_on_early_termination`` is enabled, both of which using an all-
        reduce.

    Arguments:
        joinables (List[Joinable]): a list of the participating
            :class:`Joinable` s; their hooks are iterated over in the given
            order.

        enable (bool): a flag enabling uneven input detection; setting to
            ``False`` disables the context manager's functionality and should
            only be set when the user knows the inputs will not be uneven
            (default: ``True``).

        throw_on_early_termination (bool): a flag controlling whether to throw an
            exception upon detecting uneven inputs (default: ``False``).

    Example::

        >>> import os
        >>> import torch
        >>> import torch.distributed as dist
        >>> import torch.multiprocessing as mp
        >>> # xdoctest: +SKIP
        >>> import torch.nn.parallel.DistributedDataParallel as DDP
        >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
        >>> from torch.distributed.algorithms.join import Join
        >>>
        >>> # On each spawned worker
        >>> def worker(rank):
        >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
        >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
        >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
        >>>     # Rank 1 gets one more input than rank 0
        >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
        >>>     with Join([model, optim]):
        >>>         for input in inputs:
        >>>             loss = model(input).sum()
        >>>             loss.backward()
        >>>             optim.step()
        >>>     # All ranks reach here without hanging/erroring
    TF)	joinablesr+   r,   c                    sP   t |dkrtd|| _ fdd| jD | _|| _|| _|   |   d S )Nr   z7The join context manager requires at least one joinablec                    s   g | ]}|j f  qS r   )r$   ).0joinabler#   r   r   
<listcomp>   s     z!Join.__init__.<locals>.<listcomp>)len
ValueError
_joinables_join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   r1   r+   r,   r#   r   r4   r   r      s    zJoin.__init__Nr   c                 C   s>   t | jdkstd}| jD ]}t| j| j|d|_d}qdS )zX
        Sets the :class:`_JoinConfig` of each participating :class:`Joinable`.
        r   Tr.   FN)r6   r8   AssertionErrorr   r:   r;   r    )r   r-   r3   r   r   r   r<      s    
zJoin._set_joinable_configsc                 C   sb   d}d}| j D ]4}|dkr"|j}n||jkr4td|dkr|j}q|| _t| j| _|| _dS )a  
        Extracts the process group and device information from the joinables.
        If there are multiple joinables, then the context manager uses the
        first specified device.

        Preconditions:
            ``self._joinables`` is not ``None`` and is non-empty.

        Raises:
            ValueError
                If there are multiple conflicting ``process_group`` attributes
                among the ``Joinable`` objects.
        Nz7Using join context manager with multiple process groups)	r8   r&   r7   r%   _process_groupdistZget_rank_rank_device)r   process_groupr)   r3   r   r   r   r=      s    

zJoin._extract_dist_infoc                 C   s   d S r   r   r   r   r   r   	__enter__   s    zJoin.__enter__)typevalue	tracebackc           
   	   C   s   | j r
|rdS d}d}d}d}td |s||krTtd| d| j d	| d
 |  }|dkrjd}q(| jrx|   | jD ]}	|		  q~d}|d7 }q(| jD ]}	|	
| qdS )z
        Repeatedly runs the main hooks until all processes join; then, runs
        the post-hooks.

        Raises:
            RuntimeError
                If ``throw_on_early_termination=True``.
        NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )r:   warningssimplefilterwarnrA   _get_num_nonjoined_procsr;   _notify_procs_to_terminater9   r   r   )
r   rE   rF   rG   Zall_procs_joinedr   iZWARN_THRESHOLDnum_nonjoined_procsr$   r   r   r   __exit__   s.    

	



zJoin.__exit__c                 C   s(   t jd| jd}tj|| jd | S )z|
        Returns the number of non-joined processes by shadowing an all-reduce
        in the non-joined processes.
        rI   r)   group)r(   zerosrB   r@   
all_reducer?   item)r   rP   r   r   r   rM     s    zJoin._get_num_nonjoined_procsc                 C   s6   t jd| jd}tj|| jd td| j ddS )z
        Schedules an all-reduce to notify non-joined processes to terminate
        and raises a ``RuntimeError`` indicating that the current process has
        exhausted its inputs.
        rI   rR   rS   zRank z exhausted all inputs.N)r(   onesrB   r@   rV   r?   RuntimeErrorrA   )r   rX   r   r   r   rN   &  s    zJoin._notify_procs_to_terminate)r3   c                 C   s   t | dstdt|  d| j}|jr0|js4dS | j}| j}tj	d|d}t
j||dd}|jrtjd|d}t
j||d	 | }|rtd
|S )aO  
        Notifies the join context manager that the calling process has not yet
        joined; then, if ``throw_on_early_termination=True``, checks if uneven
        inputs have been detected (i.e. if one process has already joined) and
        throws an exception if so.

        This method should be called from a :class:`Joinable` object before
        its per-iteration collective communications. For example, this should
        be called at the beginning of the forward pass in
        :class:`DistributedDataParallel`.

        Only the first :class:`Joinable` object passed into the context
        manager performs the collective communications in this method, and
        for the others, this method is vacuous.

        Arguments:
            joinable (Joinable): the :class:`Joinable` object calling this
                method.

        Returns:
            An async work handle for the all-reduce meant to notify the context
            manager that the process has not yet joined if ``joinable`` is the
            first one passed into the context manager; ``None`` otherwise.
        r    zCheck that the z/ constructor calls the ``Joinable`` constructorNrI   rR   T)rT   Zasync_oprS   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrr>   rE   r    r-   r+   r%   r&   r(   rX   r@   rV   r,   rU   rW   rY   )r3   Zjoin_configr)   rC   rX   ZworkrU   Zshould_throwr   r   r   notify_join_context0  s&    zJoin.notify_join_context)TF)r   r   r   r   r   r   r   r   r<   r=   rD   r   r	   BaseExceptionr   rQ   rM   rN   r0   r[   r   r   r   r   r   p   s&   >  
5	
)rJ   abcr   r   typesr   typingr   r   r   r   r	   r(   Ztorch.distributeddistributedr@   __all__r
   r   r   r   r   r   r   r   <module>   s   
 -