U
    Z+d@                     @   s*  d Z ddlZddlZddlZddlZddlZddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZ dd
lmZmZmZmZ ddlmZ dZdZ dd Z!dd Z"dd Z#dd Z$dd Z%G dd dZ&G dd dZ'dd Z(G dd dZ)G d d! d!eZ*dS )"zStart/stop/manage workers.    N)OrderedDictUserListdefaultdict)partial)Popen)sleep)	from_utf8)cached_property)
IS_WINDOWSPidfilesignal_name)gethostnamehost_formatnode_format	nodesplit)saferepr)ClusterNodeceleryc                  G   s   d tf|  S )N )join
CELERY_EXE)args r   5/tmp/pip-unpacked-wheel-ucduq0nd/celery/apps/multi.py
celery_exe   s    r   c                 C   sN   |}d| kr&t | }t|\}}|} n| |  }t | d| }| ||fS )N@)r   r   )nameprefixsuffixhostnamenodename	shortnamer   r   r   build_nodename   s    r#   c              	   C   s   t t| ||| dddS )Nz%iz%I)r   NdhiI)r   r   )r!   r"   r    r   r   r   build_expander+   s    r)   c                 C   s.   |s| S |  dr |  d| S |  d| S )N--=r   )
startswith)optvaluer   r   r   
format_opt7   s
    
r/   c                 C   s   dd |   D S )Nc                 S   s<   i | ]4\}}t |d kr*d|ddnd| | qS )   z--{}_-)lenformatreplace).0kvr   r   r   
<dictcomp>@   s
   
 z+_kwargs_to_command_line.<locals>.<dictcomp>)items)kwargsr   r   r   _kwargs_to_command_line?   s    r<   c                   @   sD   e Zd Zdd Zdd ZdddZddd	Zdd
dZdddZdS )NamespacedOptionParserc                 C   s,   || _ t | _g | _d| _tdd | _d S )N c                   S   s   t  S N)r   r   r   r   r   <lambda>N       z1NamespacedOptionParser.__init__.<locals>.<lambda>)r   r   optionsvaluespassthroughr   
namespaces)selfr   r   r   r   __init__I   s
    zNamespacedOptionParser.__init__c                 C   s   dd | j D }d}|t|k r|| }|dkrHd||d  | _qn|d dkr|d dkrt| |dd   qd }t||d kr||d  d dkr||d  }|d7 }| |dd  | n| j| |d7 }qd S )	Nc                 S   s   g | ]}|r|qS r   r   )r6   argr   r   r   
<listcomp>Q   s      z0NamespacedOptionParser.parse.<locals>.<listcomp>r   r*   r   r2   r0      )r   r3   r   rD   process_long_optprocess_short_optrC   append)rF   rargsposrH   r.   r   r   r   parseP   s"    $zNamespacedOptionParser.parseNc                 C   s,   d|kr| dd\}}| j||dd d S )Nr+   r0   Fshort)split
add_optionrF   rH   r.   r   r   r   rK   e   s    z'NamespacedOptionParser.process_long_optc                 C   s   | j ||dd d S )NTrQ   )rT   rU   r   r   r   rL   j   s    z(NamespacedOptionParser.process_short_optc                 C   s    |d kr| j }t|f| j| S r?   )rB   r   rE   )rF   nsdefaultsr   r   r   optmergem   s    zNamespacedOptionParser.optmergeFc                 C   sB   |rdp
d}| j }d|kr2|d\}}| j| }|||| < d S )Nr2   r*   :)rB   rS   rE   )rF   r   r.   rR   rV   r   destr   r   r   rT   r   s    
z!NamespacedOptionParser.add_option)N)N)N)FN)	__name__
__module____qualname__rG   rP   rK   rL   rX   rT   r   r   r   r   r=   G   s   


r=   c                   @   s   e Zd ZdZd*ddZdd Zdd Zd	d
 Zdd Zdd Z	d+ddZ
d,ddZejddddfddZd-ddZdd Zdd Zdd Zedd Zedd  Zed!d" Zejd#d" Zed$d% Zed&d' Zed(d) ZdS ).r   zRepresents a node in a cluster.Nc                 C   s\   || _ |pdtdd | _|| _|p(d| _| |p8t | _|  | _	| 
 | _d | _d S )Nz-m workerz--detachr>   )r   r   cmdrM   
extra_args_annotate_with_default_optsr   rB   _prepare_expanderexpander_prepare_argvargv_pid)rF   r   r_   rM   rB   r`   r   r   r   rG   ~   s    


zNode.__init__c                 C   sD   | j |d< | |ddgd | |ddgd | |dgtj |S )	N-n	--pidfile-pz/var/run/celery/%n.pid	--logfile-fz/var/log/celery/%n%I.log--executable)r   _setdefaultoptsys
executable)rF   rB   r   r   r   ra      s
    
z Node._annotate_with_default_optsc              	   C   sx   |dd  D ](}z|| W   S  t k
r2   Y qX q||d tj|}tj|}|rttj|stt| |S )Nr0   r   )KeyError
setdefaultospathnormpathdirnameexistsmakedirs)rF   r%   altr.   r-   Zdir_pathr   r   r   rm      s    
zNode._setdefaultoptc                 C   s    | j dd\}}t| j ||S )Nr   r0   )r   rS   r)   )rF   r"   r    r   r   r   rb      s      zNode._prepare_expanderc              	      s      jd}|dd } j } j D ]2\}}|dkr4||t|  | |	| q4d
|g}t| fdd| D   jg } jr|   jf7 }|S )Nr   r   r0   )z-Az--appz-bz--brokerz--result-backendz--loaderz--configz	--workdirz-Cz
--no-colorz-qz--quietc                    s    g | ]\}}t | |qS r   )r/   rc   )r6   r-   r.   rF   r   r   rI      s   z&Node._prepare_argv.<locals>.<listcomp>)rc   r_   rS   indexrB   copyr:   insertr/   popr   tupler`   rM   )rF   r_   r'   rB   r-   r.   re   r   ry   r   rd      s(    


zNode._prepare_argvc                 C   s
   |  dS Nr   )sendry   r   r   r   alive   s    z
Node.alivec              
   C   sn   | j }|r`zt|| W n@ tk
rZ } z"|jtjkr: t||  W Y dS d }~X Y nX dS t||  d S )NFT)pidrr   killOSErrorerrnoZESRCH
maybe_call)rF   sigZon_errorr   excr   r   r   r      s    
z	Node.sendc                 K   s   | j | jf| j|d|S )N)rs   env)	_waitexecre   ro   )rF   r   r;   r   r   r   start   s     z
Node.startc           	      C   sB   |  ||}t|| d||d t||d}| j| ||dS )Nr   )argstrr   )r   )on_signalled
on_failure)prepare_argvr   r   r   handle_process_exitwait)	rF   re   rs   r   on_spawnr   r   r   piper   r   r   r      s    zNode._waitexecc                 C   s4   |dk rt || |  | S |dkr0t || | |S r   )r   )rF   retcoder   r   r   r   r   r      s    zNode.handle_process_exitc                 C   s(   d |gt| }tjt|t dS )Nr   )posix)r   listshlexrS   r   r
   )rF   re   rs   r   r   r   r   r      s    zNode.prepare_argvc              	   G   s@   |D ]*}z| j | W   S  tk
r,   Y qX qt|d d S r   )rB   rp   )rF   rx   r-   r   r   r   getopt   s    zNode.getoptc                 C   s   dt | j d| j dS )N<z: >)typer[   r   ry   r   r   r   __repr__   s    zNode.__repr__c                 C   s   |  | ddS )Nrh   ri   rc   r   ry   r   r   r   pidfile   s    zNode.pidfilec                 C   s   |  | ddS )Nrj   rk   r   ry   r   r   r   logfile   s    zNode.logfilec                 C   s:   | j d k	r| j S zt| j W S  tk
r4   Y nX d S r?   )rf   r   r   Zread_pid
ValueErrorry   r   r   r   r      s    
zNode.pidc                 C   s
   || _ d S r?   )rf   )rF   r.   r   r   r   r     s    c                 C   s
   | j d S )Nrl   rB   ry   r   r   r   ro     s    zNode.executablec                 C   s   | j f| j S r?   )ro   re   ry   r   r   r   argv_with_executable  s    zNode.argv_with_executablec                 K   s   | |t |dS )Nr   )r<   )clsr   r;   r   r   r   from_kwargs  s    zNode.from_kwargs)NNNN)N)N)NN)r[   r\   r]   __doc__rG   ra   rm   rb   rd   r   r   r   rn   ro   r   r   r   r   r   r	   r   r   propertyr   setterr   classmethodr   r   r   r   r   r   {   sH          


  







r   c                 O   s   | d k	r| || d S r?   r   )Zfunr   r;   r   r   r   r     s    r   c                   @   sL   e Zd ZeZdddZdd Zdd	 Zd
d Zdd Zdd Z	dddZ
dS )MultiParsercelery workerr>   r   c                 C   s"   || _ || _|| _|| _|| _d S r?   )r_   rM   r   r   range_prefix)rF   r_   rM   r   r   r   r   r   r   rG   !  s
    zMultiParser.__init__c                    s   j }tjt|dk}jdjdj ddt }dpbddj	pt|d	krdnd
dpj
}|rz|| }W n tk
r   Y nX | |  fdd|D S )Nr0   z--cmdz--appendz
--hostnamerg   z--prefixr>   z--suffix)z""z''z--range-prefixc              
   3   s$   | ]} | V  qd S r?   )_node_from_options)r6   r   rM   r_   rB   pr   rF   r   r   r   	<genexpr>>  s         z$MultiParser.parse.<locals>.<genexpr>)rC   dictrB   r3   r   r}   r_   rM   r   r   r   _get_rangesr   _update_ns_opts_update_ns_ranges)rF   r   namesrangesr    r   r   r   r   rP   *  s*    
zMultiParser.parsec                 C   s>   t |||\}}	}
|	|jkr |	n|}t|	||||||jS r?   )r#   rE   r   rX   rD   )rF   r   r   r   r   r_   rM   rB   	namespacer!   r1   r   r   r   r   D  s    
 zMultiParser._node_from_optionsc                 C   s$   t |d }dd td|d D S )Nr   c                 S   s   g | ]}t |qS r   strr6   nr   r   r   rI   M  s     z+MultiParser._get_ranges.<locals>.<listcomp>r0   )intrange)rF   r   Z	noderanger   r   r   r   K  s    zMultiParser._get_rangesc              	   C   s   t |j D ]n\}}| rt|d }|dk r@td|z|j||  | W q tk
rz   td|Y qX qd S )Nr0   r   zIndexes start at 1 got: zNo node at index )r   rE   r:   isdigitr   rp   update
IndexError)rF   r   r   ns_namens_optsZns_indexr   r   r   r   O  s    zMultiParser._update_ns_optsc                 C   s^   t |j D ]J\}}d|ks*|rd|kr| ||D ]}|j| | q6|j| qd S )N,r2   )r   rE   r:   _parse_ns_ranger   r}   )rF   r   r   r   r   Zsubnsr   r   r   r   \  s
    zMultiParser._update_ns_rangesFc                 C   sr   g }d|kr| dp|gD ]P}|rbd|krb| d\}}|dd tt|t|d D  q|| q|S )Nr   r2   c                 s   s   | ]}t |V  qd S r?   r   r   r   r   r   r   h  s    z.MultiParser._parse_ns_range.<locals>.<genexpr>r0   )rS   extendr   r   rM   )rF   rV   r   retspacer   stopr   r   r   r   c  s    

zMultiParser._parse_ns_rangeN)r   r>   r>   r>   r   )F)r[   r\   r]   r   rG   rP   r   r   r   r   r   r   r   r   r   r     s          
	r   c                   @   s   e Zd ZdZd"ddZdd Zdd Zd	d
 Zdd Zdd Z	e
jfddZdde
jfddZdde
jfddZdde
jfddZe
jdfddZdd Zd#ddZdd Zed d! ZdS )$r   zRepresent a cluster of workers.Nc                 C   sx   || _ |ptd| _|| _|| _|| _|| _|| _|| _|	| _	|
| _
|| _|| _|| _|| _|| _|| _|| _|| _d S )Nr^   )nodesr   r_   r   on_stopping_preambleon_send_signalon_still_waiting_foron_still_waiting_progresson_still_waiting_endon_node_starton_node_restarton_node_shutdown_okon_node_statuson_node_signalon_node_signal_deadon_node_downon_child_spawnon_child_signalledon_child_failure)rF   r   r_   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rG   s  s$    zCluster.__init__c                    s    fdd D S )Nc                    s   g | ]}  |qS r   )
start_node)r6   nodery   r   r   rI     s     z!Cluster.start.<locals>.<listcomp>r   ry   r   ry   r   r     s    zCluster.startc                 C   s(   t | j| | |}t | j|| |S r?   )r   r   _start_noder   )rF   r   r   r   r   r   r     s    
zCluster.start_nodec                 C   s   |j | j| j| j| jdS )N)r   r   r   )r   r   r   r   r   )rF   r   r   r   r   r     s    zCluster._start_nodec                 C   s8   | j | jdD ]$}t| j|t| ||| j qd S )Non_down)getpidsr   r   r   r   r   r   )rF   r   r   r   r   r   send_all  s    zCluster.send_allc                 C   s   |  tjS r?   )r   signalSIGKILLry   r   r   r   r     s    zCluster.killc                    s&   g   fdd}j d||d  S )Nc                    s2   t j|  | }t j| |  | d S r?   )r   r   r   r   rM   )r   retvalZretvalsrF   r   r   restart_on_down  s    
z(Cluster.restart.<locals>.restart_on_downrJ   retryr   r   _stop_nodes)rF   r   r   r   r   r   restart  s    zCluster.restartc                 C   s   | j |||dS Nr   r   rF   r   callbackr   r   r   r   r     s    zCluster.stoprJ   c                 C   s   | j |||dS r   r   r   r   r   r   stopwait  s    zCluster.stopwaitc                 C   sJ   |d k	r|n| j }t| j|d}|rF| j|||dD ]}t|| q6d S )Nr   )r   r   )r   r   r   shutdown_nodesr   )rF   r   r   r   r   r   r   r   r   r     s
    zCluster._stop_nodesc                 c   s   t |}t| j| t  }|D ]4}t| j|t| ||| js|| |V  q||8 }|rt| j| d}|rt  }|D ]L}|d7 }t| j	| |
 s~t| j| || |V  t| j|  qq~||8 }|rp|t| sptt| qpt| j d S )Nr   r0   )setr   r   r   r   r   r   addr   r   r   r   r3   r   floatr   )rF   r   r   r   PZ	to_remover   Zitsr   r   r   r     s6    

zCluster.shutdown_nodesc                 C   s(   | D ]}|j |kr|  S qt|d S r?   )r   rp   )rF   r   r   r   r   r   find  s    

zCluster.findc                 c   s&   | D ]}|j r|V  qt|| qd S r?   )r   r   )rF   r   r   r   r   r   r     s    zCluster.getpidsc                 C   s(   dj t| tdd | D t| jdS )Nz<{name}({0}): {1}>c                 S   s   g | ]
}|j qS r   r   r   r   r   r   rI     s     z$Cluster.__repr__.<locals>.<listcomp>r   )r4   r3   r   r   r[   ry   r   r   r   r     s
     zCluster.__repr__c                 C   s   | j S r?   )r   ry   r   r   r   data  s    zCluster.data)NNNNNNNNNNNNNNNNN)N)r[   r\   r]   r   rG   r   r   r   r   r   r   SIGTERMr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   p  sB                   
$
r   )+r   r   rr   r   r   rn   collectionsr   r   r   	functoolsr   
subprocessr   timer   Zkombu.utils.encodingr   Zkombu.utils.objectsr	   Zcelery.platformsr
   r   r   Zcelery.utils.nodenamesr   r   r   r   Zcelery.utils.safereprr   __all__r   r   r#   r)   r/   r<   r=   r   r   r   r   r   r   r   r   <module>   s6   4 R