U
    dm                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dlm
Z
 d dlmZ d dlmZ d dlmZ dd	lmZ d d
lmZ d dlmZmZmZ eeZdadd ZG dd deZG dd deZG dd dej Z!dS )    N)partial)IOLoop)PeriodicCallback)EventReceiver)State)options   )api)Counter)r
   	HistogramGaugec                   C   s   t d krt a t S N)prometheus_metricsPrometheusMetrics r   r   1/tmp/pip-unpacked-wheel-3pokl8eb/flower/events.pyget_prometheus_metrics   s    r   c                   @   s   e Zd Zdd ZdS )r   c                 C   st   t dddddg| _tddddgtjd| _td	d
ddg| _tddddg| _tdddg| _	tdddg| _
d S )NZflower_events_totalzNumber of eventsZworkertypetaskZflower_task_runtime_secondszTask runtime)ZbucketsZ!flower_task_prefetch_time_secondszDThe time the task spent waiting at the celery worker to be executed.Zflower_worker_prefetched_tasksz4Number of tasks of given type prefetched at a workerZflower_worker_onlinezWorker online statusZ1flower_worker_number_of_currently_executing_tasksz/Number of tasks currently executing at a worker)PrometheusCountereventsr   r   Ztask_runtime_metric_bucketsruntimer   prefetch_timenumber_of_prefetched_tasksworker_online*worker_number_of_currently_executing_tasksselfr   r   r   __init__&   s.    zPrometheusMetrics.__init__N)__name__
__module____qualname__r   r   r   r   r   r   $   s   r   c                       s(   e Zd Z fddZ fddZ  ZS )EventsStatec                    s*   t t| j|| tt| _t | _d S r   )	superr"   r   collectionsdefaultdictr
   counterr   metrics)r   argskwargs	__class__r   r   r   D   s    zEventsState.__init__c                    s  t t| | |d }|d }| j| |  d7  < |drh|d }| j|}|dd}|sz|| jkrz|jpxd}| jj	
|||  |dd	}|r| jj
||| |j}|j}	|d
kr|js|	r| jj
||  |dkr4|js4|r4|	r4| jj
||||	  | jj
||  |dkrh|jsh|rh|	rh| jj
||d	 |dkr| jj
|d |dkr| jj
|d |d}
|
d k	r| jj
||
 |dkr| jj
|d	 d S )Nhostnamer   r   ztask-uuidname r   r   ztask-receivedztask-started)ztask-succeededztask-failedzworker-onlinezworker-heartbeatactivezworker-offline)r#   r"   eventr&   
startswithZtasksgetr.   r'   r   labelsincr   ZobservestartedZreceivedetar   r   setdecr   r   )r   r1   Zworker_nameZ
event_typeZtask_idr   Z	task_namer   Ztask_startedZtask_receivedZnum_executing_tasksr*   r   r   r1   I   s@    





zEventsState.event)r   r    r!   r   r1   __classcell__r   r   r*   r   r"   A   s   r"   c                   @   sJ   e Zd ZdZdddZdd	 Zd
d Zdd Zdd Zdd Z	dd Z
dS )Eventsi  NFTr   c           	      K   s   t j|  d| _|pt | _|| _|| _|| _	|| _
d | _d | _| j	rtd| j t| j}|rr|d | _|  |rt| j|| _| jstf || _t| j| j| _d S )NTzLoading state from '%s'...r   )	threadingThreadr   daemonr   instanceio_loopcappdb
persistentenable_eventsstatestate_save_timerloggerdebugshelveopencloser   
save_stater"   on_enable_eventsevents_enable_intervaltimer)	r   rA   rB   rC   rD   r@   Zstate_save_intervalr)   rE   r   r   r   r   }   s0    
zEvents.__init__c                 C   sD   t j|  | jr&td | j  | jr@td | j  d S )NzStarting enable events timer...zStarting state save timer...)r<   r=   startrD   rG   rH   rO   rF   r   r   r   r   rP      s    


zEvents.startc                 C   sF   | j rtd | j  | jr4td | j  | jrB|   d S )NzStopping enable events timer...zStopping state save timer...)rD   rG   rH   rO   stoprF   rC   rL   r   r   r   r   rQ      s    



zEvents.stopc                 C   s   d}zV|d9 }| j  :}t|d| ji| j d}d}td |jd d dd W 5 Q R X W q ttfk
r   zdd l	}W n t
k
r   dd l}Y nX |  Y q tk
r } z*td	|| tj|dd
 t| W 5 d }~X Y qX qd S )Nr      *)handlersZappzCapturing events...T)limittimeoutwakeupr   z;Failed to capture events: '%s', trying again in %s seconds.)exc_info)rA   
connectionr   on_eventrG   rH   captureKeyboardInterrupt
SystemExit_threadImportErrorthreadinterrupt_main	Exceptionerrortimesleep)r   Ztry_intervalconnrecvr`   er   r   r   run   s0    
 z
Events.runc                 C   s4   t d| j tj| jdd}| j|d< |  d S )NzSaving state to '%s'...n)flagr   )rG   rH   rB   rI   rJ   rE   rK   )r   rE   r   r   r   rL      s    
zEvents.save_statec                 C   s   | j d | jjj d S r   )r@   Zrun_in_executorrA   controlrD   r   r   r   r   rM      s    zEvents.on_enable_eventsc                 C   s   | j t| jj| d S r   )r@   Zadd_callbackr   rE   r1   )r   r1   r   r   r   rZ      s    zEvents.on_event)NFTNr   )r   r    r!   rN   r   rP   rQ   ri   rL   rM   rZ   r   r   r   r   r;   z   s         
 
r;   )"osrd   rI   loggingr<   r$   	functoolsr   Ztornado.ioloopr   r   Zcelery.eventsr   Zcelery.events.stater   Ztornado.optionsr   r/   r	   r
   Zprometheus_clientr   r   r   	getLoggerr   rG   r   r   objectr   r"   r=   r;   r   r   r   r   <module>   s(   
9