U
    Y+d?                     @   s  d Z ddlZddlmZ ddlmZmZmZmZm	Z	m
Z
 ddlZddlZddlZddlmZmZmZmZmZ ddlmZ ddlmZmZ ddlmZmZ dd	lmZ d
dlm Z  e!ej"dddh Z#e$de$didd e#D Z%G dd dZ&G dd de j'Z'G dd de j(Z(dS )a  Azure Service Bus Message Queue transport module for kombu.

Note that the Shared Access Policy used to connect to Azure Service Bus
requires Manage, Send and Listen claims since the broker will create new
queues and delete old queues as required.


Notes when using with Celery if you are experiencing issues with programs not
terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library
which in turn creates some threads. If the AzureServiceBus Channel is closed,
said threads will be closed properly, but it seems there are times when Celery
does not do this so these threads will be left running. As the uAMQP threads
are not marked as Daemon threads, they will not be killed when the main thread
exits. Setting the ``uamqp_keep_alive_interval`` transport option to 0 will
prevent the keep_alive thread from starting


More information about Azure Service Bus:
https://azure.microsoft.com/en-us/services/service-bus/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following format:

.. code-block::

    azureservicebus://SAS_POLICY_NAME:SAS_KEY@SERVICE_BUSNAMESPACE

Transport Options
=================

* ``queue_name_prefix`` - String prefix to prepend to queue names in a
  service bus namespace.
* ``wait_time_seconds`` - Number of seconds to wait to receive messages.
  Default ``5``
* ``peek_lock_seconds`` - Number of seconds the message is visible for before
  it is requeued and sent to another consumer. Default ``60``
* ``uamqp_keep_alive_interval`` - Interval in seconds the Azure uAMQP library
  should send keepalive messages. Default ``30``
* ``retry_total`` - Azure SDK retry total. Default ``3``
* ``retry_backoff_factor`` - Azure SDK exponential backoff factor.
  Default ``0.8``
* ``retry_backoff_max`` - Azure SDK retry total time. Default ``120``
    N)Empty)AnyDictOptionalSetTupleUnion)ServiceBusClientServiceBusMessageServiceBusReceiveModeServiceBusReceiverServiceBusSender)ServiceBusAdministrationClient)bytes_to_strsafe_str)dumpsloads)cached_property   )virtual_.-c                 C   s   i | ]}t |t d qS )r   )ord).0c r   C/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/azureservicebus.py
<dictcomp>N   s      r   c                   @   s8   e Zd ZdZd	ee ee dddZddddZdS )
SendReceivez"Container for Sender and Receiver.Nreceiversenderc                 C   s   || _ || _d S Nr    )selfr!   r"   r   r   r   __init__U   s    zSendReceive.__init__returnc                 C   s0   | j r| j   d | _ | jr,| j  d | _d S r#   )r!   closer"   r$   r   r   r   r(   [   s    

zSendReceive.close)NN)	__name__
__module____qualname____doc__r   r   r   r%   r(   r   r   r   r   r   R   s     r   c                       sF  e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd	Zi Ze Z fd
dZd	dddZ fddZ fddZdKeee ee edddZeedddZejd	feeee edddZdLeeeeef  edddZ e!j"j#d	dd d!Z$eedd"d#Z%ed	dd$d%Z&ed	dd&d'Z'dMeee(e)ef  eee*f d(d)d*Z+dNee,d	d, fd-d.Z-eedd/d0Z.d1d2 Z/d	dd3d4Z0e1e2dd5d6Z3e1e4dd7d8Z5e1d9d: Z6e1d;d< Z7e8edd=d>Z9e8edd?d@Z:e8eddAdBZ;e8eddCdDZ<e8eddEdFZ=e8e)ddGdHZ>e8eddIdJZ?  Z@S )OChannelzAzure Service Bus channel.   <         g?x   zkombu%(vhost)sNc                    s:   t  j|| d | _d | _d | _d | _|   d| j_d S )NF)	superr%   
_namespace_policy_sas_key_connection_string_try_parse_connection_stringqosZrestore_at_shutdown)r$   argskwargs	__class__r   r   r%   w   s    zChannel.__init__r&   c                 C   sd   t | jj\| _| _| _d| j }|ds6|d7 }|| j| jd}ddd |	 D | _
d S )Nzsb://z.netz.servicebus.windows.net)ZEndpointZSharedAccessKeyNameZSharedAccessKey;c                 S   s   g | ]\}}|d  | qS )=r   )r   keyvaluer   r   r   
<listcomp>   s     z8Channel._try_parse_connection_string.<locals>.<listcomp>)	Transport	parse_uriconninfohostnamer5   r6   r7   endswithjoinitemsr8   )r$   ZendpointZ	conn_dictr   r   r   r9      s    

z$Channel._try_parse_connection_stringc                    s&   |r| j | t j||f||S r#   )_noack_queuesaddr4   basic_consume)r$   queueZno_ackr;   r<   r=   r   r   rM      s     zChannel.basic_consumec                    s,   || j kr | j| }| j| t |S r#   )Z
_consumersZ_tag_to_queuerK   discardr4   basic_cancel)r$   Zconsumer_tagrN   r=   r   r   rP      s    

zChannel.basic_cancel)namer!   r"   r'   c                 C   sF   || j kr.| j | }|jp||_|jp(||_nt||}|| j |< |S r#   )_queue_cacher"   r!   r   )r$   rQ   r!   r"   objr   r   r   _add_queue_to_cache   s    



zChannel._add_queue_to_cache)rN   r'   c                 C   sD   | j |d }|d ks |jd kr@| jj|| jd}| j||d}|S )N)
keep_alive)r"   )rR   getr"   queue_serviceZget_queue_senderuamqp_keep_alive_intervalrT   )r$   rN   	queue_objr"   r   r   r   _get_asb_sender   s     zChannel._get_asb_sender)rN   	recv_modequeue_cache_keyr'   c                 C   sN   |p|}| j |d }|d ks(|jd krJ| jj||| jd}| j||d}|S )N)
queue_nameZreceive_moderU   )r!   )rR   rV   r!   rW   Zget_queue_receiverrX   rT   )r$   rN   r[   r\   	cache_keyrY   r!   r   r   r   _get_asb_receiver   s     zChannel._get_asb_receiver)rQ   tabler'   c                 C   s   t t||ptS )z:Format AMQP queue name into a valid ServiceBus queue name.)strr   	translateCHARS_REPLACE_TABLE)r$   rQ   r`   r   r   r   entity_name   s    zChannel.entity_name)messager'   c                 C   s   d S r#   r   )r$   re   r   r   r   _restore   s    zChannel._restorec                 K   s   |  | j| }z| j| W S  tk
r~   ttj| jd}z| jj	||d W n t
jjjk
rn   Y nX | | Y S X dS )z$Ensure a queue exists in ServiceBus.)seconds)r]   lock_durationN)rd   queue_name_prefixrR   KeyErrorisodateZduration_isoformatZDurationpeek_lock_secondsqueue_mgmt_serviceZcreate_queueazurecore
exceptionsZResourceExistsErrorrT   )r$   rN   r<   rh   r   r   r   
_new_queue   s     
zChannel._new_queuec                 O   s:   |  | j| }| j| | j|d}|r6|  dS )zDelete queue by name.N)rd   ri   _queue_mgmt_serviceZdelete_queuerR   popr(   )r$   rN   r;   r<   Zsend_receive_objr   r   r   _delete   s
    zChannel._deletec                 K   s6   |  | j| }tt|}| |}|j| dS )zPut message onto queue.N)rd   ri   r
   r   rZ   r"   Zsend_messages)r$   rN   re   r<   msgrY   r   r   r   _put   s    
zChannel._put)rN   timeoutr'   c           	      C   s   || j krtjntj}| | j| }| ||}|jjd|pB| j	d}|sRt
 |d }t|jtstd|j}n|j}tt|}||d d d< ||d d d< |S )	z/Try to retrieve a single message off ``queue``.r   Zmax_message_countZmax_wait_timer       Z
propertiesdelivery_infoazure_messageazure_queue_name)rK   r   RECEIVE_AND_DELETE	PEEK_LOCKrd   ri   r_   r!   receive_messageswait_time_secondsr   
isinstancebodybytesrI   r   r   )	r$   rN   rw   r[   rY   messagesre   r   ru   r   r   r   _get   s(    zChannel._getF)delivery_tagmultipler'   c                    s   | j |j}|d | jkr(t |S |d }| |}z|j|d  W nD t	j
jjk
rt   t | Y n, tk
r   t | Y nX t | d S )NZexchanger|   r{   )r:   rV   rz   rK   r4   	basic_ackr_   r!   Zcomplete_messagern   Z
servicebusrp   ZMessageAlreadySettled	ExceptionZbasic_reject)r$   r   r   rz   rN   rY   r=   r   r   r     s    
zChannel.basic_ackc                 C   s"   |  | j| }| j|}|jS )z)Return the number of messages in a queue.)rd   ri   rm   Zget_queue_runtime_propertiesZtotal_message_count)r$   rN   propsr   r   r   _size&  s    zChannel._sizec                 C   s   d}d}|  | j| }| j|d}|| jksB|dksB|jdkrV| |tjd| }|jj	|dd}|t
|7 }t
||k rVqqV|S )z'Delete all current messages in a queue.r   
   NZpurge_g?rx   )rd   ri   rR   rV   rK   r!   r_   r   r}   r   len)r$   rN   nZmax_purge_countrY   r   r   r   r   _purge-  s,    
 zChannel._purgec                 C   sH   | j sDd| _ | j D ]}|  q| j  | jd k	rD| j|  d S )NT)closedrR   valuesr(   clear
connectionZclose_channel)r$   rY   r   r   r   r(   I  s    


zChannel.closec                 C   s,   | j d kr&tj| j| j| j| jd| _ | j S )N)retry_totalretry_backoff_factorretry_backoff_max)_queue_servicer	   from_connection_stringr8   r   r   r   r)   r   r   r   rW   T  s    
zChannel.queue_servicec                 C   s   | j d krt| j| _ | j S r#   )rr   r   r   r8   r)   r   r   r   rm   _  s    
zChannel.queue_mgmt_servicec                 C   s   | j jS r#   )r   clientr)   r   r   r   rF   g  s    zChannel.conninfoc                 C   s
   | j jjS r#   )r   r   transport_optionsr)   r   r   r   r   k  s    zChannel.transport_optionsc                 C   s   | j ddS )Nri    )r   rV   r)   r   r   r   ri   o  s    zChannel.queue_name_prefixc                 C   s   | j d| jS )Nr   )r   rV   default_wait_time_secondsr)   r   r   r   r   s  s    zChannel.wait_time_secondsc                 C   s   t | jd| jdS )Nrl   i,  )minr   rV   default_peek_lock_secondsr)   r   r   r   rl   x  s
    
zChannel.peek_lock_secondsc                 C   s   | j d| jS )NrX   )r   rV   !default_uamqp_keep_alive_intervalr)   r   r   r   rX   ~  s    z!Channel.uamqp_keep_alive_intervalc                 C   s   | j d| jS )Nr   )r   rV   default_retry_totalr)   r   r   r   r     s     zChannel.retry_totalc                 C   s   | j d| jS )Nr   )r   rV   default_retry_backoff_factorr)   r   r   r   r     s     zChannel.retry_backoff_factorc                 C   s   | j d| jS )Nr   )r   rV   default_retry_backoff_maxr)   r   r   r   r     s     zChannel.retry_backoff_max)NN)N)N)F)Ar*   r+   r,   r-   r   r   r   r   r   r   Zdomain_formatr   rr   rR   setrK   r%   r9   rM   rP   ra   r   r   r   r   rT   rZ   r   r~   r_   r   intrd   r   baseMessagerf   rq   rt   rv   r   floatr   r   boolr   r   r   r(   propertyr	   rW   r   rm   rF   r   r   ri   r   rl   rX   r   r   r   __classcell__r   r   r=   r   r.   d   s     
    	
 
 


r.   c                   @   sT   e Zd ZdZeZdZdZdZee	e
e	e	e	f dddZede	e	dd
dZdS )rD   zAzure Service Bus transport.r   NT)urir'   c                 C   sL   |  dd} | dd\}}|dd\}}t|||gsBtd|||fS )Nzazureservicebus://r   @r   :z|Need a URI like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} or the azure Endpoint connection string)replacersplitsplitall
ValueError)r   Zpolicykeypair	namespacepolicysas_keyr   r   r   rE     s    zTransport.parse_uriF**c                 C   s&   |  |\}}}d||r|n||S )Nzazureservicebus://{}:{}@{})rE   format)clsr   Zinclude_passwordmaskr   r   r   r   r   r   as_uri  s    
zTransport.as_uri)Fr   )r*   r+   r,   r-   r.   Zpolling_intervaldefault_portZcan_parse_urlstaticmethodra   r   rE   classmethodr   r   r   r   r   rD     s   rD   ))r-   stringrN   r   typingr   r   r   r   r   r   Zazure.core.exceptionsrn   Zazure.servicebus.exceptionsrk   Zazure.servicebusr	   r
   r   r   r   Zazure.servicebus.managementr   Zkombu.utils.encodingr   r   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr   r   r   r   punctuationZPUNCTUATIONS_TO_REPLACEr   rc   r   r.   rD   r   r   r   r   <module>   s.   7    3