U
    Z+d	                     @   s`   d Z ddlmZ ddlmZ ddlmZ dZeeZ	e	j
e	je	j  Z
ZZG dd dejZd	S )
z-Worker <-> Worker Sync at startup (Bootstep).    )	bootsteps)
get_logger   )Events)Minglec                       sv   e Zd ZdZd ZefZddhZd fdd	Zdd Z	d	d
 Z
dd Zdd Zdd ZdddZdd Zdd Z  ZS )r   zBootstep syncing state with neighbor workers.

    At startup, or upon consumer restart, this will:

    - Sync logical clocks.
    - Sync revoked tasks.

    ZamqpZredisFc                    s0   | o|  |j| _t j|fd|i| d S )Nwithout_mingle)compatible_transportappZenabledsuper__init__)selfcr   kwargs	__class__ A/tmp/pip-unpacked-wheel-ucduq0nd/celery/worker/consumer/mingle.pyr      s    zMingle.__init__c              
   C   s.   |  }|jj| jkW  5 Q R  S Q R X d S N)Zconnection_for_read	transportZdriver_typecompatible_transports)r   r	   connr   r   r   r       s    
zMingle.compatible_transportc                 C   s   |  | d S r   )sync)r   r   r   r   r   start$   s    zMingle.startc                    s`   t d  }|rTt dtdd | D   fdd| D  t d nt d d S )Nzmingle: searching for neighborszmingle: sync with %s nodesc                 S   s   g | ]\}}|r|qS r   r   ).0replyvaluer   r   r   
<listcomp>,   s      zMingle.sync.<locals>.<listcomp>c                    s"   g | ]\}}|r  ||qS r   )on_node_reply)r   nodenamer   r   r   r   r   r   -   s    zmingle: sync completezmingle: all alone)info
send_hellolenitems)r   r   repliesr   r   r   r   '   s    

zMingle.syncc                 C   sD   |j jjd|jd}|jjj}||j|j	p0i }|
|jd  |S )Ng      ?)timeout
connection)r	   controlinspectr&   
controllerstaterevokedZhellohostname_datapop)r   r   r(   Zour_revokedr$   r   r   r   r!   3   s
    
zMingle.send_helloc              
   C   sd   t d| z| j|f| W nB tk
r2    Y n. tk
r^ } ztd|| W 5 d }~X Y nX d S )Nz mingle: processing reply from %szmingle: sync with %s failed: %r)debugsync_with_nodeMemoryError	Exception	exception)r   r   r   r   excr   r   r   r   :   s    
zMingle.on_node_replyNc                 K   s   |  || | || d S r   )on_clock_eventon_revoked_received)r   r   clockr+   r   r   r   r   r0   C   s    zMingle.sync_with_nodec                 C   s"   |r|j j|n
|j j  d S r   )r	   r7   adjustZforward)r   r   r7   r   r   r   r5   G   s    zMingle.on_clock_eventc                 C   s   |r|j jj| d S r   )r)   r*   r+   update)r   r   r+   r   r   r   r6   J   s    zMingle.on_revoked_received)F)NN)__name__
__module____qualname____doc__labelr   requiresr   r   r   r   r   r!   r   r0   r5   r6   __classcell__r   r   r   r   r      s   		
r   N)r=   Zceleryr   Zcelery.utils.logr   eventsr   __all__r:   loggerr/   r    r3   ZStartStopStepr   r   r   r   r   <module>   s   