U
    d-/                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ dd	lmZ dd
lmZmZ dZdZG dd deZdd ZG dd deZG dd dejeZdS )zqThe ``RPC`` result backend for AMQP brokers.

RPC-style result backend, using reply-to and one queue per client.
    N)maybe_declare)register_after_fork)cached_property)states)current_tasktask_join_will_block   )base)AsyncBackendMixinBaseResultConsumer)BacklogLimitExceeded
RPCBackendz
The "rpc" result backend does not support chords!

Note that a group chained with a task is also upgraded to be a chord,
as this pattern requires synchronization.

Result backends that supports chords: Redis, Database, Memcached, and more.
c                   @   s   e Zd ZdZdS )r   z'Too much state history to fast-forward.N)__name__
__module____qualname____doc__ r   r   7/tmp/pip-unpacked-wheel-9cz4377o/celery/backends/rpc.pyr      s   r   c                 C   s   |    d S N)_after_fork)backendr   r   r   _on_after_fork_cleanup_backend"   s    r   c                       s^   e Zd ZejZdZdZ fddZdddZdddZ	d	d
 Z
dd Zdd Zdd Z  ZS )ResultConsumerNc                    s   t  j|| | jj| _d S r   )super__init__r   _create_bindingselfargskwargs	__class__r   r   r   ,   s    zResultConsumer.__init__Tc                 K   sF   | j  | _| |}| j| jj|g| jg|| jd| _| j	  d S )N)	callbacksno_ackaccept)
app
connection_connectionr   ConsumerZdefault_channelZon_state_changer$   	_consumerconsume)r   Zinitial_task_idr#   r   Zinitial_queuer   r   r   start0   s    
  zResultConsumer.startc                 C   s&   | j r| j j|dS |r"t| d S )N)timeout)r'   drain_eventstimesleep)r   r,   r   r   r   r-   9   s    zResultConsumer.drain_eventsc                 C   s    z| j  W 5 | j   X d S r   )r'   closer)   cancelr   r   r   r   stop?   s    zResultConsumer.stopc                 C   s$   d | _ | jd k	r | j  d | _d S r   )r)   r'   Zcollectr2   r   r   r   on_after_forkE   s    

zResultConsumer.on_after_forkc                 C   sD   | j d kr| |S | |}| j |s@| j | | j   d S r   )r)   r+   r   Zconsuming_fromZ	add_queuer*   )r   task_idqueuer   r   r   consume_fromK   s    


zResultConsumer.consume_fromc                 C   s   | j r| j | |j d S r   )r)   Zcancel_by_queuer   namer   r5   r   r   r   
cancel_forS   s    zResultConsumer.cancel_for)T)N)r   r   r   kombur(   r'   r)   r   r+   r-   r3   r4   r7   r:   __classcell__r   r   r    r   r   &   s   
	
r   c                       sZ  e Zd ZdZejZejZeZeZdZ	dZ
dZdddddZG dd	 d	ejZG d
d dejZdE fdd	Zdd ZdFddZdd Zdd Zdd Zdd Zdd Zdd  ZdGd!d"ZdHd#d$Zd%d& Zd'd( ZdId*d+ZeZd,d- ZdJd.d/Zd0d1 Z d2d3 Z!d4d5 Z"d6d7 Z#d8d9 Z$dKd:d;Z%d<d= Z&dL fd?d@	Z'e(dAdB Z)e*dCdD Z+  Z,S )Mr   z&Base class for the RPC result backend.FT   r   r   )max_retriesZinterval_startZinterval_stepZinterval_maxc                   @   s   e Zd ZdZdZdS )zRPCBackend.Consumerz4Consumer that requires manual declaration of queues.FN)r   r   r   r   Zauto_declarer   r   r   r   r(   m   s   r(   c                   @   s   e Zd ZdZdZdS )zRPCBackend.Queuez$Queue that never caches declaration.FN)r   r   r   r   Zcan_cache_declarationr   r   r   r   Queuer   s   r?   Nc           
         s   t  j|f| | jj}	|| _i | _| || _| jr:dnd| _|pH|	j	}|pR|	j
}| ||| j| _|pn|	j| _|| _| | | j| j| j| j| _td k	rt| t d S )N   r   )r   r   r%   confr'   _out_of_bandZprepare_persistent
persistentdelivery_modeZresult_exchangeZresult_exchange_type_create_exchangeexchangeZresult_serializer
serializerauto_deleter   r$   _pending_resultsZ_pending_messagesresult_consumerr   r   )
r   r%   r&   rF   exchange_typerC   rG   rH   r   rA   r    r   r   r   w   s0    

     zRPCBackend.__init__c                 C   s   | j   | j  d S r   )rI   clearrJ   r   r2   r   r   r   r      s    
zRPCBackend._after_forkdirectr@   c                 C   s
   |  d S r   )Exchange)r   r8   typerD   r   r   r   rE      s    zRPCBackend._create_exchangec                 C   s   | j S )z$Create new binding for task with id.)bindingr9   r   r   r   r      s    zRPCBackend._create_bindingc                 C   s   t t d S r   )NotImplementedErrorE_NO_CHORD_SUPPORTstripr2   r   r   r   ensure_chords_allowed   s    z RPCBackend.ensure_chords_allowedc                 C   s   t  st| |jdd d S )NT)retry)r   r   rP   channel)r   producerr5   r   r   r   on_task_call   s    zRPCBackend.on_task_callc                 C   sB   z|p
t j}W n" tk
r0   td|Y nX |j|jp>|fS )zGet the destination for result by task id.

        Returns:
            Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
        z%RPC backend missing task request for )r   requestAttributeErrorRuntimeErrorZreply_tocorrelation_id)r   r5   rY   r   r   r   destination_for   s    
zRPCBackend.destination_forc                 C   s   d S r   r   r9   r   r   r   on_reply_declare   s    zRPCBackend.on_reply_declarec                 C   s   d S r   r   )r   resultr   r   r   on_result_fulfilled   s    zRPCBackend.on_result_fulfilledc                 C   s   dS )Nzrpc://r   )r   Zinclude_passwordr   r   r   as_uri   s    zRPCBackend.as_uric           
      K   sr   |  ||\}}|sdS | jjjjdd>}	|	j| |||||| j||| jd| j	| 
|| jd	 W 5 Q R X |S )z!Send task return value and state.NTblock)rF   routing_keyr\   rG   rU   retry_policydeclarerD   )r]   r%   ZamqpZproducer_poolacquirepublish
_to_resultrF   rG   re   r^   rD   )
r   r5   r_   state	tracebackrY   r   rd   r\   rW   r   r   r   store_result   s      
zRPCBackend.store_resultc                 C   s   |||  |||| |dS )N)r5   statusr_   rk   children)Zencode_resultZcurrent_task_children)r   r5   rj   r_   rk   rY   r   r   r   ri      s    
zRPCBackend._to_resultc                 C   s    | j r| j | || j|< d S r   )rJ   on_out_of_band_resultrB   )r   r5   messager   r   r   ro      s    z RPCBackend.on_out_of_band_result  c           
      C   s   | j |d }|r| ||S i }d }| || j|D ]2}| |}||| }||< |r6|  d }q6||d }| D ]\}}	| 	||	 q~|r|
  | ||S z| j| W S  tk
r   tjd d Y S X d S )N)rm   r_   )rB   pop_set_cache_by_message_slurp_from_queuer$   _get_message_task_idgetZackitemsro   Zrequeue_cacheKeyErrorr   ZPENDING)
r   r5   Zbacklog_limitZbufferedZlatest_by_idprevacctidZlatestmsgr   r   r   get_task_meta   s*    
zRPCBackend.get_task_metac                 C   s   |  |j }| j|< |S r   )Zmeta_from_decodedpayloadrx   )r   r5   rp   r   r   r   r   rs   	  s    z RPCBackend._set_cache_by_messagec           	   	   c   sn   | j jjddT\}}| ||}|  t|D ] }|j||d}|sN q`|V  q4| |W 5 Q R X d S )NTrb   )r$   r#   )r%   poolZacquire_channelr   rf   rangerv   r   )	r   r5   r$   limitr#   _rV   rP   r}   r   r   r   rt     s    zRPCBackend._slurp_from_queuec              	   C   s4   z|j d W S  ttfk
r.   |jd  Y S X d S )Nr\   r5   )Z
propertiesrZ   ry   r   )r   rp   r   r   r   ru     s    zRPCBackend._get_message_task_idc                 C   s   d S r   r   )r   rV   r   r   r   revive%  s    zRPCBackend.revivec                 C   s   t dd S )Nz4reload_task_result is not supported by this backend.rQ   r9   r   r   r   reload_task_result(  s    zRPCBackend.reload_task_resultc                 C   s   t ddS )z<Reload group result, even if it has been previously fetched.z5reload_group_result is not supported by this backend.Nr   r9   r   r   r   reload_group_result,  s    zRPCBackend.reload_group_resultc                 C   s   t dd S )Nz,save_group is not supported by this backend.r   )r   group_idr_   r   r   r   
save_group1  s    zRPCBackend.save_groupc                 C   s   t dd S )Nz/restore_group is not supported by this backend.r   )r   r   cacher   r   r   restore_group5  s    zRPCBackend.restore_groupc                 C   s   t dd S )Nz.delete_group is not supported by this backend.r   )r   r   r   r   r   delete_group9  s    zRPCBackend.delete_groupr   c                    s@   |si n|}t  |t|| j| jj| jj| j| j| j	| j
dS )N)r&   rF   rK   rC   rG   rH   expires)r   
__reduce__dictr'   rF   r8   rO   rC   rG   rH   r   r   r    r   r   r   =  s    
zRPCBackend.__reduce__c                 C   s   | j | j| j| jdd| jdS )NFT)ZdurablerH   r   )r?   oidrF   r   r2   r   r   r   rP   J  s      zRPCBackend.bindingc                 C   s   | j jS r   )r%   Z
thread_oidr2   r   r   r   r   S  s    zRPCBackend.oid)NNNNNT)rM   r@   )T)NN)rq   )rq   F)T)r   N)-r   r   r   r   r;   rN   ZProducerr   r   rC   Zsupports_autoexpireZsupports_native_joinre   r(   r?   r   r   rE   r   rT   rX   r]   r^   r`   ra   rl   ri   ro   r~   pollrs   rt   ru   r   r   r   r   r   r   r   propertyrP   r   r   r<   r   r   r    r   r   X   sh         

   
		
   
	

r   )r   r.   r;   Zkombu.commonr   Zkombu.utils.compatr   Zkombu.utils.objectsr   Zceleryr   Zcelery._stater   r    r	   Zasynchronousr
   r   __all__rR   	Exceptionr   r   r   ZBackendr   r   r   r   r   <module>   s   
2