U
    Y¨+dæ  ã                   @   s®   d Z ddlZddlmZ ddlmZ ddlmZmZ ddl	m
Z
 ddlmZ zdd	lmZ W n ek
rt   dZY nX d
d„ ejD ƒZG dd„ dejƒZG dd„ dejƒZdS )aÚ  Azure Storage Queues transport module for kombu.

More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following format:

.. code-block::

    azurestoragequeues://:STORAGE_ACCOUNT_ACCESS kEY@STORAGE_ACCOUNT_NAME

Note that if the access key for the storage account contains a slash, it will
have to be regenerated before it can be used in the connection URL.

Transport Options
=================

* ``queue_name_prefix``
é    N)ÚEmpty)Úsafe_str)ÚdumpsÚloads)Úcached_propertyé   )Úvirtual)ÚQueueServicec                 C   s   i | ]}t |ƒd “qS )é-   )Úord)Ú.0Úc© r   úF/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/azurestoragequeues.pyÚ
<dictcomp>1   s     r   c                       s´   e Zd ZdZdZdZi ZdZeƒ Z	‡ fdd„Z
‡ fdd„Zefd	d
„Zdd„ Z‡ fdd„Zdd„ Zddd„Zdd„ Zdd„ Zedd„ ƒZedd„ ƒZedd„ ƒZedd„ ƒZ‡  ZS ) ÚChannelzAzure Storage Queues channel.zkombu%(vhost)sNTc                    s<   t d krtdƒ‚tƒ j||Ž | j ¡ D ]}|| j|< q(d S )NzGAzure Storage Queues transport requires the azure-storage-queue library)r	   ÚImportErrorÚsuperÚ__init__Úqueue_serviceZlist_queuesÚ_queue_name_cache)ÚselfÚargsÚkwargsÚ
queue_name©Ú	__class__r   r   r   ?   s
    zChannel.__init__c                    s&   |r| j  |¡ tƒ j||f|ž|ŽS ©N)Ú_noack_queuesÚaddr   Úbasic_consume)r   ÚqueueÚno_ackr   r   r   r   r   r    I   s    ÿÿzChannel.basic_consumec                 C   s   t t|ƒƒ |¡S )z=Format AMQP queue name into a valid Azure Storage Queue name.)Ústrr   Ú	translate)r   ÚnameÚtabler   r   r   Úentity_nameP   s    zChannel.entity_namec                 C   sX   |   | j| ¡}z| j| W S  tk
rR   | jj|dd | }| j|< | Y S X dS )zEnsure a queue exists.F)Zfail_on_existN)r'   Úqueue_name_prefixr   ÚKeyErrorr   Zcreate_queue)r   r!   Úqr   r   r   Ú_ensure_queueT   s    zChannel._ensure_queuec                    s4   |   |¡}| j |d¡ | j |¡ tƒ  |¡ dS )zDelete queue by name.N)r'   r   Úpopr   Zdelete_queuer   Ú_delete)r   r!   r   r   r   r   r   r   r-   ^   s    
zChannel._deletec                 K   s$   |   |¡}t|ƒ}| j ||¡ dS )zPut message onto queue.N)r+   r   r   Zput_message)r   r!   Úmessager   r*   Zencoded_messager   r   r   Ú_pute   s    
zChannel._putc                 C   s\   |   |¡}| jj|d|d}|s&tƒ ‚|d }| j |j¡}t|ƒ}| j ||j|j	¡ |S )z/Try to retrieve a single message off ``queue``.r   )Znum_messagesÚtimeoutr   )
r+   r   Zget_messagesr   Zdecode_functionÚcontentr   Zdelete_messageÚidZpop_receipt)r   r!   r0   r*   Úmessagesr.   Zraw_contentr1   r   r   r   Ú_getk   s    

ÿzChannel._getc                 C   s   |   |¡}| j |¡}|jS )z)Return the number of messages in a queue.)r+   r   Zget_queue_metadataZapproximate_message_count)r   r!   r*   Úmetadatar   r   r   Ú_size|   s    
zChannel._sizec                 C   s$   |   |¡}|  |¡}| j |¡ |S )z'Delete all current messages in a queue.)r+   r6   r   Zclear_messages)r   r!   r*   Únr   r   r   Ú_purge‚   s    

zChannel._purgec                 C   s&   | j d kr t| jj| jjd| _ | j S )N)Zaccount_nameZaccount_key)Ú_queue_servicer	   ÚconninfoÚhostnameÚpassword©r   r   r   r   r   ‰   s    
þzChannel.queue_servicec                 C   s   | j jS r   )Ú
connectionÚclientr=   r   r   r   r:   ’   s    zChannel.conninfoc                 C   s
   | j jjS r   )r>   r?   Útransport_optionsr=   r   r   r   r@   –   s    zChannel.transport_optionsc                 C   s   | j  dd¡S )Nr(   Ú )r@   Úgetr=   r   r   r   r(   š   s    zChannel.queue_name_prefix)N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__Zdomain_formatr9   r   r"   Úsetr   r   r    ÚCHARS_REPLACE_TABLEr'   r+   r-   r/   r4   r6   r8   Úpropertyr   r:   r@   r   r(   Ú__classcell__r   r   r   r   r   6   s.   





r   c                   @   s   e Zd ZdZeZdZdZdS )Ú	TransportzAzure Storage Queues transport.r   N)rC   rD   rE   rF   r   Zpolling_intervalÚdefault_portr   r   r   r   rK   Ÿ   s   rK   )rF   Ústringr!   r   Zkombu.utils.encodingr   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr   rA   r   Zazure.storage.queuer	   r   ÚpunctuationrH   r   rK   r   r   r   r   Ú<module>   s    
ÿi