U
    Z+dd                  	   @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddlm Z  dZ!e"edZ#dZ$dZ%dZ&e e'Z(e(j)Z*dZ+dZ,dZ-ej.ej/ej0ej1ej2ej3ej4ej5dZ6G dd deZ7e8e7 eddd dd d! Z9d"e$e
e:e;fd#d$Z<d%d& Z=d'd( Z>e>d)G d*d+ d+Z?e>d,G d-d. d.Z@G d/d0 d0ZAd1d2 ZBd3d4 ZCdS )5a  In-memory representation of cluster state.

This module implements a data-structure used to keep
track of the state of a cluster of workers and the tasks
it is working on (by consuming events).

For every event consumed the state is updated,
so the state represents the state of the cluster
at the time of the last event.

Snapshots (:mod:`celery.events.snapshot`) can be used to
take "pictures" of this state at regular intervals
to for example, store that in a database.
    N)defaultdict)Callable)datetime)Decimal)islice)
itemgetter)time)Mapping)WeakSetref	timetuple)cached_property)states)LRUCachememoizepass1)
get_logger)WorkerTaskStateheartbeat_expirespypy_version_info      zmSubstantial drift from %s may mean clocks are out of sync.  Current drift is %s seconds.  [orig: %s recv: %s]z4<State: events={0.event_count} tasks={0.task_count}>z9<Worker: {0.hostname} ({0.status_string} clock:{0.clock})z4<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>)sentreceivedstartedfailedretried	succeededrevokedrejectedc                       s(   e Zd ZdZ fddZdd Z  ZS )CallableDefaultdicta  :class:`~collections.defaultdict` with configurable __call__.

    We use this for backwards compatibility in State.tasks_by_type
    etc, which used to be a method but is now an index instead.

    So you can do::

        >>> add_tasks = state.tasks_by_type['proj.tasks.add']

    while still supporting the method call::

        >>> add_tasks = list(state.tasks_by_type(
        ...     'proj.tasks.add', reverse=True))
    c                    s   || _ t j|| d S N)funsuper__init__)selfr%   argskwargs	__class__ 7/tmp/pip-unpacked-wheel-ucduq0nd/celery/events/state.pyr'   _   s    zCallableDefaultdict.__init__c                 O   s   | j ||S r$   )r%   )r(   r)   r*   r-   r-   r.   __call__c   s    zCallableDefaultdict.__call__)__name__
__module____qualname____doc__r'   r/   __classcell__r-   r-   r+   r.   r#   O   s   r#   i  c                 C   s   | d S Nr   r-   )a_r-   r-   r.   <lambda>j       r8   )maxsizeZkeyfunc                 C   s    t t| |t|t| d S r$   )warnDRIFT_WARNINGr   fromtimestamp)hostnamedriftlocal_received	timestampr-   r-   r.   _warn_driftj   s    rB   <   c                 C   s8   |||r||n|}|| |r(|| } | ||d   S )z#Return time when heartbeat expires.g      Y@r-   )rA   freqexpire_windowr   float
isinstancer-   r-   r.   r   r   s    
r   c                 C   s
   | f |S r$   r-   )clsfieldsr-   r-   r.   _depickle_task~   s    rJ   c                    s    fdd}|S )Nc                    s6    fdd}|| _ dd }|| _ fdd}|| _| S )Nc                    s$   t || jr t|  t| kS tS r$   )rG   r,   getattrNotImplemented)thisotherattrr-   r.   __eq__   s    z8with_unique_field.<locals>._decorate_cls.<locals>.__eq__c                 S   s   |  |}|tkrdS | S )NT)rQ   rL   )rM   rN   resr-   r-   r.   __ne__   s    
z8with_unique_field.<locals>._decorate_cls.<locals>.__ne__c                    s   t t|  S r$   )hashrK   )rM   rO   r-   r.   __hash__   s    z:with_unique_field.<locals>._decorate_cls.<locals>.__hash__)rQ   rS   rU   )rH   rQ   rS   rU   rO   r-   r.   _decorate_cls   s    z(with_unique_field.<locals>._decorate_clsr-   )rP   rV   r-   rO   r.   with_unique_field   s    rW   r>   c                   @   s   e Zd ZdZdZeZdZes$ed Z	ddd	Z
d
d Zdd Zdd Zdd Zedd Zedd ZeefddZedd ZdS )r   zWorker State.   )r>   pidrD   
heartbeatsclockactive	processedloadavgsw_identsw_versw_sys)event__dict____weakref__NrC   r   c                 C   s`   || _ || _|| _|d krg n|| _|p*d| _|| _|| _|| _|	| _|
| _	|| _
|  | _d S r5   )r>   rY   rD   rZ   r[   r\   r]   r^   r_   r`   ra   _create_event_handlerrb   )r(   r>   rY   rD   rZ   r[   r\   r]   r^   r_   r`   ra   r-   r-   r.   r'      s    
zWorker.__init__c                 C   s6   | j | j| j| j| j| j| j| j| j| j	| j
| jffS r$   )r,   r>   rY   rD   rZ   r[   r\   r]   r^   r_   r`   ra   r(   r-   r-   r.   
__reduce__   s         zWorker.__reduce__c                    sP   t j jjjjjjd d d tttt	j
tf fdd	}|S )Nc	                    s   |pi }|  D ]\}	}
 |	|
 q| dkr<g d d < n||rD|sHd S ||||| }||krttj||| |r|}|d krd |r|d kr| n
|| d S )Noffline   r   )itemsrB   r>   )type_rA   r@   rI   Z	max_driftabsintinsortlenkvr?   heartsZ_setZ	hb_appendZhb_popZhbmaxrZ   r(   r-   r.   rb      s(     
z+Worker._create_event_handler.<locals>.event)object__setattr__heartbeat_maxrZ   popappendHEARTBEAT_DRIFT_MAXrm   rn   bisectro   rp   r(   rb   r-   rt   r.   re      s        zWorker._create_event_handlerc                 K   s6   |rt |f|n|}| D ]\}}t| || qd S r$   )dictrk   setattr)r(   fkwdrq   rr   r-   r-   r.   update   s    zWorker.updatec                 C   s
   t | S r$   )R_WORKERformatrf   r-   r-   r.   __repr__   s    zWorker.__repr__c                 C   s   | j r
dS dS )NZONLINEZOFFLINEaliverf   r-   r-   r.   status_string   s    zWorker.status_stringc                 C   s   t | jd | j| jS )Nrj   )r   rZ   rD   rE   rf   r-   r-   r.   r      s    
 zWorker.heartbeat_expiresc                 C   s   t | jo| | jk S r$   )boolrZ   r   )r(   Znowfunr-   r-   r.   r      s    zWorker.alivec                 C   s
   d | S )Nz{0.hostname}.{0.pid})r   rf   r-   r-   r.   id   s    z	Worker.id)NNrC   Nr   NNNNNN)r0   r1   r2   r3   rw   HEARTBEAT_EXPIRE_WINDOWrE   _fieldsPYPY	__slots__r'   rg   re   r   r   propertyr   r   r   r   r   r-   r-   r-   r.   r      s8                  
!

r   uuidc                   @   s6  e Zd ZdZd Z Z Z Z Z Z	 Z
 Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z ZZejZdZ dZ!e"sdZ#ej$diZ%dZ&d$dd	Z'dddej(e)e*j+ej,fd
dZ-d%ddZ.dd Z/dd Z0dd Z1dd Z2dd Z3dd Z4e5dd Z6e5dd Z7e5dd Z8e9d d! Z:e9d"d# Z;dS )&r   zTask State.Nr   )r   namestater   r   r   r"   r    r   r   r!   r)   r*   etaexpiresretriesworkerresult	exceptionrA   runtime	tracebackexchangerouting_keyr[   clientrootroot_idparent	parent_idchildren)rc   rd   )r   r)   r*   r   r   r   r   r   )r)   r*   r   r   r   r   r   r   r   r   r   r   c                    sd   | _ | _ jd k	r4t fdd|p(dD  _nt  _ j j jd _|r` j	| d S )Nc                 3   s(   | ] }| j jkr j j|V  qd S r$   )cluster_statetasksget).0Ztask_idrf   r-   r.   	<genexpr>'  s   z Task.__init__.<locals>.<genexpr>r-   )r   r   r   )
r   r   r
   r   _serializable_children_serializable_root_serializable_parent_serializer_handlersrc   r   )r(   r   r   r   r*   r-   rf   r.   r'   #  s    
zTask.__init__c	           
         s   |pi }||}	|	d k	r&|| || n|  }	|	|kr~| j|kr~||	|| jkr~| j|	  d k	r fdd| D }n|j|	|d | j| d S )Nc                    s   i | ]\}}| kr||qS r-   r-   )r   rq   rr   Zkeepr-   r.   
<dictcomp>J  s      zTask.event.<locals>.<dictcomp>)r   rA   )upperr   merge_rulesr   rk   r   rc   )
r(   rl   rA   r@   rI   
precedencer~   Ztask_event_to_stateRETRYr   r-   r   r.   rb   6  s    
z
Task.eventc                    s8    sg n  dkrj n fdd}t| S )z;Information about this task suitable for on-screen display.Nc                  3   s8   t t   D ]"} t| d }|d k	r| |fV  qd S r$   )listrK   )keyvalueextrarI   r(   r-   r.   _keysX  s    zTask.info.<locals>._keys)_info_fieldsr}   )r(   rI   r   r   r-   r   r.   infoS  s    z	Task.infoc                 C   s
   t | S r$   )R_TASKr   rf   r-   r-   r.   r   `  s    zTask.__repr__c                    s&   t j jj fddjD S )Nc                    s"   i | ]}||t  |qS r-   )r   )r   rq   r   handlerr(   r-   r.   r   f  s     z Task.as_dict.<locals>.<dictcomp>)ru   __getattribute__r   r   r   rf   r-   r   r.   as_dictc  s
    zTask.as_dictc                 C   s   dd | j D S )Nc                 S   s   g | ]
}|j qS r-   r   )r   taskr-   r-   r.   
<listcomp>k  s     z/Task._serializable_children.<locals>.<listcomp>)r   r(   r   r-   r-   r.   r   j  s    zTask._serializable_childrenc                 C   s   | j S r$   )r   r   r-   r-   r.   r   m  s    zTask._serializable_rootc                 C   s   | j S r$   )r   r   r-   r-   r.   r   p  s    zTask._serializable_parentc                 C   s   t | j|  ffS r$   )rJ   r,   r   rf   r-   r-   r.   rg   s  s    zTask.__reduce__c                 C   s   | j S r$   )r   rf   r-   r-   r.   r   v  s    zTask.idc                 C   s   | j d kr| jS | j jS r$   )r   r   r   rf   r-   r-   r.   originz  s    zTask.originc                 C   s   | j tjkS r$   r   r   ZREADY_STATESrf   r-   r-   r.   ready~  s    z
Task.readyc                 C   s4   z| j o| jjj| j  W S  tk
r.   Y d S X d S r$   )r   r   r   dataKeyErrorrf   r-   r-   r.   r     s    zTask.parentc                 C   s4   z| j o| jjj| j  W S  tk
r.   Y d S X d S r$   )r   r   r   r   r   rf   r-   r-   r.   r     s    z	Task.root)NNN)NN)<r0   r1   r2   r3   r   r   r   r   r    r   r   r!   r"   r)   r*   r   r   r   r   r   r   rA   r   r   r   r   r   r   r   r   PENDINGr   r[   r   r   r   RECEIVEDr   r   r'   r   r~   TASK_EVENT_TO_STATEr   r   rb   r   r   r   r   r   r   rg   r   r   r   r   r   r   r   r-   r-   r-   r.   r      s     
  





r   c                
   @   s   e Zd ZdZeZeZdZdZdZd6ddZ	e
d	d
 Zdd Zd7ddZd8ddZd9ddZd:ddZdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zefd$d%Zd;d&d'Zd<d(d)ZeZd=d*d+Zd>d,d-Zd.d/ Zd0d1 Z d2d3 Z!d4d5 Z"dS )?r   zRecords clusters state.r   rX   N  '  c                 C   s   || _ |d krt|n|| _|d kr,t|n|| _|d kr>g n|| _|| _|| _|| _|| _t	
 | _i | _t | _i | _|   t| jt| _| jt|	| j t| jt| _| jt|
| j d S r$   )event_callbackr   workersr   	_taskheapmax_workers_in_memorymax_tasks_in_memoryon_node_joinon_node_leave	threadingLock_mutexhandlersset_seen_types_tasks_to_resolverebuild_taskheapr#   _tasks_by_typer
   tasks_by_typer   !_deserialize_Task_WeakSet_Mapping_tasks_by_workertasks_by_worker)r(   callbackr   r   taskheapr   r   r   r   r   r   r-   r-   r.   r'     sB    


 
 
zState.__init__c                 C   s   |   S r$   )_create_dispatcherrf   r-   r-   r.   _event  s    zState._eventc              
   O   sL   | dd}| j0 z|||W W  5 Q R  S |r<|   X W 5 Q R X d S )Nclear_afterF)rx   r   _clear)r(   r%   r)   r*   r   r-   r-   r.   freeze_while  s    zState.freeze_whileTc              
   C   s(   | j  | |W  5 Q R  S Q R X d S r$   )r   _clear_tasksr(   r   r-   r-   r.   clear_tasks  s    zState.clear_tasksc                 C   sJ   |r.dd |   D }| j  | j| n
| j  g | jd d < d S )Nc                 S   s"   i | ]\}}|j tjkr||qS r-   r   r   r   r   r-   r-   r.   r     s    z&State._clear_tasks.<locals>.<dictcomp>)	itertasksr   clearr   r   )r(   r   Zin_progressr-   r-   r.   r     s    

zState._clear_tasksc                 C   s$   | j   | | d| _d| _d S r5   )r   r   r   event_count
task_countr   r-   r-   r.   r     s    

zState._clearc              
   C   s(   | j  | |W  5 Q R  S Q R X d S r$   )r   r   r   r-   r-   r.   r     s    zState.clearc                 K   s\   z"| j | }|r|| |dfW S  tk
rV   | j|f| }| j |< |df Y S X dS )zsGet or create worker by hostname.

        Returns:
            Tuple: of ``(worker, was_created)`` pairs.
        FTN)r   r   r   r   )r(   r>   r*   r   r-   r-   r.   get_or_create_worker  s    


zState.get_or_create_workerc                 C   sJ   z| j | dfW S  tk
rD   | j|| d }| j |< |df Y S X dS )zGet or create task by uuid.Fr   TN)r   r   r   )r(   r   r   r-   r-   r.   get_or_create_task  s
    zState.get_or_create_taskc              
   C   s(   | j  | |W  5 Q R  S Q R X d S r$   )r   r   r|   r-   r-   r.   rb     s    zState.eventc                 C   s    |  t|dd|gdd S )Deprecated, use :meth:`event`.-r   typer   r   r}   joinr(   rl   rI   r-   r-   r.   
task_event  s    zState.task_eventc                 C   s    |  t|dd|gdd S )r   r   r   r   r   r   r   r-   r-   r.   worker_event  s    zState.worker_eventc                    s   j jjtdddtdddddjjjjj 	j	j
jj 
jj  jj jjjj jjjjtttjdf 	
fdd	}|S )	Nr>   rA   r@   r   r[   Tc                    sd   j d7  _ r|  | d d\}}}z|}W n |k
rP   Y nX ||| |fS |dkrDz| \}	}
}W n |k
r   Y nX |dk}z|	d }}W n8 |k
r   |rЈ|	d }}n|	 }|	< Y nX |||
||  
r|s|dkr
| r4|r4| |	d  ||f|fS n|dkr`| \}}	}
}}|d	k}z|d }}W n. |k
r    |d
 }|< d}Y nX |r|	|_nXz|	}W n& |k
r   |	 }|	< Y nX ||_|d k	r|r|d ||
 |r|	n|j}t}|d 	kr>d |||
|t|}|rn|d krn| n
|| |dkr j	d7  _	|||
||  |j
}|d k	rڈ| |rڈ|| |	| |jr zj|j }W n  |k
r   | Y nX |j| zj|}W n |k
rF   Y nX |j| ||f|fS d S )Nri   r   r   r   rh   FZonliner   r   r   Tr   rj   r   )r   	partitionrb   rx   r   r   r   rp   r   r   r   addr   r   _add_pending_task_childr   r   r   )rb   r   r   ro   createdgroupr7   subjectr   r>   rA   r@   Z
is_offliner   r   r[   Zis_client_eventr   Ztask_createdr   ZheapsZtimetupZ	task_nameZparent_task	_childrenr   r   add_typer   Zget_handlerZget_taskZget_task_by_type_setZget_task_by_worker_setZ
get_workerZmax_events_in_heapr   r   r(   r   r   ZtfieldsZ	th_appendZth_popZwfieldsr   r-   r.   r   !  s    


 




z(State._create_dispatcher.<locals>._event)r   __getitem__r   r   r   ry   rx   r   heap_multiplierr   r   r   r   r   r   r   r   r   r   r   r   r   r{   ro   )r(   r   r-   r  r.   r     s0       4^zState._create_dispatcherc                 C   sF   z| j |j }W n& tk
r6   t  }| j |j< Y nX || d S r$   )r   r   r   r
   r   )r(   r   chr-   r-   r.   r     s
    zState._add_pending_task_childc                    s2    fdd| j  D  }| jd d < |  d S )Nc                    s$   g | ]} |j |j|jt|qS r-   )r[   rA   r   r   r   tr   r-   r.   r     s   z*State.rebuild_taskheap.<locals>.<listcomp>)r   valuesr   sort)r(   r   heapr-   r   r.   r     s    
zState.rebuild_taskheapc                 c   s6   t | j D ]"\}}|V  |r|d |kr q2qd S )Nri   )	enumerater   rk   )r(   limitindexrowr-   r-   r.   r     s    zState.itertasksc                 c   sb   | j }|rt|}t }t|d|D ]8}|d  }|dk	r$|j}||kr$||fV  || q$dS )zkGenerator yielding tasks ordered by time.

        Yields:
            Tuples of ``(uuid, Task)``.
        r      N)r   reversedr   r   r   r   )r(   r  reverseZ_heapseenZevtupr   r   r-   r-   r.   tasks_by_time  s    

zState.tasks_by_timec                    s"   t  fdd| j|dD d|S )zGet all tasks by type.

        This is slower than accessing :attr:`tasks_by_type`,
        but will be ordered by time.

        Returns:
            Generator: giving ``(uuid, Task)`` pairs.
        c                 3   s$   | ]\}}|j  kr||fV  qd S r$   r   r   r  r-   r.   r     s    
z'State._tasks_by_type.<locals>.<genexpr>r  r   r   r  )r(   r   r  r  r-   r  r.   r     s
    	 zState._tasks_by_typec                    s"   t  fdd| j|dD d|S )znGet all tasks by worker.

        Slower than accessing :attr:`tasks_by_worker`, but ordered by time.
        c                 3   s&   | ]\}}|j j kr||fV  qd S r$   )r   r>   r   r>   r-   r.   r     s    z)State._tasks_by_worker.<locals>.<genexpr>r  r   r  )r(   r>   r  r  r-   r  r.   r     s
     zState._tasks_by_workerc                 C   s
   t | jS )z%Return a list of all seen task types.)sortedr   rf   r-   r-   r.   
task_types  s    zState.task_typesc                 C   s   dd | j  D S )z+Return a list of (seemingly) alive workers.c                 s   s   | ]}|j r|V  qd S r$   r   )r   wr-   r-   r.   r     s      z&State.alive_workers.<locals>.<genexpr>)r   r
  rf   r-   r-   r.   alive_workers  s    zState.alive_workersc                 C   s
   t | S r$   )R_STATEr   rf   r-   r-   r.   r     s    zState.__repr__c                 C   s8   | j | j| j| jd | j| j| j| jt| j	t| j
f
fS r$   )r,   r   r   r   r   r   r   r   _serialize_Task_WeakSet_Mappingr   r   rf   r-   r-   r.   rg     s         zState.__reduce__)
NNNNr   r   NNNN)T)T)T)T)N)NT)NT)NT)#r0   r1   r2   r3   r   r   r   r   r  r'   r   r   r   r   r   r   r   r   r   rb   r   r   r   r   r   r   r   r  Ztasks_by_timestampr   r   r  r  r   rg   r-   r-   r-   r.   r     sR                  

	



{



r   c                 C   s   dd |   D S )Nc                 S   s    i | ]\}}|d d |D qS )c                 S   s   g | ]
}|j qS r-   r   r  r-   r-   r.   r     s     z>_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<listcomp>r-   )r   r   r   r-   r-   r.   r     s      z3_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>rk   )mappingr-   r-   r.   r    s    r  c                    s   | pi }  fdd|   D S )Nc                    s(   i | ] \}}|t  fd d|D qS )c                 3   s   | ]}| kr | V  qd S r$   r-   )r   ir   r-   r.   r     s      z?_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<genexpr>)r
   )r   r   idsr#  r-   r.   r     s    z5_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>r   )r!  r   r-   r#  r.   r     s    
r   )Dr3   r{   sysr   collectionsr   collections.abcr   r   decimalr   	itertoolsr   operatorr   r   typingr	   weakrefr
   r   Zkombu.clocksr   Zkombu.utils.objectsr   Zceleryr   Zcelery.utils.functionalr   r   r   Zcelery.utils.logr   __all__hasattrr   r   rz   r<   r0   loggerwarningr;   r  r   r   r   r   ZSTARTEDFAILUREr   SUCCESSZREVOKEDZREJECTEDr   r#   registerrB   rF   rG   r   rJ   rW   r   r   r   r  r   r-   r-   r-   r.   <module>   sv   


  
]   G