U
    Z+d]                     @   s  d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ dZddhZdZdZG dd dejZG dd dejZG dd dejZG dd dejZ G dd dejZ!G dd dejZ"dS )zWorker-level Bootsteps.    N)Hub)get_event_loopset_event_loop)	DummyLockLaxBoundedSemaphore)Timer)	bootsteps)_set_task_join_will_block)ImproperlyConfigured)
IS_WINDOWS)worker_logger)r   r   PoolBeatStateDBConsumereventletgeventzO-B option doesn't work with eventlet/gevent pools: use standalone beat instead.z
The worker_pool setting shouldn't be used to select the eventlet/gevent
pools, instead you *must use the -P* argument so that patches are applied
as early as possible.
c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	r   zTimer bootstep.c                 C   sD   |j rtdd|_n,|js$|jj|_| j|j|j| j| j	d|_d S )Ng      $@)max_interval)r   Zon_errorZon_tick)
use_eventloop_TimertimerZ	timer_clspool_clsr   instantiateZtimer_precisionon_timer_erroron_timer_tickselfw r   </tmp/pip-unpacked-wheel-ucduq0nd/celery/worker/components.pycreate#   s    
zTimer.createc                 C   s   t jd|dd d S )NzTimer error: %rT)exc_info)loggererror)r   excr   r   r   r   1   s    zTimer.on_timer_errorc                 C   s   t d| d S )Nz Timer wake-up! Next ETA %s secs.)r"   debug)r   delayr   r   r   r   4   s    zTimer.on_timer_tickN)__name__
__module____qualname____doc__r    r   r   r   r   r   r   r       s   r   c                       sV   e Zd ZdZefZ fddZdd Zdd Zdd	 Z	d
d Z
dd Zdd Z  ZS )r   zWorker starts the event loop.c                    s   d |_ t j|f| d S N)hubsuper__init__r   r   kwargs	__class__r   r   r.   =   s    zHub.__init__c                 C   s   |j S r+   )r   r   r   r   r   
include_ifA   s    zHub.include_ifc                 C   sF   t  |_|jd kr8t|jdd }t|r*|nt|j|_| | | S )NZrequires_hub)r   r,   getattrZ	_conninfor   _Hubr   _patch_thread_primitives)r   r   Zrequired_hubr   r   r   r    D   s    

 
z
Hub.createc                 C   s   d S r+   r   r   r   r   r   startM   s    z	Hub.startc                 C   s   |j   d S r+   r,   closer   r   r   r   stopP   s    zHub.stopc                 C   s   |j   d S r+   r8   r   r   r   r   	terminateS   s    zHub.terminatec                 C   s<   t  |jj_zddlm} W n tk
r0   Y nX t |_d S )Nr   )pool)r   appclockmutexZbilliardr<   ImportErrorLock)r   r   r<   r   r   r   r6   V   s    zHub._patch_thread_primitives)r'   r(   r)   r*   r   requiresr.   r3   r    r7   r:   r;   r6   __classcell__r   r   r1   r   r   8   s   	r   c                       sP   e Zd ZdZefZd fdd	Zdd Zdd Zd	d
 Z	dd Z
dd Z  ZS )r   a
  Bootstep managing the worker pool.

    Describes how to initialize the worker pool, and starts and stops
    the pool during worker start-up/shutdown.

    Adds attributes:

        * autoscale
        * pool
        * max_concurrency
        * min_concurrency
    Nc                    s|   d |_ d |_|j|_|j| _t|trN|d\}}}t||rHt|pJdg}||_	|j	rh|j	\|_|_t
 j|f| d S )N,r   )r<   max_concurrencyconcurrencymin_concurrencyoptimization
isinstancestr	partitionint	autoscaler-   r.   )r   r   rM   r0   Zmax_c_Zmin_cr1   r   r   r.   r   s    
zPool.__init__c                 C   s   |j r|j   d S r+   )r<   r9   r   r   r   r   r9      s    z
Pool.closec                 C   s   |j r|j   d S r+   )r<   r;   r   r   r   r   r;      s    zPool.terminatec                 C   s   d }d }|j jjtkr$ttt |j p.t	}|j
}|j|_|s~t| }|_|jj|_|jj|_d}|jr~|jjr~|j|_|j}| j|j|j
|j |jf|j|j|j|j|jo||j|||d|| j|j d }|_ t!|j" |S )Nd   T)ZinitargsZmaxtasksperchildmax_memory_per_childtimeoutZsoft_timeoutZputlocksZlost_worker_timeoutthreadsmax_restartsallow_restartZforking_enable	semaphoreZsched_strategyr=   )#r=   confZworker_poolGREEN_POOLSwarningswarnUserWarningW_POOL_SETTINGr   r   rG   Z_process_taskprocess_taskr   rU   acquireZ_quick_acquirereleaseZ_quick_releaseZpool_putlocksr   Zuses_semaphoreZ_process_task_semZpool_restartsr   hostnameZmax_tasks_per_childrP   Z
time_limitZsoft_time_limitZworker_lost_waitrH   r<   r	   Ztask_join_will_block)r   r   rU   rS   ZthreadedZprocsrT   r<   r   r   r   r       sF    

 

zPool.createc                 C   s   d|j r|j jndiS )Nr<   zN/A)r<   infor   r   r   r   r`      s    z	Pool.infoc                 C   s   |j | d S r+   )r<   register_with_event_loop)r   r   r,   r   r   r   ra      s    zPool.register_with_event_loop)N)r'   r(   r)   r*   r   rB   r.   r9   r;   r    r`   ra   rC   r   r   r1   r   r   b   s   $r   c                       s2   e Zd ZdZd ZdZd fdd	Zdd Z  ZS )	r   zWStep used to embed a beat process.

    Enabled when the ``beat`` argument is set.
    TFc                    s.   | | _ |_d |_t j|fd|i| d S )Nbeat)enabledrb   r-   r.   )r   r   rb   r0   r1   r   r   r.      s    zBeat.__init__c                 C   s@   ddl m} |jjdr"tt||j|j|j	d }|_
|S )Nr   )EmbeddedService)r   r   )schedule_filenameZscheduler_cls)Zcelery.beatrd   r   r(   endswithr
   ERR_B_GREENr=   re   Z	schedulerrb   )r   r   rd   br   r   r   r       s    zBeat.create)F)	r'   r(   r)   r*   labelZconditionalr.   r    rC   r   r   r1   r   r      s
   r   c                       s(   e Zd ZdZ fddZdd Z  ZS )r   z:Bootstep that sets up between-restart state database file.c                    s"   |j | _d |_t j|f| d S r+   )statedbrc   _persistencer-   r.   r/   r1   r   r   r.      s    zStateDB.__init__c                 C   s,   |j |j |j|jj|_t|jj d S r+   )	stateZ
Persistentrj   r=   r>   rk   atexitregistersaver   r   r   r   r       s    zStateDB.create)r'   r(   r)   r*   r.   r    rC   r   r   r1   r   r      s   r   c                   @   s   e Zd ZdZdZdd ZdS )r   z)Bootstep starting the Consumer blueprint.Tc                 C   sn   |j rt|j d|j }n|j|j }| j|j|j|j|j|j	||j
|j|j||j|j|j|jd }|_|S )N   )r_   task_eventsZinit_callbackZinitial_prefetch_countr<   r   r=   
controllerr,   Zworker_optionsdisable_rate_limitsprefetch_multiplier)rE   maxrt   rF   r   Zconsumer_clsr\   r_   rq   Zready_callbackr<   r   r=   r,   optionsrs   Zconsumer)r   r   Zprefetch_countcr   r   r   r       s(     zConsumer.createN)r'   r(   r)   r*   lastr    r   r   r   r   r      s   r   )#r*   rm   rX   Zkombu.asynchronousr   r5   r   r   Zkombu.asynchronous.semaphorer   r   Zkombu.asynchronous.timerr   r   Zceleryr   Zcelery._stater	   Zcelery.exceptionsr
   Zcelery.platformsr   Zcelery.utils.logr   r"   __all__rW   rg   r[   ZStepZStartStopStepr   r   r   r   r   r   r   r   <module>   s*   *P