U
    Z+d4                     @   s2  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZmZmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZmZmZmZ ddlm Z m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z'm(Z(m)Z) ddl*m+Z+ ddl,m-Z- dZ.e'e/Z0ej1dZ2e3edZ4ddddddddddddgZ5dZ6dZ7dd Z8dd  Z9G d!d" d"e-Z:d#d$edefd%d&Z;e
d'kree;d(d)eed*Z<nee;d(d$ed+Z<e2see;d'd)eed*Z=nd,d-  Z<Z=d.d/ Z>e2see;d0e>ed1Z?nd2d3 Z?d4d5 Z@dBd7d8ZAdCd:d;ZBdDd>d?ZCdEd@dAZDdS )FzWorker command-line program.

This module is the 'program-version' of :mod:`celery.worker`.

It does everything necessary to run that module
as an actual application, like installing signal handlers,
platform tweaks, and so on.
    N)datetime)partial)REMAP_SIGTERM)current_process)safe_str)VERSION_BANNER	platformssignals)trace)WorkerShutdownWorkerTerminate)	AppLoader)
EX_FAILUREEX_OKcheck_privilegesisatty)staticterm)cry)qualname)
get_loggerin_sighandlerset_in_sighandler)	pluralize)WorkController)Workerjavapypy_version_infoz --------------z--- ***** -----z-- ******* ----z- *** --- * ---z- ** ----------z{hostname} v{version}

{platform} {timestamp}

[config]
.> app:         {app}
.> transport:   {conninfo}
.> results:     {results}
.> concurrency: {concurrency}
.> task events: {events}

[queues]
{queues}
z
[tasks]
{tasks}
c                  C   s    ddl m}  tdd |  D S )Nr   	enumeratec                 s   s   | ]}|j d sdV  qdS )zDummy-   N)name
startswith).0t r%   6/tmp/pip-unpacked-wheel-ucduq0nd/celery/apps/worker.py	<genexpr>M   s    z&active_thread_count.<locals>.<genexpr>)	threadingr   sumr   r%   r%   r&   active_thread_countK   s    r*   c                 C   s   t d|  tjdd d S )N
Tfileflush)printsys
__stderr__)msgr%   r%   r&   safe_sayQ   s    r3   c                       s   e Zd ZdZd#ddZd$ fdd	Zdd	 Z fd
dZdd Zdd Z	d%ddZ
dd Zd&ddZdd Zd'ddZdd Zdd  Zd!d" Z  ZS )(r   zWorker as a program.Fc                 K   sB   || _ t| j| j tjj| j| | jj|d t	| jjj
 d S )N)senderinstanceconfoptions)quietr
   setup_worker_optimizationsapphostnamer	   Zceleryd_initsendr6   r   Zaccept_content)selfr8   kwargsr%   r%   r&   on_before_initX   s      zWorker.on_before_initNc                    sn   | j d|| _| j d|| _t jf | || _|| _tt	j
| _| j jj| j|d k	r`| n|d| _d S )NZworker_redirect_stdoutsZworker_redirect_stdouts_level)Zenabled)r:   Zeitherredirect_stdoutsredirect_stdouts_levelsuperZsetup_defaultspurgeno_colorr   r0   stdout_isattylogcoloredlogfile)r=   rC   rD   r@   rA   r>   	__class__r%   r&   on_after_initd   s       zWorker.on_after_initc                 C   s   |   | _t| j| j d S N)setup_logging_custom_loggingr
   r9   r:   r;   r=   r%   r%   r&   on_init_blueprintt   s    
zWorker.on_init_blueprintc                    s   | j }t   tjj| j| |jd | jr4| 	  | j
sB|   | d | |  | jsp| jrp|j| j d}|j}t|tr| dk}|r|j rtd d S )N)r4   r5   r6   z-active-T)zdjango.conf:settingszPlease run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.)r:   rB   on_startr	   Zceleryd_after_setupr<   r;   r6   rC   purge_messagesr8   emit_bannerset_process_statusinstall_platform_tweaksrO   r@   rG   rA   Z_config_source
isinstancestrlowerZmaybe_warn_deprecated_settingsloggerwarning)r=   r:   Zwarn_deprecatedZconfig_sourcerJ   r%   r&   rR   z   s0    
  



zWorker.on_startc                 C   sl   t  }|rtt t  ttdt| j	
d| j| dt| j	|  pRdgtjdd d S )N z 
)artlinesTr,   )r   Zsupports_imagesr/   Zimgcatr   Zlogor   joinrX   rH   Zcyanstartup_inforeset
extra_infor0   
__stdout__)r=   Z	use_imager%   r%   r&   rT      s      zWorker.emit_bannerc                 C   s$   t jj|d tdt| j d S )N)r4   z	%s ready.)r	   Zworker_readyr<   rZ   infor   r;   )r=   Zconsumerr%   r%   r&   on_consumer_ready   s    zWorker.on_consumer_readyc                 C   s8   |d kr| j d k	r| j  }| jjj| j| jd|| jdS )NF)r@   colorizer;   )rD   r:   rG   setuploglevelrI   r;   )r=   re   r%   r%   r&   rN      s       zWorker.setup_loggingc              	   C   sN   | j  :}| j jj|d}|r@td| dt|d ddd W 5 Q R X d S )N)
connectionzpurge: Erased  messagez from the queue.
T)r.   )r:   Zconnection_for_writecontrolrC   r/   r   )r=   rh   countr%   r%   r&   rS      s    zWorker.purge_messagesTr+   celery.c                    s"   |  fddt| jjD S )Nc                 3   s,   | ]$} s| sn|rd | V  qdS )z  . N)r"   )r#   Ztaskinclude_builtinsint_r%   r&   r'      s
     z"Worker.tasklist.<locals>.<genexpr>)r^   sortedr:   tasks)r=   ro   seprp   r%   rn   r&   tasklist   s    
zWorker.tasklistc                 C   sB   | j d krd S | j tjkr>| j tjk}| j|d}tj|dS d S )N)ro   )rr   )rg   loggingINFODEBUGrt   EXTRA_INFO_FMTformat)r=   ro   rt   r%   r%   r&   ra      s    
zWorker.extra_infoc                 C   s  | j }t| j}d|jpdt|}t|jtsbt	|j}|
drR|dd  }|d| d7 }| jr| j\}}d| d| d	}| j}t|ts|j}|d|d
d  d7 }d}	| jsd}	tj|t| jt jddt| j   | j j |tt |	|jjjdddd
 }
|r~t|
D ]P\}}zd t!| |
| g|
|< W n& t"k
rx   d|
|  |
|< Y nX q,d |
d S )Nz{}:{:#x}__main__zcelery.loaders    ()z{min=z, max=}.ONz/OFF (enable -E to monitor tasks in this worker)r   )microsecondF)indentindent_first)
r:   r;   	timestampversionZconninforesultsconcurrencyplatformeventsqueuesri   z                r+   )#r:   rX   r   ry   mainidrW   loaderr   r   r"   Z	autoscalepool_cls
__module__splitZtask_eventsBANNERr   r;   r   nowreplacer   rh   as_uribackend	_platformr   Zamqpr   
splitlinesr   r^   ARTLINES
IndexError)r=   r]   r:   r   Zapprr   maxminpoolr   Zbanneri_r%   r%   r&   r_      sJ    







zWorker.startup_infoc                 C   sX   | j jr|   | js0| j jr(t| nt| t| t| t| t	  t
  dS )z1Install platform specific tweaks and workarounds.N)r:   ZIS_macOS macOS_proxy_detection_workaroundrF   !install_HUP_not_supported_handlerinstall_worker_restart_handlerinstall_worker_term_handler install_worker_term_hard_handlerinstall_worker_int_handlerinstall_cry_handlerinstall_rdb_handler)r=   workerr%   r%   r&   rV      s    
zWorker.install_platform_tweaksc                 C   s   t jdd dS )z6See https://github.com/celery/celery/issues#issue/161.Zcelery_dummy_proxyZset_by_celerydN)osenviron
setdefaultrP   r%   r%   r&   r     s    z'Worker.macOS_proxy_detection_workaroundc                 C   s&   t jd| dt tj d| jdS )NZcelerydr|   r}   )rc   r;   )r   Zset_mp_process_titleZstrargvr0   argvr;   )r=   rc   r%   r%   r&   rU     s
    zWorker.set_process_status)F)FNNN)N)Tr+   rm   )T)__name__r   __qualname____doc__r?   rL   rQ   rR   rT   rd   rN   rS   rt   ra   r_   rV   r   rU   __classcell__r%   r%   rJ   r&   r   U   s$   
    (


*r   TERMWarmc                    s4    fdd}t d |_|tj< d S )Nc               	      s   t  z ddlm} t jdkrR r,  td d tjjj	d t
 dkrtt|dd	d
  nW 5 Q R X d S )Nr   stateMainProcesszworker: z shutdown (MainProcess))r4   sighowexitcoder    should_stopZshould_terminate)r   Cold)r   celery.workerr   r   _namer3   r	   Zworker_shutting_downr<   r;   r*   setattr)argsr   callbackexcr   r   r   r   r%   r&   _handle_request  s*      
z*_shutdown_handler.<locals>._handle_requestZworker_)rX   r   r   r	   )r   r   r   r   r   r   r   r%   r   r&   _shutdown_handler  s    r   SIGQUITSIGTERMr   )r   r   r   r   )r   r   r   c                  O   s   d S rM   r%   )akwr%   r%   r&   <lambda>@      r   c                 C   s   t d t| dd d S )Nz>worker: Hitting Ctrl+C again will terminate all running tasks!SIGINTr   )r3   r   )r   r%   r%   r&   	on_SIGINTC  s    r   r   )r   r   r   c                  O   s   d S rM   r%   )r   r>   r%   r%   r&   r   N  s    r   c                   C   s2   t tjtjtjg ttjtjgtj	  d S rM   )
r   Zclose_open_fdsr0   	__stdin__rb   r1   r   execv
executabler   r%   r%   r%   r&   _reload_current_workerR  s      r   SIGHUPc                 C   s   dd }|t j|< d S )Nc                  W   sH   t d tddtj d ddl}|t ddlm	} t
|_dS )z5Signal handler restarting the current python program.TzRestarting celery worker (ri   r}   r   Nr   )r   r3   r^   r0   r   atexitregisterr   r   r   r   r   )r   r   r   r%   r%   r&   restart_worker_sig_handler[  s    
zBinstall_worker_restart_handler.<locals>.restart_worker_sig_handlerr   r	   )r   r   r   r%   r%   r&   r   Y  s    r   SIGUSR1c                 C   s   t rd S dd }|tj| < d S )Nc               	   W   s    t   tt  W 5 Q R X dS )z=Signal handler logging the stack-trace of all active threads.N)r   r3   r   )r   r%   r%   r&   cry_handlerk  s    z(install_cry_handler.<locals>.cry_handler)is_pypyr   r	   )r   r   r%   r%   r&   r   f  s    r   CELERY_RDBSIGSIGUSR2c                 C   s"   dd }t j| r|tj|< d S )Nc               	   W   sB   t  2 ddlm}m} | r$| d n| j}|| W 5 Q R X dS )z=Signal handler setting a rdb breakpoint at the current frame.r   )_frame	set_tracer    N)r   Zcelery.contrib.rdbr   r   f_back)r   r   r   framer%   r%   r&   rdb_handleru  s    z(install_rdb_handler.<locals>.rdb_handler)r   r   getr   r	   )Zenvvarr   r   r%   r%   r&   r   r  s    r   c                    s    fdd}|t j < d S )Nc              	      s&   t   tdj d W 5 Q R X d S )NzH{sig} not supported: Restarting with {sig} is unstable on this platform!r   )r   r3   ry   )signumr   r   r%   r&   warn_on_HUP_handler  s    z>install_HUP_not_supported_handler.<locals>.warn_on_HUP_handlerr   )r   r   r   r%   r   r&   r     s    r   )r   )r   )r   r   )r   )Er   ru   r   r   r   r0   r   	functoolsr   Zbilliard.commonr   Zbilliard.processr   Zkombu.utils.encodingr   Zceleryr   r   r	   Z
celery.appr
   Zcelery.exceptionsr   r   Zcelery.loaders.appr   Zcelery.platformsr   r   r   r   Zcelery.utilsr   r   Zcelery.utils.debugr   Zcelery.utils.importsr   Zcelery.utils.logr   r   r   Zcelery.utils.textr   r   r   __all__r   rZ   r"   Z	is_jythonhasattrr   r   r   rx   r*   r3   r   r   r   r   r   r   r   r   r   r   r   r%   r%   r%   r&   <module>   s   
 F  

            

  
