U
    d                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ d
dlmZ dZdZeddZG dd de	ZdS )zEvent receiver implementation.    N)
itemgetter)Queue)maybe_channel)ConsumerMixin)uuid)app_or_default)adjust_timestamp   )get_exchange)EventReceiver	utcoffset	timestampc                   @   s   e Zd ZdZdZdddZdd Zdd	 ZdddZdddZ	dddZ
dddZd
ejeeefddZeefddZedd ZdS )r   a?  Capture events.

    Arguments:
        connection (kombu.Connection): Connection to the broker.
        handlers (Mapping[Callable]): Event handlers.
            This is  a map of event type names and their handlers.
            The special handler `"*"` captures all events that don't have a
            handler.
    N#c
           
   	   C   s   t |p
| j| _t|| _|d kr&i n|| _|| _|p:t | _|pJ| jjj	| _
t| jp^| j | jjjd| _|d kr| jjj}|	d kr| jjj}	td| j
| jg| j| jdd||	d| _| jj| _| jj| _| jj| _|d kr| jjjdh}|| _d S )N)name.TF)exchangerouting_keyZauto_deleteZdurableZmessage_ttlexpiresjson)r   appr   channelhandlersr   r   node_idconfZevent_queue_prefixqueue_prefixr
   
connectionZconnection_for_writeZevent_exchanger   Zevent_queue_ttlZevent_queue_expiresr   joinqueueclockadjustadjust_clockZforwardforward_clockZevent_serializeraccept)
selfr   r   r   r   r   r   r#   Z	queue_ttlZqueue_expires r%   :/tmp/pip-unpacked-wheel-9cz4377o/celery/events/receiver.py__init__#   s:    


 


zEventReceiver.__init__c                 C   s(   | j |p| j d}|o"|| dS )z3Process event by dispatching to configured handler.*N)r   get)r$   typeeventhandlerr%   r%   r&   processB   s    zEventReceiver.processc                 C   s   || j g| jgd| jdgS )NT)Zqueues	callbacksZno_ackr#   )r   _receiver#   )r$   ZConsumerr   r%   r%   r&   get_consumersG   s
     zEventReceiver.get_consumersTc                 K   s   |r| j |d d S )N)r   )wakeup_workers)r$   r   r   Z	consumerswakeupkwargsr%   r%   r&   on_consume_readyL   s    zEventReceiver.on_consume_readyc                 C   s   | j |||dS )Nlimittimeoutr2   consume)r$   r6   r7   r2   r%   r%   r&   itercaptureQ   s    zEventReceiver.itercapturec                 C   s   | j |||dD ]}qdS )zOpen up a consumer capturing events.

        This has to run in the main process, and it will never stop
        unless :attr:`EventDispatcher.should_stop` is set to True, or
        forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
        r5   Nr8   )r$   r6   r7   r2   _r%   r%   r&   captureT   s    zEventReceiver.capturec                 C   s   | j jjd| j|d d S )NZ	heartbeat)r   r   )r   control	broadcastr   )r$   r   r%   r%   r&   r1   ^   s    
zEventReceiver.wakeup_workersc                 C   s   |d }|dkr4| j jpd|  }|d< | | n8z|d }	W n  tk
r`   |  |d< Y nX | |	 |rz||\}
}W n tk
r   Y nX |||
|d< | |d< ||fS )Nr*   z	task-sentr	   r   r   Zlocal_received)r   valuer!   KeyErrorr"   )r$   bodyZlocalizenowZtzfieldsr   CLIENT_CLOCK_SKEWr*   Z_cr   offsetr   r%   r%   r&   event_from_messagec   s"    

z EventReceiver.event_from_messagec                    sB   |||r.| j | j   fdd|D  n| j | |  d S )Nc                    s   g | ]} | qS r%   r%   ).0r+   Zfrom_messager-   r%   r&   
<listcomp>   s     z*EventReceiver._receive.<locals>.<listcomp>)r-   rE   )r$   rA   messagelist
isinstancer%   rG   r&   r/   ~   s    
zEventReceiver._receivec                 C   s   | j r| j jjS d S )N)r   r   client)r$   r%   r%   r&   r      s    zEventReceiver.connection)Nr   NNNNNN)T)NNT)NNT)N)__name__
__module____qualname____doc__r   r'   r-   r0   r4   r:   r<   r1   time	_TZGETTERr   rC   rE   rJ   rK   r/   propertyr   r%   r%   r%   r&   r      s4   
           
 




 
r   )rP   rQ   operatorr   Zkombur   Zkombu.connectionr   Zkombu.mixinsr   Zceleryr   Z
celery.appr   Zcelery.utils.timer   r+   r
   __all__rC   rR   r   r%   r%   r%   r&   <module>   s   
