U
    Y+d                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ ddlmZmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' zddl(Z(W n e)k
r   dZ(Y nX zddl(m*Z* W n e)k
rJ   dZ*Y nX edZ+e+j,e+j- Z.Z-dZ/dZ0dZ1ddddgZ2eddZ3dd Z4d d! Z5G d"d# d#e6Z7ed$d% Z8d&d' Z9G d(d) d)Z:G d*d+ d+e:e(j;Z<G d,d- d-e:e(j=j>Z?G d.d/ d/e(j=j@ZAG d0d1 d1e'jBZBG d2d3 d3ZCG d4d5 d5e'jDZDG d6d7 d7e'jEZEe*rdG d8d9 d9e*jFe(jGZHG d:d; d;eDZIG d<d= d=eEZJdS )>a  Redis transport module for Kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
    rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery,
the connection string has following format:

.. code-block::

    sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
  simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
  used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
    N)bisect)
namedtuple)contextmanager)Empty)time)promise)InconsistencyErrorVersionMismatch)
get_logger)register_after_fork)bytes_to_str)ERRREADpoll)accepts_argument)dumpsloads)cached_property)cycle_by_name)
_parse_url   )virtual)sentinelzkombu.transport.redisi           	   error_classes_t)connection_errorschannel_errorsc               	   C   s^   ddl m}  t| dr| j}n| j}ttjjt	t
jtt| j| j| jf tjj|| j| jf S )z$Return tuple of redis error classes.r   
exceptionsInvalidData)redisr!   hasattrr"   	DataErrorr   r   	Transportr   r   socketerrorIOErrorOSErrorConnectionErrorAuthenticationErrorTimeoutErrorr   ZInvalidResponseResponseError)r!   r%    r/   9/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/redis.pyget_redis_error_classesw   s(    
r1   c                  C   s   ddl m}  | jS )z1Return the redis ConnectionError exception class.r   r    )r#   r!   r+   r    r/   r/   r0   get_redis_ConnectionError   s    r2   c                   @   s   e Zd ZdZdS )	MutexHeldz)Raised when another party holds the lock.N__name__
__module____qualname____doc__r/   r/   r/   r0   r3      s   r3   c                 c   sf   | j ||d}d}z"|jdd}|r,dV  nt W 5 |r`z|  W n tjjk
r^   Y nX X dS )zTAcquire redis lock in non blocking way.

    Raise MutexHeld if not successful.
    timeoutF)blockingN)lockreleaser#   r!   ZLockNotOwnedErroracquirer3   )clientnameZexpirer<   Zlock_acquiredr/   r/   r0   Mutex   s    
rA   c                 C   s   |    d S N)_after_forkchannelr/   r/   r0   _after_fork_cleanup_channel   s    rF   c                       s~   e Zd ZdZdddddddd	d
ddddddgZddddddddddZdd Z fddZ fddZd!dd Z	  Z
S )"GlobalKeyPrefixMixina  Mixin to provide common logic for global key prefixing.

    Overriding all the methods used by Kombu with the same key prefixing logic
    would be cumbersome and inefficient. Hence, we override the command
    execution logic that is called by all commands.
    ZHDELZHGETZHSETZLLENZLPUSHZPUBLISHZRPUSHZRPOPZSADDZSREMZSETZSMEMBERSZZADDZZREMZZREVRANGEBYSCOREr   N)
args_startargs_end   r   )ZDELBRPOPZEVALSHAc                    s   t |}|d}| jkr4 jt|d  |d< nx| jkr j| d } j| d }|dkrn|d | ng }g }|d k	r||d  }| fdd||| D  | }|f|S )Nr   rH   rI   c                    s   g | ]} j t| qS r/   global_keyprefixstr.0argselfr/   r0   
<listcomp>   s   z5GlobalKeyPrefixMixin._prefix_args.<locals>.<listcomp>)listpopPREFIXED_SIMPLE_COMMANDSrN   rO   PREFIXED_COMPLEX_COMMANDS)rT   argscommandrH   rI   Zpre_argsZ	post_argsr/   rS   r0   _prefix_args   s"    



z!GlobalKeyPrefixMixin._prefix_argsc                    sD   t  j||f|}|dkr@|r@|\}}|t| jd }||fS |S )zParse a response from the Redis server.

        Method wraps ``redis.parse_response()`` to remove prefixes of keys
        returned by redis command.
        rL   N)superparse_responselenrN   )rT   
connectioncommand_nameoptionsretkeyvalue	__class__r/   r0   r^      s    z#GlobalKeyPrefixMixin.parse_responsec                    s   t  j| ||S rB   r]   execute_commandr\   rT   rZ   kwargsrf   r/   r0   ri      s    z$GlobalKeyPrefixMixin.execute_commandTc                 C   s   t | j| j||| jdS )NrN   )PrefixedRedisPipelineconnection_poolZresponse_callbacksrN   )rT   transactionZ
shard_hintr/   r/   r0   pipeline   s    zGlobalKeyPrefixMixin.pipeline)TN)r5   r6   r7   r8   rX   rY   r\   r^   ri   rp   __classcell__r/   r/   rf   r0   rG      s2   rG   c                   @   s    e Zd ZdZdd Zdd ZdS )PrefixedStrictRedisz@Returns a ``StrictRedis`` client that prefixes the keys it uses.c                 O   s&   | dd| _tjj| f|| d S NrN    )rW   rN   r#   Redis__init__rj   r/   r/   r0   rv   	  s    zPrefixedStrictRedis.__init__c                 K   s   t | jfd| ji|S )NrN   )PrefixedRedisPubSubrn   rN   )rT   rk   r/   r/   r0   pubsub  s    zPrefixedStrictRedis.pubsubN)r5   r6   r7   r8   rv   rx   r/   r/   r/   r0   rr     s   rr   c                   @   s   e Zd ZdZdd ZdS )rm   a   Custom Redis pipeline that takes global_keyprefix into consideration.

    As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
    the keys it uses, the pipeline called by the client must be able to prefix
    the keys as well.
    c                 O   s(   | dd| _tjjj| f|| d S rs   )rW   rN   r#   r?   Pipelinerv   rj   r/   r/   r0   rv     s    zPrefixedRedisPipeline.__init__N)r5   r6   r7   r8   rv   r/   r/   r/   r0   rm     s   rm   c                       sD   e Zd ZdZdZ fddZdd Z fddZ fd	d
Z  Z	S )rw   zCRedis pubsub client that takes global_keyprefix into consideration.)Z	SUBSCRIBEZUNSUBSCRIBEZ
PSUBSCRIBEZPUNSUBSCRIBEc                    s    | dd| _t j|| d S rs   )rW   rN   r]   rv   rj   rf   r/   r0   rv   ,  s    zPrefixedRedisPubSub.__init__c                    s8   t |}|d}| jkr. fdd|D }|f|S )Nr   c                    s   g | ]} j t| qS r/   rM   rP   rS   r/   r0   rU   5  s   z4PrefixedRedisPubSub._prefix_args.<locals>.<listcomp>)rV   rW   PUBSUB_COMMANDS)rT   rZ   r[   r/   rS   r0   r\   0  s    


z PrefixedRedisPubSub._prefix_argsc                    sB   t  j||}|dkr|S |^}}}|f fdd|D |fS )zParse a response from the Redis server.

        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
        returned by redis command.
        Nc                    s   g | ]}|t  jd  qS rB   )r_   rN   )rQ   rE   rS   r/   r0   rU   N  s     z6PrefixedRedisPubSub.parse_response.<locals>.<listcomp>)r]   r^   )rT   rZ   rk   rc   Zmessage_typeZchannelsmessagerf   rS   r0   r^   <  s    z"PrefixedRedisPubSub.parse_responsec                    s   t  j| ||S rB   rh   rj   rf   r/   r0   ri   R  s    z#PrefixedRedisPubSub.execute_command)
r5   r6   r7   r8   rz   rv   r\   r^   ri   rq   r/   r/   rf   r0   rw   "  s   rw   c                       s   e Zd ZdZdZ fddZ fddZd#dd	Z fd
dZd$ddZ	e
d%ddZd&ddZd'ddZd(ddZedd Zedd Zedd Zedd  Zed!d" Z  ZS ))QoSzRedis Ack Emulation.Tc                    s   t  j|| d| _d S )Nr   )r]   rv   _vrestore_countrj   rf   r/   r0   rv   [  s    zQoS.__init__c              	      s   |j }|d |d  }}tjd dkr4|t ig}n
t |g}|  B}|j| jf| | j|t	|j
||g  t || W 5 Q R X d S )Nexchangerouting_keyr   r   )delivery_infor#   VERSIONr   pipe_or_acquireZzaddunacked_index_keyZhsetunacked_keyr   _rawexecuter]   append)rT   r{   delivery_tagZdeliveryEXRKZ	zadd_argspiperf   r/   r0   r   _  s    

 z
QoS.appendNc              	   C   s@   | j | }| jD ]}| j||d qW 5 Q R X | j  d S )Nr?   )rE   conn_or_acquireZ
_deliveredrestore_by_tagclear)rT   r?   tagr/   r/   r0   restore_unackedp  s    
zQoS.restore_unackedc                    s   |  |  t | d S rB   )_remove_from_indicesr   r]   ack)rT   r   rf   r/   r0   r   v  s    zQoS.ackFc                 C   s    |r| j |dd | | d S NT)leftmost)r   r   )rT   r   Zrequeuer/   r/   r0   rejectz  s    z
QoS.rejectc              	   c   s2   |r|V  n"| j |}| V  W 5 Q R X d S rB   )rE   r   rp   )rT   r   r?   r/   r/   r0   r     s    zQoS.pipe_or_acquirec              
   C   s:   |  |&}|| j|| j|W  5 Q R  S Q R X d S rB   )r   Zzremr   hdelr   )rT   r   r   r/   r/   r0   r     s
     zQoS._remove_from_indicesr   
   c           	   
   C   s   |  j d7  _ | j d | r d S | j }t | j }zZt|| j| j@ |j| j	|d|o^||dd}|png D ]\}}| 
|| qpW 5 Q R X W n tk
r   Y nX W 5 Q R X d S )Nr   r   T)startnumZ
withscores)r}   rE   r   r   visibility_timeoutrA   unacked_mutex_keyunacked_mutex_expireZzrevrangebyscorer   r   r3   )	rT   r   r   intervalr?   ceilZvisibler   Zscorer/   r/   r0   restore_visible  s*        zQoS.restore_visiblec              	      s:    fdd}j |}||j W 5 Q R X d S )Nc                    sP   |  j}|   |  |rLtt|\}}}j||||   d S rB   )hgetr   multir   r   r   rE   _do_restore_message)r   pMr   r   r   rT   r   r/   r0   restore_transaction  s    z/QoS.restore_by_tag.<locals>.restore_transaction)rE   r   ro   r   )rT   r   r?   r   r   r/   r   r0   r     s    zQoS.restore_by_tagc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_keyc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_index_keyc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_mutex_keyc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_mutex_expirec                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.visibility_timeout)N)F)NN)N)r   r   r   )NF)r5   r6   r7   r8   Zrestore_at_shutdownrv   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rq   r/   r/   rf   r0   r|   V  s,   








r|   c                   @   s   e Zd ZdZeeB ZdZdZdd Z	dd Z
dd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd(d$d%Zed&d' ZdS ))MultiChannelPollerz%Async I/O poller for Redis transport.FNc                 C   s(   t  | _i | _i | _t | _t  | _d S rB   )set	_channels_fd_to_chan_chan_to_sockr   poller
after_readrS   r/   r/   r0   rv     s
    zMultiChannelPoller.__init__c              
   C   s\   | j  D ].}z| j| W q
 ttfk
r6   Y q
X q
| j  | j  | j   d S rB   )	r   valuesr   
unregisterKeyError
ValueErrorr   r   r   )rT   fdr/   r/   r0   close  s    

zMultiChannelPoller.closec                 C   s   | j | d S rB   )r   addrT   rE   r/   r/   r0   r     s    zMultiChannelPoller.addc                 C   s   | j | d S rB   )r   discardr   r/   r/   r0   r     s    zMultiChannelPoller.discardc              	   C   s0   z| j |j W n ttfk
r*   Y nX d S rB   )r   r   _sockAttributeError	TypeErrorrT   r`   r/   r/   r0   _on_connection_disconnect  s    z,MultiChannelPoller._on_connection_disconnectc                 C   sr   |||f| j kr| ||| |jjd kr4|j  |jj}||f| j| < || j |||f< | j|| j	 d S rB   )
r   _unregisterr`   r   connectr   filenor   register
eventflags)rT   rE   r?   typesockr/   r/   r0   	_register  s    
zMultiChannelPoller._registerc                 C   s   | j | j|||f  d S rB   )r   r   r   )rT   rE   r?   r   r/   r/   r0   r     s    zMultiChannelPoller._unregisterc                 C   s:   t |dd d kr|jd|_|jjd k	o8|||f| jkS )Nr`   _)getattrrn   get_connectionr`   r   r   )rT   rE   r?   cmdr/   r/   r0   _client_registered  s
    z%MultiChannelPoller._client_registeredc                 C   s>   ||j df}| ||j ds,d|_| j|  |js:|  dS )zEnable BRPOP mode for channel.rL   FN)r?   r   _in_pollr   _brpop_start)rT   rE   identr/   r/   r0   _register_BRPOP  s    
z"MultiChannelPoller._register_BRPOPc                 C   s8   |  ||jds&d|_| ||jd |js4|  dS )zEnable LISTEN mode for channel.LISTENFN)r   	subclient
_in_listenr   
_subscriber   r/   r/   r0   _register_LISTEN  s
    z#MultiChannelPoller._register_LISTENc                 C   s:   | j D ].}|jr$|j r$| | |jr| | qd S rB   )r   active_queuesqoscan_consumer   active_fanout_queuesr   r   r/   r/   r0   on_poll_start  s    


z MultiChannelPoller.on_poll_startc                 C   s(   || _ | jD ]}|jj|jd  S d S N)r   )r   r   r   r   unacked_restore_limit)rT   r   rE   r/   r/   r0   on_poll_init  s
    
zMultiChannelPoller.on_poll_initc                 C   s*   | j D ]}|jr|jj|jd  S qd S r   )r   r   r   r   r   r   r/   r/   r0   maybe_restore_messages  s
    
z)MultiChannelPoller.maybe_restore_messagesc                 C   s<   | j D ]0}|jd}|d k	rtt|dd r|  qd S )Nr   check_health)r   __dict__getcallabler   r   )rT   rE   r?   r/   r/   r0   maybe_check_subclient_health'  s    
z/MultiChannelPoller.maybe_check_subclient_healthc                 C   s(   | j | \}}|j r$|j|   d S rB   )r   r   r   handlers)rT   r   chanr   r/   r/   r0   on_readable/  s    
zMultiChannelPoller.on_readablec                 C   s:   |t @ r| || fS |t@ r6| j| \}}|| d S rB   )r   r   r   r   _poll_error)rT   r   eventr   r   r/   r/   r0   handle_event4  s
    zMultiChannelPoller.handle_eventc           	      C   s   d| _ z| jD ].}|jr,|j r,| | |j	r| 
| q| j|}|rv|D ]"\}}| ||}|rR W d S qR|   t W 5 d| _ | jrz| j }W n tk
r   Y qY qX |  qX d S )NTF)_in_protected_readr   rW   r   r   r   r   r   r   r   r   r   r   r   r   r   )	rT   callbackr:   ZfunrE   eventsr   r   rc   r/   r/   r0   r   ;  s.    




zMultiChannelPoller.getc                 C   s   | j S rB   )r   rS   r/   r/   r0   fdsY  s    zMultiChannelPoller.fds)N)r5   r6   r7   r8   r   r   r   r   r   rv   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   propertyr   r/   r/   r/   r0   r     s.   

	
r   c                       sz  e Zd ZdZeZdZdZdZdZdZ	dZ
dZdZdZi ZdZdZd	Zd
ZdZdZdZeZdZdZdZdZdZdZeZdZ dZ!dZ"dZ#dZ$dZ%e&j'j(d Z(e)re)j*ndZ+e)re)j,ndZ- fddZ.dd Z/dd Z0dd Z1drddZ2ds fdd	Z3dd Z4 fdd Z5d!d" Z6 fd#d$Z7d%d& Z8d'd( Z9d)d* Z:d+d, Z;d-d. Z<d/d0 Z=d1d2 Z>dtd4d5Z?d6d7 Z@d8d9 ZAd:d; ZBd<d= ZCd>d? ZDd@dA ZEdBdC ZFdDdE ZGdudFdGZHdHdI ZIdJdK ZJdLdM ZKdNdO ZLdPdQ ZM fdRdSZNdTdU ZOdVdW ZPdvdXdYZQdwdZd[ZRdxd\d]ZSdyd^d_ZTd`da ZUeVdzdbdcZWeXddde ZYeXdfdg ZZe[dhdi Z\e[djdk Z]dldm Z^dndo Z_eXdpdq Z`  ZaS ){ChannelzRedis Channel.NFTz_kombu.binding.%sz/{db}.zZunackedZunacked_indexZunacked_mutexi,  i  r   rt   Zround_robin)sepack_emulationr   r   r   r   r   r   fanout_prefixfanout_patternsrN   socket_timeoutsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionsqueue_order_strategymax_connectionshealth_check_intervalretry_on_timeoutpriority_stepsc                    s   t  j|| | jstj| _t| j | _|  | _	| 
 | _t | _t | _i | _| j| jd| _| jrt| jtr| j| _nd| _z| j  W n tk
r   |    Y nX | jj|  | jj| _td k	rt| t  d S )N)rL   r   rt   )!r]   rv   r   r   r|   r   r   _queue_cycle_get_clientClient_get_response_errorr.   r   r   auto_delete_queues_fanout_to_queue_brpop_read_receiver   r   
isinstancerO   keyprefix_fanoutr?   Zping	Exception_disconnect_poolsr`   cycler   r   r   rF   rj   rf   r/   r0   rv     s.    



zChannel.__init__c                 C   s   |    d S rB   )r  rS   r/   r/   r0   rC     s    zChannel._after_forkc                 C   s<   | j }| j}d  | _| _ |d k	r(|  |d k	r8|  d S rB   )_pool_async_pool
disconnect)rT   pool
async_poolr/   r/   r0   r    s    zChannel._disconnect_poolsc                 C   s@   | j |krd | _ | j|kr d | _| jr<| jjr<| jj| d S rB   )r   r   r`   r  r   r   r/   r/   r0   r     s    

z!Channel._on_connection_disconnectc                 C   s   zfz d|d d< d|d d d< W n t k
r6   Y nX | ||D ]}|rR|jn|j|t| qDW n" tk
r   td|dd Y nX d S )NTheadersZredeliveredZ
propertiesr   zCould not restore message: %rexc_info)r   Z_lookuplpushZrpushr   r  crit)rT   payloadr~   r   r   r   queuer/   r/   r0   r     s     
zChannel._do_restore_messagec              	      sN   j st |S |j fdd} }||j W 5 Q R X d S )Nc                    sP   |  j}|   | j |rLtt|\}}}||||   d S rB   )r   r   r   r   r   r   r   )r   Pr   r   r   r   r/   r0   r     s    z-Channel._restore.<locals>.restore_transaction)r   r]   _restorer   r   ro   r   )rT   r{   r   r   r?   rf   r   r0   r    s    
zChannel._restorec                 C   s   | j |ddS r   )r  )rT   r{   r/   r/   r0   _restore_at_beginning$  s    zChannel._restore_at_beginningc                    sN   || j kr.| j | \}}| j| || j|< t j|f||}|   |S rB   )_fanout_queuesr   r   r   r]   basic_consume_update_queue_cycle)rT   r  rZ   rk   r~   r   rc   rf   r/   r0   r  '  s    

zChannel.basic_consumec                 C   s8   | j }|r4|jjr*|jjt| j|fS | |S d S rB   )r`   r  r   r   r   r   _basic_cancel)rT   consumer_tagr`   r/   r/   r0   basic_cancel;  s    zChannel.basic_cancelc                    s   z| j | }W n tk
r$   Y d S X z| j| W n tk
rJ   Y nX | | z| j| \}}| j| W n tk
r   Y nX t 	|}| 
  |S rB   )Z_tag_to_queuer   r   remove_unsubscribe_fromr  r   rW   r]   r  r  )rT   r  r  r~   r   rc   rf   r/   r0   r  H  s"    
zChannel._basic_cancelc                 C   s.   |r| j rd| j|d|gS d| j|gS )Nrt   /)r   joinr  )rT   r~   r   r/   r/   r0   _get_publish_topic\  s    
zChannel._get_publish_topicc                 C   s   | j | \}}| ||S rB   )r  r   )rT   r  r~   r   r/   r/   r0   _get_subscribe_topica  s    zChannel._get_subscribe_topicc                    sN    fdd j D }|sd S  j}|jjd kr8|j  |j _|| d S )Nc                    s   g | ]}  |qS r/   )r!  rQ   r  rS   r/   r0   rU   f  s   z&Channel._subscribe.<locals>.<listcomp>)r   r   r`   r   r   r   Z
psubscribe)rT   keyscr/   rS   r0   r   e  s    

zChannel._subscribec                 C   s.   |  |}| j}|jr*|jjr*||g d S rB   )r!  r   r`   r   unsubscribe)rT   r  topicr$  r/   r/   r0   r  p  s    
zChannel._unsubscribe_fromc                 C   s   t |d dkr&|d dkr&d|_d S t |d dkr\|d |d |d |d f\}}}}n |d d |d |d f\}}}}||||dS )	Nr   r%  rK   FZpmessager   r   )r   patternrE   data)r   Z
subscribed)rT   r?   rr   r'  rE   r(  r/   r/   r0   _handle_messagev  s    & zChannel._handle_messagec                 C   sf   | j }g }z|| | W n tk
r2   Y nX |jd k	r^|jjddr^|| | q4t|S )Nr   r9   )r   r   _receive_oner   r`   Zcan_readany)rT   r$  rc   r/   r/   r0   r    s    zChannel._receivec              	   C   s  d }z|  }W n | jk
r.   d | _ Y nX t|ttfr
| ||}t|d dr
t|d }|d r
|d dkr|	d\}}}zt
t|d }W n: ttfk
r   td|t|d d	 d
d t Y nX |dd
d }| j|| j|  dS d S )Nr   r{   rE   r(  r   r  .z&Cannot process event on channel %r: %si   r   r  T)r^   r   r   r  rV   tupler*  r   endswith	partitionr   r   r   warnreprr   splitr`   _deliverr   )rT   r$  responser  rE   r   r{   r~   r/   r/   r0   r+    s8    
   zChannel._receive_oner   c                    sp   j tj  sd S  fddjD |p4dg }jj_d|}jr^j	|}jjj
|  d S )Nc                    s"   g | ]} D ]} ||qqS r/   )
_q_for_pri)rQ   prir  ZqueuesrT   r/   r0   rU     s     z(Channel._brpop_start.<locals>.<listcomp>r   rL   )rL   )r   consumer_   r   r   r?   r`   r   rN   r\   Zsend_command)rT   r:   r#  command_argsr/   r8  r0   r     s    
zChannel._brpop_startc                 K   s   zz| jj| jjdf|}W n$ | jk
r@   | jj   Y nX |r|\}}t|| jdd }| j	
| | jtt|| W dS t W 5 d | _ X d S )NrL   r   r   T)r   r?   r^   r`   r   r	  r   rsplitr   r   rotater4  r   r   )rT   rb   Z
dest__itemdestitemr/   r/   r0   r     s$    
zChannel._brpop_readc                 K   s*   |dkr| j   n| j| jj| d S )Nr   )r   r^   r?   r`   )rT   r   rb   r/   r/   r0   r     s    zChannel._poll_errorc              
   C   s\   |   J}| jD ]6}|| ||}|rtt|  W  5 Q R  S qt W 5 Q R X d S rB   )r   r   Zrpopr6  r   r   r   )rT   r  r?   r7  r>  r/   r/   r0   _get  s    

zChannel._getc                 C   sx   |   f}| R}| jD ]}|| ||}q| }tdd |D W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S )Nc                 s   s   | ]}t |tjr|V  qd S rB   )r  numbersIntegral)rQ   sizer/   r/   r0   	<genexpr>  s    z Channel._size.<locals>.<genexpr>)r   rp   r   llenr6  r   sum)rT   r  r?   r   r7  sizesr/   r/   r0   _size  s    


zChannel._sizec                 C   s$   |  |}|r | | j | S |S rB   )priorityr   )rT   r  r7  r/   r/   r0   r6    s    
zChannel._q_for_pric                 C   s   | j }|t||d  S )Nr   )r   r   )rT   nZstepsr/   r/   r0   rH    s    zChannel.priorityc              	   K   s>   | j |dd}|  }|| ||t| W 5 Q R X dS )zDeliver message.F)reverseN)Z_get_message_priorityr   r  r6  r   )rT   r  r{   rk   r7  r?   r/   r/   r0   _put  s    
zChannel._putc              	   K   s0   |   }|| ||t| W 5 Q R X dS )zDeliver fanout message.N)r   publishr   r   )rT   r~   r{   r   rk   r?   r/   r/   r0   _put_fanout  s
    

zChannel._put_fanoutc                 K   s   |r| j | d S rB   )r   r   )rT   r  Zauto_deleterk   r/   r/   r0   
_new_queue  s    zChannel._new_queuec              	   C   sl   |  |jdkr&||ddf| j|< |  4}|| j|f | j|pJd|pPd|pVdg W 5 Q R X d S )Nfanout#*rt   )	Ztypeofr   replacer  r   Zsaddkeyprefix_queuer   r  )rT   r~   r   r'  r  r?   r/   r/   r0   _queue_bind  s     


zChannel._queue_bindc           
   
   O   s   | j | | j|ddn}|| j|f | j|p:d|p@d|pFdg | ,}| j	D ]}	|
| ||	}q^|  W 5 Q R X W 5 Q R X d S )Nr?   r   rt   )r   r   r   r   ZsremrS  r   r  rp   r   deleter6  r   )
rT   r  r~   r   r'  rZ   rk   r?   r   r7  r/   r/   r0   _delete  s    

zChannel._deletec                 K   sj   |   X}| D}| jD ]}|| ||}qt| W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S rB   )r   rp   r   existsr6  r,  r   )rT   r  rk   r?   r   r7  r/   r/   r0   
_has_queue  s
    


zChannel._has_queuec              
      sZ    j | }  >}||}|s2g W  5 Q R  S  fdd|D W  5 Q R  S Q R X d S )Nc                    s    g | ]}t t| jqS r/   )r.  r   r3  r   )rQ   valrS   r/   r0   rU   )  s     z%Channel.get_table.<locals>.<listcomp>)rS  r   Zsmembers)rT   r~   rd   r?   r   r/   rS   r0   	get_table!  s    


zChannel.get_tablec                 C   s   |   p}| \}| jD ] }| ||}|||}q| }t|d d d W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S )NrK   )r   rp   r   r6  rD  rU  r   rE  )rT   r  r?   r   r7  ZpriqrF  r/   r/   r0   _purge+  s    


zChannel._purgec                    sp   d| _ | jsb| jj|  | jd}|d k	rR| jD ]}|| jkr4| j	||d q4| 
  |   t   d S )NTr?   r   )_closingclosedr`   r  r   r   r   r  r   Zqueue_deleter  _close_clientsr]   r   )rT   r?   r  rf   r/   r0   r   4  s    

zChannel.closec                 C   sP   dD ]F}z$| j | }|jd  }|_|  W q tt| jfk
rH   Y qX qd S )N)r?   r   )r   r`   r	  r   r   r.   )rT   attrr?   r`   r/   r/   r0   r^  D  s    
zChannel._close_clientsc                 C   sh   t |tjsd|r|dkrt}n|dr4|dd  }zt|}W n" tk
rb   td|Y nX |S )Nr  r   z/Database is int between 0 and limit - 1, not {})r  r@  rA  
DEFAULT_DB
startswithintr   format)rT   Zvhostr/   r/   r0   _prepare_virtual_hostN  s    

zChannel._prepare_virtual_hostc                 K   s   |S rB   r/   )rT   r   r   paramsr/   r/   r0   _filter_tcp_connparams]  s    zChannel._filter_tcp_connparamsc                    s  | j j}|jpd|jp| j j|j|j|j| j| j	| j
| j| j| j| jd}| j}t|drpt|jdsp|d |jrz||j | j|d< W n tk
r   Y nX |d }d|krJt|\}}}}}	}
}|dkr"| jf |}|jtjd	|
 d
f| |dd  |dd  |dd  ||d< |	|d< |dd  |dd  | |dd |d< |  |dpt| j}|rG  fddd|}|}||d< |S )Nz	127.0.0.1)hostportvirtual_hostusernamepasswordr   r   r   r   r   r   r   rv   r   connection_classrg  z://r'   r  )rl  pathr   r   r   rj  rk  rh  ri  dbc                       s   e Zd Z fddZ  ZS )z'Channel._connparams.<locals>.Connectionc                    s   t    |  d S rB   )r]   r	  r   rS   )rg   rE   r/   r0   r	    s    
z2Channel._connparams.<locals>.Connection.disconnect)r5   r6   r7   r	  rq   r/   rD   rf   r0   
Connection  s   ro  )r`   r?   hostnamerh  default_portri  Zuseridrk  r   r   r   r   r   r   r   rl  r$   r   rv   rW   sslupdateconnection_class_sslr   r   rf  r#   ZUnixDomainSocketConnectionrd  r   )rT   asynchronousZconninfo
connparamsZ
conn_classrg  schemer   rj  rk  rm  queryZconnection_clsro  r/   rD   r0   _connparamsa  sp    




zChannel._connparamsc                 C   s    |r| j | jdS | j | jdS )N)rn   )r   r  r
  rT   ru  r/   r/   r0   _create_client  s    zChannel._create_clientc                 C   s,   | j |d}| jj|d d| _tjf |S )Nru  rn  )rn  )ry  r  rc  r#   ConnectionPool)rT   ru  re  r/   r/   r0   	_get_pool  s    zChannel._get_poolc                 C   s4   t jdk rtdt | jr.tjt| jdS t jS )N)r   rK   r   zSRedis transport requires redis-py versions 3.2.0 or later. You have {0.__version__}rl   )	r#   r   r	   rc  rN   	functoolspartialrr   ZStrictRedisrS   r/   r/   r0   r     s    
zChannel._get_clientc                 c   s   |r|V  n
|   V  d S rB   r{  rT   r?   r/   r/   r0   r     s    zChannel.conn_or_acquirec                 C   s   | j d kr|  | _ | j S rB   )r  r~  rS   r/   r/   r0   r
    s    

zChannel.poolc                 C   s   | j d kr| jdd| _ | j S )NTr|  )r  r~  rS   r/   r/   r0   r    s    
zChannel.async_poolc                 C   s   | j ddS )z+Client used to publish messages, BRPOP etc.Tr|  r  rS   r/   r/   r0   r?     s    zChannel.clientc                 C   s   | j dd}| S )z1Pub/Sub connection used to consume fanout queues.Tr|  )r{  rx   r  r/   r/   r0   r     s    zChannel.subclientc                 C   s   | j | j d S rB   )r   rs  r   rS   r/   r/   r0   r    s    zChannel._update_queue_cyclec                 C   s   ddl m} |jS )Nr   r    )r#   r!   r.   )rT   r!   r/   r/   r0   r     s    zChannel._get_response_errorc                    s    fdd j D S )z<Set of queues being consumed from (excluding fanout queues).c                    s   h | ]}| j kr|qS r/   )r   r"  rS   r/   r0   	<setcomp>  s    
z(Channel.active_queues.<locals>.<setcomp>)Z_active_queuesrS   r/   rS   r0   r     s    zChannel.active_queues)F)F)r   )F)NN)F)F)F)N)br5   r6   r7   r8   r|   Z_clientZ
_subclientr\  Zsupports_fanoutrS  r  r   r   r   r  r   r   r   r   r   r   r   PRIORITY_STEPSr   r   r   r   r   r   r   DEFAULT_HEALTH_CHECK_INTERVALr   r   r   rN   r   r  r  r   r   from_transport_optionsr#   ro  rl  SSLConnectionrt  rv   rC   r  r   r   r  r  r  r  r  r   r!  r   r  r*  r  r+  r   r   r   r?  rG  r6  rH  rK  rM  rN  rT  rV  rX  rZ  r[  r   r^  rd  rf  ry  r{  r~  r   r   r   r   r
  r  r   r?   r   r  r   r   rq   r/   r/   rf   r0   r   ^  s   %	 

	

	
  

H





r   c                       sv   e Zd ZdZeZdZeZdZdZ	e
jjjdedddgdZerJe \ZZ fd	d
Zdd Zdd Zdd Z  ZS )r&   zRedis Transport.Nr#   Tdirectr&  rO  )ru  Zexchange_typec                    s*   t d krtdt j|| t | _d S )Nz)Missing redis library (pip install redis))r#   ImportErrorr]   rv   r   r  rj   rf   r/   r0   rv     s    zTransport.__init__c                 C   s   t jS rB   )r#   __version__rS   r/   r/   r0   driver_version  s    zTransport.driver_versionc                    s   | j j jj | jfdd}|_ fddj 	dj
 |jjdt}	|j d S )Nc                    sB   | j r| j   jr>zj W n tk
r<   Y nX d S rB   )r   r  r   on_tickr   )r`   )r  loopr   r/   r0   _on_disconnect  s    z:Transport.register_with_event_loop.<locals>._on_disconnectc                      s       fddj D  d S )Nc                    s   g | ]} ||qS r/   r/   )rQ   r   )
add_readerr   r/   r0   rU      s     zMTransport.register_with_event_loop.<locals>.on_poll_start.<locals>.<listcomp>)r   r/   )r  r  cycle_poll_startr   r/   r0   r     s    z9Transport.register_with_event_loop.<locals>.on_poll_startr   r   )r  r   r   r   r  r   r   r  r   Zcall_repeatedlyr   r?   Ztransport_optionsr   r  r   )rT   r`   r  r  r   r/   )r  r  r  r  r   r   r0   register_with_event_loop
  s$    z"Transport.register_with_event_loopc                 C   s   | j | dS )z1Handle AIO event for one of our file descriptors.N)r  r   )rT   r   r/   r/   r0   r   ,  s    zTransport.on_readable)r5   r6   r7   r8   r   Zpolling_intervalDEFAULT_PORTrq  Zdriver_typeZdriver_namer   r&   Z
implementsextend	frozensetr#   r1   r   r   rv   r  r  r   rq   r/   r/   rf   r0   r&     s    
"r&   c                   @   s   e Zd ZdZdS )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        Nr4   r/   r/   r/   r0   r  2  s   r  c                   @   sH   e Zd ZdZejd Zer ejndZer,e	ndZ
d	ddZd
ddZdS )SentinelChannela  Channel with explicit Redis Sentinel knowledge.

    Broker url is supposed to look like:

    .. code-block::

        sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

    where each sentinel is separated by a `;`.

    Other arguments for the sentinel should come from the transport options
    (see `transport_options` of :class:`~kombu.connection.Connection`).

    You must provide at least one option in Transport options:
     * `master_name` - name of the redis group to poll

    Example:

    .. code-block:: python

        >>> import kombu
        >>> c = kombu.Connection(
             'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
             transport_options={'master_name': 'mymaster'}
        )
        >>> c.connect()
    )master_namemin_other_sentinelssentinel_kwargsNFc           	      C   s   |  |}| }|dd  |dd  g }| jjjD ]4}t|}|jdkr8|jpZ| jj	}|
|j|f q8|s|
|d |d f tj|ft| ddt| dd d|}t| dd }|d krtd	||| jjS )
Nrg  rh  r   r  r   r  )r  r  r  z1'master_name' transport option must be specified.)ry  copyrW   r`   r?   Zaltr   rw  rh  rq  r   rp  r   ZSentinelr   r   Z
master_forr   rn   )	rT   ru  rv  Zadditional_paramsZ	sentinelsurlrh  Zsentinel_instr  r/   r/   r0   _sentinel_managed_poold  s:    



z&SentinelChannel._sentinel_managed_poolc                 C   s
   |  |S rB   )r  rz  r/   r/   r0   r~    s    zSentinelChannel._get_pool)F)F)r5   r6   r7   r8   r   r  r   SentinelManagedConnectionrl  r  rt  r  r~  r/   r/   r/   r0   r  ?  s   

%r  c                   @   s   e Zd ZdZdZeZdS )SentinelTransportzRedis Sentinel Transport.ig  N)r5   r6   r7   r8   rq  r  r   r/   r/   r/   r0   r    s   r  )Kr8   r  r@  r'   r   collectionsr   
contextlibr   r  r   r   Zviner   Zkombu.exceptionsr   r	   Z	kombu.logr
   Zkombu.utils.compatr   Zkombu.utils.encodingr   Zkombu.utils.eventior   r   r   Zkombu.utils.functionalr   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr   Zkombu.utils.schedulingr   Zkombu.utils.urlr   rt   r   r#   r  r   loggercriticalr1  r  r  r`  r  r  r   r1   r2   r  r3   rA   rF   rG   ru   rr   r?   ry   rm   ZPubSubrw   r|   r   r   r&   r  r  r  r  r  r/   r/   r/   r0   <module>   s|   5



Q4i       D
N