U
    Y+d                  
   @   s  d Z ddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ z$ddlZdd	lmZ dd
lmZ W n  ek
r   d Z ZZY nX dZdZdZeeZG dd dejZG dd dejZedk	reddd  ejejddG dd dZedkred e  bZ!ed"ej#j$ej#j% e& .Z'ed"ej#j$ e!(eZ)e'(de) W 5 Q R X e!*  W 5 Q R X dS )a  Pyro transport module for kombu.

Pyro transport, and Kombu Broker daemon.

Requires the :mod:`Pyro4` library to be installed.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================

To use the Pyro transport with Kombu, use an url of the form:

.. code-block::

    pyro://localhost/kombu.broker

The hostname is where the transport will be looking for a Pyro name server,
which is used in turn to locate the kombu.broker Pyro service.
This broker can be launched by simply executing this transport module directly,
with the command: ``python -m kombu.transport.pyro``

Transport Options
=================
    N)EmptyQueue)reraise)
get_logger)cached_property   )virtual)NamingError)SerializerBasei#  z5Unable to locate pyro nameserver on host {0.hostname}zKUnable to lookup '{0.virtual_host}' in pyro nameserver on host {0.hostname}c                       s~   e Zd ZdZ fddZdd Zdd Zdd	 ZdddZdd Z	dd Z
dd Zdd Zdd Zdd Zedd Z  ZS )ChannelzPyro Channel.c                    s   t    | jr| j  d S N)supercloseshared_queuesZ_pyroReleaseself	__class__ 8/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/pyro.pyr   A   s    
zChannel.closec                 C   s
   | j  S r   )r   get_queue_namesr   r   r   r   queuesF   s    zChannel.queuesc                 K   s   ||   kr| j| d S r   r   r   	new_queuer   queuekwargsr   r   r   
_new_queueI   s    zChannel._new_queuec                 K   s   | j |S r   )r   	has_queuer   r   r   r   
_has_queueM   s    zChannel._has_queueNc                 C   s   |  |}| j|S r   )
_queue_forr   get)r   r   timeoutr   r   r   _getP   s    
zChannel._getc                 C   s   ||   kr| j| |S r   r   r   r   r   r   r   r    T   s    zChannel._queue_forc                 K   s   |  |}| j|| d S r   )r    r   put)r   r   messager   r   r   r   _putY   s    
zChannel._putc                 C   s   | j |S r   )r   sizer$   r   r   r   _size]   s    zChannel._sizec                 O   s   | j | d S r   )r   delete)r   r   argsr   r   r   r   _delete`   s    zChannel._deletec                 C   s   | j |S r   )r   purger$   r   r   r   _purgec   s    zChannel._purgec                 C   s   d S r   r   r$   r   r   r   after_reply_message_receivedf   s    z$Channel.after_reply_message_receivedc                 C   s   | j jS r   )
connectionr   r   r   r   r   r   i   s    zChannel.shared_queues)N)__name__
__module____qualname____doc__r   r   r   r   r#   r    r'   r)   r,   r.   r/   r   r   __classcell__r   r   r   r   r   >   s   
r   c                       sT   e Zd ZdZeZe ZeZ	d Z
Z fddZdd Zdd Zed	d
 Z  ZS )	TransportzPyro Transport.pyroc                    s   t  j|f| | j| _d S r   )r   __init__global_statestate)r   clientr   r   r   r   r8   {   s    zTransport.__init__c              	   C   s   t d | j}ztj|j| jd}W n2 tk
rX   tttt	
|t d  Y nX z||j}t|W S  tk
r   tttt
|t d  Y nX d S )Nz0trying Pyro nameserver to find the broker daemon)hostport   )loggerdebugr;   r7   locateNShostnamedefault_portr	   r   E_NAMESERVERformatsysexc_infolookupZvirtual_hostZProxyE_LOOKUP)r   ZconninfoZ
nameserverurir   r   r   _open   s"    




zTransport._openc                 C   s   t jS r   )r7   __version__r   r   r   r   driver_version   s    zTransport.driver_versionc                 C   s   |   S r   )rK   r   r   r   r   r      s    zTransport.shared_queues)r1   r2   r3   r4   r   r   ZBrokerStater9   DEFAULT_PORTrC   Zdriver_typeZdriver_namer8   rK   rM   r   r   r5   r   r   r   r   r6   n   s   r6   zqueue.Emptyc                 C   s   t  S r   )r   )clsdatar   r   r   <lambda>       rQ   Zsingle)Zinstance_modec                   @   sX   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd ZdS )KombuBrokerzmKombu Broker used by the Pyro transport.

        You have to run this as a separate (Pyro) service.
        c                 C   s
   i | _ d S r   r   r   r   r   r   r8      s    zKombuBroker.__init__c                 C   s
   t | jS r   )listr   r   r   r   r   r      s    zKombuBroker.get_queue_namesc                 C   s   || j krd S t | j |< d S r   )r   r   r$   r   r   r   r      s    
zKombuBroker.new_queuec                 C   s
   || j kS r   rT   r$   r   r   r   r      s    zKombuBroker.has_queuec                 C   s   | j | jddS )NF)block)r   r!   r$   r   r   r   r!      s    zKombuBroker.getc                 C   s   | j | | d S r   )r   r%   )r   r   r&   r   r   r   r%      s    zKombuBroker.putc                 C   s   | j |  S r   )r   qsizer$   r   r   r   r(      s    zKombuBroker.sizec                 C   s   | j |= d S r   rT   r$   r   r   r   r*      s    zKombuBroker.deletec                 C   s6   z| j | jdd W q  tk
r.   Y q2Y q X q d S )NF)blocking)r   r!   r   r$   r   r   r   r-      s    zKombuBroker.purgeN)r1   r2   r3   r4   r8   r   r   r   r!   r%   r(   r*   r-   r   r   r   r   rS      s   rS   __main__z,Launching Broker for Kombu's Pyro transport.z'(Expecting a Pyro name server at {}:{})zAYou can connect with Kombu using the url 'pyro://{}/kombu.broker'zkombu.broker)+r4   rF   r   r   r   Zkombu.exceptionsr   Z	kombu.logr   Zkombu.utils.objectsr    r   ZPyro4r7   ZPyro4.errorsr	   Z
Pyro4.utilr
   ImportErrorrN   rD   rI   r1   r?   r   r6   Zregister_dict_to_classZexposeZbehaviorrS   printZDaemondaemonrE   configZNS_HOSTZNS_PORTrA   nsregisterrJ   ZrequestLoopr   r   r   r   <module>   sN   "0*

*

 

