U
    Y+d7                     @   s   d Z ddlZddlmZ ddlZddlmZmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ dZdZG dd dZG dd dejZG dd dejZdS )a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    N)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                   @   sF   e Zd ZdZdd Zdd Zdd Zdd	d
Zdd Zdd Z	e	Z
dS )BroadcastCursorzCursor for broadcast queues.c                 C   s   || _ | jdd d S )NF)rewind)_cursorpurge)selfcursor r   ;/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/mongodb.py__init__@   s    zBroadcastCursor.__init__c                 C   s   | j  | j S N)r   count_offsetr   r   r   r   get_sizeE   s    zBroadcastCursor.get_sizec                 C   s   | j   d S r   )r   closer   r   r   r   r   H   s    zBroadcastCursor.closeTc                 C   s.   |r| j   | j  | _| j | j| _ d S r   )r   r   r   r   skip)r   r   r   r   r   r   K   s    
zBroadcastCursor.purgec                 C   s   | S r   r   r   r   r   r   __iter__S   s    zBroadcastCursor.__iter__c              
   C   sj   zt | j}W qX tjjk
rR } z"dt|kr@|   W Y q  W 5 d }~X Y q X qXq |  jd7  _|S )Nznot valid at serverr   )nextr   pymongor   OperationFailurestrr   r   )r   msgexcr   r   r   __next__V   s    zBroadcastCursor.__next__N)T)__name__
__module____qualname____doc__r   r   r   r   r!   r(   r"   r   r   r   r   r   =   s   
r   c                       sT  e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZejjd Z fddZdd Zdd Z fddZdd Zdd Zdd Zdd Zdd Z fd d!ZdEd#d$Zd%d& Z d'd( Z!dFd)d*Z"d+d, Z#d-d. Z$d/d0 Z%e&d1d2 Z'e&d3d4 Z(e&d5d6 Z)e&d7d8 Z*e&d9d: Z+d;d< Z,d=d> Z-d?d@ Z.dAdB Z/dCdD Z0  Z1S )GChannelzMongoDB Channel.TFNi z	127.0.0.1ii  Zkombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                    s   t  j|| i | _| j d S r   )superr   _broadcast_cursorsclient)r   Zvargskwargs	__class__r   r   r      s    zChannel.__init__c                 K   s0   | j r,| jjd|i||| |dddd d S )N_id	x-expires)rA   options	expire_atTZupsert)r1   queuesupdate_get_expire)r   queuer>   r   r   r   
_new_queue   s    
zChannel._new_queuec                 C   s   || j kr8zt| |}W qV tk
r4   d }Y qVX n| jjd|idtjfgdd}| jrf| 	| |d krtt
 tt|d S )NrI   priorityT)querysortremovepayload)_fanout_queuesr"   _get_broadcast_cursorStopIterationr.   Zfind_and_modifyr#   Z	ASCENDINGr1   _update_queues_expirer   r   r	   )r   rI   r&   r   r   r   _get   s    


zChannel._getc                    s>   | j st |S || jkr*| | S | jd|i S NrI   )	r:   r;   _sizerP   rQ   r   r.   findr   r   rI   r?   r   r   rV      s
    
zChannel._sizec                 K   s@   t ||| j|ddd}| jr0| |d|d< | j| d S )NT)reverse)rO   rI   rK   zx-message-ttlrD   )r
   Z_get_message_priorityr1   rH   r.   insert)r   rI   messager>   datar   r   r   _put   s    zChannel._putc                 K   s   | j t||d d S )N)rO   rI   )	broadcastrZ   r
   )r   exchanger[   routing_keyr>   r   r   r   _put_fanout   s    zChannel._put_fanoutc                 C   s8   |  |}|| jkr$| |  n| jd|i |S rU   )rV   rP   rQ   r   r.   rN   )r   rI   sizer   r   r   _purge   s
    

zChannel._purgec                 C   s:   t | jj| d }| jd|i}|t dd |D B S )Ntabler_   c                 s   s$   | ]}|d  |d |d fV  qdS )r`   patternrI   Nr   ).0rr   r   r   	<genexpr>   s   z$Channel.get_table.<locals>.<genexpr>)	frozensetstateZ	exchangesroutingrW   )r   r_   ZlocalRoutesZbrokerRoutesr   r   r   	get_table   s    
zChannel.get_tablec                 C   sl   |  |jdkr*| |||| || j|< ||||d}| }| jrV| |d|d< | jj||dd d S )Nfanout)r_   rI   r`   re   rB   rD   TrE   )	Ztypeoftype_create_broadcast_cursorrP   copyr1   rH   rk   rG   )r   r_   r`   re   rI   lookupr\   r   r   r   _queue_bind   s"       
zChannel._queue_bindc                    s~   | j d|i | jr&| jd|i t j|f| || jkrzz| j|}W n t	k
rd   Y nX |
  | j| d S )NrI   rA   )rk   rN   r1   rF   r;   queue_deleterP   r<   popKeyErrorr   )r   rI   r>   r   r?   r   r   rs      s    
zChannel.queue_delete
mongodb://c                 C   s  | j j}|j}||s || }|t|d  s:|| j7 }|jrd|kr|d\}}|j}|jrp|d|j 7 }|d | d | }|j	r|j	n| j
}t||}|d p|j}	|	dkr| j}	d| j| jrt| jd nd d}
|
|d	  | |
}
||	|
fS )
N@z://:database)/NTi  )auto_start_requestr0   ZconnectTimeoutMSrC   )
connectionr=   hostname
startswithlenr3   Zuseridsplitpasswordportr4   r   	parse_uriZvirtual_hostr5   r0   r/   intrG   _prepare_client_options)r   schemer=   r}   headtailcredentialsr   parseddbnamerC   r   r   r   
_parse_uri  s4    


zChannel._parse_uric                 C   sB   t jdkr>|dd  t|dtr>t jj}||d  |d< |S )N   r{   Zreadpreference)r#   version_tuplert   
isinstancegetr   Zread_preferencesZ_MONGOS_MODES)r   rC   modesr   r   r   r   5  s    
zChannel._prepare_client_optionsc                 K   s   t |f|S r   r   )r   	argumentsr>   r   r   r   prepare_queue_arguments=  s    zChannel.prepare_queue_argumentsc                 C   s   | j |d\}}}||d< t }|dkr>ddlm} |  n|dkrXddlm} |  tf |}|| }	| d }
|
	d	d }
t
tt|
	d
}|dk rtt|
n| jr|dk rtt|
|	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   )r   r   r   r   Z	patch_allr   r   r   Zserver_infor   tuplemapr   r   E_SERVER_VERSIONformatr1   E_NO_TTL_INDEXES)r   r   r}   r   confenvr   r   Z	mongoconnry   version_strr   r   r   r   _open@  s&    

zChannel._openc                 C   s*   | j | krdS |j| j | jdd dS )z0Create capped collection for broadcast messages.NT)rb   Zcapped)r8   Zcollection_namesZcreate_collectionr2   r   ry   r   r   r   _create_broadcast[  s    zChannel._create_broadcastc                 C   s   || j  }|jdddgdd || j dg || j }|ddg | jr|jdgdd	 |jdgdd	 || j jdgdd	 d
S )zEnsure indexes on collections.)rI   r   )rK   r   )rA   r   T)Z
background)r_   r   )rD   r   r   )ZexpireAfterSecondsN)r6   Zensure_indexr8   r7   r1   r9   )r   ry   r.   rk   r   r   r   _ensure_indexesd  s    
 

 zChannel._ensure_indexesc                 C   s    |   }| | | | |S )zActually creates connection.)r   r   r   r   r   r   r   _create_clientw  s    

zChannel._create_clientc                 C   s   |   S r   )r   r   r   r   r   r=     s    zChannel.clientc                 C   s   | j | j S r   )r=   r6   r   r   r   r   r.     s    zChannel.messagesc                 C   s   | j | j S r   )r=   r7   r   r   r   r   rk     s    zChannel.routingc                 C   s   | j | j S r   )r=   r8   r   r   r   r   r^     s    zChannel.broadcastc                 C   s   | j | j S r   )r=   r9   r   r   r   r   rF     s    zChannel.queuesc              	   C   s<   z| j | W S  tk
r6   | | j| d d | Y S X d S r   )r<   ru   ro   rP   rX   r   r   r   rQ     s       zChannel._get_broadcast_cursorc                 C   sN   t jdkrd|itjd}nd|idd}| jjf |}t| }| j|< |S )Nr   rI   )filterZcursor_typeT)rL   Ztailable)r#   r   r   ZTAILABLEr^   rW   r   r<   )r   r_   r`   re   rI   rL   r   retr   r   r   ro     s    
z Channel._create_broadcast_cursorc              	   C   sp   t |tr,| jd|i}|s"dS |d }n|}z|d | }W n ttfk
rZ   Y dS X |  tj|d S )zGet expiration header named `argument` of queue definition.

        Note:
            `queue` must be either queue name or options itself.
        rA   NrC   r   )Zmilliseconds)	r   r%   rF   Zfind_oneru   	TypeErrorget_nowdatetime	timedelta)r   rI   argumentdocr\   valuer   r   r   rH     s    

zChannel._get_expirec                 C   sT   |  |d}|sdS | jjd|idd|iidd | jjd|idd|iidd dS )	z,Update expiration field on queues documents.rB   NrI   z$setrD   T)ZmultirA   )rH   rk   rG   rF   )r   rI   rD   r   r   r   rS     s     
  
 zChannel._update_queues_expirec                 C   s
   t j  S )zReturn current time in UTC.)r   utcnowr   r   r   r   r     s    zChannel.get_now)rv   )rv   )2r)   r*   r+   r,   Zsupports_fanoutrP   r0   r1   r/   r2   r:   r3   r4   r5   r6   r7   r8   r9   r   r-   Zfrom_transport_optionsr   rJ   rT   rV   r]   ra   rc   rl   rr   rs   r   r   r   r   r   r   r   r   r=   r.   rk   r^   rF   rQ   ro   rH   rS   r   __classcell__r   r   r?   r   r-   l   s`   
	

)
	




r-   c                   @   sp   e Zd ZdZeZdZdZejZej	j
ejf Z
ej	jejejf ZdZdZej	jjedddgd	Zd
d ZdS )	TransportzMongoDB Transport.Tr   Zmongodbr#   directZtopicrm   )Zexchange_typec                 C   s   t jS r   )r#   r   r   r   r   r   driver_version  s    zTransport.driver_versionN)r)   r*   r+   r,   r-   Zcan_parse_urlZpolling_intervalr4   r   r   Zconnection_errorsr   ZConnectionFailureZchannel_errorsr$   Zdriver_typeZdriver_nameZ
implementsextendri   r   r   r   r   r   r     s$   r   )r,   r   rI   r   r#   r   r   r   Zpymongo.cursorr   Zkombu.exceptionsr   Zkombu.utils.compatr   Zkombu.utils.encodingr	   Zkombu.utils.jsonr
   r   Zkombu.utils.objectsr    r   baser   r   r   r   r-   r   r   r   r   r   <module>   s&    /  l