U
    Z+d                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ dZeeZdd Zdd Zejejeej e!efddZ"dS )z'Task execution strategy (optimization).    N)to_timestamp)	safe_repr)signals)trace)InvalidTaskError)symbol_by_name)
get_logger)saferepr)timezone   )create_request_cls)task_reserved)defaultc                 C   s(  z$| dd| di  }}|j W n6 tk
r@   tdY n tk
rZ   tdY nX | d| d| d| d	| d
| d| d| d| d| d| dd| dd| d| d| dd}|| jpi  | d| d| ddd}|||f|d| ddfS )zECreate a fresh protocol 2 message from a hybrid protocol 1/2 message.args kwargs!Message does not have args/kwargs(Task keyword arguments must be a mappinglangtaskidroot_id	parent_idgroupmethshadowetaexpiresretriesr   	timelimit)NNargsrepr
kwargsreprorigin)r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   	callbackserrbackschordNr#   r$   r%   chainTutc)getitemsKeyErrorr   AttributeErrorupdateheaders)messagebodyr   r   r.   embedr   r   :/tmp/pip-unpacked-wheel-ucduq0nd/celery/worker/strategy.pyhybrid_to_proto2   s@    



r3   c                 C   s   z$| dd| di  }}|j W n6 tk
r@   tdY n tk
rZ   tdY nX |jt|t|| jd z|d |d< W n tk
r   Y nX | d	| d
| ddd}|||f|d| ddfS )zConvert Task message protocol 1 arguments to protocol 2.

    Returns:
        Tuple: of ``(body, headers, already_decoded_status, utc)``
    r   r   r   r   r   )r    r!   r.   Ztasksetr   r#   r$   r%   Nr&   Tr(   )r)   r*   r+   r   r,   r-   r	   r.   )r/   r0   r   r   r1   r   r   r2   proto1_to_proto2C   s0    

r4   c	                    s   j jttjjo&j}	o0j|	o:j	j
jjj jj	j
jjtj}
t|
jd jjjtf 	
fdd	S )zDefault task execution strategy.

    Note:
        Strategies are here as an optimization, so sadly
        it's not very easy to override.
    )appc                    sd  |d kr0d| j kr0| j| jd f\}}}}n2d| j krPt| | j \}}}}n| |\}}}}| ||	||||d r j jt jt j	d}	t
j|	d|	id  jsȈ jkrԈ  rd S tjj d r8d j j j j j j jd	d
 jo" j  jo2 j d
 d }
d } jrz* jrb| j}n| jj}W nP ttfk
r } z,d j| jdddd  jdd W 5 d }~X Y nX rԈ
j}
|r |
r j   | |
dfddS |r&j   | fdd S |
r8 |
dS   |rX fdd|D    d S )Nr   F)Zon_ackZ	on_rejectr5   hostnameeventerr   connection_errorsr0   r.   decodedr(   )r   namer   r   data)extra)Zsenderrequestztask-receivedr   r   )	uuidr:   r   r   r   r   r   r   r   z2Couldn't convert ETA %r to timestamp: %r. Task: %rT)safe)exc_info)Zrequeuer      )priorityc                    s   g | ]}| qS r   r   ).0callbackreqr   r2   
<listcomp>   s     z9default.<locals>.task_message_handler.<locals>.<listcomp>)!payloadr0   r.   Zuses_utc_timezoner3   r   r:   r   r   r   
_app_traceZLOG_RECEIVEDr   revokedr   Ztask_receivedsendr    r!   r   r   Zrequest_dictr)   r   	isoformatr(   r
   OverflowError
ValueErrorinforejectZqosZincrement_eventually)r/   r0   ZackrP   r#   r   r.   r9   r(   contextZbucketr   excZReqZ
_does_infor5   apply_eta_taskcall_atr8   consumererrorr7   Z
get_buckethandler6   rO   Zlimit_post_etaZ
limit_taskr4   Zrate_limits_enabledZrevoked_tasksZ
send_eventr   task_message_handlerr   Ztask_sends_eventsto_system_tzrE   r2   rY      s       
           
  
 


z%default.<locals>.task_message_handler)r6   r8   loggerisEnabledForloggingINFOZevent_dispatcherZenabledrK   Zsend_eventsZtimerrU   rT   Zdisable_rate_limitsZtask_buckets__getitem__Zon_task_requestZ_limit_taskZ_limit_post_etar   Requestr   pool
controllerstaterJ   r   )r   r5   rV   rO   rW   r   rZ   bytesr4   eventsr`   r   rS   r2   r   d   s(    





<Kr   )#__doc__r]   Zkombu.asynchronous.timerr   Zkombu.utils.encodingr   Zceleryr   Z
celery.appr   rI   Zcelery.exceptionsr   Zcelery.utils.importsr   Zcelery.utils.logr   Zcelery.utils.safereprr	   Zcelery.utils.timer
   r=   r   rc   r   __all____name__r[   r3   r4   rO   rW   Z	to_systemrd   r   r   r   r   r2   <module>   s.   )"   