U
    Z+d+                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
mZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ dZeeddZG dd deZ G dd dZ!G dd de!Z"dS )zIntegration testing utilities.    N)defaultdict)partial)count)AnyCallableDictSequenceTextIOTupleretry_over_timestates)TimeoutError)AsyncResult	ResultSet)truncate)humanize_secondsz4Still waiting for {0}.  Trying again {when}: {exc!r}T)microsecondsc                   @   s   e Zd ZdZdS )SentinelzSignifies the end of something.N)__name__
__module____qualname____doc__ r   r   B/tmp/pip-unpacked-wheel-ucduq0nd/celery/contrib/testing/manager.pyr      s   r   c                	   @   s   e Zd ZdZd;ddZd<dd	Zd
d Zd=ddZd>ddZdd Z	d?ddZ
d@ddZdAd d!ZdBd"d#ZdCd%d&ZdDd(d)ZdEd+d,ZdFd-d.Zed/d0 ZdGd1d2Zd3d4 Zd5d6 ZdHd7d8Zd9d: ZdS )IManagerMixinz.Mixin that adds :class:`Manager` capabilities.      @FNc                 C   sF   |d krt jn|| _|d kr"t jn|| _| j j| _|| _|| _d S N)	sysstdoutstderrapp
connectionZrecoverable_connection_errors
connerrorsblock_timeoutno_join)selfr%   r&   r    r!   r   r   r   _init_manager   s
    zManagerMixin._init_manager-c                 C   s   t | | | jd d S )N)file)printr    )r'   ssepr   r   r   remark'   s    zManagerMixin.remarkc                 C   s   dd |D S )Nc                 S   s    g | ]}|j |jjkr|j qS r   )idbackend_cache).0resr   r   r   
<listcomp>-   s      z0ManagerMixin.missing_results.<locals>.<listcomp>r   )r'   rr   r   r   missing_results+   s    zManagerMixin.missing_resultsthingr   
   皙?      ?      @c              	      s@   |si n|} fdd}j ||f||||||	d|S )zWait for event to happen.

        The `catch` argument specifies the exception that means the event
        has not happened yet.
        c                    s>   t |}r*tj t|dd| d r:| || |S )Nin )whenexc)nextwarnE_STILL_WAITINGformatr   )r?   Z	intervalsretriesintervaldescemit_warningerrbackr'   r   r   on_errorF   s     
 z'ManagerMixin.wait_for.<locals>.on_error)argskwargsrI   max_retriesinterval_startinterval_stepr   )r'   funcatchrG   rK   rL   rI   rM   rN   rO   interval_maxrH   optionsrJ   r   rF   r   wait_for/   s    
    zManagerMixin.wait_for   {Gz?      ?c	           
   
   K   sD   z| j ||||||||dW S  |k
r0   Y nX td| dS )z;Make sure something does not happen (at least for a while).)rG   rM   rN   rO   rR   rH   zShould not have happened: N)rT   AssertionError)
r'   rP   rQ   rG   rM   rN   rO   rR   rH   rS   r   r   r   ensure_not_for_a_whileX   s         z#ManagerMixin.ensure_not_for_a_whilec                 O   s
   t ||S r   r   )r'   rK   rL   r   r   r   r   i   s    zManagerMixin.retry_over_timec           	         s  | j r
d S t|ts"| j|g}g   fdd}|r>t|ntdD ]}g  d d < z|jf ||d|W   S  tjt	fk
r } z@| 
|}| dt|t  t|td||d W 5 d }~X Y qF | jk
r } z| d|d W 5 d }~X Y qFX qFtd	d S )
Nc                    s     |  d S r   )append)task_idvalueZreceivedr   r   	on_results   s    z$ManagerMixin.join.<locals>.on_resultr   )callback	propagatez#Still waiting for {}/{}: [{}]: {!r}z, !zjoin: connection lost: z!Test failed: Missing task results)r&   
isinstancer   r"   ranger   getsockettimeoutr   r6   r.   rC   lenr   joinr$   rX   )	r'   r5   r`   rM   rL   r^   ir?   Zwaiting_forr   r]   r   rh   l   s0    

  &zManagerMixin.join      @c                 C   s   | j jj|dS Nrf   )r"   controlinspect)r'   rf   r   r   r   rn      s    zManagerMixin.inspectc                 c   s&   |  |j| pi }| E d H  d S r   )rn   Z
query_taskitems)r'   idsrf   Ztasksr   r   r   query_tasks   s    zManagerMixin.query_tasksc           	      C   sH   t t}| j||dD ],\}}| D ]\}\}}|| | q&q|S rk   )r   setrq   ro   add)	r'   rp   rf   r   hostnameZreplyr[   state_r   r   r   query_task_states   s
    zManagerMixin.query_task_states waiting for tasks to be acceptedc                 K   s   | j | j|f||d|S N)rE   rG   assert_task_worker_stateis_acceptedr'   rp   rE   rG   policyr   r   r   assert_accepted   s      zManagerMixin.assert_accepted waiting for tasks to be receivedc                 K   s   | j | j|f||d|S ry   rz   r}   r   r   r   assert_received   s      zManagerMixin.assert_received,waiting for tasks to be started or completedc                 K   s   | j | j|f||d|S ry   )assert_task_state_from_resultis_result_task_in_progress)r'   Zasync_resultsrE   rG   r~   r   r   r   ,assert_result_tasks_in_progress_or_completed   s     z9ManagerMixin.assert_result_tasks_in_progress_or_completedc                 K   s    | j t| j|||dtff|S rk   rT   r   true_or_raiser   )r'   rP   resultsrE   r~   r   r   r   r      s    z*ManagerMixin.assert_task_state_from_resultc                    s"   t jt jf t fdd| D S )Nc                 3   s   | ]}|j  kV  qd S r   )ru   )r2   resultZpossible_statesr   r   	<genexpr>   s     z:ManagerMixin.is_result_task_in_progress.<locals>.<genexpr>)r   ZSTARTEDSUCCESSall)r   rL   r   r   r   r      s    z'ManagerMixin.is_result_task_in_progressc                 K   s    | j t| j|||dtff|S rk   r   )r'   rP   rp   rE   r~   r   r   r   r{      s    z%ManagerMixin.assert_task_worker_statec                 K   s   | j dddg|f|S )Nreservedactiveready_ids_matches_stater'   rp   rL   r   r   r   is_received   s     zManagerMixin.is_receivedc                 K   s   | j ddg|f|S )Nr   r   r   r   r   r   r   r|      s    zManagerMixin.is_acceptedc                    s&   | j ||dt fdd|D S )Nrl   c                 3   s2   | ]* t  fd dfddD D V  qdS )c                 3   s   | ]} |kV  qd S r   r   )r2   r,   tr   r   r      s     z<ManagerMixin._ids_matches_state.<locals>.<genexpr>.<genexpr>c                    s   g | ]} | qS r   r   )r2   kr   r   r   r4      s     z=ManagerMixin._ids_matches_state.<locals>.<genexpr>.<listcomp>N)any)r2   expected_statesr   r   r   r      s   z2ManagerMixin._ids_matches_state.<locals>.<genexpr>)rw   r   )r'   r   rp   rf   r   r   r   r      s    zManagerMixin._ids_matches_statec                 O   s   |||}|st  |S r   )r   )r'   rP   rK   rL   r3   r   r   r   r      s    
zManagerMixin.true_or_raise)r   FNN)r)   )	r7   r   NNr8   r9   r:   r;   F)r7   rU   r9   rV   rW   F)Fr8   )rj   )r:   )r:   )r:   rx   )r:   r   )r:   r   )r:   )r:   )r:   )r   r   r   r   r(   r.   r6   rT   rY   r   rh   rn   rq   rw   r   r   r   r   staticmethodr   r{   r   r|   r   r   r   r   r   r   r      s`         


         
*         




  
  
	  
 



r   c                   @   s   e Zd ZdZdd ZdS )Managerz(Test helpers for task integration tests.c                 K   s   || _ | jf | d S r   )r"   r(   )r'   r"   rL   r   r   r   __init__   s    zManager.__init__N)r   r   r   r   r   r   r   r   r   r      s   r   )#r   re   r   collectionsr   	functoolsr   	itertoolsr   typingr   r   r   r   r	   r
   Zkombu.utils.functionalr   Zceleryr   Zcelery.exceptionsr   Zcelery.resultr   r   Zcelery.utils.textr   Zcelery.utils.timer   Z_humanize_secondsrB   	Exceptionr   r   r   r   r   r   r   <module>   s$     :