U
    d-                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	m
Z
mZmZmZmZmZmZmZmZmZ ddlmZmZmZmZ ddlmZmZ ddlmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z% ddl&m'Z' dd	l(m)Z) dd
l*m+Z+ ddl,m-Z-m.Z. ddl/m0Z0 ddl1m2Z2 ddl3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9 ddl:m;Z;m<Z<m=Z=m>Z>m?Z? e
rzddl@mAZA ddlBmCZC ddlDmEZEmFZF dddddZGG dd dZHG dd dZIddddddZJd d dd!d"d#ZKdS )$z<Internal class to monitor a topology of one or more servers.    )annotationsN)
TYPE_CHECKINGAnyCallableDictListMappingOptionalSetTuplecast)_csotcommonhelpersperiodic_executor)_ServerSession_ServerSessionPool)	ConfigurationErrorConnectionFailureInvalidOperationNetworkTimeoutNotPrimaryErrorOperationFailurePyMongoErrorServerSelectionTimeoutError
WriteError)Hello)_create_lock)
SrvMonitor)PoolPoolOptions)Server)ServerDescription)	Selectionany_server_selectorarbiter_server_selectorreadable_server_selectorsecondary_server_selectorwritable_server_selector)SRV_POLLING_TOPOLOGIESTOPOLOGY_TYPETopologyDescription)_updated_topology_description_srv_pollingupdated_topology_description)ObjectId)TopologySettings)ClusterTime_Addressz"weakref.ReferenceType[queue.Queue]bool)	queue_refreturnc                 C  sL   |  }|sdS z|  }W n tjk
r4   Y qHY qX |\}}||  qdS )NFT)
get_nowaitqueueEmpty)r3   qeventfnargs r<   4/tmp/pip-unpacked-wheel-oblwsawz/pymongo/topology.pyprocess_events_queueO   s    

r>   c                   @  s  e Zd ZdZddddZdddd	Zd
dddZddddddddZdd
dddddZddddddddZ	ddddddddZ
ddddd d!d"Zdd$d%dd&d'd(Zdd$d%dd&d)d*Zd+dd,d-d.Zd+dd,d/d0Zdd1d2d3d4Zdd%d2d5d6Zddd7d8Zdd9d:d;d<Zd9dd=d>Zd9dd?d@ZdAddBdCZdDddEdFdGZdDddEdHdIZddKddLdMdNZdddOdPZdddQdRZdddSdTZedUddVdWZdXddYdZZddd[d\Z d
dd]d^Z!d_dd`daZ"d_d%ddbdcddZ#deddfdgZ$dddhdiZ%ddjd%dkdldmZ&ddjddkdndoZ'ddjddkdpdqZ(dddrdsZ)dddtduZ*ddvd2dwdxZ+ddvd2dydzZ,dd{d:d|d}Z-d{dd~dZ.ddddZ/dd%dddZ0dKdddZ1dS )Topologyz*Monitor a topology of one or more servers.r/   )topology_settingsc                   s  |j | _ |jj| _| jd k	o"| jj| _| jd k	o6| jj| _d | _d | _	| jsR| jr`t
jdd| _| jr| jd k	stt| j| jj| j ff || _t| | |jd d |}|| _| jr| jd k	stttji d d d | j}| j| jj|| j| j ff |jD ]8}| jr| jd k	st| j| jj|| j ff qt| | _d| _d| _t | _ | j!| j | _"i | _#d | _$d | _%t& | _'| js| jr| jd k	stdd fdd}t(j)t*j+t*j,|dd	}t-.| j|j/ || _	|0  d | _1| jj2d k	r| jj3st4| | j| _1d S )
Nd   )maxsizeFr2   r4   c                     s   t  S N)r>   r<   weakr<   r=   target   s    z!Topology.__init__.<locals>.targetZpymongo_events_thread)intervalZmin_intervalrG   name)5_topology_idZ_pool_options_event_listeners
_listenersZenabled_for_server_publish_serverZenabled_for_topology_publish_tp_events_Topology__events_executorr6   QueueAssertionErrorputZpublish_topology_opened	_settingsr+   Zget_topology_typeZget_server_descriptionsreplica_set_name_descriptionr*   Unknown$publish_topology_description_changedseedsZpublish_server_openedlistserver_descriptions_seed_addresses_opened_closedr   _lockZcondition_class
_condition_servers_pid_max_cluster_timer   _session_poolr   ZPeriodicExecutorr   ZEVENTS_QUEUE_FREQUENCYMIN_HEARTBEAT_INTERVALweakrefrefcloseopen_srv_monitorfqdnload_balancedr   )selfr@   Ztopology_descriptionZ
initial_tdseedrG   executorr<   rE   r=   __init__c   s    
	     

zTopology.__init__NonerC   c              	   C  s   t  }| jdkr|| _nN|| jkrh|| _td | j( | j D ]}|  qF| j	
  W 5 Q R X | j |   W 5 Q R X dS )a  Start monitoring, or restart after a fork.

        No effect if called multiple times.

        .. warning:: Topology is shared among multiple threads and is protected
          by mutual exclusion. Using Topology from a process other than the one
          that initialized it will emit a warning and may result in deadlock. To
          prevent this from happening, MongoClient must be created after any
          forking.

        NzMongoClient opened before fork. May not be entirely fork-safe, proceed with caution. See PyMongo's documentation for details: https://pymongo.readthedocs.io/en/stable/faq.html#is-pymongo-fork-safe)osgetpidrb   warningswarnr_   ra   valuesrh   rd   reset_ensure_opened)rm   pidserverr<   r<   r=   ri      s    


zTopology.openfloatc                 C  s   t  }|d kr| jjS |S rD   )r   	remainingrT   server_selection_timeout)rm   timeoutr<   r<   r=   get_server_selection_timeout   s    z%Topology.get_server_selection_timeoutNz Callable[[Selection], Selection]zOptional[float]zOptional[_Address]zList[Server])selectorr}   addressr4   c              
     sT   |dkr   }n|} j.  |||} fdd|D W  5 Q R  S Q R X dS )aL  Return a list of Servers matching selector, or time out.

        :Parameters:
          - `selector`: function that takes a list of Servers and returns
            a subset of them.
          - `server_selection_timeout` (optional): maximum seconds to wait.
            If not provided, the default value common.SERVER_SELECTION_TIMEOUT
            is used.
          - `address`: optional server address to select.

        Calls self.open() if needed.

        Raises exc:`ServerSelectionTimeoutError` after
        `server_selection_timeout` if no matching servers are found.
        Nc                   s   g | ]}t t |jqS r<   )r   r!   get_server_by_addressr   .0sdrm   r<   r=   
<listcomp>   s    z+Topology.select_servers.<locals>.<listcomp>)r   r_   _select_servers_loop)rm   r   r}   r   Zserver_timeoutr[   r<   r   r=   select_servers   s    

zTopology.select_serverszList[ServerDescription])r   r~   r   r4   c                 C  s   t  }|| }| jj||| jjd}|s|dks:||krZt| | d| d| j| 	  | 
  | jtj | j  t  }| jj||| jjd}q&| j  |S )z7select_servers() guts. Hold the lock when calling this.)Zcustom_selectorr   z, Timeout: zs, Topology Description: )time	monotonicrV   Zapply_selectorrT   Zserver_selectorr   _error_messagedescriptionrx   _request_check_allr`   waitr   re   Zcheck_compatible)rm   r   r~   r   nowZend_timer[   r<   r<   r=   r      s0      
  
zTopology._select_servers_loopr!   c                 C  sN   |  |||}t|dkr"|d S t|d\}}|jj|jjkrF|S |S d S )N   r      )r   lenrandomsamplepoolZoperation_count)rm   r   r}   r   serversZserver1Zserver2r<   r<   r=   _select_server"  s    zTopology._select_serverc                 C  s(   |  |||}t r$t|jj |S )zALike select_servers, but choose a random server if several match.)r   r   Zget_timeoutZset_rttr   Zmin_round_trip_time)rm   r   r}   r   rz   r<   r<   r=   select_server1  s    zTopology.select_serverr1   zOptional[int])r   r}   r4   c                 C  s   |  t||S )a  Return a Server for "address", reconnecting if necessary.

        If the server's type is not known, request an immediate check of all
        servers. Time out after "server_selection_timeout" if the server
        cannot be reached.

        :Parameters:
          - `address`: A (host, port) pair.
          - `server_selection_timeout` (optional): maximum seconds to wait.
            If not provided, the default value
            common.SERVER_SELECTION_TIMEOUT is used.

        Calls self.open() if needed.

        Raises exc:`ServerSelectionTimeoutError` after
        `server_selection_timeout` if no matching servers are found.
        )r   r$   )rm   r   r}   r<   r<   r=   select_server_by_address=  s    z!Topology.select_server_by_addressFr"   r2   )server_description
reset_poolr4   c                 C  s\  | j }|j|j }t||r dS t| j |}|jsD|jr`|jtj	kr`| j
|j}|r`|j  | jsl| jor||k}| jr|s| jdk	st| j| jj|||j| jff || _ |   | |j | jr|s| jdk	st| j| jj|| j | jff | jr*|jtjkr*| j jtkr*| j  |rN| j
|j}|rN|j  | j   dS )ziProcess a new ServerDescription on an opened topology.

        Hold the lock when calling this.
        N)!rV   Z_server_descriptionsr   _is_stale_server_descriptionr-   Zis_readableZis_server_type_knowntopology_typer*   Singlera   getr   readyrM   rN   rO   rR   rS   rL   Z"publish_server_description_changedrJ   _update_servers_receive_cluster_time_no_lockcluster_timerX   rj   rW   r)   rh   rw   r`   
notify_all)rm   r   r   td_oldZsd_oldZnew_tdrz   Zsuppress_eventr<   r<   r=   _process_changeS  sT    









zTopology._process_changec              	   C  s6   | j & | jr(| j|jr(| || W 5 Q R X dS )z>Process a new ServerDescription after an hello call completes.N)r_   r]   rV   
has_serverr   r   )rm   r   r   r<   r<   r=   	on_change  s    	zTopology.on_changezList[Tuple[str, Any]])seedlistr4   c                 C  s`   | j }|jtkrdS t| j || _ |   | jr\| jdk	s>t| j| j	j
|| j | jff dS )z_Process a new seedlist on an opened topology.
        Hold the lock when calling this.
        N)rV   r   r)   r,   r   rN   rO   rR   rS   rL   rX   rJ   )rm   r   r   r<   r<   r=   _process_srv_update  s    
zTopology._process_srv_updatec              	   C  s&   | j  | jr| | W 5 Q R X dS )z?Process a new list of nodes obtained from scanning SRV records.N)r_   r]   r   )rm   r   r<   r<   r=   on_srv_update  s    zTopology.on_srv_updatezOptional[Server])r   r4   c                 C  s   | j |S )aJ  Get a Server or None.

        Returns the current version of the server immediately, even if it's
        Unknown or absent from the topology. Only use this in unittests.
        In driver code, use select_server_by_address, since then you're
        assured a recent view of the server's type and wire protocol version.
        )ra   r   rm   r   r<   r<   r=   r     s    zTopology.get_server_by_addressc                 C  s
   || j kS rD   )ra   r   r<   r<   r=   r     s    zTopology.has_serverc              
   C  sP   | j @ | jj}|tjkr(W 5 Q R  dS t|  d jW  5 Q R  S Q R X dS )z!Return primary's address or None.Nr   )r_   rV   r   r*   ReplicaSetWithPrimaryr(   _new_selectionr   )rm   r   r<   r<   r=   get_primary  s
    
zTopology.get_primaryzSet[_Address])r   r4   c              
   C  sb   | j R | jj}|tjtjfkr2t W  5 Q R  S dd t||  D W  5 Q R  S Q R X dS )z+Return set of replica set member addresses.c                 S  s   h | ]
}|j qS r<   )r   r   r<   r<   r=   	<setcomp>  s     z4Topology._get_replica_set_members.<locals>.<setcomp>N)	r_   rV   r   r*   r   ReplicaSetNoPrimarysetiterr   )rm   r   r   r<   r<   r=   _get_replica_set_members  s    z!Topology._get_replica_set_membersc                 C  s
   |  tS )z"Return set of secondary addresses.)r   r'   r   r<   r<   r=   get_secondaries  s    zTopology.get_secondariesc                 C  s
   |  tS )z Return set of arbiter addresses.)r   r%   r   r<   r<   r=   get_arbiters  s    zTopology.get_arbiterszOptional[ClusterTime]c                 C  s   | j S )z1Return a document, the highest seen $clusterTime.rc   r   r<   r<   r=   max_cluster_time  s    zTopology.max_cluster_timeOptional[Mapping[str, Any]])r   r4   c                 C  s&   |r"| j r|d | j d kr"|| _ d S )NZclusterTimer   rm   r   r<   r<   r=   r     s    z&Topology._receive_cluster_time_no_lockc              	   C  s    | j  | | W 5 Q R X d S rD   )r_   r   r   r<   r<   r=   receive_cluster_time  s    zTopology.receive_cluster_time   int)	wait_timer4   c              	   C  s*   | j  |   | j| W 5 Q R X dS )z=Wake all monitors, wait for at least one to check its server.N)r_   r   r`   r   )rm   r   r<   r<   r=   request_check_all  s    zTopology.request_check_allc                 C  s   | j jtjkr| j jS | j jS )z~Return a list of all data-bearing servers.

        This includes any server that might be selected for an operation.
        )rV   r   r*   r   known_serversreadable_serversr   r<   r<   r=   data_bearing_servers  s    zTopology.data_bearing_serversc                 C  s   g }| j 6 |  D ]&}| j|j }|||jj f qW 5 Q R X |D ]^\}}z|j| W qJ t	k
r } z&t
|d|dd }| |jj|  W 5 d }~X Y qJX qJd S )Nr   F)r_   r   ra   r   appendr   genZget_overallZremove_stale_socketsr   _ErrorContexthandle_errorr   )rm   r   r   rz   Z
generationexcctxr<   r<   r=   update_pool  s    "zTopology.update_poolc              	   C  s   | j t | j D ]}|  q| j | _| j  D ]\}}|| jkr:|| j| _q:| j	rj| j	  d| _
d| _W 5 Q R X | jr| jdk	st| j| jj| jff | js| jr| j  dS )zClear pools and terminate monitors. Topology does not reopen on
        demand. Any further operations will raise
        :exc:`~.errors.InvalidOperation`.
        FTN)r_   ra   rv   rh   rV   rw   r[   itemsr   rj   r]   r^   rN   rO   rR   rS   rL   Zpublish_topology_closedrJ   rM   rP   )rm   rz   r   r   r<   r<   r=   rh     s     


zTopology.closer+   c                 C  s   | j S rD   )rV   r   r<   r<   r=   r   <  s    zTopology.descriptionzList[_ServerSession]c              
   C  s(   | j  | j W  5 Q R  S Q R X dS )z"Pop all session ids from the pool.N)r_   rd   pop_allr   r<   r<   r=   pop_all_sessions@  s    zTopology.pop_all_sessionsc              	   C  s   | j  |   W 5 Q R X d S rD   )r_   _check_session_supportr   r<   r<   r=   _check_implicit_session_supportE  s    z(Topology._check_implicit_session_supportc                 C  s   | j jrtdS | jj}|dkr|| jjtjkrJ| jjsd| 	t
|  d n| jjsd| 	t|  d | jj}|dkr|td|S )z/Internal check for session support on clusters.infNz5Sessions are not supported by this MongoDB deployment)rT   rl   r{   rV   logical_session_timeout_minutesr   r*   r   Zhas_known_serversr   r$   r   r   r&   r   rm   Zsession_timeoutr<   r<   r=   r   I  s*        zTopology._check_session_supportr   c              
   C  s2   | j " |  }| j|W  5 Q R  S Q R X dS )z>Start or resume a server session, or raise ConfigurationError.N)r_   r   rd   get_server_sessionr   r<   r<   r=   r   `  s    zTopology.get_server_session)server_sessionlockr4   c              	   C  s:   |r*| j  | j|| jj W 5 Q R X n| j| d S rD   )r_   rd   return_server_sessionrV   r   Zreturn_server_session_no_lock)rm   r   r   r<   r<   r=   r   f  s     zTopology.return_server_sessionr#   c                 C  s   t | jS )zmA Selection object, initially including all known servers.

        Hold the lock when calling this.
        )r#   Zfrom_topology_descriptionrV   r   r<   r<   r=   r   p  s    zTopology._new_selectionc              	   C  s   | j rtd| jsd| _|   | js.| jr8| j  | jrT| j	j
tkrT| j  | jjr| t| jd td| jdd | j D ]}|  qdS )z[Start monitors, or restart after a fork.

        Hold the lock when calling this.
        z"Cannot use MongoClient after closeTr   r      )okZ	serviceIdZmaxWireVersionN)r^   r   r]   r   rN   rM   rP   ri   rj   r   r   r)   rT   rl   r   r"   r\   r   rJ   ra   rv   rm   rz   r<   r<   r=   rx   w  s$    

zTopology._ensure_openedr   )r   err_ctxr4   c                 C  sp   | j |}|d krdS |j|j|jr.dS |jj}|j}d }|rft	|drft
|jtrf|jd}t||S )NTdetailsZtopologyVersion)ra   r   _poolZstale_generationsock_generation
service_idr   topology_versionerrorhasattr
isinstancer   dict _is_stale_error_topology_version)rm   r   r   rz   Zcur_tvr   error_tvr<   r<   r=   _is_stale_error  s    zTopology._is_stale_errorc           	      C  s`  |  ||rd S | j| }|j}|j}| jjr<|s<|js<d S t|trP|jrPd S t|t	r^d S t|t
tfr t|dr|j}n t|t
rdnd }|jd|}|tjkr|tjk}| jjs| t||d |s|jdkr|| |  n.|js\| jjs| t||d || n<t|tr\| jjsH| t||d || |j  d S )Ncodei{'  r      )r   ra   r   r   rT   rl   completed_handshaker   r   r   r   r   r   r   r   r   r   Z_NOT_PRIMARY_CODESZ_SHUTDOWN_CODESr   r"   max_wire_versionrw   request_checkr   _monitorZcancel_check)	rm   r   r   rz   r   r   Zerr_codedefaultZis_shutting_downr<   r<   r=   _handle_error  s@    

	







zTopology._handle_errorc              	   C  s"   | j  | || W 5 Q R X dS )zHandle an application error.

        May reset the server to Unknown, clear the pool, and request an
        immediate check depending on the error and the context.
        N)r_   r   )rm   r   r   r<   r<   r=   r     s    zTopology.handle_errorc                 C  s   | j  D ]}|  q
dS )z3Wake all monitors. Hold the lock when calling this.N)ra   rv   r   r   r<   r<   r=   r     s    zTopology._request_check_allc              	   C  s  | j   D ]\}}|| jkr| jj|| | || jd}d}| jr\| jdk	r\t	
| j}t|| ||| j| j|d}|| j|< |  q| j| jj}|| j| _||jkr| j| j|j qt| j D ](\}}| j |s|  | j| qdS )zrSync our Servers from TopologyDescription.server_descriptions.

        Hold the lock while calling this.
        )r   Ztopologyr   r@   N)r   r   monitorZtopology_idZ	listenersevents)rV   r[   r   ra   rT   Zmonitor_class_create_pool_for_monitorrM   rO   rf   rg   r!   _create_pool_for_serverrJ   rL   ri   r   is_writabler   Zupdate_is_writablerZ   r   rh   pop)rm   r   r   r   rF   rz   Zwas_writabler<   r<   r=   r     s:    
	


zTopology._update_serversr   c                 C  s   | j || j jS rD   )rT   
pool_classpool_optionsr   r<   r<   r=   r   &  s    z Topology._create_pool_for_serverc                 C  sD   | j j}t|j|j|j|j|j|j|jd|j	d	}| j j
||ddS )NF)	connect_timeoutsocket_timeoutssl_contexttls_allow_invalid_hostnamesZevent_listenersappnamedriverZpause_enabled
server_api)Z	handshake)rT   r   r    r   Z_ssl_contextr   rK   r   r   r   r   )rm   r   optionsZmonitor_pool_optionsr<   r<   r=   r   )  s    z!Topology._create_pool_for_monitorstrc                   s*  | j jtjtjfk}|rd}n| j jtjkr2d}nd}| j jrl|tkrX|rNdS d| S nd| d| dS nt| j 	 }t| j 	 
 }|s|rd	|| jjS d
| S |d j t fdd|dd D }|r dkrd| S |r
t|| js
d| S t S ddd |D S dS )zeFormat an error message if server selection fails.

        Hold the lock when calling this.
        zreplica set membersZmongosesr   zNo primary available for writeszNo %s available for writeszNo z match selector ""z)No {} available for replica set name "{}"zNo %s availabler   c                 3  s   | ]}|j  kV  qd S rD   r   r   rz   r   r<   r=   	<genexpr>f  s     z*Topology._error_message.<locals>.<genexpr>r   NzNo %s found yetz\Could not reach any servers in %s. Replica set is configured with internal hostnames or IPs?,c                 s  s   | ]}|j rt|j V  qd S rD   )r   r   r  r<   r<   r=   r  u  s      )rV   r   r*   r   r   ZShardedr   r(   rZ   r[   rv   formatrT   rU   r   allr   intersectionr\   r   join)rm   r   Zis_replica_setZserver_plural	addressesr   Zsamer<   r   r=   r   =  sH    

zTopology._error_messagec                 C  s*   d}| j sd}d| jj d| | jdS )N zCLOSED < >)r]   	__class____name__rV   )rm   msgr<   r<   r=   __repr__w  s    zTopology.__repr__z>Tuple[Tuple[_Address, ...], Optional[str], Optional[str], str]c                 C  s"   | j }tt|j|j|j|jfS )z?The properties to use for MongoClient/Topology equality checks.)rT   tuplesortedrY   rU   rk   Zsrv_service_name)rm   tsr<   r<   r=   eq_props}  s    zTopology.eq_propsobject)otherr4   c                 C  s    t || jr|  | kS tS rD   )r   r  r  NotImplemented)rm   r  r<   r<   r=   __eq__  s    zTopology.__eq__c                 C  s   t |  S rD   )hashr  r   r<   r<   r=   __hash__  s    zTopology.__hash__)NN)NN)NN)N)F)F)r   )2r  
__module____qualname____doc__rp   ri   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rh   propertyr   r   r   r   r   r   r   rx   r   r   r   r   r   r   r   r   r  r  r  r  r<   r<   r<   r=   r?   `   sn   P"
  !(      ?

	
!B	*:r?   c                   @  s&   e Zd ZdZddddddddZd	S )
r   z.An error with context for SDAM error handling.BaseExceptionr   r2   zOptional[ObjectId]r   r   r   r   r   c                 C  s"   || _ || _|| _|| _|| _d S rD   r!  )rm   r   r   r   r   r   r<   r<   r=   rp     s
    z_ErrorContext.__init__N)r  r  r  r  rp   r<   r<   r<   r=   r     s   r   r   )
current_tvr   r4   c                 C  s8   | dks|dkrdS | d |d kr(dS | d |d kS )z9Return True if the error's topologyVersion is <= current.NF	processIdcounterr<   )r"  r   r<   r<   r=   r     s
    r   r"   )
current_sdnew_sdr4   c                 C  sF   | j |j  }}|dks|dkr"dS |d |d kr6dS |d |d kS )z4Return True if the new topologyVersion is < current.NFr#  r$  )r   )r%  r&  r"  Znew_tvr<   r<   r=   r     s    r   )Lr  
__future__r   rr   r6   r   r   rt   rf   typingr   r   r   r   r   r   r	   r
   r   r   Zpymongor   r   r   r   Zpymongo.client_sessionr   r   Zpymongo.errorsr   r   r   r   r   r   r   r   r   Zpymongo.hellor   Zpymongo.lockr   Zpymongo.monitorr   Zpymongo.poolr   r    Zpymongo.serverr!   Zpymongo.server_descriptionr"   Zpymongo.server_selectorsr#   r$   r%   r&   r'   r(   Zpymongo.topology_descriptionr)   r*   r+   r,   r-   Zbsonr.   Zpymongo.settingsr/   Zpymongo.typingsr0   r1   r>   r?   r   r   r   r<   r<   r<   r=   <module>   sD   0,       1