U
    d7                     @   s:  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z ddl	mZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZmZ ddlmZ ddl m!Z! ddl"m#Z# ddl"m$Z% ddl"m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+ ddl,m-Z- ddl.m/Z/m0Z0 dZ1dd Z2dd Z3ej4j5G dd de6Z7dd Z8e7j9d d!G d"d# d#e7Z:G d$d  d e:Z;G d%d& d&e7Z<e79 G d'd( d(e<Z=e79 G d)d* d*e<Z>e79 G d+d, d,e7Z$d-d. Z?e79 G d/d0 d0e7Z@e7j9d1d!G d2d3 d3e7ZAeAZBd4d5 ZCeCZDd9d7d8ZEeEZFdS ):zmComposing task work-flows.

.. seealso:

    You should import these from :mod:`celery` and not this module.
    N)deque)MutableSequence)deepcopy)partial)reduce)
itemgetter)GeneratorType)fxrangereprcall)cached_property)uuid)barrier)current_app)GroupResultallow_join_result)abstractChainMap)_regen)chunks)is_list	lookahead
maybe_listregenseq_concat_itemseq_concat_seq)getitem_property)remove_repeating_from_tasktruncate)		Signaturechainxmapxstarmapr   groupchord	signaturemaybe_signaturec                 C   s   zt | j}W nb tk
rp   z| j }W n  ttfk
rL   |  Y  Y S X |dkrdt| jd n|  Y S Y nX |dkr| jd S | S dS )z"Unroll group with only one member.   r   N)lentasks	TypeError__length_hint__AttributeErrorlist)r#   size r/   1/tmp/pip-unpacked-wheel-9cz4377o/celery/canvas.pymaybe_unroll_group(   s    $r1   c                 C   s   t | d| S )Nname)getattrtaskr/   r/   r0   task_name_from8   s    r6   c                       s  e Zd ZdZi Zd ZZdhZed]ddZ	ed^ddZ
d_ fd	d
	Zdd Zdd Zd`ddZdaddZdbddZdcddZeZddddZeZdeddZdfddZdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Z d1d2 Z!d3d4 Z"d5d6 Z#d7d8 Z$d9d: Z%d;d< Z&d=d> Z' fd?d@Z(e)dAdB Z*e+dCdD Z,e+dEdF Z-e+dGdH Z.e+dIdJ Z/e0dKdLZ1e0dMdNZ2e0dOdPZ3e0dQdRZ4e0dSdTZ5e0dUdVZ6e0dWdXZ7e0dYdZZ8e0d[d\Z9  Z:S )gr   a.  Task Signature.

    Class that wraps the arguments and execution options
    for a single task invocation.

    Used as the parts in a :class:`group` and other constructs,
    or to pass tasks around as callbacks while being compatible
    with serializers with a strict type subset.

    Signatures can also be created from tasks:

    - Using the ``.signature()`` method that has the same signature
      as ``Task.apply_async``:

        .. code-block:: pycon

            >>> add.signature(args=(1,), kwargs={'kw': 2}, options={})

    - or the ``.s()`` shortcut that works for star arguments:

        .. code-block:: pycon

            >>> add.s(1, kw=2)

    - the ``.s()`` shortcut does not allow you to specify execution options
      but there's a chaning `.set` method that returns the signature:

        .. code-block:: pycon

            >>> add.s(2, 2).set(countdown=10).set(expires=30).delay()

    Note:
        You should use :func:`~celery.signature` to create new signatures.
        The ``Signature`` class is the type returned by that function and
        should be used for ``isinstance`` checks for signatures.

    See Also:
        :ref:`guide-canvas` for the complete guide.

    Arguments:
        task (Union[Type[celery.app.task.Task], str]): Either a task
            class/instance, or the name of a task.
        args (Tuple): Positional arguments to apply.
        kwargs (Dict): Keyword arguments to apply.
        options (Dict): Additional options to :meth:`Task.apply_async`.

    Note:
        If the first argument is a :class:`dict`, the other
        arguments will be ignored and the values in the dict will be used
        instead::

            >>> s = signature('tasks.add', args=(2, 2))
            >>> signature(s)
            {'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}
    Ngroup_idc                    s    fdd}|S )Nc                    s   |  j p| j< | S N)TYPES__name__)subclassclsr2   r/   r0   _inner~   s    z'Signature.register_type.<locals>._innerr/   )r=   r2   r>   r/   r<   r0   register_type|   s    zSignature.register_typec                 C   s:   | d}|r.| j| }|| k	r.|j||dS t||dS )Nsubtask_typeapp)getr9   	from_dictr   )r=   drB   typZ
target_clsr/   r/   r0   rD      s    

zSignature.from_dictFc	                    s|   || _ t|trt | nZz
|j}
W n tk
r@   |}
Y nX || _t j|
t|pXd|p`i t|phi f|	||d d S )Nr/   r5   argskwargsoptionsr@   	immutable)	_app
isinstancedictsuper__init__r2   r,   _typetuple)selfr5   rH   rI   rJ   typer@   rK   rB   exZ	task_name	__class__r/   r0   rP      s     


 
zSignature.__init__c                 O   s    |  ||d\}}}| j||S )z0Call the task directly (in the current process).N)_mergerT   )rS   partial_argspartial_kwargsrH   rI   _r/   r/   r0   __call__   s    zSignature.__call__c                 O   s   |  ||S )z5Shortcut to :meth:`apply_async` using star arguments.apply_async)rS   rY   rZ   r/   r/   r0   delay   s    zSignature.delayc                 K   sP   |r|nd}|r|ni }dd |  D }| |||\}}}| jj||f|S )zCall task locally.

        Same as :meth:`apply_async` but executed the task inline instead
        of sending a task message.
        r/   c                 S   s   i | ]\}}|d k	r||qS r8   r/   .0kvr/   r/   r0   
<dictcomp>   s       z#Signature.apply.<locals>.<dictcomp>)itemsrX   rT   apply)rS   rH   rI   rJ   r/   r/   r0   rf      s
    zSignature.applyc                 K   s   |r|nd}|r|ni }dd |  D }z
| j}W n tk
rJ   Y dS X |sX|sX|rn| |||\}}}n| j| j| j  }}}|||f|S )a  Apply this task asynchronously.

        Arguments:
            args (Tuple): Partial args to be prepended to the existing args.
            kwargs (Dict): Partial kwargs to be merged with existing kwargs.
            options (Dict): Partial options to be merged
                with existing options.

        Returns:
            ~@AsyncResult: promise of future evaluation.

        See also:
            :meth:`~@Task.apply_async` and the :ref:`guide-calling` guide.
        r/   c                 S   s   i | ]\}}|d k	r||qS r8   r/   r`   r/   r/   r0   rd      s       z)Signature.apply_async.<locals>.<dictcomp>N)re   _apply_async
IndexErrorrX   rH   rI   rJ   )rS   rH   rI   
route_namerJ   _applyr/   r/   r0   r^      s    
zSignature.apply_asyncc                    s   |r|nd}|r|ni }|d k	rBt  jf fdd| D }n j} jr`|s` j j|fS |rvt|t j n j|rt  jf|n j|fS )Nr/   c                    s*   i | ]"\}}| j ks | jkr||qS r/   )_IMMUTABLE_OPTIONSrJ   r`   rS   r/   r0   rd      s
   
 
 z$Signature._merge.<locals>.<dictcomp>)rN   rJ   re   rK   rH   rI   rR   )rS   rH   rI   rJ   forcenew_optionsr/   rl   r0   rX      s    

zSignature._mergec                 K   s   |r|nd}|r|ni }|s$|s$|r:|  |||\}}}n| j| j| j  }}}tj| jt||t|| j	| j
d| jd}| j|_|S )a7  Create a copy of this signature.

        Arguments:
            args (Tuple): Partial args to be prepended to the existing args.
            kwargs (Dict): Partial kwargs to be merged with existing kwargs.
            options (Dict): Partial options to be merged with
                existing options.
        r/   rG   rA   )rX   rH   rI   rJ   r   rD   r5   rR   r   r@   rK   rL   rQ   )rS   rH   rI   optsr%   r/   r/   r0   clone   s     	zSignature.clonec           	      C   s   | j }z|d }W n& tk
r8   |p*t  }|d< Y nX |rF||d< |rR||d< d|krf| jj|d< |rzd|krz||d< |r||d< |dk	r||d< | |S )	a6  Finalize the signature by adding a concrete task id.

        The task won't be called and you shouldn't call the signature
        twice after freezing it as that'll result in two task messages
        using the same task id.

        Returns:
            ~@AsyncResult: promise of future evaluation.
        task_idroot_id	parent_idZreply_tor7   r$   Ngroup_index)rJ   KeyErrorr   rB   Z
thread_oidAsyncResult)	rS   _idr7   r$   rr   rs   rt   ro   tidr/   r/   r0   freeze  s$    zSignature.freezec                 C   s6   |   }|dk	r||_|dk	r$||_|dk	r2||_|S )zReplace the args, kwargs or options set for this signature.

        These are only replaced if the argument for the section is
        not :const:`None`.
        N)rp   rH   rI   rJ   )rS   rH   rI   rJ   r%   r/   r/   r0   replace4  s    zSignature.replacec                 K   s"   |dk	r|  | | j| | S )u   Set arbitrary execution options (same as ``.options.update(…)``).

        Returns:
            Signature: This is a chaining method call
                (i.e., it will return ``self``).
        N)set_immutablerJ   update)rS   rK   rJ   r/   r/   r0   setC  s    
zSignature.setc                 C   s
   || _ d S r8   rK   )rS   rK   r/   r/   r0   r{   O  s    zSignature.set_immutablec                 C   s,   | j |g }t|ts(|g }| j |< |S r8   )rJ   
setdefaultrM   r   )rS   keyre   r/   r/   r0   _with_list_optionR  s    
zSignature._with_list_optionc                 C   s    |  |}||kr|| |S r8   )r   appendrS   r   valuere   r/   r/   r0   append_to_list_optionX  s    

zSignature.append_to_list_optionc                 C   s   |  |}|t| d S r8   )r   extendr   r   r/   r/   r0   extend_list_option^  s    
zSignature.extend_list_optionc                 C   s   |  d|S )zAdd callback task to be applied if this task succeeds.

        Returns:
            Signature: the argument passed, for chaining
                or use with :func:`~functools.reduce`.
        linkr   rS   callbackr/   r/   r0   r   b  s    zSignature.linkc                 C   s   |  d|S )zAdd callback task to be applied on error in task execution.

        Returns:
            Signature: the argument passed, for chaining
                or use with :func:`~functools.reduce`.
        
link_errorr   rS   errbackr/   r/   r0   r   k  s    zSignature.link_errorc                 C   s   |  | | S )a2  Version of :meth:`link_error` that supports chaining.

        on_error chains the original signature, not the errback so::

            >>> add.s(2, 2).on_error(errback.s()).delay()

        calls the ``add`` task, not the ``errback`` task, but the
        reverse is true for :meth:`link_error`.
        r   r   r/   r/   r0   on_errort  s    

zSignature.on_errorc                 C   s6   t tjt| ggdd t| jdp*g D S )zhReturn a recursive list of dependencies.

        "unchain" if you will, but with links intact.
        c                 s   s   | ]}|  V  qd S r8   )flatten_links)ra   r   r/   r/   r0   	<genexpr>  s   z*Signature.flatten_links.<locals>.<genexpr>r   )r-   	itertoolsr    from_iterabler   rJ   rC   rl   r/   r/   r0   r     s    zSignature.flatten_linksc                 C   sd   t |tr$tt| f| | jdS t |trFt|}t| || jdS t |tr`t| || jdS t	S NrA   )
rM   _chainr   unchain_tasksrL   r#   r1   rB   r   NotImplementedrS   otherr/   r/   r0   __or__  s    
 

zSignature.__or__c                 C   s
   |  |S r8   )r   r   r/   r/   r0   __ior__  s    zSignature.__ior__c              
   C   s~   | j }|j}| jdpt }|d L}|j||}|jj	|d| j
f d|i||jd ||W  5 Q R  S Q R X d S )Nrq   r5   )
connection)rT   rB   rJ   rC   r   producer_or_acquirebackendZon_task_callcontrolelectionrp   r   rv   )rS   rT   rB   rx   producerpropsr/   r/   r0   r     s    
zSignature.electionc                 O   s(   | j ||i dd\}}}t| d ||S )NT)rm   r5   )rX   r
   )rS   rH   rI   r[   r/   r/   r0   r
     s    zSignature.reprcallc                 C   s   | |t | < t| S r8   )idrN   )rS   memor/   r/   r0   __deepcopy__  s    zSignature.__deepcopy__c                 C   s   |    S r8   )r^   rC   rl   r/   r/   r0   
__invert__  s    zSignature.__invert__c                 C   s   t t| ffS r8   )r%   rN   rl   r/   r/   r0   
__reduce__  s    zSignature.__reduce__c                 C   s   t | S r8   )rN   rl   r/   r/   r0   __json__  s    zSignature.__json__c                 C   s   |   S r8   )r
   rl   r/   r/   r0   __repr__  s    zSignature.__repr__c                 #   s4   t   D ]$\}}t|tr$| n||fV  q
d S r8   )rO   re   rM   bytesdecode)rS   rb   rc   rV   r/   r0   re     s    zSignature.itemsc                 C   s   | j S r8   r4   rl   r/   r/   r0   r2     s    zSignature.namec                 C   s   | j p| jj| d  S Nr5   )rQ   rB   r)   rl   r/   r/   r0   rT     s    zSignature.typec                 C   s
   | j ptS r8   )rL   r   rl   r/   r/   r0   rB     s    zSignature.appc                 C   s,   z
| j jW S  tk
r&   | jj Y S X d S r8   )rT   rv   ru   rB   rl   r/   r/   r0   rv     s    
zSignature.AsyncResultc                 C   s6   z
| j jW S  tk
r0   t| jj| d  Y S X d S r   )rT   r^   ru   _partialrB   Z	send_taskrl   r/   r/   r0   rg     s    
zSignature._apply_asynczoptions.task_idz	Task UUIDzoptions.parent_idzTask parent UUID.zoptions.root_idzTask root UUID.r5   zName of task.rH   zPositional arguments to task.rI   zKeyword arguments to task.rJ   zTask execution options.r@   zType of signaturerK   z+Flag set if no longer accepts new arguments)N)N)NNNNNNFN)NN)NNN)NNNF)NN)NNNNNN)NNN)N);r:   
__module____qualname____doc__r9   rL   rQ   rk   classmethodr?   rD   rP   r\   r_   rf   r^   rX   rp   r   ry   _freezerz   r}   r{   r   r   r   r   r   r   r   r   r   r   r
   r   r   r   r   r   re   propertyr2   r   rT   rB   rv   rg   r   r   rs   rr   r5   rH   rI   rJ   r@   rK   __classcell__r/   r/   rV   r0   r   <   s   8       

!

      
%

		












 r   c                 C   sJ   |rt dd i| S d| kr(t d|i| S |d k	rFt d| d | i| S d S )Nr    r   )rJ   r)   use_linkr/   r/   r0   _prepare_chain_from_options  s    )r   r    )r2   c                       s   e Zd ZeddZedddZ fddZdd	 Zd
d Z	 fddZ
dd Zd ddZd!ddZd"ddZddddddddejdf
ddZd#ddZedd Zdd Z  ZS )$r   kwargs.taskszTasks in chain.Nc                    sX   |d d }|r@t |tr.t| }|d d<  fdd|D }t|fd i|d S )NrI   r)   c                    s   g | ]}t | d qS rA   r&   ra   r5   rA   r/   r0   
<listcomp>1  s     z$_chain.from_dict.<locals>.<listcomp>rB   rJ   )rM   rR   r-   r   )r=   rE   rB   r)   r/   rA   r0   rD   +  s    
z_chain.from_dictc                    s^   t |dkr$t|d r$t|d n|}t jddd|if| |dd | _d| _d | _d S )Nr'   r   zcelery.chainr/   r)   r   r    )	r(   r   r   rO   rP   pop	_use_linkr@   _frozenrS   r)   rJ   rV   r/   r0   rP   4  s    $z_chain.__init__c                 O   s   | j r| ||S d S r8   )r)   r^   )rS   rH   rI   r/   r/   r0   r\   =  s    z_chain.__call__c                 C   s  t |tr6t|}|  }|s"|S tt||| jdS t |tr\tt|  | | jdS t |t r| j	rt | j	d tr| 
 }t|j	d || jd|j	d< |S | j	rt | j	d tr| 
 }|j	d j|B |j	d _|S tt|  || jdS ntS d S )NrA   )rM   r#   r1   r   r   r   rL   r   r   r)   rp   r$   bodyr   )rS   r   r)   sigr/   r/   r0   r   A  sL    
 
    z_chain.__or__c                    s6   t t j||} fdd|jd D |jd< |S )Nc                    s   g | ]}| j d dqS )T)rB   rp   )rL   )ra   r   rS   Zto_signaturer/   r0   r   f  s   z _chain.clone.<locals>.<listcomp>r)   )r&   rO   rp   rI   rS   rH   rI   r%   rV   r   r0   rp   c  s    z_chain.clonec                 C   s<   dd | j D }| jdg D ]}|D ]}|| q&q|S )Nc                 S   s   g | ]}|  qS r/   rp   ra   tr/   r/   r0   r   o  s     z(_chain.unchain_tasks.<locals>.<listcomp>r   )r)   rJ   rC   r   )rS   r)   r   r5   r/   r/   r0   r   l  s
    z_chain.unchain_tasksc              
   K   s|   |r|nd}|r|ng }| j }|jjrPt  | j||f|W  5 Q R  S Q R X | j||fd|i|rrt| jf|n| jS )Nr/   rB   )rB   conftask_always_eagerr   rf   runrN   rJ   )rS   rH   rI   rJ   rB   r/   r/   r0   r^   u  s    "z_chain.apply_asyncc                 K   s   |r|nd}|r|ng }|p | j }| j}|d kr@|jjdkr@d}|r\| js\t|t| j n| j}| j||| j|
|||||||d\}}|r|r|d 	d| |
 }t|||}|jf |}|s|S |d S d S )Nr/   r'   T)rt   r   r   )rB   r   r   task_protocolrK   rR   rH   prepare_stepsr)   r   r   r   r^   )rS   rH   rI   r7   r$   rq   r   r   	publisherr   rr   rs   rB   rt   rJ   r   r)   Zresults_from_prepareZ
first_taskZresult_from_applyr/   r/   r0   r     sD    
         
z
_chain.runc           	      C   s<   | j | j| j| j||d | j|||d|d \}}| _|d S )NF)rp   rt   r   )r   rH   rI   r)   rB   r   )	rS   rw   r7   r$   rr   rs   rt   r[   resultsr/   r/   r0   ry     s             z_chain.freezeTc              	   C   s  |p| j }| j}|d kr(|jjdkr(d}t|}|j}|j}d }d }g g  }}d}|r| }| |  }}t|tj	s|||d}t|t
rt|}|r|r|||}q| }n|rt|t|j |_t|tr||j qRt|t
rF|rF|  |  zt|||j||d}W n& tk
rD   t||||d}Y nX |rb|j|||	|
|d}n|j|d}|d7 }|r|r|| |r|js||_|rt|D ]}|| q|| || || }}t|trR|j  |}|jr|j}q|}qR||fS )	Nr'   Tr   rA   )r   rq   rr   rB   )r   rr   rB   )rr   r7   r$   rt   rr   )rB   r   r   r   r   r   r   rM   r   CallableSignaturer#   r1   rp   rR   rH   r   r)   r$   rq   r,   ry   r   parentr   r   r   r   Zensure_chords_allowed)rS   rH   rI   r)   rr   rs   r   rB   Zlast_task_idr7   Z
chord_bodyrp   rD   rt   r   ZstepsZ	steps_popZsteps_extendZ	prev_taskZprev_resr   ir5   Zis_first_taskZis_last_taskresr   noder/   r/   r0   r     s    





   
    






z_chain.prepare_stepsc           	      K   sz   |r|nd}|r|ni }d ||f }\}}| j D ]D}|||j|oL| fft| jf|}||d  |_}\}}q0|S )Nr/   )NN)r)   rp   rf   rC   rN   rJ   r   )	rS   rH   rI   rJ   lastZfargsZfkwargsr5   r   r/   r/   r0   rf   .  s    
z_chain.applyc                 C   s<   | j }|d kr4z| jd j }W n tk
r2   Y nX |p:tS Nr   )rL   r)   LookupErrorr   rS   rB   r/   r/   r0   rB   8  s    z
_chain.appc                 C   sJ   | j s$dt| j dt| ddS t| j d d ddd	 | j D S )
N<@z#xz: empty>r   r5   z | c                 s   s   | ]}t |V  qd S r8   )reprr   r/   r/   r0   r   G  s     z"_chain.__repr__.<locals>.<genexpr>)r)   rT   r:   r   r   joinrl   r/   r/   r0   r   B  s    z_chain.__repr__)N)NN)NNNNNNNNNNNNN)NNNNNN)NN)r:   r   r   r   r)   r   rD   rP   r\   r   rp   r   r^   r   ry   r   r   rf   r   rB   r   r   r/   r/   rV   r0   r   '  sP   
	"		
                
'      
      
|


	r   c                       s    e Zd ZdZ fddZ  ZS )r    a  Chain tasks together.

    Each tasks follows one another,
    by being applied as a callback of the previous task.

    Note:
        If called with only one argument, then that argument must
        be an iterable of tasks to chain: this allows us
        to use generator expressions.

    Example:
        This is effectively :math:`((2 + 2) + 4)`:

        .. code-block:: pycon

            >>> res = chain(add.s(2, 2), add.s(4))()
            >>> res.get()
            8

        Calling a chain will return the result of the last task in the chain.
        You can get to the other tasks by following the ``result.parent``'s:

        .. code-block:: pycon

            >>> res.parent.get()
            4

        Using a generator expression:

        .. code-block:: pycon

            >>> lazy_chain = chain(add.s(i) for i in range(10))
            >>> res = lazy_chain(3)

    Arguments:
        *tasks (Signature): List of task signatures to chain.
            If only one argument is passed and that argument is
            an iterable, then that'll be used as the list of signatures
            to chain instead.  This means that you can use a generator
            expression.

    Returns:
        ~celery.chain: A lazy signature that can be called to apply the first
            task in the chain.  When that task succeeds the next task in the
            chain is applied, and so on.
    c                    s\   |sH|rHt |dks t|d rHt |dkr4|d n|}ttj|t S t j| f||S )Nr'   r   )r(   r   r   operatoror_r    rO   __new__)r=   r)   rI   rV   r/   r0   r   {  s
    zchain.__new__)r:   r   r   r   r   r   r/   r/   rV   r0   r    J  s   0c                       sB   e Zd ZdZeddZed
ddZ fddZddd	Z	  Z
S )_basemapNr5   itc                 C   s    | |  |d d|i|d S NrI   rB   rJ   )_unpack_argsr=   rE   rB   r/   r/   r0   rD     s    z_basemap.from_dictc                    s,   t  j| jd|t|dfddi| d S )Nr/   r5   r   rK   T)rO   rP   
_task_namer   )rS   r5   r   rJ   rV   r/   r0   rP     s    z_basemap.__init__c                 K   sX   |r|nd}|r|ni }|  | j\}}| jjd|t|dfdt| jdi|S )Nr/   r   ri   r5   )r   rI   rT   r^   r-   r6   rC   )rS   rH   rI   ro   r5   r   r/   r/   r0   r^     s     z_basemap.apply_async)N)NN)r:   r   r   r   r   r   r   rD   rP   r^   r   r/   r/   rV   r0   r     s   
r   c                   @   s   e Zd ZdZdZdd ZdS )r!   zMap operation for tasks.

    Note:
        Tasks executed sequentially in process, this is not a
        parallel operation like :class:`group`.
    z
celery.mapc                 C   s.   |  | j\}}d|j dtt|d dS )N[z(x) for x in d   ]r   rI   r5   r   r   rS   r5   r   r/   r/   r0   r     s    zxmap.__repr__Nr:   r   r   r   r   r   r/   r/   r/   r0   r!     s   r!   c                   @   s   e Zd ZdZdZdd ZdS )r"   z.Map operation for tasks, using star arguments.zcelery.starmapc                 C   s.   |  | j\}}d|j dtt|d dS )Nr   z(*x) for x in r   r   r   r   r/   r/   r0   r     s    zxstarmap.__repr__Nr   r/   r/   r/   r0   r"     s   r"   c                       sb   e Zd ZdZedddZedddZ fdd	Zd
d Z	dddZ
dd ZedddZ  ZS )r   z)Partition of tasks into chunks of size n.r5   r   nNc                 C   s    t | |d d|i|d S r   )r   r   r   r/   r/   r0   rD     s    zchunks.from_dictc                    s,   t  jdd|t||dfddi| d S )Nzcelery.chunksr/   )r5   r   r   rK   T)rO   rP   r   )rS   r5   r   r   rJ   rV   r/   r0   rP     s    
zchunks.__init__c                 K   s   | j f |S r8   r]   )rS   rJ   r/   r/   r0   r\     s    zchunks.__call__c                 K   s@   |r|nd}|r|ni }|   j||fdt| jdi|S )Nr/   ri   r5   )r#   r^   r6   rI   rC   )rS   rH   rI   ro   r/   r/   r0   r^     s     zchunks.apply_asyncc                    s:      j\}}t fddtt||D  jdS )Nc                 3   s   | ]}t | jd V  qdS rA   N)r"   rL   )ra   partrS   r5   r/   r0   r     s   zchunks.group.<locals>.<genexpr>rA   )r   rI   r#   _chunksiterrL   )rS   r   r   r/   r   r0   r#     s    zchunks.groupc                 C   s   | ||||d S r   r/   )r=   r5   r   r   rB   r/   r/   r0   apply_chunks  s    zchunks.apply_chunks)N)NN)N)r:   r   r   r   r   r   r   rD   rP   r\   r^   r#   r   r   r/   r/   rV   r0   r     s   
r   c                    sx   t | trt|  d} t | ttfr,| j} nHt | tjr@| g} n4t | trbt	 fdd| D } n fdd| D } | S )NrA   c                 3   s   | ]}t | d V  qdS r   r%   r   rA   r/   r0   r     s     z_maybe_group.<locals>.<genexpr>c                    s   g | ]}t | d qS r   r   r   rA   r/   r0   r     s     z _maybe_group.<locals>.<listcomp>)
rM   rN   r%   r#   r   r)   r   r   r   r   )r)   rB   r/   rA   r0   _maybe_group  s    

r   c                       s   e Zd ZdZeddZed1ddZ fddZd	d
 Z	dd Z
d2ddZd3ddZd4ddZdd Zdd Zdd ZejejeefddZd5ddZdd  Zd6d!d"Zd7d#d$ZeZd%d& Zd'd( Zd)d* Zd+d, Zd-d. Z e!d/d0 Z"  Z#S )8r#   aq  Creates a group of tasks to be executed in parallel.

    A group is lazy so you must call it to take action and evaluate
    the group.

    Note:
        If only one argument is passed, and that argument is an iterable
        then that'll be used as the list of tasks instead: this
        allows us to use ``group`` with generator expressions.

    Example:
        >>> lazy_group = group([add.s(2, 2), add.s(4, 4)])
        >>> promise = lazy_group()  # <-- evaluate: returns lazy result.
        >>> promise.get()  # <-- will wait for the task to return
        [4, 8]

    Arguments:
        *tasks (List[Signature]): A list of signatures that this group will
            call. If there's only one argument, and that argument is an
            iterable, then that'll define the list of signatures instead.
        **options (Any): Execution options applied to all tasks
            in the group.

    Returns:
        ~celery.group: signature that when called will then call all of the
            tasks in the group (and return a :class:`GroupResult` instance
            that can be used to inspect the state of the group).
    r   zTasks in group.Nc                    sJ   |d d }t | fdd|D  |d d< }t|fd i|d S )NrI   r)   c                 3   s   | ]}t | d V  qdS r   r   r   rA   r/   r0   r     s    z"group.from_dict.<locals>.<genexpr>rB   rJ   )rT   r#   )r=   rE   rB   Z
orig_tasksZrebuilt_tasksr/   rA   r0   rD     s
    zgroup.from_dictc                    sn   t |dkrL|d }t|tr$|j}t|tjr:| g}t|tsLt|}t	 j
ddd|if| d| _d S )Nr'   r   zcelery.groupr/   r)   r#   )r(   rM   r#   r)   r   r   rp   r   r   rO   rP   r@   r   rV   r/   r0   rP     s    


zgroup.__init__c                 O   s   | j |f|S r8   r]   )rS   rY   rJ   r/   r/   r0   r\   *  s    zgroup.__call__c                 C   s   t | || jdS )N)r   rB   )r$   rL   r   r/   r/   r0   r   -  s    zgroup.__or__      ?c                 C   s0   t |||dd}| jD ]}|jt|d q| S )NT)Z
repeatlast)	countdown)r	   r)   r}   next)rS   startstopstepr   r5   r/   r/   r0   skew1  s    
z
group.skewTc                 K   s   |r|nd}|d k	rt d|d k	r,t d| j}|jjrJ| j||f|S | jsX|  S | |\}}	}
| | jg |	|
|}t	 }t
| j||||f||d|}| jj|	||d}|  t|dkrt|d tr|d }|j}|r|r|| |S )Nr/   z%Cannot add link to group: use a chordz5Cannot add link to group: do that on individual tasksrH   rI   )Zready_barrierr'   r   )r*   rB   r   r   rf   r)   ry   _freeze_gid	_preparedr   r-   _apply_tasksr   finalizer(   rM   Zcurrent_worker_taskZ	add_trail)rS   rH   rI   add_to_parentr   r   r   rJ   rB   r7   rr   r)   pr   resultZparent_taskr/   r/   r0   r^   7  s:     
zgroup.apply_asyncc                    sn    r nd rni | j }| js,|  S | \}}| | jg |||}|| fdd|D S )Nr/   c                    s(   g | ] \}}}|j f  d qS )r   )rf   )ra   r   r[   rH   rI   rJ   r/   r0   r   c  s    zgroup.apply.<locals>.<listcomp>)rB   r)   ry   r   r   r   )rS   rH   rI   rJ   rB   r7   rr   r)   r/   r  r0   rf   [  s    zgroup.applyc                 C   s   | j D ]}|| qd S r8   r)   r{   rS   rK   r5   r/   r/   r0   r{   g  s    
zgroup.set_immutablec                 C   s    |  jdd}| jd |S )NTr~   r   )rp   r}   r)   r   rS   r   r/   r/   r0   r   k  s    z
group.linkc                    s   t  fdd| jD S )Nc                 3   s   | ]}|  V  qd S r8   r   )ra   Z
child_taskr   r/   r0   r   ~  s     z#group.link_error.<locals>.<genexpr>)rR   r)   r	  r/   r
  r0   r   v  s    zgroup.link_errorc
                 c   s   |D ]}
||
|r|
  }
n||
|d}
||
trR|
|
j||||}|E d H  q|rp|
jsp|	||	|
j |
_|
|
j||d|fV  qd S )NrA   )r7   rr   )rp   r#   r   r)   rK   rH   ry   )rS   r)   rY   r7   rr   rB   r   rD   rM   rR   r5   Zunrollr/   r/   r0   r     s     


    
zgroup._preparedc	              
   k   s   |p| j }||}d}
tt|D ]\}\}}|\}}}|d k	rH|n
|jd}|
t|7 }
|d k	r|d kr|j	||
 |j
f |d|||d|	 |r|js|js| jd7  _|j|dd |V  q&W 5 Q R X d S )Nr   r$   F)r   r  r$   rH   rI   r'   T)Zweak)rB   r   	enumerater   rJ   rC   _chord_descendr   Zset_chord_sizer^   Z	cancelledreadyr.   Zthen)rS   r)   r   rB   r  r  r$   rH   rI   rJ   Z
chord_sizeZ
task_indexZcurrent_taskZ	next_taskr   r   r7   Z	chord_objr/   r/   r0   r    s,    


  	zgroup._apply_tasksc                 C   s4   t | jf|}|dt  |d< }|||dfS )Nrq   r7   rr   )rN   rJ   r   r   rC   )rS   rJ   r7   r/   r/   r0   r     s    
zgroup._freeze_gidc                 C   s  | j }z|d }W n& tk
r8   |p*t  }|d< Y nX |rF||d< |rR||d< |d k	rb||d< |d|}|d|}t| jtrt| 	| j\}	}
t
| |	||||}t
dd t|
|D | _n>g }t| |||||}t| jtr|| jd d < n|| _||fS )	Nrq   r7   r$   rt   rr   rs   c                 s   s   | ]}|d  V  qdS )r   Nr/   )ra   xr/   r/   r0   r     s     z,group._freeze_group_tasks.<locals>.<genexpr>)rJ   ru   r   r   rM   r)   r   r   tee_unroll_tasksr   _freeze_taskszipr-   _freeze_unrollr   )rS   rw   r7   r$   rr   rs   rt   ro   gidZtasks1Ztasks2r   	new_tasksr/   r/   r0   _freeze_group_tasks  s:        zgroup._freeze_group_tasksc              	   C   s   | j j| j||||||d S )N)rw   r7   r$   rr   rs   rt   )rB   r   r  )rS   rw   r7   r$   rr   rs   rt   r/   r/   r0   ry     s    
    zgroup.freezec                 #   s&    fddt |D E d H  d S )Nc                 3   s&   | ]\}}|j  |d V  qdS )r7   r$   rr   rs   rt   N)ry   )ra   rt   r5   r$   r7   rs   rr   r/   r0   r     s   z&group._freeze_tasks.<locals>.<genexpr>)r  )rS   r)   r7   r$   rr   rs   r/   r  r0   r    s    zgroup._freeze_tasksc                 #   s    fdd|D E d H  d S )Nc                 3   s    | ]}t | jd  V  qdS r   )r&   rL   rp   r   rl   r/   r0   r     s     z&group._unroll_tasks.<locals>.<genexpr>r/   rS   r)   r/   rl   r0   r    s    zgroup._unroll_tasksc           	      c   sn   t | j}d}|rjt| | jd }t|tr@||j q|	| |j
|||||dV  |d7 }qd S )Nr   rA   r  r'   )r   r)   r&   popleftrL   rp   rM   r#   
extendleftr   ry   )	rS   r  r7   r$   rr   rs   stackrt   r5   r/   r/   r0   r    s    


 zgroup._freeze_unrollc                 C   s(   | j r$t| j d d d| j dS dS )Nr   r5   zgroup()zgroup(<empty>))r)   r   rl   r/   r/   r0   r     s    zgroup.__repr__c                 C   s
   t | jS r8   )r(   r)   rl   r/   r/   r0   __len__   s    zgroup.__len__c                 C   sD   | j }|d kr4z| jd j}W n tk
r2   Y nX |d k	r@|S tS r   )rL   r)   rB   r   r   r   r/   r/   r0   rB   #  s    z	group.app)N)r   Nr   )NNTNNN)NN)NNNNNNN)NNNNNN)NNNNNN)$r:   r   r   r   r   r)   r   rD   rP   r\   r   r   r^   rf   r{   r   r   r   r   r   rM   rR   r   r  r   r  ry   r   r  r  r  r   r  r   rB   r   r/   r/   rV   r0   r#     s^   

      
$
 
       
*      
)      
r#   r$   c                       s   e Zd ZdZed/ddZed0ddZd1 fdd		Zd2d
dZ	 fddZ
d3ddZd4ddZd5ddZedd Zdd Zd6ddZ fddZdd  Zd!d" Zd#d$ Zd%d& Zed'd( Zd7d)d*Zed+d,Zed-d.Z  ZS )8r  aL  Barrier synchronization primitive.

    A chord consists of a header and a body.

    The header is a group of tasks that must complete before the callback is
    called.  A chord is essentially a callback for a group of tasks.

    The body is applied with the return values of all the header
    tasks as a list.

    Example:

        The chord:

        .. code-block:: pycon

            >>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

        is effectively :math:`\Sigma ((2 + 2) + (4 + 4))`:

        .. code-block:: pycon

            >>> res.get()
            12
    Nc                 C   s2   |  }| jf |d \}|d< | |d|i|S )NrI   rB   )copyr   )r=   rE   rB   rJ   rH   r/   r/   r0   rD   J  s    z_chord.from_dictc                 K   s   | |f|fS r8   r/   )headerr   rI   r/   r/   r0   r   P  s    z_chord._unpack_argscelery.chordc              	      sZ   |r|nd}|r|ndi i}t  j|||t||t||ddfd|i| d| _d S )Nr/   rI   rA   )r!  r   rB   r$   )rO   rP   r   r&   r@   )rS   r!  r   r5   rH   rI   rB   rJ   rV   r/   r0   rP   V  s    


z_chord.__init__c                 K   s   | j d|rd|ini f|S )Nr/   r   r]   )rS   r   rJ   r/   r/   r0   r\   `  s    z_chord.__call__c                    s@   t |ttfs0t |tr0|  }|j|B |_|S t |S d S r8   )rM   r#   r   r   rp   r   rO   r   )rS   r   r   rV   r/   r0   r   c  s    z_chord.__or__c                 C   s   t | jtst| j| jd| _| jj||| jd}| jj|||||d}|}	t }
|	r|	j|
krhtd|
	|	j |	j
d kr||	_
q|	j
}	qR| jj| _|S )NrA   )rs   rr   r$   )rr   r$   r7   rt   zRecursive result parents)rM   r)   r#   rB   ry   r   r}   r   RuntimeErroraddr   )rS   rw   r7   r$   rr   rs   rt   header_resultZbody_resultr   seenr/   r/   r0   ry   m  s4         


z_chord.freezec	              
   K   s$  |r|nd}|r|ni }|r4| j s4t|t| j n| j}|dd pN| jd }
t| jd f|}|
jf |	}
| |
}t| j	t
r| j	 nt
| j	|d}|jjrt ( | j||f|
|d|	W  5 Q R  S Q R X |	rt| jf|	n| j}|dd }|d kr
|}| j||
|fd|i|S )Nr/   r   rI   rA   )r   rq   rq   )rK   rR   rH   r   rI   rN   rp   _get_apprM   r)   r#   r   r   r   rf   rJ   r   )rS   rH   rI   rq   r   r   r   ZrouterZ
result_clsrJ   r   rB   r)   Zmerged_optionsZoption_task_idr/   r/   r0   r^     s8    

 
z_chord.apply_asyncTc                 K   sn   |r|nd}|r|ni }|d kr&| j n|}t| jtr@| j nt| j| jd}|j|||j|dfdS )Nr/   rA   )	propagate)rH   )r   rM   r)   r#   rp   rB   rf   rC   )rS   rH   rI   r(  r   rJ   r)   r/   r/   r0   rf     s    z_chord.applyc                    s   t |tst |trt|}t |trNt|jd|j}t fdd|D S t |tr|jdd d D ]} 	|}|dkrh|  S qhdS t |t
r 	|jS t |trdS t|S )Nr)   c                 3   s   | ]}  |V  qd S r8   r  r   r=   r/   r0   r     s     z"_chord._descend.<locals>.<genexpr>r   r   r'   )rM   r   rN   rD   r#   r3   r)   sumr   r  r$   r   r(   )r=   Zsig_objZsubtasksZ	child_sigZ
child_sizer/   r*  r0   r    s     






z_chord._descendc                    s&   t  jd j}t fdd|D S )Nr)   c                 3   s   | ]}  |V  qd S r8   r)  r   rl   r/   r0   r     s     z)_chord.__length_hint__.<locals>.<genexpr>)r3   r)   r+  r  r/   rl   r0   r+     s    z_chord.__length_hint__r'   Fc
                 K   s   |p|  |}|jdpt }|jd}|
r>t| jf|
n| j}
|
r`|
dd  |j|
 |j|	|d}|
dd  |
dd  |
dd  |j|||d}|j	r|j
j|||||d ||d|i|
}n|g  | jj| }||_|S )Nrq   rr   r   r    r$   )r7   r$   rr   )intervalr   max_retries)r'  rJ   rC   r   rN   r   r|   ry   r  r)   r   Zapply_chordr_   rB   r   r   )rS   r!  r   rY   rB   r,  r   r-  eagerrq   rJ   r7   rr   ZbodyresZheader_result_argsr%  r/   r/   r0   r     s2    
z
_chord.runc              	      sH   t  j||}zt|jd dd|jd< W n ttfk
rB   Y nX |S )Nr   Tr   )rO   rp   r&   rI   r,   ru   r   rV   r/   r0   rp     s     z_chord.clonec                 C   s   | j | |S r8   )r   r   r   r/   r/   r0   r     s    z_chord.linkc                 C   s   | j | |S r8   )r   r   r   r/   r/   r0   r     s    z_chord.link_errorc                 C   s   | j D ]}|| qd S r8   r  r  r/   r/   r0   r{   	  s    
z_chord.set_immutablec              	   C   s   | j rtt| j trVt| j jd d d| j jd | jt| j jdd  | jdS dt| j d | j | j S d| jdS )	Nr   r5   z%({} | {!r})r'   rA   %z<chord without body: >)	r   rM   r   r   r)   formatr
   r    rL   rl   r/   r/   r0   r     s     z_chord.__repr__c                 C   s   |  | jS r8   )r'  r   rl   r/   r/   r0   rB     s    z
_chord.appc                 C   sj   | j }|d krZz| jj}W n tk
r4   | j}Y nX |rD|d j }|d krZ|d k	rZ|j }|d k	rf|S tS r   )rL   r)   r,   r   )rS   r   rB   r)   r/   r/   r0   r'     s    
z_chord._get_appzkwargs.headerzTasks in chord header.zkwargs.bodyzBody task of chord.)N)NN)Nr"  NNN)N)NNNNNN)NNNNNNNN)NNTN)NNr'   NFN)N)r:   r   r   r   r   rD   staticmethodr   rP   r\   r   ry   r^   rf   r  r+   r   rp   r   r   r{   r   r   rB   r'  r   r)   r   r   r/   r/   rV   r0   r  .  s^         


      
         
    

       
(



r  c                 O   sF   | d}t| tr6t| tjr(|  S tj| |dS t| f||S )zCreate new signature.

    - if the first argument is a signature already then it's cloned.
    - if the first argument is a dict, then a Signature version is returned.

    Returns:
        Signature: The resulting signature.
    rB   rA   )rC   rM   rN   r   r   rp   r   rD   )ZvariesrH   rI   rB   r/   r/   r0   r%   6  s    	

r%   Fc                 C   sF   | dk	rBt | tjr"|r4|  } nt | tr4t| } |dk	rB|| _| S )a  Ensure obj is a signature, or None.

    Arguments:
        d (Optional[Union[abstract.CallableSignature, Mapping]]):
            Signature or dict-serialized signature.
        app (celery.Celery):
            App to bind signature to.
        clone (bool):
            If d' is already a signature, the signature
           will be cloned when this flag is enabled.

    Returns:
        Optional[abstract.CallableSignature]
    N)rM   r   r   rp   rN   r%   rL   )rE   rB   rp   r/   r/   r0   r&   J  s    

r&   )NF)Gr   r   r   collectionsr   collections.abcr   r   r   	functoolsr   r   r   r   typesr   Zkombu.utils.functionalr	   r
   Zkombu.utils.objectsr   Zkombu.utils.uuidr   Zviner   Zcelery._stater   Zcelery.resultr   r   Zcelery.utilsr   Zcelery.utils.collectionsr   Zcelery.utils.functionalr   r   r   r   r   r   r   r   r   Zcelery.utils.objectsr   Zcelery.utils.textr   r   __all__r1   r6   r   registerrN   r   r   r?   r   r    r   r!   r"   r   r#   r  r$   r%   Zsubtaskr&   Zmaybe_subtaskr/   r/   r/   r0   <module>   sr       49
  $<
&  ?
  
