U
    Y+d߃                     @   s  d Z ddlZddlZddlZddlZddlmZ ddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ ddlmZmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& dZ'dZ(dZ)dZ*dZ+dZ,ee-Z.e	ddZ/e	ddZ0G dd dZ1G dd  d e2Z3G d!d" d"e4Z5G d#d$ d$Z6G d%d& d&Z7G d'd( d(ej8Z8G d)d* d*Z9G d+d, d,e9ej:Z;G d-d. d.ej<Z<G d/d0 d0ej=Z=dS )1zPVirtual transport implementation.

Emulates the AMQ API for non-AMQ transports.
    N)array)OrderedDictdefaultdict
namedtuple)count)Finalize)Empty)	monotonicsleep)queue_declare_ok_t)ChannelErrorResourceError)
get_logger)base)emergency_dump_state)bytes_to_strstr_to_bytes)	FairCycleuuid   )STANDARD_EXCHANGE_TYPESHzlMessage could not be delivered: No queues bound to exchange {exchange!r} using binding key {routing_key!r}.
zkCannot redeclare exchange {0!r} in vhost {1!r} with different type, durable, autodelete or arguments value.z;Requeuing undeliverable message for queue %r: No consumers.z)Restoring {0!r} unacknowledged message(s)z#UNABLE TO RESTORE {0} MESSAGES: {1}binding_key_t)queueexchangerouting_keyqueue_binding_t)r   r   	argumentsc                   @   s    e Zd ZdZdd Zdd ZdS )Base64zBase64 codec.c                 C   s   t tt|S N)r   base64	b64encoder   selfs r&   @/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/virtual/base.pyencode@   s    zBase64.encodec                 C   s   t t|S r    )r!   	b64decoder   r#   r&   r&   r'   decodeC   s    zBase64.decodeN)__name__
__module____qualname____doc__r(   r*   r&   r&   r&   r'   r   =   s   r   c                   @   s   e Zd ZdZdS )NotEquivalentErrorzAEntity declaration is not equivalent to the previous declaration.Nr+   r,   r-   r.   r&   r&   r&   r'   r/   G   s   r/   c                   @   s   e Zd ZdZdS )UndeliverableWarningz.The message could not be delivered to a queue.Nr0   r&   r&   r&   r'   r1   K   s   r1   c                   @   sV   e Zd ZdZdZdZdZdddZdd Zdd Z	d	d
 Z
dd Zdd Zdd ZdS )BrokerStatez2Broker state holds exchanges, queues and bindings.Nc                 C   s&   |d kri n|| _ i | _tt| _d S r    )	exchangesbindingsr   setqueue_index)r$   r3   r&   r&   r'   __init__l   s    zBrokerState.__init__c                 C   s"   | j   | j  | j  d S r    )r3   clearr4   r6   r$   r&   r&   r'   r8   q   s    

zBrokerState.clearc                 C   s   |||f| j kS r    )r4   )r$   r   r   r   r&   r&   r'   has_bindingv   s    zBrokerState.has_bindingc                 C   s.   t |||}| j|| | j| | d S r    )r   r4   
setdefaultr6   add)r$   r   r   r   r   keyr&   r&   r'   binding_declarey   s    zBrokerState.binding_declarec                 C   sB   t |||}z| j|= W n tk
r,   Y nX | j| | d S r    )r   r4   KeyErrorr6   remove)r$   r   r   r   r=   r&   r&   r'   binding_delete~   s    zBrokerState.binding_deletec                    s<   z j |}W n tk
r$   Y nX  fdd|D  d S )Nc                    s   g | ]} j |d qS r    )r4   pop).0Zbindingr9   r&   r'   
<listcomp>   s     z5BrokerState.queue_bindings_delete.<locals>.<listcomp>)r6   rB   r?   )r$   r   r4   r&   r9   r'   queue_bindings_delete   s
    z!BrokerState.queue_bindings_deletec                    s    fdd j | D S )Nc                 3   s$   | ]}t |j|j j| V  qd S r    )r   r   r   r4   )rC   r=   r9   r&   r'   	<genexpr>   s   z-BrokerState.queue_bindings.<locals>.<genexpr>)r6   r$   r   r&   r9   r'   queue_bindings   s    
zBrokerState.queue_bindings)N)r+   r,   r-   r.   r3   r4   r6   r7   r8   r:   r>   rA   rE   rH   r&   r&   r&   r'   r2   O   s   

	r2   c                   @   s~   e Zd ZdZdZdZdZdZdddZdd Z	d	d
 Z
dd Zdd Zdd Zdd ZdddZdd ZdddZdd ZdS )QoSzQuality of Service guarantees.

    Only supports `prefetch_count` at this point.

    Arguments:
        channel (ChannelT): Connection channel.
        prefetch_count (int): Initial prefetch count (defaults to 0).
    r   NTc                 C   sR   || _ |pd| _t | _d| j_t | _| jj| _| jj	| _
t| | jdd| _d S )Nr   Fr   )Zexitpriority)channelprefetch_countr   
_deliveredrestoredr5   _dirtyr<   
_quick_ack__setitem___quick_appendr   restore_unacked_once_on_collect)r$   rJ   rK   r&   r&   r'   r7      s    


  zQoS.__init__c                 C   s$   | j }| p"t| jt| j |k S )zReturn true if the channel can be consumed from.

        Used to ensure the client adhers to currently active
        prefetch limits.
        )rK   lenrL   rN   r$   Zpcountr&   r&   r'   can_consume   s    zQoS.can_consumec                 C   s,   | j }|r(t|t| jt| j  dS dS )a  Return the maximum number of messages allowed to be returned.

        Returns an estimated number of messages that a consumer may be allowed
        to consume at once from the broker.  This is used for services where
        bulk 'get message' calls are preferred to many individual 'get message'
        calls - like SQS.

        Returns:
            int: greater than zero.
        r   N)rK   maxrT   rL   rN   rU   r&   r&   r'   can_consume_max_estimate   s    zQoS.can_consume_max_estimatec                 C   s   | j r|   | || dS )z&Append message to transactional state.N)rN   _flushrQ   )r$   messagedelivery_tagr&   r&   r'   append   s    z
QoS.appendc                 C   s
   | j | S r    )rL   r$   r[   r&   r&   r'   get   s    zQoS.getc                 C   sD   | j }| j}z| }W n tk
r0   Y q@Y nX ||d qdS )z'Flush dirty (acked/rejected) tags from.N)rN   rL   rB   r?   )r$   Zdirty	deliveredZ	dirty_tagr&   r&   r'   rY      s    
z
QoS._flushc                 C   s   |  | dS )z8Acknowledge message and remove from transactional state.N)rO   r]   r&   r&   r'   ack   s    zQoS.ackFc                 C   s$   |r| j | j|  | | dS )z4Remove from transactional state and requeue message.N)rJ   _restore_at_beginningrL   rO   r$   r[   requeuer&   r&   r'   reject   s    z
QoS.rejectc              
   C   s   |    | j}g }| jj}|j}|rz| \}}W n tk
rJ   Y qY nX z|| W q  tk
r } z|||f W 5 d}~X Y q X q |  |S )z$Restore all unacknowledged messages.N)	rY   rL   rJ   _restorepopitemr?   BaseExceptionr\   r8   )r$   r_   errorsrestoreZpop_message_rZ   excr&   r&   r'   restore_unacked   s     
"zQoS.restore_unackedc                 C   s   | j   |   |dkr tjn|}| j}| jr8| jjs<dS t	|ddrT|rPt
dS z`|rttt| j|d |  }|rtt| \}}ttt|||d t||d W 5 d|_X dS )zRestore all unacknowledged messages at shutdown/gc collect.

        Note:
            Can only be called once for each instance, subsequent
            calls will be ignored.
        NrM   T)file)stderr)rS   cancelrY   sysrn   rL   restore_at_shutdownrJ   
do_restoregetattrAssertionErrorrM   printRESTORING_FMTformatrT   rl   listzipRESTORE_PANIC_FMTr   )r$   rn   stateZ
unrestoredrh   messagesr&   r&   r'   rR   
  s,    
zQoS.restore_unacked_oncec                 O   s   dS )zRestore any pending unackwnowledged messages.

        To be filled in for visibility_timeout style implementations.

        Note:
            This is implementation optional, and currently only
            used by the Redis transport.
        Nr&   )r$   argskwargsr&   r&   r'   restore_visible)  s    zQoS.restore_visible)r   )F)N)r+   r,   r-   r.   rK   rL   rN   rq   r7   rV   rX   r\   r^   rY   r`   rd   rl   rR   r   r&   r&   r&   r'   rI      s    

	

rI   c                       s*   e Zd ZdZd fdd	Zdd Z  ZS )MessagezMessage object.Nc                    st   || _ |d }|d}|r.|||d}t jf |||d |d|d|d||dd	d
	| d S )N
propertiesbodybody_encodingr[   content-typecontent-encodingheadersdelivery_infozutf-8)	r   rJ   r[   content_typecontent_encodingr   r   r   Z
postencode)_rawr^   decode_bodysuperr7   )r$   payloadrJ   r~   r   r   	__class__r&   r'   r7   7  s$    

zMessage.__init__c                 C   sJ   | j }| j| j|d\}}t| j}|dd  ||| j| j	|dS )Nr   compression)r   r   r   r   r   )
r   rJ   encode_bodyr   r^   dictr   rB   r   r   )r$   propsr   rj   r   r&   r&   r'   serializableI  s    

zMessage.serializable)N)r+   r,   r-   r.   r7   r   __classcell__r&   r&   r   r'   r   4  s   r   c                   @   s\   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dddZdd ZdS )AbstractChannelzAbstract channel interface.

    This is an abstract class defining the channel methods
    you'd usually want to implement in a virtual channel.

    Note:
        Do not subclass directly, but rather inherit
        from :class:`Channel`.
    Nc                 C   s   t ddS )zGet next message from `queue`.z$Virtual channels must implement _getNNotImplementedError)r$   r   timeoutr&   r&   r'   _getd  s    zAbstractChannel._getc                 C   s   t ddS )zPut `message` onto `queue`.z$Virtual channels must implement _putNr   )r$   r   rZ   r&   r&   r'   _puth  s    zAbstractChannel._putc                 C   s   t ddS )z!Remove all messages from `queue`.z&Virtual channels must implement _purgeNr   rG   r&   r&   r'   _purgel  s    zAbstractChannel._purgec                 C   s   dS )z<Return the number of messages in `queue` as an :class:`int`.r   r&   rG   r&   r&   r'   _sizep  s    zAbstractChannel._sizec                 O   s   |  | dS )zDelete `queue`.

        Note:
            This just purges the queue, if you need to do more you can
            override this method.
        Nr   )r$   r   r}   r~   r&   r&   r'   _deletet  s    zAbstractChannel._deletec                 K   s   dS )zCreate new queue.

        Note:
            Your transport can override this method if it needs
            to do something whenever a new queue is declared.
        Nr&   r$   r   r~   r&   r&   r'   
_new_queue}  s    zAbstractChannel._new_queuec                 K   s   dS )zVerify that queue exists.

        Returns:
            bool: Should return :const:`True` if the queue exists
                or :const:`False` otherwise.
        Tr&   r   r&   r&   r'   
_has_queue  s    zAbstractChannel._has_queuec                 C   s
   | |S )z-Poll a list of queues for available messages.)r^   )r$   cyclecallbackr   r&   r&   r'   _poll  s    zAbstractChannel._pollc                 C   s   |  |}||| d S r    )r   )r$   r   r   rZ   r&   r&   r'   _get_and_deliver  s    
z AbstractChannel._get_and_deliver)N)N)r+   r,   r-   r.   r   r   r   r   r   r   r   r   r   r&   r&   r&   r'   r   Y  s   

		
r   c                   @   s  e Zd ZdZeZeZdZeeZ	dZ
de iZdZedZdZdZdZdZd	Zd
d ZdbddZdcddZddddZdeddZdd ZdfddZdgddZdhddZdiddZd d! Zd"d# Z d$d% Z!d&d' Z"d(d) Z#d*d+ Z$d,d- Z%djd.d/Z&dkd0d1Z'dld2d3Z(dmd4d5Z)dnd6d7Z*d8d9 Z+d:d; Z,dod<d=Z-dpd>d?Z.d@dA Z/dBdC Z0dqdDdEZ1dFdG Z2drdHdIZ3dsdJdKZ4dLdM Z5dtdNdOZ6dudPdQZ7dRdS Z8dTdU Z9dVdW Z:e;dXdY Z<e;dZd[ Z=e;d\d] Z>dvd^d_Z?d`da Z@dS )wChannelzVirtual channel.

    Arguments:
        connection (ConnectionT): The transport instance this
            channel is part of.
    TFr!   r   N)r   deadletter_queuer   	   c              	      s   | _ t  _d  _i  _g  _d  _d _ fdd j	 D  _ 
  _ j jj} jD ].}zt |||  W q` tk
r   Y q`X q`d S )NFc                    s   i | ]\}}|| qS r&   r&   )rC   typclsr9   r&   r'   
<dictcomp>  s     z$Channel.__init__.<locals>.<dictcomp>)
connectionr5   
_consumers_cycle_tag_to_queue_active_queues_qosclosedexchange_typesitems_get_free_channel_id
channel_idclienttransport_optionsfrom_transport_optionssetattrr?   )r$   r   r~   Ztoptsopt_namer&   r9   r'   r7     s"    



zChannel.__init__directc           	   	   C   s   |pd}|pd| }|rH|| j jkrDtd|| jjjp8dddddS zD| j j| }| |||||||st	t
|| jjjpdW n0 tk
r   ||||pi g d	| j j|< Y nX dS )
zDeclare exchange.r   zamq.%sz*NOT_FOUND - no exchange {!r} in vhost {!r}/2   
   Channel.exchange_declare404N)typedurableauto_deleter   table)r{   r3   r   rw   r   r   virtual_hosttypeofZ
equivalentr/   NOT_EQUIVALENT_FMTr?   )	r$   r   r   r   r   r   nowaitpassiveprevr&   r&   r'   exchange_declare  s@         
r   c                 C   s:   |  |D ]\}}}| j|ddd q
| jj|d dS )z'Delete `exchange` and all its bindings.T)	if_unusedif_emptyN)	get_tablequeue_deleter{   r3   rB   )r$   r   r   r   rkeyrj   r   r&   r&   r'   exchange_delete  s    zChannel.exchange_deletec                 K   sb   |pdt   }|rB| j|f|sBtd|| jjjp4ddddn| j|f| t|| 	|dS )zDeclare queue.z
amq.gen-%sz'NOT_FOUND - no queue {!r} in vhost {!r}r   r   Channel.queue_declarer   r   )
r   r   r   rw   r   r   r   r   r   r   )r$   r   r   r~   r&   r&   r'   queue_declare   s       r   c           	      K   sd   |r|  |rdS | j|D ]4\}}}| |||||}| j||f|| q| j| dS )zDelete queue.N)r   r{   rH   r   prepare_bindr   rE   )	r$   r   r   r   r~   r   r   r}   metar&   r&   r'   r     s    
   zChannel.queue_deletec                 C   s   |  | d S r    )r   rG   r&   r&   r'   after_reply_message_received  s    z$Channel.after_reply_message_received c                 C   s   t dd S )Nz(transport does not support exchange_bindr   r$   destinationsourcer   r   r   r&   r&   r'   exchange_bind  s    zChannel.exchange_bindc                 C   s   t dd S )Nz*transport does not support exchange_unbindr   r   r&   r&   r'   exchange_unbind  s    zChannel.exchange_unbindc                 K   s|   |pd}| j |||rdS | j |||| | j j| dg }| |||||}|| | jrx| j	|f|  dS )z.Bind `queue` to `exchange` with `routing key`.z
amq.directNr   )
r{   r:   r>   r3   r;   r   r   r\   supports_fanoutZ_queue_bind)r$   r   r   r   r   r~   r   r   r&   r&   r'   
queue_bind#  s    
   
zChannel.queue_bindc                    sj   | j ||| z| |}W n tk
r4   Y d S X | |||||  fdd|D |d d < d S )Nc                    s   g | ]}| kr|qS r&   r&   )rC   r   Zbinding_metar&   r'   rD   A  s      z(Channel.queue_unbind.<locals>.<listcomp>)r{   rA   r   r?   r   r   )r$   r   r   r   r   r~   r   r&   r   r'   queue_unbind4  s    
   zChannel.queue_unbindc                    s    fdd j jD S )Nc                 3   s.   | ]&}  |D ]\}}}|||fV  qqd S r    )r   )rC   r   r   patternr   r9   r&   r'   rF   D  s    z(Channel.list_bindings.<locals>.<genexpr>r{   r3   r9   r&   r9   r'   list_bindingsC  s    
zChannel.list_bindingsc                 K   s
   |  |S )z%Remove all ready messages from queue.r   r   r&   r&   r'   queue_purgeH  s    zChannel.queue_purgec                 C   s   t  S r    r   r9   r&   r&   r'   _next_delivery_tagL  s    zChannel._next_delivery_tagc                 K   s:   |  ||| |r*| |j|||f|S | j||f|S )zPublish message.)_inplace_augment_messager   Zdeliverr   )r$   rZ   r   r   r~   r&   r&   r'   basic_publishO  s    
  zChannel.basic_publishc                 C   sJ   |  |d | j\|d< }|d }|j||  d |d j||d d S )Nr   r   )r   r[   r   r   r   )r   r   updater   )r$   rZ   r   r   r   r   r&   r&   r'   r   Y  s     z Channel._inplace_augment_messagec                    sJ   |j |< j|  fdd}|jj|< j|   dS )zConsume from `queue`.c                    s*   j | d}s"j||j  |S )NrJ   )r   qosr\   r[   )raw_messagerZ   r   no_ackr$   r&   r'   	_callbackl  s    z(Channel.basic_consume.<locals>._callbackN)r   r   r\   r   
_callbacksr   r<   _reset_cycle)r$   r   r   r   consumer_tagr~   r   r&   r   r'   basic_consumeg  s    
zChannel.basic_consumec                 C   sf   || j krb| j | |   | j|d}z| j| W n tk
rP   Y nX | jj|d dS )z Cancel consumer by consumer tag.N)	r   r@   r   r   rB   r   
ValueErrorr   r   )r$   r   r   r&   r&   r'   basic_cancelw  s    
zChannel.basic_cancelc                 K   sH   z.| j | || d}|s*| j||j |W S  tk
rB   Y nX dS )z+Get message by direct access (synchronous).r   N)r   r   r   r\   r[   r   )r$   r   r   r~   rZ   r&   r&   r'   	basic_get  s    zChannel.basic_getc                 C   s   | j | dS )zAcknowledge message.N)r   r`   )r$   r[   multipler&   r&   r'   	basic_ack  s    zChannel.basic_ackc                 C   s   |r| j  S tddS )zRecover unacked messages.z'Does not support recover(requeue=False)N)r   rl   r   )r$   rc   r&   r&   r'   basic_recover  s    
zChannel.basic_recoverc                 C   s   | j j||d dS )zReject message.rc   N)r   rd   rb   r&   r&   r'   basic_reject  s    zChannel.basic_rejectc                 C   s   || j _dS )zmChange QoS settings for this channel.

        Note:
            Only `prefetch_count` is supported.
        N)r   rK   )r$   Zprefetch_sizerK   Zapply_globalr&   r&   r'   	basic_qos  s    zChannel.basic_qosc                 C   s   t | jjS r    )rx   r{   r3   r9   r&   r&   r'   get_exchanges  s    zChannel.get_exchangesc                 C   s   | j j| d S )z%Get table of bindings for `exchange`.r   r   )r$   r   r&   r&   r'   r     s    zChannel.get_tablec                 C   s8   z| j j| d }W n tk
r,   |}Y nX | j| S )z.Get the exchange type instance for `exchange`.r   )r{   r3   r?   r   )r$   r   defaultr   r&   r&   r'   r     s
    
zChannel.typeofc                 C   s   |dkr| j }|s|p|gS z | || ||||}W n tk
rT   g }Y nX |s|dk	rtttj	||d | 
| |g}|S )zFind all queues matching `routing_key` for the given `exchange`.

        Returns:
            str: queue name -- must return the string `default`
                if no queues matched.
        Nr   )r   r   lookupr   r?   warningswarnr1   UNDELIVERABLE_FMTrw   r   )r$   r   r   r  Rr&   r&   r'   _lookup  s*    

  

 

zChannel._lookupc                 C   s@   |j }| }d|d< | |d |d D ]}| || q*dS )z.Redeliver message to its original destination.TZredeliveredr   r   N)r   r   r  r   )r$   rZ   r   r   r&   r&   r'   re     s     zChannel._restorec                 C   s
   |  |S r    )re   )r$   rZ   r&   r&   r'   ra     s    zChannel._restore_at_beginningc                 C   sR   |p
| j j}| jrH| j rHt| dr6| j| j|dS | j| j	||dS t
 d S )N	_get_manyr   )r   _deliverr   r   rV   hasattrr  r   r   r   r   )r$   r   r   r&   r&   r'   drain_events  s    
zChannel.drain_eventsc                 C   s   t || js| j|| dS |S )z1Convert raw message to :class:`Message` instance.)r   rJ   )
isinstancer   )r$   r   r&   r&   r'   message_to_python  s    zChannel.message_to_pythonc                 C   s>   |pi }| di  | d|p"| j ||||p2i |p8i dS )zPrepare message data.r   priority)r   r   r   r   r   )r;   default_priority)r$   r   r  r   r   r   r   r&   r&   r'   prepare_message  s    zChannel.prepare_messagec                 C   s   t ddS )zEnable/disable message flow.

        Raises:
            NotImplementedError: as flow
                is not implemented by the base virtual implementation.
        z%virtual channels do not support flow.Nr   )r$   activer&   r&   r'   flow  s    zChannel.flowc                 C   sp   | j sfd| _ t| jD ]}| | q| jr6| j  | jdk	rP| j  d| _| jdk	rf| j	|  d| _
dS )zTClose channel.

        Cancel all consumers, and requeue unacked messages.
        TN)r   rx   r   r   r   rR   r   closer   close_channelr   )r$   Zconsumerr&   r&   r'   r    s    



zChannel.closec                 C   s"   |r| j |||fS ||fS r    )codecsr^   r(   r$   r   encodingr&   r&   r'   r     s    zChannel.encode_bodyc                 C   s   |r| j ||S |S r    )r  r^   r*   r  r&   r&   r'   r     s    zChannel.decode_bodyc                 C   s   t | j| jt| _d S r    )r   r   r   r   r   r9   r&   r&   r'   r     s
      zChannel._reset_cyclec                 C   s   | S r    r&   r9   r&   r&   r'   	__enter__  s    zChannel.__enter__c                 G   s   |    d S r    )r  )r$   exc_infor&   r&   r'   __exit__"  s    zChannel.__exit__c                 C   s   | j jS )z/Broker state containing exchanges and bindings.)r   r{   r9   r&   r&   r'   r{   %  s    zChannel.statec                 C   s   | j dkr| | | _ | j S )z&:class:`QoS` manager for this channel.N)r   rI   r9   r&   r&   r'   r   *  s    
zChannel.qosc                 C   s   | j d kr|   | j S r    )r   r   r9   r&   r&   r'   r   1  s    
zChannel.cyclec              
   C   sX   z$t tt|d d | j| j}W n  tttfk
rD   | j}Y nX |rT| j| S |S )zGet priority from message.

        The value is limited to within a boundary of 0 to 9.

        Note:
            Higher value has more priority.
        r   r  )	rW   minintmax_prioritymin_priority	TypeErrorr   r?   r  )r$   rZ   reverser  r&   r&   r'   _get_message_priority7  s    zChannel._get_message_priorityc                 C   sd   t | jj}td| jjd D ]"}||kr| jj| |  S qtdt| jj	| jjdd S )Nr   z/No free channel ids, current={}, channel_max={})   r   )
r5   r   _used_channel_idsrangechannel_maxr\   r   rw   rT   channels)r$   Zused_channel_idsr   r&   r&   r'   r   J  s    

zChannel._get_free_channel_id)Nr   FFNFF)FF)NF)FF)r   r   FN)r   r   FN)Nr   N)Nr   N)F)F)F)F)r   r   F)r   )N)NN)NNNNN)T)N)N)F)Ar+   r,   r-   r.   r   rI   rr   r   r   r   r   r   r  r   r   Z_delivery_tagsr   r   r  r  r  r7   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  re   ra   r  r  r  r  r  r   r   r   r  r  propertyr{   r   r   r"  r   r&   r&   r&   r'   r     s   
       



    
    
  
  






  
	

	
      

	





r   c                       s0   e Zd ZdZ fddZdd Zdd Z  ZS )
Managementz'Base class for the AMQP management API.c                    s   t  | |j | _d S r    )r   r7   r   rJ   )r$   	transportr   r&   r'   r7   ^  s    zManagement.__init__c                 C   s   dd | j  D S )Nc                 S   s   g | ]\}}}|||d qS ))r   r   r   r&   )rC   qerr&   r&   r'   rD   c  s   z+Management.get_bindings.<locals>.<listcomp>)rJ   r   r9   r&   r&   r'   get_bindingsb  s    zManagement.get_bindingsc                 C   s   | j   d S r    )rJ   r  r9   r&   r&   r'   r  f  s    zManagement.close)r+   r,   r-   r.   r7   r.  r  r   r&   r&   r   r'   r)  [  s   r)  c                   @   s   e Zd ZdZeZeZeZdZdZ	dZ
dZdZdZejjjdeddgddZd	d
 Zdd Zdd Zdd Zdd ZdddZdd Zdd Zdd Zd ddZedd ZdS )!	TransportznVirtual transport.

    Arguments:
        client (kombu.Connection): The client this is a transport for.
    Ng      ?i  Fr   Ztopic)ZasynchronousZexchange_typeZ
heartbeatsc                 K   s\   || _ t | _g | _g | _i | _| | j| jt| _	|j
d}|d k	rN|| _tt| _d S )Npolling_interval)r   r2   r{   r'  _avail_channelsr   Cycle_drain_channelr   r   r   r^   r0  r   ARRAY_TYPE_Hr$  )r$   r   r~   r0  r&   r&   r'   r7     s    zTransport.__init__c                 C   s@   z| j  W S  tk
r:   | |}| j| | Y S X d S r    )r1  rB   
IndexErrorr   r'  r\   )r$   r   rJ   r&   r&   r'   create_channel  s    
zTransport.create_channelc                 C   s`   zRz| j|j W n tk
r(   Y nX z| j| W n tk
rN   Y nX W 5 d |_ X d S r    )r   r$  r@   r   r   r'  )r$   rJ   r&   r&   r'   r    s    
zTransport.close_channelc                 C   s   | j | |  | S r    )r1  r\   r6  r9   r&   r&   r'   establish_connection  s    zTransport.establish_connectionc              	   C   sP   | j   | j| jfD ]4}|rz| }W n tk
r>   Y qX |  qqd S r    )r   r  r1  r'  rB   LookupError)r$   r   Z	chan_listrJ   r&   r&   r'   close_connection  s    
zTransport.close_connectionc                 C   s   t  }| jj}| j}|r(|r(||kr(|}z|| j|d W q tk
r|   |d k	rht  | |krht |d k	rxt| Y q(X qq(d S )Nr	  )	r	   r   r^   r0  r
  r   socketr   r
   )r$   r   r   Z
time_startr^   r0  r&   r&   r'   r    s    zTransport.drain_eventsc                 C   sX   |st d|z| j| }W n* t k
rJ   tt| | | Y n
X || d S )Nz.Received message without destination queue: {})r?   rw   r   loggerwarningW_NO_CONSUMERS_reject_inbound_message)r$   rZ   r   r   r&   r&   r'   r
    s    zTransport._deliverc                 C   sF   | j D ]:}|r|j||d}|j||j |j|jdd  qBqd S )Nr   Tr   )r'  r   r   r\   r[   r   )r$   r   rJ   rZ   r&   r&   r'   r>    s    
z!Transport._reject_inbound_messagec                 C   s0   |r|| j krtd||| j | | d S )Nz,Message for queue {!r} without consumers: {})r   r?   rw   )r$   rJ   rZ   r   r&   r&   r'   on_message_ready  s     zTransport.on_message_readyc                 C   s   |j ||dS )N)r   r   )r  )r$   rJ   r   r   r&   r&   r'   r3    s    zTransport._drain_channelc                 C   s   | j ddS )N	localhost)porthostname)default_portr9   r&   r&   r'   default_connection_params  s    z#Transport.default_connection_params)N)N)r+   r,   r-   r.   r   r   r2  r)  r   rC  r'  r   r0  r&  r   r/  Z
implementsextend	frozensetr7   r6  r  r7  r9  r  r
  r>  r?  r3  r(  rD  r&   r&   r&   r'   r/  j  s6   


r/  )>r.   r!   r:  rp   r  r   collectionsr   r   r   	itertoolsr   Zmultiprocessing.utilr   r   r   timer	   r
   Zamqp.protocolr   Zkombu.exceptionsr   r   Z	kombu.logr   Zkombu.transportr   Zkombu.utils.divr   Zkombu.utils.encodingr   r   Zkombu.utils.schedulingr   Zkombu.utils.uuidr   r   r   r4  r  r   r=  rv   rz   r+   r;  r   r   r   	Exceptionr/   UserWarningr1   r2   rI   r   r   Z
StdChannelr   r)  r/  r&   r&   r&   r'   <module>   sT   


G %>   G