U
    Z+d.                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ dd	lmZ d
ZeeZejejej  ZZZG dd dZG dd deZdS )zWorker Pidbox (remote control).    N)ignore_errors)safe_str)AttributeDict)pass1)
get_logger   )control)PidboxgPidboxc                   @   sT   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd Zdd ZdS )r	   zWorker mailbox.Nc              	   C   s^   || _ |j| _|jjjjt|jtjjt	|j|j||j
jr>tntdd| _| j jjj| _d S )N)apphostnameconsumerZtset)handlersstate)cr   r   r   ZmailboxNoder   ZPaneldatar   
controllerZuse_eventloopr   setnodeZclockZforward_forward_clockselfr    r   8/tmp/pip-unpacked-wheel-ucduq0nd/celery/worker/pidbox.py__init__   s    
	zPidbox.__init__c              
   C   s   |    z| j|| W nb tk
rF } ztd| W 5 d }~X Y n8 tk
r| } ztd|dd |   W 5 d }~X Y nX d S )NzNo such control command: %szControl command error: %rT)exc_info)r   r   Zhandle_messageKeyErrorerror	Exceptionreset)r   bodymessageexcr   r   r   
on_message'   s    zPidbox.on_messagec                 C   s.   |j  | j_| jj| jd| _|j| j_d S N)callback)
connectionchannelr   listenr$   r   Zon_decode_errorr   r   r   r   start3   s    zPidbox.startc                 C   s   d S Nr   r   r   r   r   on_stop8   s    zPidbox.on_stopc                 C   s   |    | || _d S r+   )r-   _close_channelr   r   r   r   r   stop;   s    zPidbox.stopc                 C   s   |  | j | | j d S r+   )r/   r   r*   r,   r   r   r   r    ?   s    zPidbox.resetc                 C   s"   | j r| j jrt|| j jj d S r+   )r   r(   r   closer   r   r   r   r.   C   s    zPidbox._close_channelc                 C   s4   |    | jr$td t|| jj | | j d S )NzCanceling broadcast consumer...)r-   r   debugr   cancelr/   r   r   r   r   r   shutdownG   s
    zPidbox.shutdown)__name__
__module____qualname____doc__r   r   r$   r*   r-   r/   r    r.   r3   r   r   r   r   r	      s   r	   c                   @   sD   e Zd ZdZdZdZdZdd Zdd Zdd	 Z	d
d Z
dd ZdS )r
   zWorker pidbox (greenlet).Nr   c                 C   s   |j | j| d S r+   )poolZspawn_nloopr   r   r   r   r*   V   s    zgPidbox.startc                 C   s2   | j r.| j  td | j   d  | _ | _d S )Nz+Waiting for broadcast thread to shutdown...)_node_stopped_node_shutdownr   r1   waitr,   r   r   r   r-   Y   s
    

zgPidbox.on_stopc                 C   s   |  j d7  _ d S )Nr   )_resetsr,   r   r   r   r    `   s    zgPidbox.resetc                 C   s6   |  | | | j_| jj| jd| _| j  d S r%   )r.   r(   r   r)   r$   r   consume)r   r   r'   r   r   r   	_do_resetc   s    
zgPidbox._do_resetc              	   C   s   | j g}t  }| _t  }| _z| }td|  | 	|| |
 s|jr|d | j k r|d  d7  < | 	|| z|jdd W qJ tjk
r   Y qJX qJW 5 Q R X W 5 |  X d S )Nzpidbox: Connected to %s.r   r   g      ?)timeout)r=   	threadingEventr;   r:   r   Zconnection_for_readinfoas_urir?   is_setr'   Zdrain_eventssocketr@   )r   r   Zresetsr3   stoppedr'   r   r   r   r9   i   s     
zgPidbox.loop)r4   r5   r6   r7   r;   r:   r=   r*   r-   r    r?   r9   r   r   r   r   r
   O   s   r
   )r7   rF   rA   Zkombu.commonr   Zkombu.utils.encodingr   Zcelery.utils.collectionsr   Zcelery.utils.functionalr   Zcelery.utils.logr    r   __all__r4   loggerr1   r   rC   r	   r
   r   r   r   r   <module>   s   ;