U
    d                     @   sv  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZmZmZ ddlmZmZ ddlmZ zddl Z W n e!k
r   dZ Y nX dZ"dZ#dd Z$edd Z%edd Z&G dd dZ'ej(G dd de'Z)ej(G dd de'Z*ej(G dd de*Z+ej(G dd  d e)Z,d#d!d"Z-dS )$z3Task results/state and results for groups of tasks.    N)deque)contextmanager)proxy)cached_property)Thenablebarrierpromise   )current_appstates)_set_task_join_will_blocktask_join_will_block)app_or_default)ImproperlyConfiguredIncompleteStreamTimeoutError)DependencyGraphGraphFormatter)parse_iso8601)
ResultBaseAsyncResult	ResultSetGroupResultEagerResultresult_from_tuplez|Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks
c                   C   s   t  rttd S N)r   RuntimeErrorE_WOULDBLOCK r   r   1/tmp/pip-unpacked-wheel-9cz4377o/celery/result.pyassert_will_not_block$   s    r    c                  c   s(   t  } td z
d V  W 5 t|  X d S NFr   r   Zreset_valuer   r   r   allow_join_result)   s
    
r$   c                  c   s(   t  } td z
d V  W 5 t|  X d S NTr"   r#   r   r   r   denied_join_result3   s
    
r&   c                   @   s   e Zd ZdZdZdS )r   zBase class for results.N)__name__
__module____qualname____doc__parentr   r   r   r   r   =   s   r   c                   @   s2  e Zd ZdZdZeZdZdZdhddZe	dd Z
e
jdd Z
did	d
Zdd Zdd Zdd Zdd ZdjddZdddddddddejejfddZeZdd Zdd ZdkddZdd  Zdld!d"Zd#d$ Zd%d& Zd'd( Zd)d* Zdmd+d,Z e Z!d-d. Z"dnd/d0Z#d1d2 Z$d3d4 Z%d5d6 Z&d7d8 Z'd9d: Z(d;d< Z)d=d> Z*d?d@ Z+dAdB Z,e-dCdD Z.e	dEdF Z/e	dGdH Z0dIdJ Z1dKdL Z2dMdN Z3dOdP Z4e	dQdR Z5e5Z6e	dSdT Z7e	dUdV Z8e8Z9e	dWdX Z:e:jdYdX Z:e	dZd[ Z;e	d\d] Z<e	d^d_ Z=e	d`da Z>e	dbdc Z?e	ddde Z@e	dfdg ZAdS )or   zxQuery task state.

    Arguments:
        id (str): See :attr:`id`.
        backend (Backend): See :attr:`backend`.
    Nc                 C   sd   |d krt dt| t|p$| j| _|| _|p:| jj| _|| _t| jdd| _	d | _
d| _d S )Nz#AsyncResult requires valid id, not TweakF)
ValueErrortyper   appidbackendr+   r   _on_fulfilledon_ready_cache_ignored)selfr1   r2   Z	task_namer0   r+   r   r   r   __init__X   s    zAsyncResult.__init__c                 C   s   t | dr| jS dS )z+If True, task result retrieval is disabled.r6   F)hasattrr6   r7   r   r   r   ignoredf   s    
zAsyncResult.ignoredc                 C   s
   || _ dS )z%Enable/disable task result retrieval.N)r6   )r7   valuer   r   r   r;   m   s    Fc                 C   s   | j j| |d | j||S )Nr,   )r2   add_pending_resultr4   thenr7   callbackZon_errorr-   r   r   r   r>   r   s    zAsyncResult.thenc                 C   s   | j |  |S r   r2   remove_pending_resultr7   resultr   r   r   r3   v   s    zAsyncResult._on_fulfilledc                 C   s   | j }| j|o| fd fS r   )r+   r1   as_tuple)r7   r+   r   r   r   rE   z   s    zAsyncResult.as_tuplec                 C   s0   g }| j }|| j |dk	r,||  |S )zReturn as a list of task IDs.N)r+   appendr1   extendas_list)r7   resultsr+   r   r   r   rH   ~   s    zAsyncResult.as_listc                 C   s(   d| _ | jr| j  | j| j dS )z/Forget the result of this task and its parents.N)r5   r+   forgetr2   r1   r:   r   r   r   rJ      s    
zAsyncResult.forgetc                 C   s    | j jj| j|||||d dS )a  Send revoke signal to all workers.

        Any worker receiving the task, or having reserved the
        task, *must* ignore it.

        Arguments:
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from workers.
                The ``timeout`` argument specifies the seconds to wait.
                Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                ``wait`` is enabled.
        )
connection	terminatesignalreplytimeoutN)r0   controlrevoker1   r7   rK   rL   rM   waitrO   r   r   r   rQ      s      zAsyncResult.revokeT      ?c              
   C   s   | j r
dS |	rt  t }|r>|r>| jr>t| jdd}|   |rL|| | jrh|rb| j|d | jS | j	
|  | j	j| |||||||dS )a  Wait until task is ready, and return its result.

        Warning:
           Waiting for tasks within a task may lead to deadlocks.
           Please read :ref:`task-synchronous-subtasks`.

        Warning:
           Backends use resources to store and transmit results. To ensure
           that resources are released, you must eventually call
           :meth:`~@AsyncResult.get` or :meth:`~@AsyncResult.forget` on
           EVERY :class:`~@AsyncResult` instance returned after calling
           a task.

        Arguments:
            timeout (float): How long to wait, in seconds, before the
                operation times out.
            propagate (bool): Re-raise exception if the task failed.
            interval (float): Time to wait (in seconds) before retrying to
                retrieve the result.  Note that this does not have any effect
                when using the RPC/redis result store backends, as they don't
                use polling.
            no_ack (bool): Enable amqp no ack (automatically acknowledge
                message).  If this is :const:`False` then the message will
                **not be acked**.
            follow_parents (bool): Re-raise any exception raised by
                parent tasks.
            disable_sync_subtasks (bool): Disable tasks to wait for sub tasks
                this is the default configuration. CAUTION do not enable this
                unless you must.

        Raises:
            celery.exceptions.TimeoutError: if `timeout` isn't
                :const:`None` and the result does not arrive within
                `timeout` seconds.
            Exception: If the remote call raised an exception then that
                exception will be re-raised in the caller process.
        NTr,   )r@   )rO   intervalon_intervalno_ack	propagater@   
on_message)r;   r    r   r+   _maybe_reraise_parent_errorr>   r5   maybe_throwrD   r2   r=   Zwait_for_pending)r7   rO   rX   rU   rW   Zfollow_parentsr@   rY   rV   disable_sync_subtasksEXCEPTION_STATESPROPAGATE_STATESZ_on_intervalr   r   r   get   s2    *
 zAsyncResult.getc                 C   s"   t t|  D ]}|  qd S r   )reversedlist_parentsr[   r7   noder   r   r   rZ      s    z'AsyncResult._maybe_reraise_parent_errorc                 c   s   | j }|r|V  |j }qd S r   r+   rc   r   r   r   rb      s    zAsyncResult._parentsc                 k   s,   | j |dD ]\}}||jf |fV  qdS )a  Collect results as they return.

        Iterator, like :meth:`get` will wait for the task to complete,
        but will also follow :class:`AsyncResult` and :class:`ResultSet`
        returned by the task, yielding ``(result, value)`` tuples for each
        result in the tree.

        An example would be having the following tasks:

        .. code-block:: python

            from celery import group
            from proj.celery import app

            @app.task(trail=True)
            def A(how_many):
                return group(B.s(i) for i in range(how_many))()

            @app.task(trail=True)
            def B(i):
                return pow2.delay(i)

            @app.task(trail=True)
            def pow2(i):
                return i ** 2

        .. code-block:: pycon

            >>> from celery.result import ResultBase
            >>> from proj.tasks import A

            >>> result = A.delay(10)
            >>> [v for v in result.collect()
            ...  if not isinstance(v, (ResultBase, tuple))]
            [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

        Note:
            The ``Task.trail`` option must be enabled
            so that the list of children is stored in ``result.children``.
            This is the default but enabled explicitly for illustration.

        Yields:
            Tuple[AsyncResult, Any]: tuples containing the result instance
            of the child task, and the return value of that task.
        intermediateNiterdepsr_   )r7   rg   kwargs_Rr   r   r   collect   s    .zAsyncResult.collectc                 C   s"   d }|   D ]\}}| }q|S r   rh   )r7   r<   rk   rl   r   r   r   get_leaf&  s    
zAsyncResult.get_leafc                 #   s`   t d | fg}|r\| \} | fV    rP| fdd jpFg D  q|st qd S )Nc                 3   s   | ]} |fV  qd S r   r   .0childrd   r   r   	<genexpr>3  s     z'AsyncResult.iterdeps.<locals>.<genexpr>)r   popleftreadyrG   childrenr   )r7   rg   stackr+   r   rr   r   ri   ,  s    
 zAsyncResult.iterdepsc                 C   s   | j | jjkS )zReturn :const:`True` if the task has executed.

        If the task is still running, pending, or is waiting
        for retry then :const:`False` is returned.
        )stater2   READY_STATESr:   r   r   r   ru   8  s    zAsyncResult.readyc                 C   s   | j tjkS )z7Return :const:`True` if the task executed successfully.)rx   r   SUCCESSr:   r   r   r   
successful@  s    zAsyncResult.successfulc                 C   s   | j tjkS )z(Return :const:`True` if the task failed.)rx   r   FAILUREr:   r   r   r   failedD  s    zAsyncResult.failedc                 O   s   | j j|| d S r   )r4   throwr7   argsrj   r   r   r   r~   H  s    zAsyncResult.throwc                 C   sn   | j d kr|  n| j }|d |d |d  }}}|tjkrV|rV| || | |d k	rj|| j| |S )NstatusrD   	traceback)r5   _get_task_metar_   r   r^   r~   _to_remote_tracebackr1   )r7   rX   r@   cacherx   r<   tbr   r   r   r[   K  s      
zAsyncResult.maybe_throwc                 C   s*   |r&t d k	r&| jjjr&t j| S d S r   )tblibr0   confZtask_remote_tracebacks	TracebackZfrom_stringZas_traceback)r7   r   r   r   r   r   V  s    z AsyncResult._to_remote_tracebackc                 C   sL   t |pt| jddd}| j|dD ]"\}}|| |r$||| q$|S )NZoval)rootshape)	formatterrf   )r   r   r1   ri   Zadd_arcZadd_edge)r7   rg   r   graphr+   rd   r   r   r   build_graphZ  s    
zAsyncResult.build_graphc                 C   s
   t | jS z`str(self) -> self.id`.strr1   r:   r   r   r   __str__d  s    zAsyncResult.__str__c                 C   s
   t | jS z`hash(self) -> hash(self.id)`.hashr1   r:   r   r   r   __hash__h  s    zAsyncResult.__hash__c                 C   s   dt | j d| j dS )N<: >)r/   r'   r1   r:   r   r   r   __repr__l  s    zAsyncResult.__repr__c                 C   s.   t |tr|j| jkS t |tr*|| jkS tS r   )
isinstancer   r1   r   NotImplementedr7   otherr   r   r   __eq__o  s
    


zAsyncResult.__eq__c                 C   s   |  |}|tkrdS | S r%   r   r   r7   r   resr   r   r   __ne__v  s    
zAsyncResult.__ne__c                 C   s   |  | j| jd | j| jS r   )	__class__r1   r2   r0   r+   r:   r   r   r   __copy__z  s        zAsyncResult.__copy__c                 C   s   | j |  fS r   r   __reduce_args__r:   r   r   r   
__reduce__  s    zAsyncResult.__reduce__c                 C   s   | j | jd d | jfS r   )r1   r2   r+   r:   r   r   r   r     s    zAsyncResult.__reduce_args__c                 C   s   | j dk	r| j |  dS )z9Cancel pending operations when the instance is destroyed.NrA   r:   r   r   r   __del__  s    
zAsyncResult.__del__c                 C   s   |   S r   )r   r:   r   r   r   r     s    zAsyncResult.graphc                 C   s   | j jS r   )r2   supports_native_joinr:   r   r   r   r     s    z AsyncResult.supports_native_joinc                 C   s   |   dS )Nrv   r   r_   r:   r   r   r   rv     s    zAsyncResult.childrenc                 C   s:   |r6|d }|t jkr6| | j|}| |  |S |S )Nr   )r   ry   
_set_cacher2   Zmeta_from_decodedr4   )r7   metarx   dr   r   r   _maybe_set_cache  s    

zAsyncResult._maybe_set_cachec                 C   s$   | j d kr| | j| jS | j S r   )r5   r   r2   Zget_task_metar1   r:   r   r   r   r     s    
zAsyncResult._get_task_metac                 K   s   t |  gS r   )iterr   r7   rj   r   r   r   
_iter_meta  s    zAsyncResult._iter_metac                    s.   | d}|r$ fdd|D |d< | _|S )Nrv   c                    s   g | ]}t | jqS r   )r   r0   ro   r:   r   r   
<listcomp>  s    z*AsyncResult._set_cache.<locals>.<listcomp>)r_   r5   )r7   r   rv   r   r:   r   r     s    


zAsyncResult._set_cachec                 C   s   |   d S )zTask return value.

        Note:
            When the task has been executed, this contains the return value.
            If the task raised an exception, this will be the exception
            instance.
        rD   r   r:   r   r   r   rD     s    	zAsyncResult.resultc                 C   s   |   dS )z#Get the traceback of a failed task.r   r   r:   r   r   r   r     s    zAsyncResult.tracebackc                 C   s   |   d S )a  The tasks current state.

        Possible values includes:

            *PENDING*

                The task is waiting for execution.

            *STARTED*

                The task has been started.

            *RETRY*

                The task is to be retried, possibly because of failure.

            *FAILURE*

                The task raised an exception, or has exceeded the retry limit.
                The :attr:`result` attribute then contains the
                exception raised by the task.

            *SUCCESS*

                The task executed successfully.  The :attr:`result` attribute
                then contains the tasks return value.
        r   r   r:   r   r   r   rx     s    zAsyncResult.statec                 C   s   | j S )zCompat. alias to :attr:`id`.r1   r:   r   r   r   task_id  s    zAsyncResult.task_idc                 C   s
   || _ d S r   r   )r7   r1   r   r   r   r     s    c                 C   s   |   dS )Nnamer   r:   r   r   r   r     s    zAsyncResult.namec                 C   s   |   dS )Nr   r   r:   r   r   r   r     s    zAsyncResult.argsc                 C   s   |   dS )Nrj   r   r:   r   r   r   rj     s    zAsyncResult.kwargsc                 C   s   |   dS )Nworkerr   r:   r   r   r   r     s    zAsyncResult.workerc                 C   s*   |   d}|r&t|tjs&t|S |S )zUTC date and time.	date_done)r   r_   r   datetimer   )r7   r   r   r   r   r     s    zAsyncResult.date_donec                 C   s   |   dS )Nretriesr   r:   r   r   r   r     s    zAsyncResult.retriesc                 C   s   |   dS )Nqueuer   r:   r   r   r   r     s    zAsyncResult.queue)NNNN)NF)NFNFN)F)F)TN)FN)Br'   r(   r)   r*   r0   r   r1   r2   r8   propertyr;   setterr>   r3   rE   rH   rJ   rQ   r   r]   r^   r_   rS   rZ   rb   rm   rn   ri   ru   r{   r}   r~   r[   maybe_reraiser   r   r   r   r   r   r   r   r   r   r   r   r   r   rv   r   r   r   r   rD   infor   rx   r   r   r   r   rj   r   r   r   r   r   r   r   r   r   D   s        



	    
    
E
1

	




		
	









r   c                   @   s>  e Zd ZdZdZdZdEddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd ZdFddZeZdd Zdd Zdd Zdd ZdGd!d"Zd#d$ Zd%d& ZdHd(d)ZdId*d+ZdJd,d-ZdKd.d/ZdLd0d1Zd2d3 Zd4d5 Zd6d7 Zd8d9 Z d:d; Z!d<d= Z"e#d>d? Z$e#d@dA Z%e%j&dBdA Z%e#dCdD Z'dS )Mr   zpA collection of results.

    Arguments:
        results (Sequence[AsyncResult]): List of result instances.
    Nc                 K   sL   || _ || _tt| fd| _|p(t|| _| jrH| jt| jdd d S )N)r   Tr,   )	_apprI   r   r   r4   r   _on_fullr>   	_on_ready)r7   rI   r0   Zready_barrierrj   r   r   r   r8     s    zResultSet.__init__c                 C   s,   || j kr(| j | | jr(| j| dS )zvAdd :class:`AsyncResult` as a new member of the set.

        Does nothing if the result is already a member.
        N)rI   rF   r   addrC   r   r   r   r      s    
zResultSet.addc                 C   s   | j jr|   d S r   )r2   Zis_asyncr4   r:   r   r   r   r   *  s    zResultSet._on_readyc                 C   sH   t |tr| j|}z| j| W n tk
rB   t|Y nX dS )z~Remove result from the set; it must be a member.

        Raises:
            KeyError: if the result isn't a member.
        N)r   r   r0   r   rI   remover.   KeyErrorrC   r   r   r   r   .  s    
zResultSet.removec                 C   s(   z|  | W n tk
r"   Y nX dS )zbRemove result from the set if it is a member.

        Does nothing if it's not a member.
        N)r   r   rC   r   r   r   discard;  s    zResultSet.discardc                    s    j  fdd|D  dS )z Extend from iterable of results.c                 3   s   | ]}| j kr|V  qd S r   rI   rp   rr:   r   r   rs   G  s     
 z#ResultSet.update.<locals>.<genexpr>N)rI   rG   )r7   rI   r   r:   r   updateE  s    zResultSet.updatec                 C   s   g | j dd< dS )z!Remove all results from this set.Nr   r:   r   r   r   clearI  s    zResultSet.clearc                 C   s   t dd | jD S )zReturn true if all tasks successful.

        Returns:
            bool: true if all of the tasks finished
                successfully (i.e. didn't raise an exception).
        c                 s   s   | ]}|  V  qd S r   )r{   rp   rD   r   r   r   rs   T  s     z'ResultSet.successful.<locals>.<genexpr>allrI   r:   r   r   r   r{   M  s    zResultSet.successfulc                 C   s   t dd | jD S )zReturn true if any of the tasks failed.

        Returns:
            bool: true if one of the tasks failed.
                (i.e., raised an exception)
        c                 s   s   | ]}|  V  qd S r   )r}   r   r   r   r   rs   ]  s     z#ResultSet.failed.<locals>.<genexpr>anyrI   r:   r   r   r   r}   V  s    zResultSet.failedTc                 C   s   | j D ]}|j||d qd S )N)r@   rX   )rI   r[   )r7   r@   rX   rD   r   r   r   r[   _  s    
zResultSet.maybe_throwc                 C   s   t dd | jD S )zReturn true if any of the tasks are incomplete.

        Returns:
            bool: true if one of the tasks are still
                waiting for execution.
        c                 s   s   | ]}|   V  qd S r   ru   r   r   r   r   rs   k  s     z$ResultSet.waiting.<locals>.<genexpr>r   r:   r   r   r   waitingd  s    zResultSet.waitingc                 C   s   t dd | jD S )zDid all of the tasks complete? (either by success of failure).

        Returns:
            bool: true if all of the tasks have been executed.
        c                 s   s   | ]}|  V  qd S r   r   r   r   r   r   rs   s  s     z"ResultSet.ready.<locals>.<genexpr>r   r:   r   r   r   ru   m  s    zResultSet.readyc                 C   s   t dd | jD S )zaTask completion count.

        Returns:
            int: the number of tasks completed.
        c                 s   s   | ]}t | V  qd S r   )intr{   r   r   r   r   rs   {  s     z,ResultSet.completed_count.<locals>.<genexpr>)sumrI   r:   r   r   r   completed_countu  s    zResultSet.completed_countc                 C   s   | j D ]}|  qdS )z?Forget about (and possible remove the result of) all the tasks.N)rI   rJ   rC   r   r   r   rJ   }  s    
zResultSet.forgetFc                 C   s*   | j jjdd | jD |||||d dS )a[  Send revoke signal to all workers for all tasks in the set.

        Arguments:
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from worker.
                The ``timeout`` argument specifies the number of seconds
                to wait.  Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                the ``wait`` argument is enabled.
        c                 S   s   g | ]
}|j qS r   r   r   r   r   r   r     s     z$ResultSet.revoke.<locals>.<listcomp>)rK   rO   rL   rM   rN   N)r0   rP   rQ   rI   rR   r   r   r   rQ     s       zResultSet.revokec                 C   s
   t | jS r   )r   rI   r:   r   r   r   __iter__  s    zResultSet.__iter__c                 C   s
   | j | S )z`res[i] -> res.results[i]`.r   )r7   indexr   r   r   __getitem__  s    zResultSet.__getitem__rT   c	           	   
   C   s&   | j r| jn| j||||||||dS )zSee :meth:`join`.

        This is here for API compatibility with :class:`AsyncResult`,
        in addition it uses :meth:`join_native` if available for the
        current result backend.
        )rO   rX   rU   r@   rW   rY   r\   rV   )r   join_nativejoin)	r7   rO   rX   rU   r@   rW   rY   r\   rV   r   r   r   r_     s    	    zResultSet.getc	              	   C   s   |r
t   t }	d}
|dk	r&tdg }| jD ]^}d}
|r\|t |	  }
|
dkr\td|j|
|||||d}|r||j| q0|| q0|S )a  Gather the results of all tasks as a list in order.

        Note:
            This can be an expensive operation for result store
            backends that must resort to polling (e.g., database).

            You should consider using :meth:`join_native` if your backend
            supports it.

        Warning:
            Waiting for tasks within a task may lead to deadlocks.
            Please see :ref:`task-synchronous-subtasks`.

        Arguments:
            timeout (float): The number of seconds to wait for results
                before the operation times out.
            propagate (bool): If any of the tasks raises an exception,
                the exception will be re-raised when this flag is set.
            interval (float): Time to wait (in seconds) before retrying to
                retrieve a result from the set.  Note that this does not have
                any effect when using the amqp result store backend,
                as it does not use polling.
            callback (Callable): Optional callback to be called for every
                result received.  Must have signature ``(task_id, value)``
                No results will be returned by this function if a callback
                is specified.  The order of results is also arbitrary when a
                callback is used.  To get access to the result object for
                a particular id you'll have to generate an index first:
                ``index = {r.id: r for r in gres.results.values()}``
                Or you can create new result objects on the fly:
                ``result = app.AsyncResult(task_id)`` (both will
                take advantage of the backend cache anyway).
            no_ack (bool): Automatic message acknowledgment (Note that if this
                is set to :const:`False` then the messages
                *will not be acknowledged*).
            disable_sync_subtasks (bool): Disable tasks to wait for sub tasks
                this is the default configuration. CAUTION do not enable this
                unless you must.

        Raises:
            celery.exceptions.TimeoutError: if ``timeout`` isn't
                :const:`None` and the operation takes longer than ``timeout``
                seconds.
        Nz,Backend does not support on_message callbackg        zjoin operation timed out)rO   rX   rU   rW   rV   r\   )	r    time	monotonicr   rI   r   r_   r1   rF   )r7   rO   rX   rU   r@   rW   rY   r\   rV   Z
time_start	remainingrI   rD   r<   r   r   r   r     s6    /
   zResultSet.joinc                 C   s   | j ||S r   r4   r>   r?   r   r   r   r>     s    zResultSet.thenc                 C   s   | j j| |||||dS )a0  Backend optimized version of :meth:`iterate`.

        .. versionadded:: 2.2

        Note that this does not support collecting the results
        for different task types using different backends.

        This is currently only supported by the amqp, Redis and cache
        result backends.
        )rO   rU   rW   rY   rV   )r2   iter_native)r7   rO   rU   rW   rY   rV   r   r   r   r     s       zResultSet.iter_nativec	                 C   s   |r
t   |rdndd t| jD }	|r.dndd tt| D }
| |||||D ]j\}}t|trg }|D ]}||	  qpn|d }|r|d t
jkr||r||| qV||
|	| < qV|
S )a-  Backend optimized version of :meth:`join`.

        .. versionadded:: 2.2

        Note that this does not support collecting the results
        for different task types using different backends.

        This is currently only supported by the amqp, Redis and cache
        result backends.
        Nc                 S   s   i | ]\}}|j |qS r   r   )rp   irD   r   r   r   
<dictcomp>  s     z)ResultSet.join_native.<locals>.<dictcomp>c                 S   s   g | ]}d qS r   r   )rp   rk   r   r   r   r     s     z)ResultSet.join_native.<locals>.<listcomp>rD   r   )r    	enumeraterI   rangelenr   r   ra   rF   r_   r   r^   )r7   rO   rX   rU   r@   rW   rY   rV   r\   Zorder_indexaccr   r   r<   Zchildren_resultr   r   r   r     s*    
 
zResultSet.join_nativec                 K   s.   dd | j jdd | jD fddi|D S )Nc                 s   s   | ]\}}|V  qd S r   r   )rp   rk   r   r   r   r   rs   0  s     z'ResultSet._iter_meta.<locals>.<genexpr>c                 S   s   h | ]
}|j qS r   r   r   r   r   r   	<setcomp>1  s     z'ResultSet._iter_meta.<locals>.<setcomp>Zmax_iterationsr	   )r2   Zget_manyrI   r   r   r   r   r   /  s    zResultSet._iter_metac                 C   s   dd | j D S )Nc                 s   s,   | ]$}|j |jr|jtjkr|V  qd S r   )r2   Z	is_cachedr1   rx   r   r^   )rp   r   r   r   r   rs   5  s    z0ResultSet._failed_join_report.<locals>.<genexpr>r   r:   r   r   r   _failed_join_report4  s    zResultSet._failed_join_reportc                 C   s
   t | jS r   )r   rI   r:   r   r   r   __len__9  s    zResultSet.__len__c                 C   s   t |tr|j| jkS tS r   )r   r   rI   r   r   r   r   r   r   <  s    
zResultSet.__eq__c                 C   s   |  |}|tkrdS | S r%   r   r   r   r   r   r   A  s    
zResultSet.__ne__c                 C   s*   dt | j dddd | jD  dS )Nr   z: [, c                 s   s   | ]}|j V  qd S r   r   r   r   r   r   rs   F  s     z%ResultSet.__repr__.<locals>.<genexpr>]>)r/   r'   r   rI   r:   r   r   r   r   E  s    zResultSet.__repr__c                 C   s(   z| j d jW S  tk
r"   Y nX d S Nr   )rI   r   
IndexErrorr:   r   r   r   r   H  s    zResultSet.supports_native_joinc                 C   s,   | j d kr&| jr| jd jnt | _ | j S r   )r   rI   r0   r
   Z_get_current_objectr:   r   r   r   r0   O  s
    
zResultSet.appc                 C   s
   || _ d S r   )r   )r7   r0   r   r   r   r0   V  s    c                 C   s   | j r| j jS | jd jS r   )r0   r2   rI   r:   r   r   r   r2   Z  s    zResultSet.backend)NN)NT)NFNFN)NTrT   NTNTN)NTrT   NTNTN)NF)NrT   TNN)NTrT   NTNNT)(r'   r(   r)   r*   r   rI   r8   r   r   r   r   r   r   r{   r}   r[   r   r   ru   r   rJ   rQ   r   r   r_   r   r>   r   r   r   r   r   r   r   r   r   r   r0   r   r2   r   r   r   r   r     s   


		
	    
         
         
J
    
          
$


r   c                       s   e Zd ZdZdZdZd! fdd	Z fddZd"ddZd#d	d
Z	dd Z
dd Zdd ZeZdd Zdd Zdd Zdd Zdd Zdd Zedd Zed$dd Z  ZS )%r   az  Like :class:`ResultSet`, but with an associated id.

    This type is returned by :class:`~celery.group`.

    It enables inspection of the tasks state and return values as
    a single entity.

    Arguments:
        id (str): The id of the group.
        results (Sequence[AsyncResult]): List of result instances.
        parent (ResultBase): Parent result of this group.
    Nc                    s    || _ || _t j|f| d S r   )r1   r+   superr8   )r7   r1   rI   r+   rj   r   r   r   r8   t  s    zGroupResult.__init__c                    s   | j |  t   d S r   )r2   rB   r   r   r:   r   r   r   r   y  s    zGroupResult._on_readyc                 C   s   |p
| j j| j| S )zSave group-result for later retrieval using :meth:`restore`.

        Example:
            >>> def save_and_restore(result):
            ...     result.save()
            ...     result = GroupResult.restore(result.id)
        )r0   r2   Z
save_groupr1   r7   r2   r   r   r   save}  s    zGroupResult.savec                 C   s   |p
| j j| j dS )z.Remove this result if it was previously saved.N)r0   r2   Zdelete_groupr1   r   r   r   r   delete  s    zGroupResult.deletec                 C   s   | j |  fS r   r   r:   r   r   r   r     s    zGroupResult.__reduce__c                 C   s   | j | jfS r   )r1   rI   r:   r   r   r   r     s    zGroupResult.__reduce_args__c                 C   s   t | jp| jS r   )boolr1   rI   r:   r   r   r   __bool__  s    zGroupResult.__bool__c                 C   sF   t |tr.|j| jko,|j| jko,|j| jkS t |trB|| jkS tS r   )r   r   r1   rI   r+   r   r   r   r   r   r   r     s    




zGroupResult.__eq__c                 C   s   |  |}|tkrdS | S r%   r   r   r   r   r   r     s    
zGroupResult.__ne__c              	   C   s2   dt | j d| j dddd | jD  dS )Nr   r   z [r   c                 s   s   | ]}|j V  qd S r   r   r   r   r   r   rs     s     z'GroupResult.__repr__.<locals>.<genexpr>r   )r/   r'   r1   r   rI   r:   r   r   r   r     s    zGroupResult.__repr__c                 C   s
   t | jS r   r   r:   r   r   r   r     s    zGroupResult.__str__c                 C   s
   t | jS r   r   r:   r   r   r   r     s    zGroupResult.__hash__c                 C   s&   | j | jo| j fdd | jD fS )Nc                 S   s   g | ]}|  qS r   )rE   r   r   r   r   r     s     z(GroupResult.as_tuple.<locals>.<listcomp>)r1   r+   rE   rI   r:   r   r   r   rE     s    zGroupResult.as_tuplec                 C   s   | j S r   r   r:   r   r   r   rv     s    zGroupResult.childrenc                 C   s.   |pt | jts| jnt}|p"|j}||S )z&Restore previously saved group result.)r   r0   r   r
   r2   Zrestore_group)clsr1   r2   r0   r   r   r   restore  s
    
zGroupResult.restore)NNN)N)N)NN)r'   r(   r)   r*   r1   rI   r8   r   r   r   r   r   r   __nonzero__r   r   r   r   r   rE   r   rv   classmethodr   __classcell__r   r   r   r   r   _  s*   



r   c                   @   s   e Zd ZdZd%ddZd&ddZdd	 Zd
d Zdd Zdd Z	dd Z
d'ddZeZdd Zdd Zdd Zedd Zedd Zedd  ZeZed!d" Zed#d$ ZdS )(r   z.Result that we know has already been executed.Nc                 C   s.   || _ || _|| _|| _t | _| |  d S r   )r1   _result_state
_tracebackr   r4   )r7   r1   Z	ret_valuerx   r   r   r   r   r8     s    zEagerResult.__init__Fc                 C   s   | j ||S r   r   r?   r   r   r   r>     s    zEagerResult.thenc                 C   s   | j S r   )r5   r:   r   r   r   r     s    zEagerResult._get_task_metac                 C   s   | j |  fS r   r   r:   r   r   r   r     s    zEagerResult.__reduce__c                 C   s   | j | j| j| jfS r   r1   r   r   r   r:   r   r   r   r     s    zEagerResult.__reduce_args__c                 C   s   |   \}}|| S r   )r   )r7   r   r   r   r   r   r     s    zEagerResult.__copy__c                 C   s   dS r%   r   r:   r   r   r   ru     s    zEagerResult.readyTc                 K   sN   |r
t   |  r| jS | jtjkrJ|rDt| jtr:| jnt| j| jS d S r   )r    r{   rD   rx   r   r^   r   	Exception)r7   rO   rX   r\   rj   r   r   r   r_     s     
zEagerResult.getc                 C   s   d S r   r   r:   r   r   r   rJ     s    zEagerResult.forgetc                 O   s   t j| _d S r   )r   ZREVOKEDr   r   r   r   r   rQ     s    zEagerResult.revokec                 C   s   d| j  dS )Nz<EagerResult: r   r   r:   r   r   r   r     s    zEagerResult.__repr__c                 C   s   | j | j| j| jdS )N)r   rD   r   r   r   r:   r   r   r   r5     s
    zEagerResult._cachec                 C   s   | j S )zThe tasks return value.)r   r:   r   r   r   rD     s    zEagerResult.resultc                 C   s   | j S )zThe tasks state.)r   r:   r   r   r   rx   	  s    zEagerResult.statec                 C   s   | j S )z!The traceback if the task failed.)r   r:   r   r   r   r     s    zEagerResult.tracebackc                 C   s   dS r!   r   r:   r   r   r   r     s    z EagerResult.supports_native_join)N)NF)NTT)r'   r(   r)   r*   r8   r>   r   r   r   r   ru   r_   rS   rJ   rQ   r   r   r5   rD   rx   r   r   r   r   r   r   r   r     s4   


  




r   c                    s   t    j}t| ts~| \}}t|ttfr2|n|df\}}|rLt| }|dk	rr j| fdd|D |dS |||dS | S )zDeserialize result from tuple.Nc                    s   g | ]}t | qS r   )r   ro   r0   r   r   r   '  s     z%result_from_tuple.<locals>.<listcomp>re   )r   r   r   r   ra   tupler   r   )r   r0   ZResultr   Znodesr1   r+   r   r   r   r     s    

 r   )N).r*   r   r   collectionsr   
contextlibr   weakrefr   Zkombu.utils.objectsr   Zviner   r   r    r
   r   r   r   r   r0   r   
exceptionsr   r   r   Zutils.graphr   r   Zutils.iso8601r   r   ImportError__all__r   r    r$   r&   r   registerr   r   r   r   r   r   r   r   r   <module>   sN   

	
	   I  UcU