U
    de                     @   s  d Z ddlZddlmZ ddlmZ ddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZmZ ddl m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*m+Z+ zddl,Z-ddl.m/Z/ W n e0k
r.   dZ-dZ/Y nX zddl1Z-W n e0k
rR   Y nX dZ2dZ3dZ4dZ5dZ6dZ7dZ8dZ9dZ:e$e;Z<G dd  d e)Z=G d!d" d"e+e(Z>e?e-d#drG d$d% d%e-j@jAe-jBZCG d&d' d'e>ZDdS )(zRedis result store backend.    N)contextmanager)partial)	CERT_NONECERT_OPTIONALCERT_REQUIRED)unquote)retry_over_time)cached_property)
_parse_urlmaybe_sanitize_url)states)task_join_will_block)maybe_signature)BackendStoreError
ChordErrorImproperlyConfigured)GroupResultallow_join_result)_regen
dictfilter)
get_logger)humanize_seconds   )AsyncBackendMixinBaseResultConsumer)BaseKeyValueStoreBackend)get_redis_error_classes)RedisBackendSentinelBackendzW
You need to install the redis library in order to use the Redis result store backend.
zp
You need to install the redis library with support of sentinel in order to use the Redis result store backend.
z
Setting ssl_cert_reqs=CERT_OPTIONAL when connecting to redis means that celery might not validate the identity of the redis broker when connecting. This leaves you vulnerable to man in the middle attacks.
z
Setting ssl_cert_reqs=CERT_NONE when connecting to redis means that celery will not validate the identity of the redis broker when connecting. This leaves you vulnerable to man in the middle attacks.
z
SSL connection parameters have been provided but the specified URL scheme is redis://. A Redis SSL connection URL should use the scheme rediss://.
zv
A rediss:// URL must have parameter ssl_cert_reqs and this must be set to CERT_REQUIRED, CERT_OPTIONAL, or CERT_NONE
z+Connection to Redis lost: Retry (%s/%s) %s.z
Retry limit exceeded while trying to reconnect to the Celery redis result store backend. The Celery application must be restarted.
c                       s   e Zd ZdZ fddZ fddZdd Zedd	 Zd
d Z	 fddZ
dd Zdd Zdd ZdddZdd Zdd Zdd Z  ZS )ResultConsumerNc                    sB   t  j|| | jj| _| jj| _| jj| _| jj	| _
t | _d S N)super__init__backendZget_key_for_task_get_key_for_taskZdecode_result_decode_resultensure_ensureconnection_errors_connection_errorssetsubscribed_toselfargskwargs	__class__ 9/tmp/pip-unpacked-wheel-9cz4377o/celery/backends/redis.pyr"   T   s    



zResultConsumer.__init__c              
      sf   z&| j jj  | jd k	r$| j  W n0 tk
rV } ztt	| W 5 d }~X Y nX t
   d S r    )r#   clientconnection_poolreset_pubsubcloseKeyErrorloggerwarningstrr!   on_after_fork)r-   er0   r2   r3   r=   \   s    
 zResultConsumer.on_after_forkc                 C   sx   d | _ | jjj  | jj| j}dd |D }|D ]}| | |d  q6| jjj	dd| _ | jrt| j j
| j  d S )Nc                 S   s   g | ]}|r|qS r2   r2   ).0metar2   r2   r3   
<listcomp>k   s      z4ResultConsumer._reconnect_pubsub.<locals>.<listcomp>TZignore_subscribe_messages)r7   r#   r4   r5   r6   mgetr+   on_state_changer%   pubsub	subscribe)r-   Zmetasr@   r2   r2   r3   _reconnect_pubsube   s    z ResultConsumer._reconnect_pubsubc                 c   s\   z
d V  W nL | j k
rV   z| | jd W n" | j k
rP   tt  Y nX Y nX d S )Nr2   )r)   r'   rG   r:   criticalE_RETRY_LIMIT_EXCEEDEDr-   r2   r2   r3   reconnect_on_errort   s    

z!ResultConsumer.reconnect_on_errorc                 C   s    |d t jkr| |d  d S )Nstatustask_id)r   ZREADY_STATES
cancel_for)r-   r@   r2   r2   r3   _maybe_cancel_ready_task   s    z'ResultConsumer._maybe_cancel_ready_taskc                    s   t  || | | d S r    )r!   rD   rO   )r-   r@   messager0   r2   r3   rD      s    zResultConsumer.on_state_changec                 K   s    | j jjdd| _| | d S )NTrB   )r#   r4   rE   r7   _consume_from)r-   Zinitial_task_idr/   r2   r2   r3   start   s    zResultConsumer.startc                 K   s*   |j f |D ]}|d k	r| |d  qd S r    )Z
_iter_metarD   )r-   resultr/   r@   r2   r2   r3   on_wait_for_pending   s    z"ResultConsumer.on_wait_for_pendingc                 C   s   | j d k	r| j   d S r    )r7   r8   rJ   r2   r2   r3   stop   s    
zResultConsumer.stopc              	   C   sb   | j rP|  : | j j|d}|rD|d dkrD| | |d | W 5 Q R X n|r^t| d S )N)timeouttyperP   data)r7   rK   Zget_messagerD   r%   timesleep)r-   rV   rP   r2   r2   r3   drain_events   s    
"zResultConsumer.drain_eventsc                 C   s"   | j d kr| |S | | d S r    )r7   rR   rQ   r-   rM   r2   r2   r3   consume_from   s    

zResultConsumer.consume_fromc              	   C   sD   |  |}|| jkr@| j| |   | j| W 5 Q R X d S r    )r$   r+   addrK   r7   rF   r-   rM   keyr2   r2   r3   rQ      s
    


zResultConsumer._consume_fromc              	   C   s@   |  |}| j| | jr<|   | j| W 5 Q R X d S r    )r$   r+   discardr7   rK   Zunsubscriber_   r2   r2   r3   rN      s
    

zResultConsumer.cancel_for)N)__name__
__module____qualname__r7   r"   r=   rG   r   rK   rO   rD   rR   rT   rU   r[   r]   rQ   rN   __classcell__r2   r2   r0   r3   r   Q   s   	


	r   c                       sF  e Zd ZdZeZeZerejndZdZdZ	dZ
dZd< fdd	Zdd Ze fd	d
Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z fddZdd Zdd Zdd  Zd!d" Zejejfd#d$Zd%d& Z d'd( Z!ed)d* Z"ed+d, Z#d=d-d.Z$d/d0 Z%d1d2 Z&d3d4 Z'e(d5d6 Z)ed7d8 Z*d> fd:d;	Z+  Z,S )?r   zyRedis task result store.

    It makes use of the following commands:
    GET, MGET, DEL, INCRBY, EXPIRE, SET, SETEX
    NTi    c              	      s   t  jf dti| | jjj}	| jd kr6tt	 |rLd|krL|d  }}|p\|	dp\| j
| _
|| _|	d}
|	d}|	d}|	d}|	d}|	d	pd
|	dpd|	dpd|	d| j
|
ot|
|pd|ot|d| _|	d}|r|| jd< |r|| jd< |r|| jd< |	d}|r2| j| | j| jd< |rH| || j| _d| jkrt| jd tjrd}ttttttd}| jd|}|||}|| krtt|tkrtt n|tkrtt || jd< || _trt nd\| _| _ | !| | j| j"| j#| j$| _%d S )NZexpires_typez://Zredis_max_connectionsZredis_socket_timeoutZredis_socket_connect_timeoutZredis_retry_on_timeoutZredis_socket_keepaliveZ#redis_backend_health_check_intervalZ
redis_host	localhostZ
redis_porti  Zredis_dbr   Zredis_passwordF)hostportdbpasswordmax_connectionssocket_timeoutretry_on_timeoutsocket_connect_timeoutZredis_usernameusernamehealth_check_intervalsocket_keepaliveZredis_backend_use_sslconnection_classMISSING)r   r   r   requiredoptionalnonessl_cert_reqs)r2   r2   )&r!   r"   intappconfgetredisr   E_REDIS_MISSINGstriprk   _ConnectionPoolfloat
connparamsupdateconnection_class_ssl_params_from_url
issubclassSSLConnectionr   r   r   values
ValueError%E_REDIS_SSL_CERT_REQS_MISSING_INVALIDr:   r;   W_REDIS_SSL_CERT_OPTIONALW_REDIS_SSL_CERT_NONEurlr   r(   Zchannel_errorsr   acceptZ_pending_resultsZ_pending_messagesresult_consumer)r-   rg   rh   ri   rj   rk   r   r5   r/   _getrl   rn   rm   rq   rp   ro   sslZssl_cert_reqs_missingZssl_string_to_constantrw   r0   r2   r3   r"      s    















   zRedisBackend.__init__c                    s  t |\}}}}}}t|ft||||dd d |dkr~ | jjd| d  dd   dd   d n| d	< d
dddg}	|dkrt fdd|	D stfdd|	D rtt	|dkrtj
 d< |	D ] }
|
d }|rt| |
< q d	pd}t|tr.|dn|}t| d	<  D ],\}}|tjjkrFtjj| ||< qF   S )NZvirtual_host)rg   rh   ro   rj   ri   socket/)rr   pathrg   rh   rn   ri   Zssl_ca_certsZssl_certfileZssl_keyfilerw   r|   c                 3   s   | ]}| kV  qd S r    r2   r?   r`   )r   r2   r3   	<genexpr>F  s     z0RedisBackend._params_from_url.<locals>.<genexpr>c                 3   s   | ]}| kV  qd S r    r2   r   )queryr2   r3   r   G  s     Zredissrr   r   )r
   dictr   popr   r|   ZUnixDomainSocketConnectionanyr   &E_REDIS_SSL_PARAMS_AND_SCHEME_MISMATCHr   r   r{   
isinstancer<   r~   rx   items
connectionZURL_QUERY_ARGUMENT_PARSERS)r-   r   defaultsschemerg   rh   ro   rj   r   Zssl_param_keysZssl_settingZssl_valri   r`   valuer2   )r   r   r3   r   +  sZ       




zRedisBackend._params_from_urlc                    s.   t  j}d| jkr*| }|| jd  |S )Nretry_policy)r!   r   _transport_optionscopyr   )r-   r   r0   r2   r3   r   b  s
    
zRedisBackend.retry_policyc                 C   s   t  s| j| d S r    )r   r   r]   )r-   ZproducerrM   r2   r2   r3   on_task_callk  s    zRedisBackend.on_task_callc                 C   s   | j |S r    )r4   r{   r-   r`   r2   r2   r3   r{   o  s    zRedisBackend.getc                 C   s   | j |S r    )r4   rC   )r-   keysr2   r2   r3   rC   r  s    zRedisBackend.mgetc                 K   s6   t | jf|}|d}t|| j|i t| j|f|S )Nmax_retries)r   r   r{   r   r(   r   on_connection_error)r-   Zfunr.   policyr   r   r2   r2   r3   r&   u  s    
   
zRedisBackend.ensurec                 C   s*   t |}tt ||pdt|d |S )NZInfzin )nextr:   errorE_LOSTr~   r   )r-   r   excZ	intervalsretriesZttsr2   r2   r3   r   }  s      z RedisBackend.on_connection_errorc                 K   s6   t |tr t|| jkr td| j| j||ff|S )Nz!value too large for Redis backend)r   r<   len_MAX_STR_VALUE_SIZEr   r&   _set)r-   r`   r   r   r2   r2   r3   r*     s    zRedisBackend.setc              	   C   sR   | j  >}| jr$||| j| n||| ||| |  W 5 Q R X d S r    )r4   pipelineexpiressetexr*   publishexecute)r-   r`   r   piper2   r2   r3   r     s    zRedisBackend._setc                    s   t  | | j| d S r    )r!   forgetr   rN   r\   r0   r2   r3   r     s    zRedisBackend.forgetc                 C   s   | j | d S r    )r4   deleter   r2   r2   r3   r     s    zRedisBackend.deletec                 C   s   | j |S r    )r4   incrr   r2   r2   r3   r     s    zRedisBackend.incrc                 C   s   | j ||S r    )r4   expire)r-   r`   r   r2   r2   r3   r     s    zRedisBackend.expirec                 C   s   | j | |dd d S )N.tr   )r4   r   get_key_for_group)r-   group_idrS   r2   r2   r3   add_to_chord  s    zRedisBackend.add_to_chordc           	      C   sB   ||\}}}}||kr"|  |}||kr>td| d||S )NzDependency z raised )Zexception_to_pythonr   )	r-   tupdecodeEXCEPTION_STATESPROPAGATE_STATES_tidstateretvalr2   r2   r3   _unpack_chord_result  s    
z!RedisBackend._unpack_chord_resultc                 C   s   |  | |d| d S )N.s)r*   r   )r-   r   Z
chord_sizer2   r2   r3   set_chord_size  s    zRedisBackend.set_chord_sizec                 K   s>   t |d ts:| jj| }tdd |jD r:|j| d d S )Nr   c                 s   s   | ]}t |tV  qd S r    )r   r   )r?   nrr2   r2   r3   r     s     z+RedisBackend.apply_chord.<locals>.<genexpr>)r#   )r   r   ry   r   r   resultssave)r-   Zheader_result_argsbodyr/   header_resultr2   r2   r3   apply_chord  s    zRedisBackend.apply_chordc                 C   s   | j ddS )NZresult_chord_orderedT)r   r{   rJ   r2   r2   r3   _chord_zset  s    zRedisBackend._chord_zsetc                 C   s   | j jdi S )NZ result_backend_transport_options)ry   rz   r{   rJ   r2   r2   r3   r     s    zRedisBackend._transport_optionsc                    s6  | j }|j|j|j  }}}	|r$|s(d S |	d kr4d}	| j}
| |d}| |d}| |d}| ||}| d|||g}|
 }| j	r|
|||	i|ddn||||||}| jr||| j|| j|| j}| d d \}}}}W 5 Q R X t|pd}|r2z|t|j|d	}t|| }||krt|}|d k	r|  |jrt|jn|j}t  ||jjd
d}W 5 Q R X nf| j| j  |
 6}| j	r| |dd}n|!|d|}| \}W 5 Q R X  fdd|D }zhz|#| W nT t$k
rh } z4t%&d|j| | '|t(d| W Y W W S d }~X Y nX W 5 |
  }|"|"|"|  W 5 Q R X X W n t(k
r } z$t%&d|j| | '|| W Y S d }~X Y nN t$k
r0 } z.t%&d|j| | '|t(d| W Y S d }~X Y nX d S )Nz+infz.jr   r   r   z-inf   r   )ry   T)rV   	propagatec                    s   g | ]}| qS r2   r2   )r?   r   r   unpackr2   r3   rA     s     z5RedisBackend.on_chord_part_return.<locals>.<listcomp>z Chord callback for %r raised: %rzCallback error: zChord %r raised: %rzJoin error: ))ry   idgroupgroup_indexr4   r   Zencode_resultencoder   r   ZzaddZzcountZrpushZllenr{   r   r   r   rx   r   Zchordr   restoreZon_readysupports_native_joinZjoin_nativejoinr   rz   Zresult_chord_join_timeoutr   r   ZzrangeZlranger   delay	Exceptionr:   	exceptionZchord_error_from_stackr   )r-   requestr   rS   r   r/   ry   r   gidr   r4   ZjkeyZtkeyZskeyencodedr   r   r   Z
readycountZ	totaldiffZchord_size_bytescallbacktotalr   Z	join_funcZreslr   r2   r   r3   on_chord_part_return  s    
   "




  $
 z!RedisBackend.on_chord_part_returnc                 K   s   |   | jf |dS )N)r5   )_get_client	_get_poolr-   paramsr2   r2   r3   _create_client#  s    
zRedisBackend._create_clientc                 C   s   | j jS r    )r|   ZStrictRedisrJ   r2   r2   r3   r   (  s    zRedisBackend._get_clientc                 K   s   | j f |S r    )ConnectionPoolr   r2   r2   r3   r   +  s    zRedisBackend._get_poolc                 C   s   | j d kr| jj| _ | j S r    )r   r|   r   rJ   r2   r2   r3   r   .  s    

zRedisBackend.ConnectionPoolc                 C   s   | j f | jS r    )r   r   rJ   r2   r2   r3   r4   4  s    zRedisBackend.clientr2   c                    s$   |si n|}t  | jfd| jiS )Nr   )r!   
__reduce__r   r   r,   r0   r2   r3   r   8  s
     zRedisBackend.__reduce__)NNNNNNN)N)r2   N)-rb   rc   rd   __doc__r   r|   r   r   rk   Zsupports_autoexpirer   r   r"   r   r	   r   r   r{   rC   r&   r   r*   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   propertyr   r4   r   re   r2   r2   r0   r3   r      s^        a7	



 
_

r   sentinelc                   @   s   e Zd ZdZdS )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        N)rb   rc   rd   r   r2   r2   r2   r3   r   @  s   r   c                       sf   e Zd ZdZdZeeddZer$endZ	 fddZ
d fdd		Z fd
dZdd Zdd Z  ZS )r   z!Redis sentinel task result store.;r   Nc                    s(   | j d krtt t j|| d S r    )r   r   E_REDIS_SENTINEL_MISSINGr~   r!   r"   r,   r0   r2   r3   r"   T  s    
zSentinelBackend.__init__Fc                    sD   |rt  j|dS dd | jp d| jD }| jdd |D S )zDReturn the server addresses as URIs, sanitizing the password or not.)include_passwordc                 s   s   | ]}t |V  qd S r    )r   )r?   chunkr2   r2   r3   r   c  s   z)SentinelBackend.as_uri.<locals>.<genexpr> c                 s   s(   | ] }| d r|dd n|V  qdS )z:///Nr   )endswith)r?   urir2   r2   r3   r   i  s   )r!   as_urir   split_SERVER_URI_SEPARATORr   )r-   r   Z
uri_chunksr0   r2   r3   r   Z  s    zSentinelBackend.as_uric                    s   | | j}t|g d}|D ]"}t j||d}|d | qdD ]}|| qDdD ]2}|d rX||d d krX|d d |||< qX|S )N)hosts)r   r   r   )rg   rh   ri   rj   )ri   rj   r   )r   r   r   r!   r   appendr   r{   )r-   r   r   chunksr   r   rX   paramr0   r2   r3   r   n  s     z SentinelBackend._params_from_urlc                 K   sV   |  }|d}| jdd}| jdi }| jjdd |D f||d|}|S )Nr   min_other_sentinelsr   sentinel_kwargsc                 S   s   g | ]}|d  |d fqS )rg   rh   r2   )r?   cpr2   r2   r3   rA     s     z:SentinelBackend._get_sentinel_instance.<locals>.<listcomp>)r   r  )r   r   r   r{   r   ZSentinel)r-   r   r   r   r   r  sentinel_instancer2   r2   r3   _get_sentinel_instance~  s    
z&SentinelBackend._get_sentinel_instancec                 K   s.   | j f |}| jdd }|j||  djS )Nmaster_name)Zservice_nameZredis_class)r  r   r{   Z
master_forr   r5   )r-   r   r  r  r2   r2   r3   r     s    zSentinelBackend._get_pool)F)rb   rc   rd   r   r   getattrr|   r   r   r   r"   r   r   r  r   re   r2   r2   r0   r3   r   K  s   r   )Er   rY   
contextlibr   	functoolsr   r   r   r   r   urllib.parser   Zkombu.utils.functionalr   Zkombu.utils.objectsr	   Zkombu.utils.urlr
   r   Zceleryr   Zcelery._stater   Zcelery.canvasr   Zcelery.exceptionsr   r   r   Zcelery.resultr   r   Zcelery.utils.functionalr   r   Zcelery.utils.logr   Zcelery.utils.timer   Zasynchronousr   r   baser   Zredis.connectionr|   Zkombu.transport.redisr   ImportErrorZredis.sentinel__all__r}   r   r   r   r   r   r   rI   rb   r:   r   r   r  r   ZSentinelManagedConnectionr   r   r   r2   r2   r2   r3   <module>   sb   
b   
