U
    Z+d~b                     @   s  d Z ddlZddlZddlmZ ddlmZmZ ddlmZ ddlm	Z	 ddl
mZmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZmZmZ ddlmZmZmZmZmZmZmZm Z  ddl!mZ" ddl#m$Z$m%Z% ddl&m'Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z-m.Z.m/Z/ ddl0m1Z1 dZ2e3edZ4e'e5Z6e6j7e6j8e6j9e6j:f\Z7Z8Z;Z:da<da=dd Z>e>  e/j?Z?ej@jAZBejCjAZDe1jEZEe1jFZFe1jGZHG dd dZIeeHeFdefddZJdS )zfTask request.

This module defines the :class:`Request` class, that specifies
how tasks are executed.
    N)datetime)	monotonictime)ref)TERM_SIGNAME)	safe_reprsafe_str)cached_property)current_appsignals)Context)fast_trace_task
trace_tasktrace_task_ret)IgnoreInvalidTaskErrorRejectRetryTaskRevokedError
TerminatedTimeLimitExceededWorkerLostError)r   )maybenoop)
get_logger)gethostname)get_pickled_exception)maybe_iso8601maybe_make_awaretimezone   )state)Requestpypy_version_infoFc                   C   s   t tjat tjad S N)loggerisEnabledForloggingDEBUG_does_debugINFO
_does_info r,   r,   9/tmp/pip-unpacked-wheel-ucduq0nd/celery/worker/request.py__optimize__-   s    r.   c                   @   s  e Zd ZdZdZdZdZdZdZdZ	dZ
dZdZes8dZeddddddeddddeefddZed	d
 Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd  Zed!d" Z ed#d$ Z!ed%d& Z"ed'd( Z#ed)d* Z$e$j%d+d* Z$ed,d- Z&ed.d/ Z'ed0d1 Z(e(j%d2d1 Z(ed3d4 Z)ed5d6 Z*ed7d8 Z+ed9d: Z,e,j%d;d: Z,ed<d= Z-ed>d? Z.ed@dA Z/e/j%dBdA Z/edCdD Z0e0j%dEdD Z0edFdG Z1edHdI Z2edJdK Z3dLdM Z4ddNdOZ5dPdQ Z6ddRdSZ7ddTdUZ8dVdW Z9dXdY Z:dZd[ Z;d\d] Z<d^d_ Z=d`da Z>dbdc Z?ddde Z@ddfdgZAdhdi ZBddjdkZCddldmZDdndo ZEdpdq ZFdrds ZGeHdtdu ZIeHdvdw ZJeHdxdy ZKeHdzd{ ZLeHd|d} ZMeHd~d ZNdS )r"   zA request for task execution.FN)NN)_app_typenameid_root_id
_parent_id_on_ack_body	_hostname_eventer_connection_errors_task_eta_expires_request_dict
_on_reject_utc_content_type_content_encoding	_argsrepr_kwargsrepr_args_kwargs_decodedZ	__payload__weakref____dict__Tc              
   K   s  || _ |d kr|j n| | _|
d kr0|jn|
| _|| _|| _|| _|rZd  | _	| _
n|j|j | _	| _
| jrx| jn|j| _| jd | _| jd  | _| _d| jkr| jd p| j| _| jd| _| jd| _| jdd }|r|| _| jdd| _| jd	d| _|| _|	| _|p$t | _|| _|p6d
| _|pL| jj| j | _| jdd| _ | jd}|d k	rz||}W n> t!t"t#fk
r } zt$d|d| W 5 d }~X Y nX ||| j%| _&nd | _&| jd}|d k	rJz||}W n> t!t"t#fk
r8 } zt$d|d| W 5 d }~X Y nX ||| j%| _'nd | _'|j(pZi }|j)pfi }|d|d|d|dd| _*| j+||d|d| j| j*d | j\| jd< | jd< }| jd | _,| jd | _-d S )Nr2   taskZshadowroot_id	parent_id	timelimitargsrepr 
kwargsreprr,   ignore_resultFetazinvalid ETA value z: expireszinvalid expires value exchangerouting_keypriorityredelivered)rS   rT   rU   rV   reply_tocorrelation_id)
propertiesrW   rX   hostnamedelivery_infoargskwargs)._messageheaderscopyr=   bodyr6   r/   r?   rF   r@   rA   content_typecontent_encodingpayload_Request__payloadr2   r0   r1   getr3   r4   time_limitsrB   rC   r5   r>   r   r7   r8   r9   Ztasksr:   _ignore_resultAttributeError
ValueError	TypeErrorr   tzlocalr;   r<   r[   rY   _delivery_infoupdaterD   rE   )selfmessageon_ackrZ   eventerappconnection_errorsrequest_dictrI   	on_rejectra   r_   decodedutcr   r   optsrL   rQ   excrR   r[   rY   _r,   r,   r-   __init__X   s     



zRequest.__init__c                 C   s   | j S r$   )rm   ro   r,   r,   r-   r[      s    zRequest.delivery_infoc                 C   s   | j S r$   )r^   r}   r,   r,   r-   rp      s    zRequest.messagec                 C   s   | j S r$   r=   r}   r,   r,   r-   ru      s    zRequest.request_dictc                 C   s   | j S r$   )r6   r}   r,   r,   r-   ra      s    zRequest.bodyc                 C   s   | j S r$   )r/   r}   r,   r,   r-   rs      s    zRequest.appc                 C   s   | j S r$   )r?   r}   r,   r,   r-   rx      s    zRequest.utcc                 C   s   | j S r$   )r@   r}   r,   r,   r-   rb      s    zRequest.content_typec                 C   s   | j S r$   )rA   r}   r,   r,   r-   rc      s    zRequest.content_encodingc                 C   s   | j S r$   )r0   r}   r,   r,   r-   type   s    zRequest.typec                 C   s   | j S r$   )r3   r}   r,   r,   r-   rJ      s    zRequest.root_idc                 C   s   | j S r$   )r4   r}   r,   r,   r-   rK      s    zRequest.parent_idc                 C   s   | j S r$   )rB   r}   r,   r,   r-   rM      s    zRequest.argsreprc                 C   s   | j S r$   )rD   r}   r,   r,   r-   r\      s    zRequest.argsc                 C   s   | j S r$   )rE   r}   r,   r,   r-   r]      s    zRequest.kwargsc                 C   s   | j S r$   )rC   r}   r,   r,   r-   rO      s    zRequest.kwargsreprc                 C   s   | j S r$   )r5   r}   r,   r,   r-   rq      s    zRequest.on_ackc                 C   s   | j S r$   r>   r}   r,   r,   r-   rv      s    zRequest.on_rejectc                 C   s
   || _ d S r$   r   ro   valuer,   r,   r-   rv      s    c                 C   s   | j S r$   )r7   r}   r,   r,   r-   rZ      s    zRequest.hostnamec                 C   s   | j S r$   )rh   r}   r,   r,   r-   rP      s    zRequest.ignore_resultc                 C   s   | j S r$   r8   r}   r,   r,   r-   rr      s    zRequest.eventerc                 C   s
   || _ d S r$   r   )ro   rr   r,   r,   r-   rr      s    c                 C   s   | j S r$   )r9   r}   r,   r,   r-   rt     s    zRequest.connection_errorsc                 C   s   | j S r$   )r:   r}   r,   r,   r-   rI     s    zRequest.taskc                 C   s   | j S r$   )r;   r}   r,   r,   r-   rQ     s    zRequest.etac                 C   s   | j S r$   r<   r}   r,   r,   r-   rR     s    zRequest.expiresc                 C   s
   || _ d S r$   r   r   r,   r,   r-   rR     s    c                 C   s   | j d kr| jjj| _ | j S r$   )_tzlocalr/   confr   r}   r,   r,   r-   rl     s    
zRequest.tzlocalc                 C   s   | j j p| j jS r$   )rI   rP   Zstore_errors_even_if_ignoredr}   r,   r,   r-   store_errors  s    
zRequest.store_errorsc                 C   s   | j S r$   r2   r}   r,   r,   r-   task_id#  s    zRequest.task_idc                 C   s
   || _ d S r$   r   r   r,   r,   r-   r   (  s    c                 C   s   | j S r$   r1   r}   r,   r,   r-   	task_name,  s    zRequest.task_namec                 C   s
   || _ d S r$   r   r   r,   r,   r-   r   1  s    c                 C   s
   | j d S )NrW   r~   r}   r,   r,   r-   rW   5  s    zRequest.reply_toc                 C   s   | j ddS )Nreplaced_task_nestingr   r=   rf   r}   r,   r,   r-   r   :  s    zRequest.replaced_task_nestingc                 C   s
   | j d S )NrX   r~   r}   r,   r,   r-   rX   >  s    zRequest.correlation_idc           	      K   s   | j }| j}|  rt|| j\}}| jjr2tnt}|j	|| j
|| j| j| j| jf| j| j| j| j|pl|j|pt|j|d	}tt|| _|S )a  Used by the worker to send this task to the pool.

        Arguments:
            pool (~celery.concurrency.base.TaskPool): The execution pool
                used to execute this request.

        Raises:
            celery.exceptions.TaskRevokedError: if the task was revoked.
        r\   Zaccept_callbackZtimeout_callbackcallbackZerror_callbackZsoft_timeouttimeoutrX   )r2   r:   revokedr   rg   r/   use_fast_trace_taskr   r   apply_asyncr0   r=   r6   r@   rA   on_accepted
on_timeout
on_success
on_failuresoft_time_limit
time_limitr   r   _apply_result)	ro   poolr]   r   rI   r   r   traceresultr,   r,   r-   execute_using_poolC  s,    

 zRequest.execute_using_poolc              
   C   s   |   rdS | jjs|   | j\}}}| j}|j||ddf|pDi  t| j| j| j	| j
|| j| jj| jd\}}}}|r| jdd n|   |S )zExecute the task in a :func:`~celery.app.trace.trace_task`.

        Arguments:
            loglevel (int): The loglevel used by the task.
            logfile (str): The logfile used by the task.
        NF)loglevellogfileZis_eager)rZ   loaderrs   requeue)r   rI   	acks_lateacknowledge_payloadr=   rn   r   r2   rD   rE   r7   r/   r   reject)ro   r   r   r{   embedrequestretvalIr,   r,   r-   executed  s,     zRequest.executec                 C   s2   | j r.t| j j}|| j kr.t| j dS dS )z%If expired, mark the task as revoked.TN)r<   r   nowtzinforevoked_tasksaddr2   )ro   r   r,   r,   r-   maybe_expire  s
    
zRequest.maybe_expirec                 C   sf   t |p
t}| jr4|| j| | dd|d n
||f| _| jd k	rb|  }|d k	rb|	| d S )N
terminatedTF)
_signalssignumr   
time_startterminate_job
worker_pid_announce_revoked_terminate_on_ackr   	terminatero   r   signalobjr,   r,   r-   r     s    

zRequest.terminatec                 C   sR   t |p
t}| jr*|| j| |   | jd k	rN|  }|d k	rN|| d S r$   )	r   r   r   r   r   r   _announce_cancelledr   r   r   r,   r,   r-   cancel  s    
zRequest.cancelc                 C   sn   t |  | d d}t|d}| jjj| j|| jd | j|| j| j	| j
d  d| _t| j| jd d d S )Nztask-cancelledzcancelled by Celery)rp   )r   T)r   einfo)
task_ready
send_eventr   rI   backendZmark_as_retryr2   _contexton_retryr\   r]   _already_cancelled
send_retry)ro   reasonrz   r,   r,   r-   r     s    

zRequest._announce_cancelledc                 C   s^   t |  | jd|||d | jjj| j|| j| jd |   d| _	t
| j| j|||d d S )Nztask-revoked)r   r   expiredr   Zstore_resultT)r   r   r   r   )r   r   rI   r   Zmark_as_revokedr2   r   r   r   _already_revokedsend_revoked)ro   r   r   r   r   r,   r,   r-   r     s&        
  zRequest._announce_revokedc                 C   sV   d}| j rdS | jr|  }| jtkrRtd| j| j | |rBdnddd| dS dS )z%If revoked, skip task and mark state.FTzDiscarding revoked task: %s[%s]r   r   N)r   r<   r   r2   r   infor1   r   )ro   r   r,   r,   r-   r     s    

   zRequest.revokedc                 K   s4   | j r0| j jr0| jjr0| j j|fd| ji| d S )Nuuid)r8   enabledrI   Zsend_eventssendr2   )ro   r   fieldsr,   r,   r-   r     s    zRequest.send_eventc                 C   sj   || _ t t |  | _t|  | jjs0|   | d t	rPt
d| j| j| | jdk	rf| j| j  dS )z4Handler called when task is accepted by worker pool.ztask-startedzTask accepted: %s[%s] pid:%rN)r   r   r   r   task_acceptedrI   r   r   r   r)   debugr1   r2   r   r   )ro   pidZtime_acceptedr,   r,   r-   r     s    

zRequest.on_acceptedc                 C   sr   |rt d|| j| j nVt|  td|| j| j t|}| jjj| j|| j	| j
d | jjrn| jjrn|   dS )z%Handler called if the task times out.z)Soft time limit (%ss) exceeded for %s[%s]z)Hard time limit (%ss) exceeded for %s[%s]r   N)warnr1   r2   r   errorr   rI   r   mark_as_failurer   r   r   acks_on_failure_or_timeoutr   )ro   Zsoftr   rz   r,   r,   r-   r     s*          zRequest.on_timeoutc                 K   sb   |\}}}|r2t |jttfr$|j| j|ddS t| dd | jjrN|   | j	d||d dS )z6Handler called if the task was successfully processed.T	return_ok)Z
successfultask-succeededr   runtimeN)

isinstance	exception
SystemExitKeyboardInterruptr   r   rI   r   r   r   ro   Zfailed__retval__runtimer]   failedr   r   r,   r,   r-   r     s    
zRequest.on_successc                 C   s2   | j jr|   | jdt|jjt|jd dS )z-Handler called if the task should be retried.ztask-retriedr   	tracebackN)	rI   r   r   r   r   r   rz   r   r   )ro   exc_infor,   r,   r-   r     s    
zRequest.on_retryc           
   	   C   s  t |  |j}t|t}|r@| js<| js<| ddt|d dS t|trZtd| n>t|t	rr| j
|jdS t|tr|  S t|tr| |S d}t|t}| jjr| jjo|}| jj}	|rd}| j
|d d}n|	r|   n| j
dd |sH|s|sH| jjj| j|| j| jd tjj| j| j|| j| j|j|d |rj| j d	t!t"|j|jd
 |st#d||j$d dS )z/Handler called if the task raised an exception.r   TFNzProcess got: r   r   )Zsenderr   r   r\   r]   r   r   ztask-failedr   zTask handler raised error: %r)r   )%r   r   r   r   r   r   r   strMemoryErrorr   r   r   r   r   r   r   r   rI   r   Zreject_on_worker_lostr   r   r   r2   r   r   r   Ztask_failurer   r\   r]   r   r   r   r   r   r   )
ro   r   Zsend_failed_eventr   rz   Zis_terminatedr   Zis_worker_lostr   Zackr,   r,   r-   r   
  st    

   






   zRequest.on_failurec                 C   s   | j s| t| j d| _ dS )zAcknowledge task.TN)acknowledgedr5   r%   r9   r}   r,   r,   r-   r   W  s    zRequest.acknowledgec                 C   s.   | j s*| t| j| d| _ | jd|d d S )NTztask-rejectedr   )r   r>   r%   r9   r   )ro   r   r,   r,   r-   r   ]  s    zRequest.rejectc                 C   sB   | j | j|s| jn| j|s | jn| j| j| j| j| j	| j
| jd
S )N)
r2   r1   r\   r]   r   rZ   r   r   r[   r   )r2   r1   rD   rB   rE   rC   r0   r7   r   r   r[   r   )ro   safer,   r,   r-   r   c  s    zRequest.infoc                 C   s
   d | S )Nz{0.name}[{0.id}])formatr}   r,   r,   r-   	humaninfoq  s    zRequest.humaninfoc                 C   s@   d |  | jrd| j dnd| jr4d| j dndg S )z``str(self)``. z ETA:[]rN   z
 expires:[)joinr   r;   r<   stripr}   r,   r,   r-   __str__t  s
    zRequest.__str__c                 C   s   d t| j|  | j| jS )z``repr(self)``.z<{}: {} {} {}>)r   r   __name__r   rB   rC   r}   r,   r,   r-   __repr__|  s      zRequest.__repr__c                 C   s   | j S r$   )re   r}   r,   r,   r-   r     s    zRequest._payloadc                 C   s   | j \}}}|dS )Nchordr   rf   ro   r{   r   r,   r,   r-   r     s    zRequest.chordc                 C   s   | j \}}}|dS )Nerrbacksr   r   r,   r,   r-   r     s    zRequest.errbacksc                 C   s   | j dS )Ngroupr   r}   r,   r,   r-   r     s    zRequest.groupc                 C   s*   | j }| j\}}}|jf |pi  t|S )z9Context (:class:`~celery.app.task.Context`) of this task.)r=   r   rn   r   )ro   r   r{   r   r,   r,   r-   r     s    zRequest._contextc                 C   s   | j dS )Ngroup_indexr   r}   r,   r,   r-   r     s    zRequest.group_index)NN)N)N)TF)F)F)Or   
__module____qualname____doc__r   r   r   rg   r   r   r   r   r   IS_PYPY	__slots__r   r   r   r|   propertyr[   rp   ru   ra   rs   rx   rb   rc   r   rJ   rK   rM   r\   r]   rO   rq   rv   setterrZ   rP   rr   rt   rI   rQ   rR   rl   r   r   r   rW   r   rX   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r	   r   r   r   r   r   r   r,   r,   r,   r-   r"   A   s  	       
T



































!
"

	
M






	r"   c
              
      s`   |j |j|j|j |o |jd kr8|	jr4tntG  f	ddd| }
|
S )Nc                       s2   e Zd ZfddZ fddZdS )z#create_request_cls.<locals>.Requestc                    s~   | j }| js|kr$|  r$t|| j\}} | j|| j| j| j| j	f| j
| j| j| j|p`|pf|d	}t|| _|S )Nr   )r   rR   r   r   rg   r   ru   ra   rb   rc   r   r   r   r   r   r   )ro   r   r]   r   r   r   r   )r   default_soft_time_limitdefault_time_limitr   r   r   r,   r-   r     s(    
 z6create_request_cls.<locals>.Request.execute_using_poolc                    s^   |\}}}|r2t |jttfr$|j| j|ddS |   rF|   rZ| jd||d d S )NTr   r   r   )r   r   r   r   r   r   r   r   )r   eventsr   r,   r-   r     s"    
   z.create_request_cls.<locals>.Request.on_successN)r   r   r   r   r   r,   	r   r   r  r  r  r   r   r   r   r,   r-   r"     s   r"   )r   r   r   r   r   r   r   r   )baserI   r   rZ   rr   r   r   r   r   rs   r"   r,   r  r-   create_request_cls  s    
$*r  )Kr   r'   sysr   r   r   weakrefr   Zbilliard.commonr   Zkombu.utils.encodingr   r   Zkombu.utils.objectsr	   Zceleryr
   r   Zcelery.app.taskr   Zcelery.app.tracer   r   r   Zcelery.exceptionsr   r   r   r   r   r   r   r   Zcelery.platformsr   Zcelery.utils.functionalr   r   Zcelery.utils.logr   Zcelery.utils.nodenamesr   Zcelery.utils.serializationr   Zcelery.utils.timer   r   r   rN   r!   __all__hasattrr   r   r%   r   r   warningr   r   r+   r)   r.   Ztz_or_localZtask_revokedr   r   Z
task_retryr   r   r   r   r   r"   r  r,   r,   r,   r-   <module>   s^   (
     s   