U
    Y+d                     @   s   d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ d	Zd
efdefdefdefdefdZdd Zdd Zdd ZG dd dZG dd dZG dd deZededddd gdd!ZG d"d# d#ZdS )$zBase transport interface.    N)RecoverableConnectionError)ChannelErrorConnectionError)Message)
dictfilter)cached_property)maybe_s_to_ms)r   
StdChannel
Management	Transportz	x-expireszx-message-ttlzx-max-lengthzx-max-length-byteszx-max-priority)expiresZmessage_ttl
max_lengthZmax_length_bytesZmax_priorityc                 K   s.   t tdd | D }|r*t| f|S | S )a  Convert queue arguments to RabbitMQ queue arguments.

    This is the implementation for Channel.prepare_queue_arguments
    for AMQP-based transports.  It's used by both the pyamqp and librabbitmq
    transports.

    Arguments:
        arguments (Mapping):
            User-supplied arguments (``Queue.queue_arguments``).

    Keyword Arguments:
        expires (float): Queue expiry time in seconds.
            This will be converted to ``x-expires`` in int milliseconds.
        message_ttl (float): Message TTL in seconds.
            This will be converted to ``x-message-ttl`` in int milliseconds.
        max_length (int): Max queue length (in number of messages).
            This will be converted to ``x-max-length`` int.
        max_length_bytes (int): Max queue size in bytes.
            This will be converted to ``x-max-length-bytes`` int.
        max_priority (int): Max priority steps for queue.
            This will be converted to ``x-max-priority`` int.

    Returns:
        Dict: RabbitMQ compatible queue arguments.
    c                 s   s   | ]\}}t ||V  qd S N)_to_rabbitmq_queue_argument).0keyvalue r   8/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/base.py	<genexpr>6   s   z.to_rabbitmq_queue_arguments.<locals>.<genexpr>)r   dictitems)	argumentsoptionspreparedr   r   r   to_rabbitmq_queue_arguments   s    

r   c                 C   s$   t |  \}}||d k	r||n|fS r   )RABBITMQ_QUEUE_ARGUMENTS)r   r   opttypr   r   r   r   =   s    r   c                 C   s   t d| j|S )Nz<Transport {0.__module__}.{0.__name__} does not implement {1})NotImplementedErrorformat	__class__)objmethodr   r   r   
_LeftBlankC   s     r$   c                   @   sL   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd ZdS )r	   zStandard channel base class.Nc                 O   s   ddl m} || f||S )Nr   )Consumer)kombu.messagingr%   )selfargskwargsr%   r   r   r   r%   N   s    zStdChannel.Consumerc                 O   s   ddl m} || f||S )Nr   )Producer)r&   r*   )r'   r(   r)   r*   r   r   r   r*   R   s    zStdChannel.Producerc                 C   s   t | dd S Nget_bindingsr$   r'   r   r   r   r,   V   s    zStdChannel.get_bindingsc                 C   s   dS )zCallback called after RPC reply received.

        Notes:
           Reply queue semantics: can be used to delete the queue
           after transient reply message received.
        Nr   )r'   queuer   r   r   after_reply_message_receivedY   s    z'StdChannel.after_reply_message_receivedc                 K   s   |S r   r   )r'   r   r)   r   r   r   prepare_queue_argumentsa   s    z"StdChannel.prepare_queue_argumentsc                 C   s   | S r   r   r.   r   r   r   	__enter__d   s    zStdChannel.__enter__c                 G   s   |    d S r   )close)r'   exc_infor   r   r   __exit__g   s    zStdChannel.__exit__)__name__
__module____qualname____doc__Zno_ack_consumersr%   r*   r,   r0   r1   r2   r5   r   r   r   r   r	   I   s   r	   c                   @   s    e Zd ZdZdd Zdd ZdS )r
   z!AMQP Management API (incomplete).c                 C   s
   || _ d S r   )	transport)r'   r:   r   r   r   __init__n   s    zManagement.__init__c                 C   s   t | dd S r+   r-   r.   r   r   r   r,   q   s    zManagement.get_bindingsN)r6   r7   r8   r9   r;   r,   r   r   r   r   r
   k   s   r
   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	
Implementsz/Helper class used to define transport features.c                 C   s,   z
| | W S  t k
r&   t|Y nX d S r   )KeyErrorAttributeError)r'   r   r   r   r   __getattr__x   s    
zImplements.__getattr__c                 C   s   || |< d S r   r   )r'   r   r   r   r   r   __setattr__~   s    zImplements.__setattr__c                 K   s   | j | f|S r   )r!   )r'   r)   r   r   r   extend   s    zImplements.extendN)r6   r7   r8   r9   r?   r@   rA   r   r   r   r   r<   u   s   r<   FdirectZtopicZfanoutheaders)asynchronousZexchange_type
heartbeatsc                   @   s  e Zd ZdZeZdZdZdZefZ	e
fZdZdZdZe Zdd Zdd Zd	d
 Zdd Zdd Zdd Zd2ddZdd Zdd Zdd Zdd Zdd Zejej e!j"e!j#ffddZ$d d! Z%d"d# Z&d3e'e'd%d&d'Z(e)d(d) Z*d*d+ Z+e,d,d- Z-e)d.d/ Z.e)d0d1 Z/dS )4r   zBase class for transports.NFN/Ac                 K   s
   || _ d S r   )client)r'   rG   r)   r   r   r   r;      s    zTransport.__init__c                 C   s   t | dd S )Nestablish_connectionr-   r.   r   r   r   rH      s    zTransport.establish_connectionc                 C   s   t | dd S )Nclose_connectionr-   r'   
connectionr   r   r   rI      s    zTransport.close_connectionc                 C   s   t | dd S )Ncreate_channelr-   rJ   r   r   r   rL      s    zTransport.create_channelc                 C   s   t | dd S )Nclose_channelr-   rJ   r   r   r   rM      s    zTransport.close_channelc                 K   s   t | dd S )Ndrain_eventsr-   )r'   rK   r)   r   r   r   rN      s    zTransport.drain_events   c                 C   s   d S r   r   )r'   rK   Zrater   r   r   heartbeat_check   s    zTransport.heartbeat_checkc                 C   s   dS )NrF   r   r.   r   r   r   driver_version   s    zTransport.driver_versionc                 C   s   dS )Nr   r   rJ   r   r   r   get_heartbeat_interval   s    z Transport.get_heartbeat_intervalc                 C   s   d S r   r   r'   rK   loopr   r   r   register_with_event_loop   s    z"Transport.register_with_event_loopc                 C   s   d S r   r   rS   r   r   r   unregister_from_event_loop   s    z$Transport.unregister_from_event_loopc                 C   s   dS NTr   rJ   r   r   r   verify_connection   s    zTransport.verify_connectionc                    s    j  fdd  S )Nc              
      sz   j stdzdd W nL k
r2   Y d S  k
rh } z|jkrVW Y 
d S  W 5 d }~X Y nX |  |  d S )NzSocket was disconnectedr   )timeout)	connectedr   errnoZ	call_soon)rT   exc_read_unavailrK   rN   errorrY   r   r   r^      s    

z%Transport._make_reader.<locals>._read)rN   )r'   rK   rY   r`   r_   r   r]   r   _make_reader   s    zTransport._make_readerc                 C   s   dS rW   r   rJ   r   r   r   qos_semantics_matches_spec   s    z$Transport.qos_semantics_matches_specc                 C   s*   | j }|d kr| | }| _ || d S r   )_Transport__readerra   )r'   rK   rT   readerr   r   r   on_readable   s    zTransport.on_readable**)urireturnc                 C   s
   t  dS )z(Customise the display format of the URI.N)r   )r'   rg   Zinclude_passwordmaskr   r   r   as_uri   s    zTransport.as_uric                 C   s   i S r   r   r.   r   r   r   default_connection_params   s    z#Transport.default_connection_paramsc                 O   s
   |  | S r   )r
   )r'   r(   r)   r   r   r   get_manager   s    zTransport.get_managerc                 C   s   |   S r   )rl   r.   r   r   r   manager   s    zTransport.managerc                 C   s   | j jS r   )
implementsrE   r.   r   r   r   supports_heartbeats   s    zTransport.supports_heartbeatsc                 C   s   | j jS r   )rn   rD   r.   r   r   r   supports_ev   s    zTransport.supports_ev)rO   )Frf   )0r6   r7   r8   r9   r
   rG   Zcan_parse_urldefault_portr   Zconnection_errorsr   Zchannel_errorsZdriver_typeZdriver_namerc   default_transport_capabilitiesrA   rn   r;   rH   rI   rL   rM   rN   rP   rQ   rR   rU   rV   rX   socketrY   r`   r[   EAGAINZEINTRra   rb   re   strrj   propertyrk   rl   r   rm   ro   rp   r   r   r   r   r      sN   
 




r   )r9   r[   rs   Zamqp.exceptionsr   Zkombu.exceptionsr   r   Zkombu.messager   Zkombu.utils.functionalr   Zkombu.utils.objectsr   Zkombu.utils.timer   __all__intr   r   r   r$   r	   r
   r   r<   	frozensetrr   r   r   r   r   r   <module>   s6   	""
