U
    Y+d                     @   sZ  d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	m
Z
 ddlmZ zddlZddlmZ dd	lmZ ejjejjejjejjejjejjejjejjejjf	Zejjejjejjejjejjejjejj ejj!ejjejj"ejj#ejj$ejjejj%ej&fZ'W n" e(k
r(   dZd
 ZZ'Y nX dZ)dZ*G dd dej+Z+G dd dej,Z,dS )a  Zookeeper transport module for kombu.

Zookeeper based transport. This transport uses the built-in kazoo Zookeeper
based queue implementation.

**References**

- https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connects to a zookeeper node as:

.. code-block::

    zookeeper://SERVER:PORT/VHOST

The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.


Transport Options
=================

    N)Empty)bytes_to_strensure_bytes)dumpsloads   )virtual)KazooClient)Queue i  z!Mahendra M <mahendra.m@gmail.com>c                       s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zedd Z  ZS )ChannelzZookeeper Channel.Nc                    s0   t  j|f| | jjj}d|d| _d S )Nz/{}/)super__init__
connectionclientZvirtual_hostformatstrip_vhost)selfr   kwargsZvhost	__class__r   =/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/zookeeper.pyr   g   s    
zChannel.__init__c                 C   s   t j| j|S N)ospathjoinr   )r   
queue_namer   r   r   	_get_pathl   s    zChannel._get_pathc                 C   s>   | j |d }|d kr:t| j| |}|| j |< t| |S r   )_queuesgetr
   r   r   len)r   r   queuer   r   r   
_get_queueo   s    
zChannel._get_queuec                 K   s&   |  |jtt|| j|dddS )NT)reverse)priority)r$   putr   r   Z_get_message_priority)r   r#   messager   r   r   r   _put{   s    

zChannel._putc                 C   s,   |  |}| }|d kr t tt|S r   )r$   r!   r   r   r   )r   r#   msgr   r   r   _get   s
    
zChannel._getc                 C   s.   d}|  |}| }|d kr q*|d7 }q|S )Nr   r   )r$   r!   )r   r#   countr*   r   r   r   _purge   s    

zChannel._purgec                 O   s*   |  |r&| | | j| | d S r   )
_has_queuer-   r   deleter   )r   r#   argsr   r   r   r   _delete   s    

zChannel._deletec                 C   s   |  |}t|S r   )r$   r"   r   r#   r   r   r   _size   s    
zChannel._sizec                 K   s   |  |s| |}d S r   )r.   r$   )r   r#   r   r   r   r   
_new_queue   s    
zChannel._new_queuec                 C   s   | j | |d k	S r   )r   existsr   r2   r   r   r   r.      s    zChannel._has_queuec              	   C   s   | j j}g }|jr|jD ]}|dr6|tdd  }|s<qz |dd\}}|t|f}W n6 tk
r   ||jkr||j	pt
f}n|t
f}Y nX || q|j|j	pt
f}||kr|d| ddd |D }t|}|  |S )Nzzookeeper://:r   r   ,c                 S   s   g | ]\}}| d | qS )r6   r   ).0hpr   r   r   
<listcomp>   s     z!Channel._open.<locals>.<listcomp>)r   r   Zalt
startswithr"   splitint
ValueErrorhostnameportDEFAULT_PORTappendinsertr   r	   start)r   Zconninfohosts	host_porthostrA   Zconn_strconnr   r   r   _open   s.    


zChannel._openc                 C   s   | j d kr|  | _ | j S r   )_clientrJ   r   r   r   r   r      s    

zChannel.client)__name__
__module____qualname____doc__rK   r    r   r   r$   r)   r+   r-   r1   r3   r4   r.   rJ   propertyr   __classcell__r   r   r   r   r   a   s    	r   c                       sT   e Zd ZdZeZdZeZej	j
e Z
ej	je ZdZdZ fddZdd Z  ZS )		TransportzZookeeper Transport.r   Z	zookeeperkazooc                    s"   t d krtdt j|| d S )Nz"The kazoo library is not installed)rT   ImportErrorr   r   )r   r0   r   r   r   r   r      s    zTransport.__init__c                 C   s   t jS r   )rT   __version__rL   r   r   r   driver_version   s    zTransport.driver_version)rM   rN   rO   rP   r   Zpolling_intervalrB   default_portr   rS   Zconnection_errorsKZ_CONNECTION_ERRORSZchannel_errorsKZ_CHANNEL_ERRORSZdriver_typeZdriver_namer   rW   rR   r   r   r   r   rS      s   

rS   )-rP   r   socketr#   r   Zkombu.utils.encodingr   r   Zkombu.utils.jsonr   r    r   rT   Zkazoo.clientr	   Zkazoo.recipe.queuer
   
exceptionsZSystemErrorExceptionZConnectionLossExceptionZMarshallingErrorExceptionZUnimplementedExceptionZOperationTimeoutExceptionZNoAuthExceptionZInvalidACLExceptionZAuthFailedExceptionZSessionExpiredExceptionrY   ZRuntimeInconsistencyExceptionZDataInconsistencyExceptionZBadArgumentsExceptionZApiErrorExceptionZNoNodeExceptionZNodeExistsExceptionZ NoChildrenForEphemeralsExceptionZNotEmptyExceptionZInvalidCallbackExceptionerrorrZ   rU   rB   
__author__r   rS   r   r   r   r   <module>   sV   )f