U
    Z+d                     @   s   d Z ddlZddlZddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ dd
lmZ dZeeZejejej  ZZZeejddZG dd de	jZG dd deZdS )zPool Autoscaling.

This module implements the internal thread responsible
for growing and shrinking the pool according to the
current autoscale settings.

The autoscale thread is only enabled if
the :option:`celery worker --autoscale` option is used.
    N)	monotonicsleep)	DummyLock)	bootsteps)
get_logger)bgThread   )state)Pool)
AutoscalerWorkerComponentAUTOSCALE_KEEPALIVE   c                   @   s>   e Zd ZdZdZdZefZdd Zdd Z	dd	 Z
d
d ZdS )r   z?Bootstep that starts the autoscaler thread/timer in the worker.r   Tc                 K   s   |j | _d |_d S N)Z	autoscaleZenabled
autoscaler)selfwkwargs r   ;/tmp/pip-unpacked-wheel-ucduq0nd/celery/worker/autoscale.py__init__&   s    zWorkerComponent.__init__c                 C   s>   | j |j|j|j|j||jr"t nd d }|_|js:|S d S )N)workermutex)ZinstantiateZautoscaler_clspoolmax_concurrencymin_concurrencyZuse_eventloopr   r   )r   r   Zscalerr   r   r   create*   s       zWorkerComponent.createc                 C   s*   |j j|jj ||jj|jj d S r   )consumerZon_task_messageaddr   maybe_scaleZcall_repeatedly	keepalive)r   r   Zhubr   r   r   register_with_event_loop2   s
     z(WorkerComponent.register_with_event_loopc                 C   s   d|j  iS )zReturn `Autoscaler` info.r   )r   info)r   r   r   r   r   r"   8   s    zWorkerComponent.infoN)__name__
__module____qualname____doc__labelZconditionalr
   requiresr   r   r!   r"   r   r   r   r   r      s   r   c                       s   e Zd ZdZddedf fdd	Zdd Zddd	Zdd
dZd ddZ	dd Z
dd Zdd Zdd Zdd Zdd Zedd Zedd Z  ZS )!r   z,Background thread to autoscale pool workers.r   Nc                    sN   t    || _|pt | _|| _|| _|| _d | _	|| _
| jsJtdd S )Nzcannot scale down too fast.)superr   r   	threadingLockr   r   r   r    _last_scale_upr   AssertionError)r   r   r   r   r   r    r   	__class__r   r   r   @   s    
zAutoscaler.__init__c              	   C   s&   | j  |   W 5 Q R X td d S )Ng      ?)r   r   r   r   r   r   r   bodyN   s    zAutoscaler.bodyc                 C   sZ   | j }t| j| j}||kr.| ||  dS t| j| j}||k rV| ||  dS d S )NT)	processesminqtyr   scale_upmaxr   
scale_down)r   reqZprocscurr   r   r   _maybe_scaleS   s    zAutoscaler._maybe_scalec                 C   s   |  |r| j  d S r   )r:   r   Zmaintain_pool)r   r8   r   r   r   r   ^   s    
zAutoscaler.maybe_scalec              
   C   s   | j t |d k	r:|| jk r*| | j|  | | || _|d k	rb|| jkr\| || j  || _| j| jfW  5 Q R  S Q R X d S r   )r   r2   _shrink_update_consumer_prefetch_countr   _growr   )r   r6   r3   r   r   r   updateb   s    


zAutoscaler.updatec                 C   s   t  | _| |S r   )r   r,   r=   r   nr   r   r   r5   o   s    zAutoscaler.scale_upc                 C   s&   | j r"t | j  | jkr"| |S d S r   )r,   r   r    r;   r?   r   r   r   r7   s   s    zAutoscaler.scale_downc                 C   s   t d| | j| d S )NzScaling up %s processes.)r"   r   Zgrowr?   r   r   r   r=   x   s    
zAutoscaler._growc              
   C   sj   t d| z| j| W nJ tk
r6   td Y n0 tk
rd } ztd|dd W 5 d }~X Y nX d S )NzScaling down %s processes.z0Autoscaler won't scale down: all processes busy.zAutoscaler: scale_down: %rT)exc_info)r"   r   shrink
ValueErrordebug	Exceptionerror)r   r@   excr   r   r   r;   |   s    
zAutoscaler._shrinkc                 C   s    || j  }|r| jj| d S r   )r   r   r   Z_update_prefetch_count)r   Znew_maxZdiffr   r   r   r<      s
    
z*Autoscaler._update_consumer_prefetch_countc                 C   s   | j | j| j| jdS )N)r6   r3   currentr4   )r   r   r2   r4   r0   r   r   r   r"      s
    zAutoscaler.infoc                 C   s
   t tjS r   )lenr	   Zreserved_requestsr0   r   r   r   r4      s    zAutoscaler.qtyc                 C   s   | j jS r   )r   Znum_processesr0   r   r   r   r2      s    zAutoscaler.processes)N)N)NN)r#   r$   r%   r&   r   r   r1   r:   r   r>   r5   r7   r=   r;   r<   r"   propertyr4   r2   __classcell__r   r   r.   r   r   =   s(     


	
r   )r&   osr*   timer   r   Zkombu.asynchronous.semaphorer   Zceleryr   Zcelery.utils.logr   Zcelery.utils.threadsr    r	   
componentsr
   __all__r#   loggerrD   r"   rF   floatenvirongetr   ZStartStopStepr   r   r   r   r   r   <module>   s   	