U
    Y+db                     @   s   d Z ddlZddlZddlZddlZddlmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ d	Zd
ZdZdZG dd dejZG dd dejejZG dd dejZG dd dejZdS )zT`librabbitmq`_ transport.

.. _`librabbitmq`: https://pypi.org/project/librabbitmq/
    N)ChannelErrorConnectionError)get_manager)version_string_as_tuple   )base)to_rabbitmq_queue_argumentsz
    librabbitmq version too old to detect RabbitMQ version information
    so make sure you are using librabbitmq 1.5 when using rabbitmq > 3.3
i(  i'  zAssl not supported by librabbitmq, please use pyamqp:// or stunnelc                       s    e Zd ZdZ fddZ  ZS )MessagezAMQP Message (librabbitmq).c                    s8   t  j|||||d|d|d|dd d S )Ndelivery_tagcontent_typecontent_encodingheaders)channelbodyZdelivery_info
propertiesr
   r   r   r   )super__init__get)selfr   propsinfor   	__class__ ?/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/librabbitmq.pyr   "   s    zMessage.__init__)__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r	      s   r	   c                   @   s&   e Zd ZdZeZdddZdd ZdS )ChannelzAMQP Channel (librabbitmq).Nc                 C   s:   |dk	r|ni }| |||d |dk	r2||d< ||fS )z%Encapsulate data into a AMQP message.N)r   r   r   priority)update)r   r   r!   r   r   r   r   r   r   r   prepare_message3   s    zChannel.prepare_messagec                 K   s   t |f|}dd | D S )Nc                 S   s   i | ]\}}| d |qS )utf8)encode).0kvr   r   r   
<dictcomp>D   s      z3Channel.prepare_queue_arguments.<locals>.<dictcomp>)r   items)r   	argumentskwargsr   r   r   prepare_queue_argumentsB   s    zChannel.prepare_queue_arguments)NNNNN)r   r   r   r   r	   r#   r-   r   r   r   r   r    .   s          
r    c                   @   s   e Zd ZdZeZeZdS )
ConnectionzAMQP Connection (librabbitmq).N)r   r   r   r   r    r	   r   r   r   r   r.   G   s   r.   c                   @   s   e Zd ZdZeZeZeZe	j
jeejeef Ze	j
jef ZdZdZe	j
jjdddZdd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z dd Z!e"dd Z#dS ) 	TransportzAMQP Transport (librabbitmq).amqplibrabbitmqTF)ZasynchronousZ
heartbeatsc                 K   s4   || _ |dp| j| _|dp&| j| _d | _d S )Ndefault_portdefault_ssl_port)clientr   r2   r3   Z_Transport__reader)r   r4   r,   r   r   r   r   e   s    
zTransport.__init__c                 C   s   t jS N)r0   __version__r   r   r   r   driver_versionl   s    zTransport.driver_versionc                 C   s   |  S r5   )r   r   
connectionr   r   r   create_channelo   s    zTransport.create_channelc                 K   s   |j f |S r5   )drain_events)r   r:   r,   r   r   r   r<   r   s    zTransport.drain_eventsc              
   C   s   | j }| j D ] \}}t||dst||| q|jr@ttt|j	|j
|j|j|j|j|j|jdf|jppi }| jf |}| j |_ |j| j _|S )z(Establish connection to the AMQP broker.N)hostuseridpasswordvirtual_hostlogin_methodinsistsslconnect_timeout)r4   default_connection_paramsr*   getattrsetattrrC   NotImplementedErrorNO_SSL_ERRORdictr=   r>   r?   r@   rA   rB   rD   Ztransport_optionsr.   r<   )r   Zconninfonamedefault_valueoptsconnr   r   r   establish_connectionu   s,    	

zTransport.establish_connectionc                 C   s   d| j _|  dS )z!Close the AMQP broker connection.N)r4   r<   closer9   r   r   r   close_connection   s    zTransport.close_connectionc              	   C   sp   |d k	r^|j  D ]
}d |_qzt|  W n ttfk
rH   Y nX |j   |j	  d | j
_d | _
d S r5   )Zchannelsvaluesr:   osrP   filenoOSError
ValueErrorclear	callbacksr4   r<   )r   r:   r   r   r   r   _collect   s    

zTransport._collectc                 C   s   |j S r5   )	connectedr9   r   r   r   verify_connection   s    zTransport.verify_connectionc                 C   s   | | | j|| d S r5   )Z
add_readerrT   Zon_readable)r   r:   Zloopr   r   r   register_with_event_loop   s       z"Transport.register_with_event_loopc                 O   s   t | jf||S r5   )r   r4   )r   argsr,   r   r   r   r      s    zTransport.get_managerc                 C   sP   z
|j }W n" tk
r,   ttt Y n X |ddkrLt|d dk S dS )NproductZRabbitMQversion)   r`   T)Zserver_propertiesAttributeErrorwarningswarnUserWarning	W_VERSIONr   r   )r   r:   r   r   r   r   qos_semantics_matches_spec   s    
z$Transport.qos_semantics_matches_specc                 C   s    dd| j jr| jn| jdddS )NZguest	localhostZPLAIN)r>   r?   porthostnamerA   )r4   rC   r3   r2   r7   r   r   r   rE      s    z#Transport.default_connection_paramsN)$r   r   r   r   r.   DEFAULT_PORTr2   DEFAULT_SSL_PORTr3   r   r/   Zconnection_errorsr   socketerrorIOErrorrU   Zchannel_errorsr   Zdriver_typeZdriver_nameZ
implementsextendr   r8   r;   r<   rO   rQ   rY   r[   r\   r   rf   propertyrE   r   r   r   r   r/   N   s@      
r/   )r   rS   rl   rb   r1   r0   r   r   Zkombu.utils.amq_managerr   Zkombu.utils.textr    r   r   re   rj   rk   rI   r	   r    Z
StdChannelr.   r/   r   r   r   r   <module>   s"   