U
    d@                     @  s  d Z ddlmZ ddlZddlZddlZddlmZmZm	Z	m
Z
mZmZmZ ddlmZmZ ddlmZ ddlmZmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlm Z  ddl!m"Z" erddl#m$Z$m%Z%m&Z& ddl'm(Z( ddl)m*Z* dddddZ+G dd dZ,G dd de,Z-G dd de,Z.G dd de,Z/e0 Z1ddddd Z2d!dd"d#d$Z3dd%d&d'Z4dd%d(d)Z5e6e5 dS )*z9Class to monitor a MongoDB server on a background thread.    )annotationsN)TYPE_CHECKINGAnyListMappingOptionalTuplecast)commonperiodic_executor)MovingMinimum)NotPrimaryErrorOperationFailure_OperationCancelled)Hello)_create_lock)_shutdown_executors)MovingAverage)ServerDescription)_SrvResolver)
ConnectionPool_CancellationContext)TopologySettings)Topology	ExceptionNone)errorreturnc                 C  s   d| _ d| _d| _dS )z'PYTHON-2433 Clear error traceback info.N)__traceback____context__	__cause__r    r#   3/tmp/pip-unpacked-wheel-oblwsawz/pymongo/monitor.py	_sanitize(   s    r%   c                   @  sj   e Zd ZdddddddZdd	d
dZdd	ddZdd	ddZddddddZdd	ddZdS )MonitorBaser   strintfloat)topologynameintervalmin_intervalc                   sh   dd fdd}t j||||d}|| _dddd	 fd
d}t| |j t||| _t|  dS )zBase class to do periodic work on a background thread.

        The background thread is signaled to stop when the Topology or
        this instance is freed.
        boolr   c                    s     } | d krdS |    dS )NFT)_run)monitorZself_refr#   r$   target8   s
    z$MonitorBase.__init__.<locals>.target)r,   r-   r3   r+   NzOptional[Topology]r   )dummyr   c                   s     }|r|   d S Ngc_safe_close)r4   r1   r2   r#   r$   _on_topology_gcE   s    z-MonitorBase.__init__.<locals>._on_topology_gc)N)	r   ZPeriodicExecutor	_executorweakrefrefcloseproxy	_topology	_register)selfr*   r+   r,   r-   r3   executorr8   r#   r2   r$   __init__0   s       zMonitorBase.__init__r   r/   c                 C  s   | j   dS )z[Start monitoring, or restart after a fork.

        Multiple calls have no effect.
        N)r9   openr@   r#   r#   r$   rC   Q   s    zMonitorBase.openc                 C  s   | j   dS )zGC safe close.N)r9   r<   rD   r#   r#   r$   r7   X   s    zMonitorBase.gc_safe_closec                 C  s   |    dS )zWClose and stop monitoring.

        open() restarts the monitor after closing.
        Nr6   rD   r#   r#   r$   r<   \   s    zMonitorBase.closeNzOptional[int])timeoutr   c                 C  s   | j | dS )zWait for the monitor to stop.N)r9   join)r@   rE   r#   r#   r$   rF   c   s    zMonitorBase.joinc                 C  s   | j   dS )z)If the monitor is sleeping, wake it soon.N)r9   ZwakerD   r#   r#   r$   request_checkg   s    zMonitorBase.request_check)N)	__name__
__module____qualname__rB   rC   r7   r<   rF   rG   r#   r#   r#   r$   r&   /   s   !r&   c                      s   e Zd Zddddd fddZdd	d
dZdd	ddZdd	ddZdd	ddZdd	ddZdd	ddZ	dd	ddZ
dd	ddZdddddZ  ZS )Monitorr   r   r   r   )server_descriptionr*   pooltopology_settingsc                   sn   t  |d|jtj || _|| _|| _| jjj	| _
| j
dk	oD| j
j| _d| _t||||j| _d| _dS )a   Class to monitor a MongoDB server on a background thread.

        Pass an initial ServerDescription, a Topology, a Pool, and
        TopologySettings.

        The Topology is weakly referenced. The Pool must be exclusive to this
        Monitor.
        Zpymongo_server_monitor_threadN)superrB   heartbeat_frequencyr
   MIN_HEARTBEAT_INTERVAL_server_description_pool	_settingsZ_pool_optionsZ_event_listeners
_listenersZenabled_for_server_heartbeat_publish_cancel_context_RttMonitorZ_create_pool_for_monitoraddress_rtt_monitorZheartbeater)r@   rL   r*   rM   rN   	__class__r#   r$   rB   m   s$    
zMonitor.__init__r   r/   c                 C  s   | j }|r|  dS )zCancel any concurrent hello check.

        Note: this is called from a weakref.proxy callback and MUST NOT take
        any locks.
        N)rW   cancel)r@   contextr#   r#   r$   cancel_check   s    zMonitor.cancel_checkc                 C  s    | j   | jjr| j   dS )z1Start an _RttMonitor that periodically runs ping.N)rZ   rC   r9   _stoppedr<   rD   r#   r#   r$   _start_rtt_monitor   s    
zMonitor._start_rtt_monitorc                 C  s    | j   | j  |   d S r5   )r9   r<   rZ   r7   r_   rD   r#   r#   r$   r7      s    

zMonitor.gc_safe_closec                 C  s   |    | j  |   d S r5   )r7   rZ   r<   _reset_connectionrD   r#   r#   r$   r<      s    
zMonitor.closec                 C  s   | j   d S r5   )rS   resetrD   r#   r#   r$   rb      s    zMonitor._reset_connectionc              
   C  s   z| j }z|  | _ W nT tk
rj } z6t| t| j j|d| _ |jrR| j  W Y W d S d }~X Y nX | j	j
| j | j jd | j jr| j jr|   | j  | j jr|jr| j  W n tk
r   |   Y nX d S )Nr"   )Z
reset_pool)rR   _check_serverr   r%   r   rY   is_server_type_knownr9   Z
skip_sleepr>   Z	on_changer   topology_versionra   ReferenceErrorr<   )r@   Zprev_sdexcr#   r#   r$   r0      s6     
 
zMonitor._runc           	   
   C  s*  t  }z`z|  W W S  ttfk
rd } z,ttttf |j	}| j
|d  W 5 d}~X Y nX W n tk
r~    Y n tk
r$ } zt| | j}|j}t  | }| jrt|jo|j}| jdk	st| j|||| |   t|tr | j  t||d W Y S d}~X Y nX dS )z^Call hello or read the next streaming response.

        Returns a ServerDescription.
        z$clusterTimeNr"   )time	monotonic_check_oncer   r   r	   r   r'   r   detailsr>   Zreceive_cluster_timegetrg   r   r%   rR   rY   rV   r.   re   rf   rU   AssertionErrorZpublish_server_heartbeat_failedrb   
isinstancer   rZ   rc   r   )	r@   startrh   rl   r   sdrY   durationZawaitedr#   r#   r$   rd      s0    

zMonitor._check_serverc              
   C  s   | j j}| jr(| jdk	st| j| | jr>| jjr>|   | j	
 }|j| _| |\}}|jsr| j| | j \}}t||||d}| jr| jdk	st| j||||j |W  5 Q R  S Q R X dS )zfA single attempt to call hello.

        Returns a ServerDescription, or raises an exception.
        N)Zmin_round_trip_time)rR   rY   rV   rU   rn   Z publish_server_heartbeat_startedrW   Z	cancelledrb   rS   checkoutZcancel_context_check_with_socket	awaitablerZ   
add_samplerm   r   Z"publish_server_heartbeat_succeeded)r@   rY   connresponseZround_trip_timeZavg_rttZmin_rttrq   r#   r#   r$   rk      s,       zMonitor._check_oncer   zTuple[Hello, float])rw   r   c                 C  sn   | j  }t }|jr*t| dd}n4|jrP| jj	rP|
|| jj	| jj}n|
|dd}|t | fS )zcReturn (Hello, round_trip_time).

        Can raise ConnectionFailure or OperationFailure.
        T)ru   N)r>   Zmax_cluster_timeri   rj   Zmore_to_comer   Z_next_replyZperformed_handshakerR   rf   Z_hellorT   rP   )r@   rw   Zcluster_timerp   rx   r#   r#   r$   rt     s    
zMonitor._check_with_socket)rH   rI   rJ   rB   r_   ra   r7   r<   rb   r0   rd   rk   rt   __classcell__r#   r#   r[   r$   rK   l   s   "	% rK   c                      s@   e Zd Zddd fddZdddd	Zd
dddZ  ZS )
SrvMonitorr   r   )r*   rN   c                   sF   t  |dtj|j || _| jj| _t| jj	t
s8t| jj	| _dS )zClass to poll SRV records on a background thread.

        Pass a Topology and a TopologySettings.

        The Topology is weakly referenced.
        Zpymongo_srv_polling_threadN)rO   rB   r
   MIN_SRV_RESCAN_INTERVALrP   rT   Z_seeds	_seedlistro   Zfqdnr'   rn   _fqdn)r@   r*   rN   r[   r#   r$   rB   -  s    
zSrvMonitor.__init__r   r/   c                 C  sF   |   }|rB|| _z| j| j W n tk
r@   |   Y nX d S r5   )_get_seedlistr|   r>   Zon_srv_updaterg   r<   )r@   seedlistr#   r#   r$   r0   ?  s    zSrvMonitor._runzOptional[List[Tuple[str, Any]]]c                 C  st   z8t | j| jjj| jj}| \}}t|dkr6tW n tk
rV   | 	  Y dS X | j
t|tj |S dS )zXPoll SRV records for a seedlist.

        Returns a list of ServerDescriptions.
        r   N)r   r}   rT   Zpool_optionsconnect_timeoutZsrv_service_nameZget_hosts_and_min_ttllenr   rG   r9   Zupdate_intervalmaxr
   r{   )r@   resolverr   Zttlr#   r#   r$   r~   I  s    zSrvMonitor._get_seedlist)rH   rI   rJ   rB   r0   r~   ry   r#   r#   r[   r$   rz   ,  s   
rz   c                      s|   e Zd Zdddd fddZddd	d
ZdddddZddddZddddZddddZddddZ	  Z
S )rX   r   r   r   )r*   rN   rM   c                   s8   t  |d|jtj || _t | _t | _	t
 | _dS )z\Maintain round trip times for a server.

        The Topology is weakly referenced.
        Zpymongo_server_rtt_threadN)rO   rB   rP   r
   rQ   rS   r   _moving_averager   _moving_minr   _lock)r@   r*   rN   rM   r[   r#   r$   rB   e  s    z_RttMonitor.__init__r   r/   c                 C  s   |    | j  d S r5   )r7   rS   rc   rD   r#   r#   r$   r<   v  s    z_RttMonitor.closer)   )sampler   c              	   C  s.   | j  | j| | j| W 5 Q R X dS )zAdd a RTT sample.N)r   r   rv   r   )r@   r   r#   r#   r$   rv   |  s    z_RttMonitor.add_samplezTuple[Optional[float], float]c              
   C  s2   | j " | j | j fW  5 Q R  S Q R X dS )zBGet the calculated average, or None if no samples yet and the min.N)r   r   rm   r   rD   r#   r#   r$   rm     s    z_RttMonitor.getc              	   C  s*   | j  | j  | j  W 5 Q R X dS )zReset the average RTT.N)r   r   rc   r   rD   r#   r#   r$   rc     s    
z_RttMonitor.resetc                 C  sT   z|   }| | W n8 tk
r2   |   Y n tk
rN   | j  Y nX d S r5   )_pingrv   rg   r<   r   rS   rc   )r@   Zrttr#   r#   r$   r0     s    z_RttMonitor._runc              
   C  sN   | j  :}| jjrtdt }|  t | W  5 Q R  S Q R X dS )z)Run a "hello" command and return the RTT.z_RttMonitor closedN)rS   rs   r9   r`   r   ri   rj   Zhello)r@   rw   rp   r#   r#   r$   r     s    z_RttMonitor._ping)rH   rI   rJ   rB   r<   rv   rm   rc   r0   r   ry   r#   r#   r[   r$   rX   d  s   rX   )r1   r   c                 C  s   t | t}t| d S r5   )r:   r;   _unregister	_MONITORSadd)r1   r;   r#   r#   r$   r?     s    r?   z"weakref.ReferenceType[MonitorBase])monitor_refr   c                 C  s   t |  d S r5   )r   remove)r   r#   r#   r$   r     s    r   r/   c                  C  s8   t d krd S tt } | D ]}| }|r|  qd }d S r5   )r   listr7   )Zmonitorsr;   r1   r#   r#   r$   _shutdown_monitors  s    
r   c                  C  s    t } | r|   t} | r|   d S r5   )r   r   )shutdownr#   r#   r$   _shutdown_resources  s    r   )7__doc__
__future__r   atexitri   r:   typingr   r   r   r   r   r   r	   Zpymongor
   r   Zpymongo._csotr   Zpymongo.errorsr   r   r   Zpymongo.hellor   Zpymongo.lockr   Zpymongo.periodic_executorr   Zpymongo.read_preferencesr   Zpymongo.server_descriptionr   Zpymongo.srv_resolverr   Zpymongo.poolr   r   r   Zpymongo.settingsr   Zpymongo.topologyr   r%   r&   rK   rz   rX   setr   r?   r   r   r   registerr#   r#   r#   r$   <module>   s<   $= A8C
