U
    Y+d$                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlmZ dZ	d	Z
G d
d dejZG dd dejejZG dd dejZG dd dejZG dd deZdS )a  pyamqp transport module for Kombu.

Pure-Python amqp transport using py-amqp library.

Features
========
* Type: Native
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
Connection string can have the following formats:

.. code-block::

    amqp://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    [USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    amqp://

For TLS encryption use:

.. code-block::

    amqps://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]

Transport Options
=================
Transport Options are passed to constructor of underlying py-amqp
:class:`~kombu.connection.Connection` class.

Using TLS
=========
Transport over TLS can be enabled by ``ssl`` parameter of
:class:`~kombu.Connection` class. By setting ``ssl=True``, TLS transport is
used::

    conn = Connect('amqp://', ssl=True)

This is equivalent to ``amqps://`` transport URI::

    conn = Connect('amqps://')

For adding additional parameters to underlying TLS, ``ssl`` parameter should
be set with dict instead of True::

    conn = Connect('amqp://broker.example.com', ssl={
            'keyfile': '/path/to/keyfile'
            'certfile': '/path/to/certfile',
            'ca_certs': '/path/to/ca_certfile'
        }
    )

All parameters are passed to ``ssl`` parameter of
:class:`amqp.connection.Connection` class.

SSL option ``server_hostname`` can be set to ``None`` which is causing using
hostname from broker URL. This is usefull when failover is used to fill
``server_hostname`` with currently used broker::

    conn = Connect('amqp://broker1.example.com;broker2.example.com', ssl={
            'server_hostname': None
        }
    )
    N)get_manager)version_string_as_tuple   )baseto_rabbitmq_queue_argumentsi(  i'  c                       s"   e Zd ZdZd fdd	Z  ZS )MessagezAMQP Message.Nc                    sL   |j }t jf |j||j|d|d|j|j |dp<i d| d S )Ncontent_typecontent_encodingapplication_headers)bodychanneldelivery_tagr	   r
   delivery_info
propertiesheaders)r   super__init__r   r   getr   )selfmsgr   kwargsprops	__class__ :/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/pyamqp.pyr   V   s    	zMessage.__init__)N__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r   S   s   r   c                   @   s<   e Zd ZdZeZdddddejfddZdd Zdd ZdS )	ChannelzAMQP Channel.Nc                 C   s   ||f||||d|pi S )z<Prepare message so that it can be sent using this transport.)priorityr	   r
   r   r   )r   r   r$   r	   r
   r   r   Z_Messager   r   r   prepare_messagei   s    zChannel.prepare_messagec                 K   s   t |f|S Nr   )r   	argumentsr   r   r   r   prepare_queue_argumentsv   s    zChannel.prepare_queue_argumentsc                 C   s   | j || dS )z4Convert encoded message body back to a Python value.r   )r   )r   Zraw_messager   r   r   message_to_pythony   s    zChannel.message_to_python)	r   r   r    r!   r   amqpr%   r(   r*   r   r   r   r   r#   d   s      
r#   c                   @   s   e Zd ZdZeZdS )
ConnectionzAMQP Connection.N)r   r   r    r!   r#   r   r   r   r   r,   ~   s   r,   c                   @   s   e Zd ZdZeZeZeZe	jj
Z
e	jjZe	jjZe	jjZdZdZejjjdddZd$ddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd%ddZdd Ze d d! Z!d"d# Z"dS )&	TransportzAMQP Transport.zpy-amqpr+   T)ZasynchronousZ
heartbeatsNc                 K   s"   || _ |p| j| _|p| j| _d S r&   )clientdefault_portdefault_ssl_port)r   r.   r/   r0   r   r   r   r   r      s    zTransport.__init__c                 C   s   t jS r&   )r+   __version__r   r   r   r   driver_version   s    zTransport.driver_versionc                 C   s   |  S r&   r)   r   
connectionr   r   r   create_channel   s    zTransport.create_channelc                 K   s   |j f |S r&   )drain_events)r   r5   r   r   r   r   r7      s    zTransport.drain_eventsc                 C   s   |d k	r|   d S r&   )Zcollectr4   r   r   r   _collect   s    zTransport._collectc                 C   s   | j }| j D ] \}}t||dst||| q|jdkrBd|_t|jtrrd|jkrr|jd dkrr|j|jd< t|j	|j
|j|j|j|j|j|j|jd	f|jpi }| jf |}| j |_ |  |S )z(Establish connection to the AMQP broker.N	localhostz	127.0.0.1server_hostname)	hostuseridpasswordlogin_methodvirtual_hostinsistsslconnect_timeout	heartbeat)r.   default_connection_paramsitemsgetattrsetattrhostname
isinstancerA   dictr;   r<   r=   r>   r?   r@   rB   rC   Ztransport_optionsr,   connect)r   Zconninfonamedefault_valueoptsconnr   r   r   establish_connection   s:    

zTransport.establish_connectionc                 C   s   |j S r&   )	connectedr4   r   r   r   verify_connection   s    zTransport.verify_connectionc                 C   s   d|_ |  dS )z!Close the AMQP broker connection.N)r.   closer4   r   r   r   close_connection   s    zTransport.close_connectionc                 C   s   |j S r&   )rC   r4   r   r   r   get_heartbeat_interval   s    z Transport.get_heartbeat_intervalc                 C   s    d|j _||j| j|| d S NT)	transportZraise_on_initial_eintrZ
add_readersockZon_readable)r   r5   Zloopr   r   r   register_with_event_loop   s    z"Transport.register_with_event_loop   c                 C   s   |j |dS )N)rate)Zheartbeat_tick)r   r5   r[   r   r   r   heartbeat_check   s    zTransport.heartbeat_checkc                 C   s(   |j }|ddkr$t|d dk S dS )NproductZRabbitMQversion)   r_   T)Zserver_propertiesr   r   )r   r5   r   r   r   r   qos_semantics_matches_spec   s    z$Transport.qos_semantics_matches_specc                 C   s    dd| j jr| jn| jdddS )NZguestr9   ZPLAIN)r<   r=   portrH   r>   )r.   rA   r0   r/   r2   r   r   r   rD      s    z#Transport.default_connection_paramsc                 O   s   t | jf||S r&   )r   r.   r   argsr   r   r   r   r      s    zTransport.get_manager)NN)rZ   )#r   r   r    r!   r,   DEFAULT_PORTr/   DEFAULT_SSL_PORTr0   r+   Zconnection_errorsZchannel_errorsZrecoverable_connection_errorsZrecoverable_channel_errorsZdriver_nameZdriver_typer   r-   Z
implementsextendr   r3   r6   r7   r8   rP   rR   rT   rU   rY   r\   r`   propertyrD   r   r   r   r   r   r-      s@      



r-   c                       s    e Zd ZdZ fddZ  ZS )SSLTransportzAMQP SSL Transport.c                    s"   t  j|| | jjsd| j_d S rV   )r   r   r.   rA   rb   r   r   r   r      s    zSSLTransport.__init__r   r   r   r   r   rh      s   rh   )r!   r+   Zkombu.utils.amq_managerr   Zkombu.utils.textr    r   r   rd   re   r   r#   Z
StdChannelr,   r-   rh   r   r   r   r   <module>   s   Fo