U
    b+d~5                     @   s   d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZ d dlmZmZmZmZ d dlmZ G dd	 d	eZG d
d deZG dd deZG dd deeZG dd deZG dd deZ dS )    N)AsyncIteratorIterableMappingOptionalSequenceTupleType)Redis)
ConnectionConnectionPool
EncodableTSSLConnection)AsyncSentinelCommands)ConnectionErrorReadOnlyErrorResponseErrorTimeoutError)str_if_bytesc                   @   s   e Zd ZdS )MasterNotFoundErrorN__name__
__module____qualname__ r   r   :/tmp/pip-unpacked-wheel-cdsyf3nb/redis/asyncio/sentinel.pyr      s   r   c                   @   s   e Zd ZdS )SlaveNotFoundErrorNr   r   r   r   r   r      s   r   c                       sZ   e Zd Z fddZdd Z fddZdd Zd	d
 Zdee	e
 d fddZ  ZS )SentinelManagedConnectionc                    s   | d| _t jf | d S )Nconnection_pool)popr   super__init__)selfkwargs	__class__r   r   r       s    z"SentinelManagedConnection.__init__c                 C   sD   | j }| jj d|j }| jr<d| j d| j }||7 }|d S )N	<service=z,host=z,port=>)r   r$   r   service_namehostport)r!   poolsZ	host_infor   r   r   __repr__   s    z"SentinelManagedConnection.__repr__c                    sV   |\| _ | _t  I d H  | jjrR| dI d H  t|  I d H dkrRt	dd S )NZPINGZPONGzPING failed)
r(   r)   r   connectr   check_connectionZsend_commandr   read_responser   )r!   addressr#   r   r   
connect_to'   s    z$SentinelManagedConnection.connect_toc              	      s   | j r
d S | jjr0| | j I d H I d H  nN| j 2 z<3 d H W }z| |I d H W   S  tk
rt   Y q:Y q:X q:6 td S N)Z_readerr   	is_masterr1   get_master_addressrotate_slavesr   r   )r!   slaver   r   r   _connect_retry/   s    z(SentinelManagedConnection._connect_retryc                    s   | j | jdd I d H S )Nc                 S   s
   t dS )Nr   )asynciosleep)errorr   r   r   <lambda>?       z3SentinelManagedConnection.connect.<locals>.<lambda>)retryZcall_with_retryr7   r!   r   r   r   r-   <   s    z!SentinelManagedConnection.connectFNdisable_decodingtimeoutc                    sR   zt  j||dI d H W S  tk
rL   | jjrF|  I d H  td Y nX d S )Nr?   z"The previous master is now a slave)r   r/   r   r   r3   
disconnectr   )r!   r@   rA   r#   r   r   r/   B   s    z'SentinelManagedConnection.read_response)FN)r   r   r   r    r,   r1   r7   r-   boolr   floatr/   __classcell__r   r   r#   r   r      s     r   c                   @   s   e Zd ZdS )SentinelManagedSSLConnectionNr   r   r   r   r   rF   X   s   rF   c                       s\   e Zd ZdZ fddZdd Z fddZed fd	d
Zdd Z	e
dddZ  ZS )SentinelConnectionPoolz
    Sentinel backed connection pool.

    If ``check_connection`` flag is set to True, SentinelManagedConnection
    sends a PING command right after establishing the connection.
    c                    sv   | d|ddrtnt|d< |dd| _|dd| _t jf | t	| | j
d< || _|| _d | _d | _d S )NZconnection_classsslFr3   Tr.   r   )getr   rF   r   r3   r.   r   r    weakrefproxyconnection_kwargsr'   sentinel_managermaster_addressslave_rr_counter)r!   r'   rM   r"   r#   r   r   r    d   s    
zSentinelConnectionPool.__init__c                 C   s&   | j j d| j d| jrdpd dS )Nr%   (Zmasterr6   z)>)r$   r   r'   r3   r>   r   r   r   r,   t   s    $zSentinelConnectionPool.__repr__c                    s   t    d | _d | _d S r2   )r   resetrN   rO   r>   r#   r   r   rQ   z   s    
zSentinelConnectionPool.reset)
connectionc                    s0   | j  p| j o| j|j|jfk}|o.t |S r2   )r3   rN   r(   r)   r   owns_connection)r!   rR   checkr#   r   r   rS      s    z&SentinelConnectionPool.owns_connectionc                    s@   | j | jI d H }| jr<| j|kr<|| _| jddI d H  |S )NF)Zinuse_connections)rM   discover_masterr'   r3   rN   rB   )r!   rN   r   r   r   r4      s    
z)SentinelConnectionPool.get_master_address)returnc                 C  s   | j | jI dH }|rn| jdkr8tdt|d | _tt|D ](}| jd t| | _|| j }|V  qDz|  I dH V  W n t	k
r   Y nX t
d| jdS )zRound-robin slave balancerNr      zNo slave found for )rM   discover_slavesr'   rO   randomrandintlenranger4   r   r   )r!   slaves_r6   r   r   r   r5      s    

z$SentinelConnectionPool.rotate_slaves)r   r   r   __doc__r    r,   rQ   r
   rS   r4   r   r5   rE   r   r   r#   r   rG   \   s   
rG   c                   @   s   e Zd ZdZdddZdd Zdd	 Zeee	d
ddZ
edddZee eeeef  dddZeeeeef  dddZeefeee ee dddZeefeee ee dddZdS )Sentinela~  
    Redis Sentinel cluster client

    >>> from redis.sentinel import Sentinel
    >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
    >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
    >>> await master.set('foo', 'bar')
    >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
    >>> await slave.get('foo')
    b'bar'

    ``sentinels`` is a list of sentinel nodes. Each node is represented by
    a pair (hostname, port).

    ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
    When querying a sentinel, if it doesn't meet this threshold, responses
    from that sentinel won't be considered valid.

    ``sentinel_kwargs`` is a dictionary of connection arguments used when
    connecting to sentinel instances. Any argument that can be passed to
    a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
    not specified, any socket_timeout and socket_keepalive options specified
    in ``connection_kwargs`` will be used.

    ``connection_kwargs`` are keyword arguments that will be used when
    establishing a connection to a Redis server.
    r   Nc                    sD   |d krdd |  D }| _ fdd|D  _| _| _d S )Nc                 S   s    i | ]\}}| d r||qS )Zsocket_)
startswith).0kvr   r   r   
<dictcomp>   s    
  z%Sentinel.__init__.<locals>.<dictcomp>c                    s&   g | ]\}}t f ||d  jqS ))r(   r)   )r	   sentinel_kwargs)rb   hostnamer)   r>   r   r   
<listcomp>   s   z%Sentinel.__init__.<locals>.<listcomp>)itemsrf   	sentinelsmin_other_sentinelsrL   )r!   rj   rk   rf   rL   r   r>   r   r       s    	
zSentinel.__init__c                    sp   t dd}d kr&d |rR fdd| jD }tj| I dH  nt| jj	 I dH  dS )z
        Execute Sentinel command in sentinel nodes.
        once - If set to True, then execute the resulting command on a single
               node at random, rather than across the entire sentinel cluster.
        onceFc                    s   g | ]}t |j qS r   )r8   ZTaskexecute_command)rb   sentinelargsr"   r   r   rh      s   z,Sentinel.execute_command.<locals>.<listcomp>NT)
rC   rI   keysr   rj   r8   ZgatherrY   choicerm   )r!   rp   r"   rl   Ztasksr   ro   r   rm      s    
zSentinel.execute_commandc                 C   sN   g }| j D ](}||jjd  d|jjd   q
| jj dd| dS )Nr(   :r)   z<sentinels=[,z]>)rj   appendr   rL   r$   r   join)r!   Zsentinel_addressesrn   r   r   r   r,      s    
zSentinel.__repr__)stater'   rV   c                 C   s2   |d r|d s|d rdS |d | j k r.dS dS )Nr3   is_sdownis_odownFznum-other-sentinelsT)rk   )r!   rw   r'   r   r   r   check_master_state   s
    zSentinel.check_master_state)r'   c              
      s   t | jD ]\}}z| I dH }W n ttfk
r@   Y q
Y nX ||}|r
| ||r
|| jd  | jd< | j|< |d |d f  S q
td|dS )z
        Asks sentinel servers for the Redis master's address corresponding
        to the service labeled ``service_name``.

        Returns a pair (address, port) or raises MasterNotFoundError if no
        master is found.
        Nr   ipr)   zNo master found for )	enumeraterj   Zsentinel_mastersr   r   rI   rz   r   )r!   r'   Zsentinel_norn   Zmastersrw   r   r   r   rU      s    

zSentinel.discover_master)r]   rV   c                 C   s:   g }|D ],}|d s|d rq| |d |d f q|S )z1Remove slaves that are in an ODOWN or SDOWN statery   rx   r{   r)   )ru   )r!   r]   Zslaves_aliver6   r   r   r   filter_slaves  s    zSentinel.filter_slaves)r'   rV   c                    sZ   | j D ]N}z||I dH }W n tttfk
r<   Y qY nX | |}|r|  S qg S )z;Returns a list of alive slaves for service ``service_name``N)rj   Zsentinel_slavesr   r   r   r}   )r!   r'   rn   r]   r   r   r   rX     s    



zSentinel.discover_slaves)r'   redis_classconnection_pool_classc                 K   s0   d|d< t | j}|| |||| f|dS )a  
        Returns a redis client instance for the ``service_name`` master.

        A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
        used to retrieve the master's address before establishing a new
        connection.

        NOTE: If the master's address has changed, any cached connections to
        the old master are closed.

        By default clients will be a :py:class:`~redis.Redis` instance.
        Specify a different class to the ``redis_class`` argument if you
        desire something different.

        The ``connection_pool_class`` specifies the connection pool to
        use.  The :py:class:`~redis.sentinel.SentinelConnectionPool`
        will be used by default.

        All other keyword arguments are merged with any connection_kwargs
        passed to this class and passed to the connection pool as keyword
        arguments to be used to initialize Redis connections.
        Tr3   r   dictrL   updater!   r'   r~   r   r"   rL   r   r   r   
master_for)  s    

 zSentinel.master_forc                 K   s0   d|d< t | j}|| |||| f|dS )a  
        Returns redis client instance for the ``service_name`` slave(s).

        A SentinelConnectionPool class is used to retrieve the slave's
        address before establishing a new connection.

        By default clients will be a :py:class:`~redis.Redis` instance.
        Specify a different class to the ``redis_class`` argument if you
        desire something different.

        The ``connection_pool_class`` specifies the connection pool to use.
        The SentinelConnectionPool will be used by default.

        All other keyword arguments are merged with any connection_kwargs
        passed to this class and passed to the connection pool as keyword
        arguments to be used to initialize Redis connections.
        Fr3   r   r   r   r   r   r   	slave_forO  s    

 zSentinel.slave_for)r   N)r   r   r   r_   r    rm   r,   r   strrC   rz   rU   r   r   r   r   r   r}   rX   r	   rG   r   r   r   r   r   r   r   r`      s8     
	)r`   )!r8   rY   rJ   typingr   r   r   r   r   r   r   Zredis.asyncio.clientr	   Zredis.asyncio.connectionr
   r   r   r   Zredis.commandsr   Zredis.exceptionsr   r   r   r   Zredis.utilsr   r   r   r   rF   rG   r`   r   r   r   r   <module>   s   $>E