U
    Z+dY                     @   s
  d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z$ dZ%dZ&dZ'eddZ(dddZ)G dd de*Z+G dd dZ,dS )z/Sending/Receiving Messages (Kombu integration).    N)
namedtuple)Mapping)	timedelta)WeakValueDictionary)
ConnectionConsumerExchangeProducerQueuepools)	Broadcast)
maybe_list)cached_property)signals)anon_nodename)saferepr)indent)maybe_make_aware   )routes)AMQPQueuestask_messagei   zS
.> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) key={0.routing_key}
r   headers
propertiesbody
sent_eventutf-8c                    s    fdd|   D S )Nc                    s*   i | ]"\}}t |tr | n||qS  )
isinstancebytesdecode).0kvencodingr   3/tmp/pip-unpacked-wheel-ucduq0nd/celery/app/amqp.py
<dictcomp>%   s    zutf8dict.<locals>.<dictcomp>)items)dr'   r   r&   r(   utf8dict$   s    
r,   c                       s   e Zd ZdZdZd! fdd	Z fddZ fdd	Zd
d Zdd Z	dd Z
dd Zdd Zd"ddZdd Zdd Zdd Zdd Zedd  Z  ZS )#r   u  Queue name⇒ declaration mapping.

    Arguments:
        queues (Iterable): Initial list/tuple or dict of queues.
        create_missing (bool): By default any unknown queues will be
            added automatically, but if this flag is disabled the occurrence
            of unknown queues in `wanted` will raise :exc:`KeyError`.
        max_priority (int): Default x-max-priority for queues with none set.
    NTc           	         s   t    t | _|| _|| _|| _|d kr0tn|| _|| _	|d k	r\t
|ts\dd |D }|pbi }| D ]*\}}t
|tr| |n| j|f| qld S )Nc                 S   s   i | ]}|j |qS r   )name)r#   qr   r   r(   r)   C   s      z#Queues.__init__.<locals>.<dictcomp>)super__init__r   aliasesdefault_exchangedefault_routing_keycreate_missingr   autoexchangemax_priorityr    r   r*   r
   add
add_compat)	selfqueuesr2   r4   r5   r6   r3   r-   r.   	__class__r   r(   r0   8   s    
zQueues.__init__c                    s2   z| j | W S  tk
r,   t | Y S X d S N)r1   KeyErrorr/   __getitem__r9   r-   r;   r   r(   r?   H   s    zQueues.__getitem__c                    s8   | j r|js| j |_t || |jr4|| j|j< d S r=   )r2   exchanger/   __setitem__aliasr1   )r9   r-   queuer;   r   r(   rB   N   s
    zQueues.__setitem__c                 C   s"   | j r| | |S t|d S r=   )r4   r7   new_missingr>   r@   r   r   r(   __missing__U   s    zQueues.__missing__c                 K   s"   t |ts| j|f|S | |S )a  Add new queue.

        The first argument can either be a :class:`kombu.Queue` instance,
        or the name of a queue.  If the former the rest of the keyword
        arguments are ignored, and options are simply taken from the queue
        instance.

        Arguments:
            queue (kombu.Queue, str): Queue to add.
            exchange (kombu.Exchange, str):
                if queue is str, specifies exchange name.
            routing_key (str): if queue is str, specifies binding key.
            exchange_type (str): if queue is str, specifies type of exchange.
            **options (Any): Additional declaration options used when
                queue is a str.
        )r    r
   r8   _add)r9   rD   kwargsr   r   r(   r7   Z   s    
z
Queues.addc                 K   s:   | d|d |d d kr&||d< | tj|f|S )Nrouting_keyZbinding_key)
setdefaultgetrG   r
   	from_dict)r9   r-   optionsr   r   r(   r8   o   s    zQueues.add_compatc                 C   s`   |j d ks|j jdkr| j|_ |js,| j|_| jd k	rR|jd krFi |_| |j || |j< |S )N )rA   r-   r2   rI   r3   r6   Zqueue_arguments_set_max_priority)r9   rD   r   r   r(   rG   v   s    


zQueues._addc                 C   s&   d|kr"| j d k	r"|d| j iS d S )Nzx-max-priority)r6   update)r9   argsr   r   r(   rO      s    zQueues._set_max_priorityr   c                 C   s\   | j }|sdS dd t| D }|r8td||S |d d td|dd | S )z/Format routing table into string for log dumps.rN   c                 S   s   g | ]\}}t  |qS r   )QUEUE_FORMATstripformat)r#   _r.   r   r   r(   
<listcomp>   s   z!Queues.format.<locals>.<listcomp>
r   r   N)consume_fromsortedr*   
textindentjoin)r9   r   indent_firstactiveinfor   r   r(   rT      s    
zQueues.formatc                 K   s(   | j |f|}| jdk	r$|| j|j< |S )zAdd new task queue that'll be consumed from.

        The queue will be active even when a subset has been selected
        using the :option:`celery worker -Q` option.
        N)r7   _consume_fromr-   )r9   rD   rH   r.   r   r   r(   
select_add   s    
zQueues.select_addc                    s    |r fddt |D  _dS )zSelect a subset of currently defined queues to consume from.

        Arguments:
            include (Sequence[str], str): Names of queues to consume from.
        c                    s   i | ]}| | qS r   r   )r#   r-   r9   r   r(   r)      s     z!Queues.select.<locals>.<dictcomp>N)r   r_   )r9   includer   ra   r(   select   s    
zQueues.selectc                    sJ    rFt   | jdkr.|  fdd| D S  D ]}| j|d q2dS )zDeselect queues so that they won't be consumed from.

        Arguments:
            exclude (Sequence[str], str): Names of queues to avoid
                consuming from.
        Nc                 3   s   | ]}| kr|V  qd S r=   r   )r#   r$   excluder   r(   	<genexpr>   s      z"Queues.deselect.<locals>.<genexpr>)r   r_   rc   pop)r9   re   rD   r   rd   r(   deselect   s    
zQueues.deselectc                 C   s   t || ||S r=   )r
   r5   r@   r   r   r(   rE      s    zQueues.new_missingc                 C   s   | j d k	r| j S | S r=   )r_   ra   r   r   r(   rX      s    
zQueues.consume_from)NNTNNN)r   T)__name__
__module____qualname____doc__r_   r0   r?   rB   rF   r7   r8   rG   rO   rT   r`   rc   rh   rE   propertyrX   __classcell__r   r   r;   r(   r   )   s,          
r   c                   @   s*  e Zd ZdZeZeZeZeZeZ	dZ
dZdZdZdZdd Zedd Zedd	 Zd0d
dZd1ddZdd Zd2ddZd3ddZd4ddZdd Zdd Zedd Zedd Zejd d Zed!d" Zed#d$ Zejd%d$ Zed&d' Z e Z!ed(d) Z"ed*d+ Z#ed,d- Z$d.d/ Z%dS )5r   zApp AMQP API: app.amqp.Ni   c                 C   s*   || _ | j| jd| _| j j| j d S )N)r      )app
as_task_v1
as_task_v2task_protocolsZ_confZbind_to_handle_conf_update)r9   rp   r   r   r(   r0      s
    zAMQP.__init__c                 C   s   | j | jjj S r=   )rs   rp   confZtask_protocolra   r   r   r(   create_task_message   s    zAMQP.create_task_messagec                 C   s   |   S r=   )_create_task_senderra   r   r   r(   send_task_message   s    zAMQP.send_task_messagec                 C   sp   | j j}|j}|d kr|j}|d kr*|j}|sH|jrHt|j| j|df}|d krV| jn|}| 	|| j||||S )N)rA   rI   )
rp   ru   task_default_routing_keytask_create_missing_queuesZtask_queue_max_prioritytask_default_queuer
   r2   r5   
queues_cls)r9   r:   r4   r5   r6   ru   r3   r   r   r(   r      s,    
    zAMQP.Queuesc                 C   s&   t j| j|p| j| jd|| jdS )zReturn the current task router.rz   )rp   )_routesRouterr   r:   rp   Zeither)r9   r:   r4   r   r   r(   r~     s    zAMQP.Routerc                 C   s   t | jjj| _d S r=   )r}   preparerp   ru   task_routes_rtablera   r   r   r(   flush_routes  s    zAMQP.flush_routesc                 K   s:   |d kr| j jj}| j|f||p.t| jj d|S )N)acceptr:   )rp   ru   Zaccept_contentr   listr:   rX   values)r9   Zchannelr:   r   kwr   r   r(   TaskConsumer  s    
zAMQP.TaskConsumerr   Fc                 C   s  |pd}|pi }t |ttfs&tdt |ts8td|rx| |d |pT| j }|p`| jj}t	|t
|d |d}t |	tjr| |	d |p| j }|p| jj}t	|t
|	d |d}	t |ts|o| }t |	ts|	o|	 }	|d krt|| j}|d krt|| j}|s"|}td|||||	|||
||g|||||pNt |d	||p^d
d||||||df|r|||||||
||	d	nd dS )Nr   !task args must be a list or tuple(task keyword arguments must be a mapping	countdownseconds)tzexpirespy)langtaskidshadowetar   groupgroup_indexretries	timelimitroot_id	parent_idargsrepr
kwargsreproriginignore_resultrN   Zcorrelation_idreply_to)	callbackserrbackschainchord)	uuidr   r   r-   rQ   rH   r   r   r   r   )r    r   tuple	TypeErrorr   _verify_secondsrp   nowtimezoner   r   numbersRealstr	isoformatr   argsrepr_maxsizekwargsrepr_maxsizer   r   )r9   task_idr-   rQ   rH   r   r   group_idr   r   r   r   r   r   r   
time_limitsoft_time_limitcreate_sent_eventr   r   r   r   r   r   r   r   r   r   r   r   r(   rr     s    
  




 
zAMQP.as_task_v2c                 K   s  |pd}|pi }| j }t|ttfs,tdt|ts>td|rj| |d |pZ| j }|t	|d }t|	t
jr| |	d |p| j }|t	|	d }	|o| }|	o|	 }	ti ||pdd|||||||
||	|||||f||d	|r||t|t||
||	d
nd dS )Nr   r   r   r   r   r   rN   r   )r   r   rQ   rH   r   r   r   r   r   utcr   r   r   tasksetr   )r   r-   rQ   rH   r   r   r   r   )r   r    r   r   r   r   r   rp   r   r   r   r   r   r   r   )r9   r   r-   rQ   rH   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   Zcompat_kwargsr   r   r   r(   rq   n  sd    
zAMQP.as_task_v1c                 C   s   |t k rt| d||S )Nz is out of range: )INT_MIN
ValueError)r9   swhatr   r   r(   r     s    zAMQP._verify_secondsc                    s   | j jj| j jj| j jj| j| jtjj	tjj
tjj	tjj
 tjj	tjj
| j| j| j jj	| j jj
| j jjd 	
fdd	}|S )Nc                    s  |d krn|}|\}}}}|r*| | |r8| | |}|d krP|d krP}|d k	rxt|trr||  }}n|j}|
d krz|jj}
W n tk
r   Y nX |
p}
|d krz|jj}W n tk
r   d}Y nX |r|s|dkrd| }}n*|d kr|jjp
}|p|jp	}|d kr@|r@t|t	s@|g}|d krNn|}|rdt
f|n}r||||||||d | j|f|||	p
|p|||
||d	|} rЈ|||||d r8t|tr||d ||d |d |d	 |d
 d n*||d ||d |d |d	 |d d |r|pF}|}t|tr^|j}| |||d |jd|| ||d |S )NdirectrN   )senderr   rA   rI   declarer   r   retry_policy)	rA   rI   
serializercompressionretryr   delivery_moder   r   )r   r   r   rA   rI   r   r   r   r   r   )r   r   r   rQ   rH   r   r   rQ   rH   r   )rD   rA   rI   z	task-sent)r   r   )rP   r    r   r-   rA   r   AttributeErrortyperI   r   dictpublishr   r   )Zproducerr-   messagerA   rI   rD   Zevent_dispatcherr   r   r   r   r   r   r   Zexchange_typerH   Zheaders2r   r   r   qnameZ_rpretZevdZexnameZafter_receiversZbefore_receiversZdefault_compressorZdefault_delivery_modeZdefault_evdr2   Zdefault_policydefault_queueZdefault_retryZdefault_rkeyZdefault_serializerr:   Zsend_after_publishZsend_before_publishZsend_task_sentZsent_receiversr   r(   rx     s    




      	         
  z3AMQP._create_task_sender.<locals>.send_task_message)NNNNNNNNNNNN)rp   ru   Ztask_publish_retryZtask_publish_retry_policyZtask_default_delivery_moder   r:   r   Zbefore_task_publishsendZ	receiversZafter_task_publishZ	task_sent_event_dispatcherr2   ry   Ztask_serializerZresult_compression)r9   rx   r   r   r(   rw     s<    





                  ,bzAMQP._create_task_senderc                 C   s   | j | jjj S r=   )r:   rp   ru   r{   ra   r   r   r(   r   (  s    zAMQP.default_queuec                 C   s   |  | jjjS )u"   Queue name⇒ declaration mapping.)r   rp   ru   Ztask_queuesra   r   r   r(   r:   ,  s    zAMQP.queuesc                 C   s
   |  |S r=   )r   )r9   r:   r   r   r(   r:   1  s    c                 C   s   | j d kr|   | j S r=   )r   r   ra   r   r   r(   r   5  s    
zAMQP.routesc                 C   s   |   S r=   )r~   ra   r   r   r(   router;  s    zAMQP.routerc                 C   s   |S r=   r   )r9   valuer   r   r(   r   ?  s    c                 C   s0   | j d kr*tj| j  | _ | jjj| j _| j S r=   )_producer_poolr   Z	producersrp   Zconnection_for_writepoollimitra   r   r   r(   producer_poolC  s    
zAMQP.producer_poolc                 C   s   t | jjj| jjjS r=   )r   rp   ru   Ztask_default_exchangeZtask_default_exchange_typera   r   r   r(   r2   L  s    
zAMQP.default_exchangec                 C   s
   | j jjS r=   )rp   ru   Z
enable_utcra   r   r   r(   r   Q  s    zAMQP.utcc                 C   s   | j jjddS )NF)Zenabled)rp   eventsZ
Dispatcherra   r   r   r(   r   U  s    zAMQP._event_dispatcherc                 O   s&   d|ksd|kr"|    |  | _d S )Nr   )r   r~   r   )r9   rQ   rH   r   r   r(   rt   [  s    
zAMQP._handle_conf_update)NNN)NN)NN)NNNNNNNr   NNNNNNFNNNNNNNFNN)NNNNNNNr   NNNNNNFNNNNN)&ri   rj   rk   rl   r   r   r	   ZBrokerConnectionr   r|   r   r   r5   r   r   r0   r   rv   rx   r~   r   r   rr   rq   r   rw   r   r:   setterrm   r   r   r   Zpublisher_poolr2   r   r   rt   r   r   r   r(   r      s   

    


	                                        
V                               
<y









r   )r   )-rl   r   collectionsr   collections.abcr   datetimer   weakrefr   Zkombur   r   r   r	   r
   r   Zkombu.commonr   Zkombu.utils.functionalr   Zkombu.utils.objectsr   Zceleryr   Zcelery.utils.nodenamesr   Zcelery.utils.safereprr   Zcelery.utils.textr   rZ   Zcelery.utils.timer   rN   r   r}   __all__r   rR   r   r,   r   r   r   r   r   r   r(   <module>   s2    
 