U
    Z+dS7                     @   sd  d Z ddlZddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZG dd deZG dd dZd4ddZd5ddZdd ZeddfddZdd Zd6ddZdd Zdd  Z d!d" Z!d#d$ Z"G d%d& d&Z#d7d)d*Z$d+d, Z%d-d. Z&d/d0 Z'd1d2 Z(eeed3Z)ee%ed3Z*ee&ed3Z+ee'ed3Z,dS )8z,Message migration tools (Broker <-> Broker).    N)partial)cycleislice)Queue	eventloop)maybe_declare)ensure_bytes)app_or_default)worker_direct)str_to_list)StopFilteringState	republishmigrate_taskmigrate_tasksmove
task_id_eq
task_id_instart_filtermove_task_by_idmove_by_idmapmove_by_taskmapmove_directmove_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c                   @   s   e Zd ZdZdS )r   z*Semi-predicate used to signal filter stop.N)__name__
__module____qualname____doc__ r   r   :/tmp/pip-unpacked-wheel-ucduq0nd/celery/contrib/migrate.pyr      s   r   c                   @   s0   e Zd ZdZdZdZdZedd Zdd Z	dS )r   zMigration progress state.r   c                 C   s   | j s
dS t| j S )N?)	total_apxstrselfr   r   r   strtotal&   s    zState.strtotalc                 C   s$   | j rd| j  S | j d| j S )N^/)filteredcountr%   r#   r   r   r   __repr__,   s    zState.__repr__N)
r   r   r   r   r)   r(   r!   propertyr%   r*   r   r   r   r   r      s   
r   c              	   C   s   |sddddg}t |j}|j|j|j  }}}|dkr@|d n|}|dkrT|d n|}|j|j }	}
|dd}|D ]}||d qv| jt |f|||||	|
d	| dS )
zRepublish message.Zapplication_headerscontent_typecontent_encodingheadersNexchangerouting_keycompression)r/   r0   r1   r.   r,   r-   )	r   bodydelivery_infor.   
propertiesr,   r-   poppublish)producermessager/   r0   Zremove_propsr2   infor.   propsctypeencr1   keyr   r   r   r   2   s2     
 
  r   c                 C   s>   |j }|dkri n|}t| |||d ||d d dS )zMigrate single task message.Nr/   r0   r/   r0   )r3   r   get)r7   Zbody_r8   queuesr9   r   r   r   r   K   s    r   c                    s    fdd}|S )Nc                    s   r| d krd S  | |S NZtaskr   r2   r8   callbacktasksr   r   r(   V   s    z!filter_callback.<locals>.filteredr   )rD   rE   r(   r   rC   r   filter_callbackT   s    rF   c                    sV   t |}t|jj|dd t| d} fdd}t|| |f|d|S )z)Migrate tasks from one broker to another.F)Zauto_declarer@   c                    sh   |  j }| j| j|_|j| jkr:| j|j|_|jj| jkr\| j| j|j_|  d S N)channelr?   namer0   r/   Zdeclare)queueZ	new_queuer7   r@   r   r   on_declare_queuef   s    
z'migrate_tasks.<locals>.on_declare_queue)r@   rM   )r	   prepare_queuesamqpProducerr   r   )sourcedestZmigrateappr@   kwargsrM   r   rL   r   r   ^   s    
r   c                 C   s   t |tr| jj| S |S rH   )
isinstancer"   rO   r@   )rS   qr   r   r   _maybe_queuet   s    
rW   c	              
      s   t    fdd|pg D p d}
 j|ddT jt 	f	dd}t |fd|
i|	W  5 Q R  S Q R X dS )	aG	  Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    c                    s   g | ]}t  |qS r   )rW   ).0rK   )rS   r   r   
<listcomp>   s     zmove.<locals>.<listcomp>NF)poolc                    s   | |}|rr|}t |trBt|j |jj|j }}nt|\}}t|||d |	   j
d7  _
 r | | rj
krt d S )Nr>      )rU   r   r   Zdefault_channelr/   rJ   r0   expand_destr   ackr(   r   )r2   r8   retexrk)	rD   connr/   limit	predicater7   r0   state	transformr   r   on_task   s$    

 zmove.<locals>.on_taskconsume_from)r	   Zconnection_or_acquirerO   rP   r   r   )rc   
connectionr/   r0   rQ   rS   rD   rb   re   rT   r@   rf   r   )
rS   rD   ra   r/   rb   rc   r7   r0   rd   re   r   r   z   s    >r   c              	   C   s8   z| \}}W n" t tfk
r.   || }}Y nX ||fS rH   )	TypeError
ValueError)r^   r/   r0   r_   r`   r   r   r   r\      s
    r\   c                 C   s   |d | kS )z'Return true if task id equals task_id'.idr   )task_idr2   r8   r   r   r   r      s    r   c                 C   s   |d | kS )z-Return true if task id is member of set ids'.rk   r   )idsr2   r8   r   r   r   r      s    r   c                 C   s@   t | tr| d} t | tr0tdd | D } | d kr<i } | S )N,c                 s   s(   | ] }t tt|d ddV  qdS ):N   )tupler   r   splitrX   rV   r   r   r   	<genexpr>   s   z!prepare_queues.<locals>.<genexpr>)rU   r"   rr   listdictrG   r   r   r   rN      s    


rN   c                   @   sF   e Zd ZdddZdd Zdd	 Zd
d Zdd Zdd Zdd Z	dS )FiltererN      ?Fc                    s   | _ | _| _| _| _| _tt|p0g  _t	| _
|	 _|
 _| _ fdd|pht j
D  _|pxt  _| _d S )Nc                    s   g | ]}t  j|qS r   )rW   rS   rs   r#   r   r   rY     s   z%Filterer.__init__.<locals>.<listcomp>)rS   ra   filterrb   timeoutack_messagessetr   rE   rN   r@   rD   foreverrM   ru   rg   r   rd   accept)r$   rS   ra   ry   rb   rz   r{   rE   r@   rD   r}   rM   rg   rd   r~   rT   r   r#   r   __init__   s     

zFilterer.__init__c              	   C   sh   |  |  N zt| j| j| jdD ]}q&W n( tjk
rD   Y n tk
rV   Y nX W 5 Q R X | jS )N)rz   Zignore_timeouts)	prepare_consumercreate_consumerr   ra   rz   r}   socketr   rd   )r$   _r   r   r   start  s    
zFilterer.startc                 C   s.   | j  jd7  _| jr*| j j| jkr*t d S )Nr[   )rd   r)   rb   r   r$   r2   r8   r   r   r   update_state  s    zFilterer.update_statec                 C   s   |   d S rH   )r]   r   r   r   r   ack_message  s    zFilterer.ack_messagec                 C   s   | j jj| j| j| jdS )N)r@   r~   )rS   rO   ZTaskConsumerra   rg   r~   r#   r   r   r   r   !  s
    zFilterer.create_consumerc                 C   s   | j }| j}| j}| jr<t|| j}t|| j}t|| j}|| || | jrb|| j | jd k	rt| j| j	}| jrt|| j}|| | 
| |S rH   )ry   r   r   rE   rF   Zregister_callbackr{   rD   r   rd   declare_queues)r$   consumerry   r   r   rD   r   r   r   r   (  s$    




zFilterer.prepare_consumerc              	   C   s   |j D ]v}| j r|j| j krq| jd k	r2| | z0||jjdd\}}}|r`| j j|7  _W q | jjk
rz   Y qX qd S )NT)Zpassive)	r@   rJ   rM   rI   Zqueue_declarerd   r!   ra   Zchannel_errors)r$   r   rK   r   Zmcountr   r   r   r   <  s    


zFilterer.declare_queues)Nrx   FNNNFNNNN)
r   r   r   r   r   r   r   r   r   r   r   r   r   r   rw      s$                     
rw   rx   Fc                 K   s0   t | ||f|||||||	|
|||d| S )zFilter tasks.)rb   rz   r{   rE   r@   rD   r}   rM   rg   rd   r~   )rw   r   )rS   ra   ry   rb   rz   r{   rE   r@   rD   r}   rM   rg   rd   r~   rT   r   r   r   r   L  s&      r   c                 K   s   t | |if|S )a  Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r   )rl   rR   rT   r   r   r   r   a  s    r   c                    s$    fdd}t |fdt i|S )a  Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    c                    s     |jd S )NZcorrelation_id)r?   r4   rB   mapr   r   task_id_in_map{  s    z%move_by_idmap.<locals>.task_id_in_maprb   )r   len)r   rT   r   r   r   r   r   o  s    r   c                    s    fdd}t |f|S )a  Move tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    c                    s     | d S rA   )r?   rB   r   r   r   task_name_in_map  s    z)move_by_taskmap.<locals>.task_name_in_map)r   )r   rT   r   r   r   r   r     s    r   c                 K   s   t tjf | |d| d S )N)rd   r2   )printMOVING_PROGRESS_FMTformat)rd   r2   r8   rT   r   r   r   filter_status  s    r   )re   )NNN)N)NNNNNNNN)Nrx   FNNNFNNNN)-r   r   	functoolsr   	itertoolsr   r   Zkombur   r   Zkombu.commonr   Zkombu.utils.encodingr   Z
celery.appr	   Zcelery.utils.nodenamesr
   Zcelery.utils.textr   __all__r   	Exceptionr   r   r   r   rF   r   rW   r   r\   r   r   rN   rw   r   r   r   r   r   r   r   Zmove_direct_by_idmapZmove_direct_by_taskmapr   r   r   r   <module>   sj     

	

          
[Z                
