U
    Z+d	0                     @   s(  d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ dd	lmZmZ dd
lmZ zddlmZ W n ek
r   dZY nX efZdZdZdZdZeeZdd Zdd ZG dd deZG dd dZ G dd de!Z"G dd de"dZ#G dd de#Z$G dd  d e$Z%d!S )"z0A directed acyclic graph of reusable components.    )deque)Event)ignore_errors)bytes_to_strsymbol_by_name   )DependencyGraphGraphFormatter)instantiatequalname)
get_logger)GreenletExit )	BlueprintStepStartStopStepConsumerStep      c                 C   s   d| j  d| S )Nz| z: alias)nsfmtr   r   4/tmp/pip-unpacked-wheel-ucduq0nd/celery/bootsteps.py_pre   s    r   c                 C   s   | j ddd S )N.r   )namersplit)sr   r   r   _label#   s    r!   c                   @   sD   e Zd ZdZdZdZddddZdd	 Zd
d Zdd Z	dd Z
dS )StepFormatterz'Graph formatter for :class:`Blueprint`.u   ⧉u   ∘ZparallelogramZ
slategray4Z
slategray3)shapecolorZ	fillcolorc                 C   s,   |o*d | |t|jpt|ddS )Nz{}{}zutf-8ignore)format_get_prefixr   labelr!   encodeselfstepr   r   r   r(   2   s    zStepFormatter.labelc                 C   s   |j r| jS |jr| jS dS )N )lastblueprint_prefixconditionalconditional_prefixr*   r   r   r   r'   9   s
    zStepFormatter._get_prefixc                 K   s    |j r| jn| j}| |||S N)r.   blueprint_schemeZnode_schemeZ	draw_node)r+   objattrsschemer   r   r   node@   s    zStepFormatter.nodec                 K   s&   |j r|jddd | ||| j|S )NnoneZdarkseagreen3)Z	arrowheadr$   )r.   updateZ	draw_edgeZedge_scheme)r+   abr5   r   r   r   edgeD   s    zStepFormatter.edgeN)__name__
__module____qualname____doc__r/   r1   r3   r(   r'   r7   r<   r   r   r   r   r"   '   s   r"   c                   @   s   e Zd ZdZeZdZdZdZe	 Z
ddededediZd3dd	Zd
d Zdd Zdd Zdd Zd4ddZd5ddZd6ddZd7ddZdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Z e!d1d2 Z"dS )8r   a  Blueprint containing bootsteps that can be applied to objects.

    Arguments:
        steps Sequence[Union[str, Step]]: List of steps.
        name (str): Set explicit name for this blueprint.
        on_start (Callable): Optional callback applied after blueprint start.
        on_close (Callable): Optional callback applied before blueprint close.
        on_stopped (Callable): Optional callback applied after
            blueprint stopped.
    Nr   ZinitializingZrunningclosingterminatingc                 C   sT   |p| j ptt| | _ t|p g t| jB | _|| _|| _|| _t	 | _
i | _d S r2   )r   r   typesetdefault_stepstypeson_starton_close
on_stoppedr   shutdown_completesteps)r+   rK   r   rG   rH   rI   r   r   r   __init__c   s    zBlueprint.__init__c                 C   sb   t | _| jr|   tdd |jD D ]4\}}| d|j |d | _|| t	
d q(d S )Nc                 s   s   | ]}|d k	r|V  qd S r2   r   .0r    r   r   r   	<genexpr>q   s      z"Blueprint.start.<locals>.<genexpr>zStarting %sr   z^-- substep ok)RUNstaterG   	enumeraterK   _debugr   startedstartloggerdebug)r+   parentir,   r   r   r   rU   m   s    

zBlueprint.startc                 C   s   | j | jpd S Nr   )state_to_namerQ   r+   r   r   r   human_statew   s    zBlueprint.human_statec                 C   s(   i }|j D ]}|||pi  q
|S r2   )rK   r9   info)r+   rX   r^   r,   r   r   r   r^   z   s    
zBlueprint.infoc                 C   s$   | j r|    | j|dddd d S )NcloserA   F)reverse)rH   send_allr+   rX   r   r   r   r_      s    zBlueprint.closestop
restartingFc                 C   s   | j ||||d d S )N)	propagate)ra   )r+   rX   methoddescriptionre   r   r   r   restart   s    zBlueprint.restartTr   c                 C   s   |p| dd}|rt|jn|j}|D ]~}|r(t||d }	|	d k	r(| d| |j z|	|f|  W q( tk
r }
 z|r t	d||j|
 W 5 d }
~
X Y q(X q(d S )N_ z%s %s...zError on %s %s: %r)
replacereversedrK   getattrrS   
capitalizer   	ExceptionrV   	exception)r+   rX   rf   rg   r`   re   argsrK   r,   Zfunexcr   r   r   ra      s*        zBlueprint.send_allc                 C   s   |rdnd}| j ttfkrd S | j tks8| jt|jkrLt| _ | j  d S | 	| t| _ | j
||rjdnd|dd | jr|   t| _ | j  d S )NrB   Zstopping	terminaterc   F)rg   re   )rQ   CLOSE	TERMINATErP   rT   lenrK   rJ   rD   r_   rh   rI   )r+   rX   r_   rs   whatr   r   r   rc      s&    

 
 zBlueprint.stopc                 C   s,   z| j j|d W n tk
r&   Y nX d S )N)timeout)rJ   waitIGNORE_ERRORS)r+   rx   r   r   r   join   s    zBlueprint.joinc                 K   s   |  d g  }| _|   }| _|  d | |D ]$}||f|}|||j< || q6|  dddd | jD  |D ]}|| q~| S )ar  Apply the steps in this blueprint to an object.

        This will apply the ``__init__`` and ``include`` methods
        of each step, with the object as argument::

            step = Step(obj)
            ...
            step.include(obj)

        For :class:`StartStopStep` the services created
        will also be added to the objects ``steps`` attribute.
        zPreparing bootsteps.zBuilding graph...zNew boot order: {%s}z, c                 s   s   | ]}|j V  qd S r2   r   rM   r   r   r   rO      s     z"Blueprint.apply.<locals>.<genexpr>)	rS   orderclaim_stepsrK   _finalize_stepsr   appendr{   include)r+   rX   kwargsr|   rK   Sr,   r   r   r   apply   s    



zBlueprint.applyc                 C   s8   | j j|j j | j t|jd t| jd  d S )Nr   r   )graphadjacentr9   add_edgerC   r|   )r+   otherr   r   r   connect_with   s    zBlueprint.connect_withc                 C   s
   | j | S r2   )rK   )r+   r   r   r   r   __getitem__   s    zBlueprint.__getitem__c                 C   s   t dd | j D d S )Nc                 s   s   | ]}|j r|V  qd S r2   )r.   rN   Cr   r   r   rO      s      z'Blueprint._find_last.<locals>.<genexpr>)nextrK   valuesr\   r   r   r   
_find_last   s    zBlueprint._find_lastc                 C   sx   |  D ]}dd |jD |_qtdd |  D }|rt| D ].}t|}|j| jkrd|||j< ||j qBq6d S )Nc                 S   s   g | ]}t |qS r   r   )rN   depr   r   r   
<listcomp>   s     z(Blueprint._firstpass.<locals>.<listcomp>c                 s   s   | ]}|j V  qd S r2   requiresrN   r,   r   r   r   rO      s     z'Blueprint._firstpass.<locals>.<genexpr>)r   r   r   popleftr   r   rK   r   )r+   rK   r,   streamr7   r   r   r   
_firstpass   s    
zBlueprint._firstpassc              
   C   s   |   }| | dd | D }t|| j|dd }| _|r`|D ]}||krF||| qFz
| W S  tk
r } ztd| W 5 d }~X Y nX d S )Nc                 s   s   | ]}||j fV  qd S r2   r   r   r   r   r   rO      s     z,Blueprint._finalize_steps.<locals>.<genexpr>)root)	formatterzunknown bootstep: %s)	r   r   r   r	   r
   r   r   ZtopsortKeyError)r+   rK   r.   itGr4   rr   r   r   r   r~      s    
 

zBlueprint._finalize_stepsc                    s   t  fdd jD S )Nc                 3   s   | ]}  |V  qd S r2   )	load_stepr   r\   r   r   rO      s     z(Blueprint.claim_steps.<locals>.<genexpr>)dictrF   r\   r   r\   r   r}      s    zBlueprint.claim_stepsc                 C   s   t |}|j|fS r2   )r   r   r*   r   r   r   r      s    zBlueprint.load_stepc                 G   s   t jt| |f| S r2   )rV   rW   r   )r+   msgrq   r   r   r   rS     s    zBlueprint._debugc                 C   s   t | S r2   )r!   r\   r   r   r   r     s    zBlueprint.alias)NNNNN)rc   rd   F)NTTr   )TF)N)#r=   r>   r?   r@   r"   r
   r   rQ   rT   rD   rE   rP   rt   ru   r[   rL   rU   r]   r^   r_   rh   ra   rc   r{   r   r   r   r   r   r~   r}   r   rS   propertyr   r   r   r   r   r   J   s\             


    
       


r   c                       s8   e Zd ZdZdZdZ fddZdd Zdd Z  Z	S )	StepTypezMeta-class for steps.Nc                    sJ   | d}|r| d| n|}|j|| dp2|d t | |||S )Nr>   r   r   )r?   r   )getr9   super__new__)clsr   basesr5   moduleqname	__class__r   r   r     s    
zStepType.__new__c                 C   s   | j S r2   )r   r   r   r   r   __str__  s    zStepType.__str__c                 C   s
   d | S )Nzstep:{0.name}{{{0.requires!r}}})r&   r   r   r   r   __repr__  s    zStepType.__repr__)
r=   r>   r?   r@   r   r   r   r   r   __classcell__r   r   r   r   r   
  s   	r   c                   @   st   e Zd ZdZdZdZdZdZdZdZ	dd Z
dd	 Zd
d Zdd Zdd Zdd Zdd Zedd Zdd ZdS )r   zA Bootstep.

    The :meth:`__init__` method is called when the step
    is bound to a parent object, and can as such be used
    to initialize attributes in the parent object at
    parent instantiation-time.
    NFr   Tc                 K   s   d S r2   r   )r+   rX   r   r   r   r   rL   ?  s    zStep.__init__c                 C   s   | j S )zReturn true if bootstep should be included.

        You can define this as an optional predicate that decides whether
        this step should be created.
        )enabledrb   r   r   r   
include_ifB  s    zStep.include_ifc                 O   s   t |f||S r2   )r   )r+   r   rq   r   r   r   r   r   J  s    zStep.instantiatec                 C   s   |  |rd| |fS dS )NT)FN)r   createrb   r   r   r   _should_includeM  s    
zStep._should_includec                 C   s   |  |d S rZ   )r   rb   r   r   r   r   R  s    zStep.includec                 C   s   dS )zCreate the step.Nr   rb   r   r   r   r   U  s    zStep.createc                 C   s   d| j  dS )Nz<step: >r   r\   r   r   r   r   X  s    zStep.__repr__c                 C   s   | j pt| S r2   )r(   r!   r\   r   r   r   r   [  s    z
Step.aliasc                 C   s   d S r2   r   )r+   r4   r   r   r   r^   _  s    z	Step.info)r=   r>   r?   r@   r   r(   r0   r   r.   r   rL   r   r   r   r   r   r   r   r   r^   r   r   r   r   r      s"   	
r   )	metaclassc                   @   s<   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dS )r   z3Bootstep that must be started and stopped in order.Nc                 C   s   | j r| j  S d S r2   )r4   rU   rb   r   r   r   rU   k  s    zStartStopStep.startc                 C   s   | j r| j  S d S r2   )r4   rc   rb   r   r   r   rc   o  s    zStartStopStep.stopc                 C   s   d S r2   r   rb   r   r   r   r_   s  s    zStartStopStep.closec                 C   s   | j rt| j d| j j S d S )Nrs   )r4   rm   rc   rb   r   r   r   rs   v  s    zStartStopStep.terminatec                 C   s(   |  |\}}|r$|| _|j|  |S r2   )r   r4   rK   r   )r+   rX   incretr   r   r   r   z  s
    zStartStopStep.include)
r=   r>   r?   r@   r4   rU   rc   r_   rs   r   r   r   r   r   r   c  s   r   c                   @   sB   e Zd ZdZdZdZdd Zdd Zdd	 Zd
d Z	dddZ
dS )r   z(Bootstep that starts a message consumer.)z!celery.worker.consumer:ConnectionNc                 C   s   t dd S )Nzmissing get_consumers)NotImplementedError)r+   channelr   r   r   get_consumers  s    zConsumerStep.get_consumersc                 C   s2   |j  }| || _| jpg D ]}|  q d S r2   )
connectionr   r   	consumersconsume)r+   cr   consumerr   r   r   rU     s    
zConsumerStep.startc                 C   s   |  |d d S )NT_closer+   r   r   r   r   rc     s    zConsumerStep.stopc                 C   s   |  |d d S )NFr   r   r   r   r   shutdown  s    zConsumerStep.shutdownTc                 C   sV   t  }| jpg D ](}|r&t|j|j |jr||j q|D ]}t|j|j q>d S r2   )rD   r   r   r   cancelr   addr_   )r+   r   Zcancel_consumersZchannelsr   r   r   r   r   r     s    zConsumerStep._close)T)r=   r>   r?   r@   r   r   r   rU   rc   r   r   r   r   r   r   r     s   r   N)&r@   collectionsr   	threadingr   Zkombu.commonr   Zkombu.utils.encodingr   Zkombu.utils.importsr   Zutils.graphr	   r
   Zutils.importsr   r   Z	utils.logr   Zgreenletr   ImportErrorrz   __all__rP   rt   ru   r=   rV   r   r!   r"   r   rC   r   r   r   r   r   r   r   r   <module>   s6   
# AC