U
    Z+d#                     @   s   d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZmZmZ d
ZG dd dZdS )zEvent dispatcher sends events.    N)defaultdictdeque)Producer)app_or_default)anon_nodename)	utcoffset   )Eventget_exchange
group_from)EventDispatcherc                   @   s   e Zd ZdZdhZdZdZdZd"ddZd	d
 Z	dd Z
dd Zdd ZdefddZddefddZdeddefddZd#ddZdd Zdd Zdd Zd d! ZeeeZdS )$r   a0  Dispatches event messages.

    Arguments:
        connection (kombu.Connection): Connection to the broker.

        hostname (str): Hostname to identify ourselves as,
            by default uses the hostname returned by
            :func:`~celery.utils.anon_nodename`.

        groups (Sequence[str]): List of groups to send events for.
            :meth:`send` will ignore send requests to groups not in this list.
            If this is :const:`None`, all events will be sent.
            Example groups include ``"task"`` and ``"worker"``.

        enabled (bool): Set to :const:`False` to not actually publish any
            events, making :meth:`send` a no-op.

        channel (kombu.Channel): Can be used instead of `connection` to specify
            an exact channel to use when sending events.

        buffer_while_offline (bool): If enabled events will be buffered
            while the connection is down. :meth:`flush` must be called
            as soon as the connection is re-established.

    Note:
        You need to :meth:`close` this after use.
    ZsqlNTr      c                 C   s4  t |p
| j| _|| _|| _|p$t | _|| _|
p6t | _|| _	|| _
tt| _t | _d | _t | _|pt| jjj| _t | _t | _t|pg | _tj tj g| _| jj| _|	| _ |s|r|jj!| _|| _"| jp| j# }t$|| jjj%d| _&|j'j(| j)kr
d| _"| j"r| *  d| ji| _+t,- | _.d S )N)nameFhostname)/r   app
connectionchannelr   r   buffer_while_offline	frozensetbuffer_groupbuffer_limiton_send_bufferedr   list_group_buffer	threadingLockmutexproducerr   _outbound_bufferconfZevent_serializer
serializerset
on_enabledon_disabledgroupstimetimezonealtzonetzoffsetclockdelivery_modeclientenabledZconnection_for_writer
   Zevent_exchangeexchange	transportZdriver_typeDISABLED_TRANSPORTSenableheadersosgetpidpid)selfr   r   r,   r   r   r   r    r$   r*   r   r   r   Zconninfo r6   </tmp/pip-unpacked-wheel-ucduq0nd/celery/events/dispatcher.py__init__:   s@    



zEventDispatcher.__init__c                 C   s   | S Nr6   r5   r6   r6   r7   	__enter__^   s    zEventDispatcher.__enter__c                 G   s   |    d S r9   )close)r5   exc_infor6   r6   r7   __exit__a   s    zEventDispatcher.__exit__c                 C   s:   t | jp| j| j| jdd| _d| _| jD ]
}|  q*d S )NF)r-   r    Zauto_declareT)r   r   r   r-   r    r   r,   r"   r5   callbackr6   r6   r7   r0   d   s    
zEventDispatcher.enablec                 C   s*   | j r&d| _ |   | jD ]
}|  qd S )NF)r,   r<   r#   r?   r6   r6   r7   disablem   s
    
zEventDispatcher.disableFc           	   
   K   sp   |rdn| j  }||f| jt | j|d|}| j. | j||fd|ddi|W  5 Q R  S Q R X dS )au  Publish event using custom :class:`~kombu.Producer`.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
                fields: Dictionary of event fields, must be json serializable.
            producer (kombu.Producer): Producer instance to use:
                only the ``publish`` method will be called.
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event.
                Defaults to :func:`Event`.
            utcoffset (Callable): Function returning the current
                utc offset in hours.
        Nr   r   r4   r)   routing_key-.)r)   forwardr   r   r4   r   _publishreplace)	r5   typefieldsr   blindr	   kwargsr)   eventr6   r6   r7   publisht   s     
zEventDispatcher.publishc           	      C   sr   | j }z*|j|||j|||g| j| j| jd	 W n< tk
rl } z| jsJ | j	|||f W 5 d }~X Y nX d S )N)rC   r-   retryretry_policyZdeclarer    r1   r*   )
r-   rN   r   r    r1   r*   	Exceptionr   r   append)	r5   rM   r   rC   rO   rP   r   r-   excr6   r6   r7   rG      s"    
zEventDispatcher._publishc              	   K   s   | j r| jt| }}	|r&|	|kr&dS |	| jkr| j }
||f| j| | j|
d|}| j|	 }|	| t
|| jkr|   q| jr|   n| j||| j||||dS dS )a  Send event.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event,
                defaults to :func:`Event`.
            utcoffset (Callable): unction returning the current utc offset
                in hours.
            **fields (Any): Event fields -- must be json serializable.
        NrB   )rK   r	   rO   rP   )r,   r$   r   r   r)   rF   r   r4   r   rR   lenr   flushr   rN   r   )r5   rI   rK   r   rO   rP   r	   rJ   r$   groupr)   rM   bufr6   r6   r7   send   s0    


 



 zEventDispatcher.sendc           	   	   C   s   |rRt | j}z6| j& |D ]\}}}| || j| qW 5 Q R X W 5 | j  X |r| j: | j D ](\}}| || jd|  g |dd< qhW 5 Q R X dS )zFlush the outbound buffer.z%s.multiN)r   r   clearr   rG   r   r   items)	r5   errorsr$   rW   rM   rC   _rV   eventsr6   r6   r7   rU      s    
 zEventDispatcher.flushc                 C   s   | j |j  dS )z-Copy the outbound buffer of another instance.N)r   extend)r5   otherr6   r6   r7   extend_buffer   s    zEventDispatcher.extend_bufferc                 C   s   | j  o| j   d| _dS )zClose the event dispatcher.N)r   lockedreleaser   r:   r6   r6   r7   r<      s    zEventDispatcher.closec                 C   s   | j S r9   r   r:   r6   r6   r7   _get_publisher   s    zEventDispatcher._get_publisherc                 C   s
   || _ d S r9   rc   )r5   r   r6   r6   r7   _set_publisher   s    zEventDispatcher._set_publisher)NNTNTNNNr   Nr   N)TT)__name__
__module____qualname____doc__r/   r   r"   r#   r8   r;   r>   r0   rA   r	   rN   r   rG   rX   rU   r`   r<   rd   re   propertyZ	publisherr6   r6   r6   r7   r      sJ                   
$	 
 
 
%
r   )ri   r2   r   r%   collectionsr   r   Zkombur   Z
celery.appr   Zcelery.utils.nodenamesr   Zcelery.utils.timer   rM   r	   r
   r   __all__r   r6   r6   r6   r7   <module>   s   