U
    d#                 
   @  sh  U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlmZmZmZmZmZmZmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlm Z m!Z!m"Z"m#Z# d dl$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0 d d	l1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z= d d
l>m?Z?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZFmGZGmHZH d dlImJZJmKZK d dlLmMZM d dlNmOZO d dlPmQZQ d dlRmSZS d dlTmUZUmVZV erd dlmWZW d dlXmYZY d dlZm[Z[m\Z\ d dl$m]Z] d dl^m_Z_m`Z`maZambZb d dlcmdZd d dlemfZfmgZg d dlhmiZimjZj d dlkmlZlmmZm d dlnmoZo d dlLmpZp d dlNmqZq d d lrmsZsmtZtmuZu d d!lvmwZw z,d d"lxmyZymzZzm{Z{mxZx d#d$d%d&d'Z|W n& e}k
r   d#d$d%d(d'Z|Y nX d)Z~d*Zd+Ze	jd,krzd dlZW n e}k
rB   d dlZY nX d-d. Zz6eejd/Zeed0d1Zeed2d3ZW 5 Q R X W n ek
r   d1Zd3ZY nX d4d5 Zn$d6d7d#d$d8d9d:Zd6d$d;d<d5Zed=ed>d?e fgfgZd@edA< e	jdBr8e ZedCefdDefdEe fd?e fgedF< nNe	jdGkredCe fdDe fdEe fd?e d  fgedF< ne	jd,kredCe fdDdHe e ffdEe fd?dIe dJdK fgedF< ne	jdLr$e dM \ZZZedCefdDefdEefd?efgedF< nbee e e ZedCe fdDdHdNdO eddP D fdEe fd?edP fgedF< e dQrdHe dReee	jdSdReee	j fedT< npe	jdLrdHe dReee	jdUdHe e f fedT< n$dHe dReee	jfedT< dVdWdXdYZdVdWdZd[ZdVdWd\d]ZdVdWd^d_Zd7d`dadbdcZdddWdedfZdgZdhd$didjdkZdldm ddndodpdqdrdsdtZdudvdVdwdxdyZG dzd{ d{ZG d|d} d}ZG d~d dZdd{d6dddZdd{ddddZG dd de;ZG dd dZG dd dZG dd dZdS )    )annotationsN)TYPE_CHECKINGAnyDictIteratorListMappingMutableMappingNoReturnOptionalSequenceSetTupleUnion)DEFAULT_CODEC_OPTIONS)SON)__version___csotauthhelpers)_validate_session_write_concern)
MAX_BSON_SIZEMAX_CONNECTINGMAX_IDLE_TIME_SECMAX_MESSAGE_SIZEMAX_POOL_SIZEMAX_WIRE_VERSIONMAX_WRITE_BATCH_SIZEMIN_POOL_SIZEORDERED_TYPESWAIT_QUEUE_TIMEOUT)AutoReconnectConfigurationErrorConnectionFailureDocumentTooLargeExecutionTimeoutInvalidOperationNetworkTimeoutNotPrimaryErrorOperationFailurePyMongoErrorWaitQueueTimeoutError_CertificateError)HelloHelloCompat)_handle_reauth)_create_lock)ConnectionCheckOutFailedReasonConnectionClosedReason_EventListeners)commandreceive_message)ReadPreference)_add_to_command)SERVER_TYPE)SocketChecker)HAS_SNISSLError)CodecOptions)ObjectId)MongoCredential_AuthContext)ClientSession)CompressionSettingsSnappyContextZlibContextZstdContext)
DriverInfo)_OpMsg_OpReply)MongoClient_MongoClientErrorHandler)
SSLContext_sslConn)ReadConcern)_ServerMode)	ServerApi)ClusterTime_Address_CollationIn)WriteConcern)F_GETFDF_SETFD
FD_CLOEXECfcntlintNone)fdreturnc                 C  s   t | t}t | t|tB  dS )z8Set the close-on-exec flag on the given file descriptor.N)rV   rS   rT   rU   )rY   flags r\   0/tmp/pip-unpacked-wheel-oblwsawz/pymongo/pool.py_set_non_inheritable_non_atomicr   s    
r^   c                 C  s   dS )z6Dummy function for platforms that don't provide fcntl.Nr\   )rY   r\   r\   r]   r^   {   s    x   
   	   win32c              	   C  s<   zt | |\}}t|W S  ttfk
r6   | Y S X d S N)winregQueryValueExrW   OSError
ValueError)keynamedefaultvalue_r\   r\   r]   _query   s
    
rm   z2SYSTEM\CurrentControlSet\Services\Tcpip\ParametersZKeepAliveTimei m ZKeepAliveInterval  c                 C  sD   t ttd }t ttd }|tk s,|tk r@| tjd||f d S )Nrn      )min_WINDOWS_TCP_IDLE_MS_MAX_TCP_KEEPIDLE_WINDOWS_TCP_INTERVAL_MS_MAX_TCP_KEEPINTVLZioctlsocketZSIO_KEEPALIVE_VALS)sockZidle_msZinterval_msr\   r\   r]   _set_keepalive_times   s    rw   zsocket.socketstr)rv   
tcp_option	max_valuerZ   c                 C  sX   t t|rTtt|}z*| tj|}||kr<| tj|| W n tk
rR   Y nX d S rc   )hasattrru   getattr
getsockoptIPPROTO_TCP
setsockoptrf   )rv   ry   rz   Zsockoptrj   r\   r\   r]   _set_tcp_option   s    

r   )rv   rZ   c                 C  s(   t | dt t | dt t | dt d S )NTCP_KEEPIDLETCP_KEEPINTVLTCP_KEEPCNT)r   rr   rt   _MAX_TCP_KEEPCNT)rv   r\   r\   r]   rw      s    driver)ri   ZPyMongoversionSON[str, Any]	_METADATAlinuxtyperi   architectureosdarwin -ro      javac                 C  s   g | ]}|r|qS r\   r\   ).0partr\   r\   r]   
<listcomp>   s      r      PyPy.z(Python %s)platformz(%s)boolrZ   c                  C  s*   t drdS t d} | r&| dS dS )NZAWS_LAMBDA_RUNTIME_APITZAWS_EXECUTION_ENVZAWS_Lambda_F)r   getenv
startswith)envr\   r\   r]   
_is_lambda  s    


r   c                   C  s   t tdS )NZFUNCTIONS_WORKER_RUNTIMEr   r   r   r\   r\   r\   r]   _is_azure_func  s    r   c                   C  s   t tdptdS )NZ	K_SERVICEZFUNCTION_NAMEr   r\   r\   r\   r]   _is_gcp_func!  s    r   c                   C  s   t tdS )NZVERCELr   r\   r\   r\   r]   
_is_vercel%  s    r   Optional[int])rh   rZ   c                 C  s8   t | }|sdS z
t|W S  tk
r2   Y dS X dS )zMLike os.getenv but returns an int, or None if the value is missing/malformed.N)r   r   rW   rg   )rh   valr\   r\   r]   _getenv_int)  s    

r   Dict[str, Any]c                  C  s   i } t  t t t fddkr&| S t  rdd| d< td}|rJ|| d< td}|d k	r|| d< nt rtd	| d< nzt rd
| d< td}|r|| d< td}|d k	r|| d< td}|d k	r|| d< n$t rd| d< td}|r|| d< | S )NTro   z
aws.lambdari   Z
AWS_REGIONregionZAWS_LAMBDA_FUNCTION_MEMORY_SIZE	memory_mbz
azure.funczgcp.funcZFUNCTION_REGIONZFUNCTION_MEMORY_MBZFUNCTION_TIMEOUT_SECtimeout_secZvercelZVERCEL_REGION)r   r   r   r   countr   r   r   )r   r   r   r   r\   r\   r]   _metadata_env4  s<    





r   i   MutableMapping[str, Any])metadatarZ   c                 C  s   t t| tkrdS | di d}|r8d|i| d< t t| tkrNdS | di d}|rpd|i| d< t t| tkrdS | dd t t| }|tkrdS |t }| dd}|r|d|  }|r|| d< n| dd dS )zPerform metadata truncation.Nr   ri   r   r   r    )lenbsonencode_MAX_METADATA_SIZEgetpop)r   env_nameZos_typeZencoded_sizeZoverflowplatr\   r\   r]   _truncate_metadataZ  s.    
r   Zfooidnar   	ExceptionOptional[str]r
   )addresserror
msg_prefixrZ   c                 C  s   | \}}|dk	r d|||f }n| d| }|r:|| }t |tjrRt||n,t |trtdt|krtt||n
t||dS )z9Convert a socket.error to ConnectionFailure and raise it.Nz	%s:%d: %sz: 	timed out)
isinstanceru   timeoutr'   r;   rx   r!   )r   r   r   hostportmsgr\   r\   r]   _raise_connection_failure  s    r   zthreading.ConditionOptional[float])	conditiondeadlinerZ   c                 C  s   |r|t   nd }| |S rc   )time	monotonicwait)r   r   r   r\   r\   r]   
_cond_wait  s    r   c                   @  s  e Zd ZdZdZeeeddedddddde	ddddfddddddd	d
dddddd
ddddddZ
eddddZeddddZeddddZeddddZeddd d!Zed
dd"d#Zeddd$d%Zeddd&d'Zeddd(d)Zeddd*d+Zed	dd,d-Zed
dd.d/Zeddd0d1Zeddd2d3Zeddd4d5Zeddd6d7Zed8dd9d:Zeddd;d<Zeddd=d>ZdS )?PoolOptionsaY  Read only connection pool options for a MongoClient.

    Should not be instantiated directly by application developers. Access
    a client's pool options via
    :attr:`~pymongo.client_options.ClientOptions.pool_options` instead::

      pool_opts = client.options.pool_options
      pool_opts.max_pool_size
      pool_opts.min_pool_size

    )Z__max_pool_sizeZ__min_pool_sizeZ__max_idle_time_secondsZ__connect_timeoutZ__socket_timeoutZ__wait_queue_timeoutZ__ssl_contextZ__tls_allow_invalid_hostnamesZ__event_listenersZ	__appnameZ__driverZ
__metadataZ__compression_settingsZ__max_connectingZ__pause_enabledZ__server_apiZ__load_balancedZ__credentialsNFTrW   r   r   zOptional[SSLContext]r   zOptional[_EventListeners]r   zOptional[DriverInfo]zOptional[CompressionSettings]zOptional[ServerApi]Optional[bool]zOptional[MongoCredential])max_pool_sizemin_pool_sizemax_idle_time_secondsconnect_timeoutsocket_timeoutwait_queue_timeoutssl_contexttls_allow_invalid_hostnamesevent_listenersappnamer   compression_settingsmax_connectingpause_enabled
server_apiload_balancedcredentialsc                 C  s  || _ || _|| _|| _|| _|| _|| _|| _|	| _|
| _	|| _
|| _|| _|| _|| _|| _|| _tt| _|
rd|
i| jd< |r|jrdtd d |j| jd d< |jrdtd d |j| jd d< |jrdtd |j| jd< t }|r|| jd< t| j d S )Nri   Zapplicationz{}|{}r   r   r   r   )_PoolOptions__max_pool_size_PoolOptions__min_pool_size#_PoolOptions__max_idle_time_seconds_PoolOptions__connect_timeout_PoolOptions__socket_timeout _PoolOptions__wait_queue_timeout_PoolOptions__ssl_context)_PoolOptions__tls_allow_invalid_hostnames_PoolOptions__event_listeners_PoolOptions__appname_PoolOptions__driver"_PoolOptions__compression_settings_PoolOptions__max_connecting_PoolOptions__pause_enabled_PoolOptions__server_api_PoolOptions__load_balanced_PoolOptions__credentialscopydeepcopyr   _PoolOptions__metadatari   formatr   r   r   r   )selfr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r\   r\   r]   __init__  sJ    



zPoolOptions.__init__r   c                 C  s   | j S )z;A :class:`~pymongo.auth.MongoCredentials` instance or None.)r   r   r\   r\   r]   _credentials  s    zPoolOptions._credentialsr   c                 C  s   i }| j tkr| j |d< | jtkr,| j|d< | jtkrR| jdk	sDt| jd |d< | jtkrx| jdk	sjt| jd |d< | j	t
kr| j	|d< |S )zqThe non-default options this pool was created with.

        Added for CMAP's :class:`PoolCreatedEvent`.
        ZmaxPoolSizeZminPoolSizeNrn   ZmaxIdleTimeMSZwaitQueueTimeoutMSZmaxConnecting)r   r   r   r   r   r   AssertionErrorr   r    r   r   )r   optsr\   r\   r]   non_default_options
  s    







zPoolOptions.non_default_optionsfloatc                 C  s   | j S )aQ  The maximum allowable number of concurrent connections to each
        connected server. Requests to a server will block if there are
        `maxPoolSize` outstanding connections to the requested server.
        Defaults to 100. Cannot be 0.

        When a server's pool has reached `max_pool_size`, operations for that
        server block waiting for a socket to be returned to the pool. If
        ``waitQueueTimeoutMS`` is set, a blocked operation will raise
        :exc:`~pymongo.errors.ConnectionFailure` after a timeout.
        By default ``waitQueueTimeoutMS`` is not set.
        )r   r   r\   r\   r]   r     s    zPoolOptions.max_pool_sizec                 C  s   | j S )zThe minimum required number of concurrent connections that the pool
        will maintain to each connected server. Default is 0.
        )r   r   r\   r\   r]   r   .  s    zPoolOptions.min_pool_sizec                 C  s   | j S )zgThe maximum number of concurrent connection creation attempts per
        pool. Defaults to 2.
        )r   r   r\   r\   r]   r   5  s    zPoolOptions.max_connectingc                 C  s   | j S rc   )r   r   r\   r\   r]   r   <  s    zPoolOptions.pause_enabledc                 C  s   | j S )zThe maximum number of seconds that a connection can remain
        idle in the pool before being removed and replaced. Defaults to
        `None` (no limit).
        )r   r   r\   r\   r]   r   @  s    z!PoolOptions.max_idle_time_secondsc                 C  s   | j S )z>How long a connection can take to be opened before timing out.)r   r   r\   r\   r]   r   H  s    zPoolOptions.connect_timeoutc                 C  s   | j S )zBHow long a send or receive on a socket can take before timing out.)r   r   r\   r\   r]   r   M  s    zPoolOptions.socket_timeoutc                 C  s   | j S )zhHow long a thread will wait for a socket from the pool if the pool
        has no free sockets.
        )r   r   r\   r\   r]   r   R  s    zPoolOptions.wait_queue_timeoutc                 C  s   | j S )zAn SSLContext instance or None.)r   r   r\   r\   r]   _ssl_contextY  s    zPoolOptions._ssl_contextc                 C  s   | j S )z If True skip ssl.match_hostname.)r   r   r\   r\   r]   r   ^  s    z'PoolOptions.tls_allow_invalid_hostnamesc                 C  s   | j S )z2An instance of pymongo.monitoring._EventListeners.)r   r   r\   r\   r]   _event_listenersc  s    zPoolOptions._event_listenersc                 C  s   | j S )zAThe application name, for sending with hello in server handshake.)r   r   r\   r\   r]   r   h  s    zPoolOptions.appnamec                 C  s   | j S )z=Driver name and version, for sending with hello in handshake.)r   r   r\   r\   r]   r   m  s    zPoolOptions.driverc                 C  s   | j S rc   )r   r   r\   r\   r]   _compression_settingsr  s    z!PoolOptions._compression_settingsr   c                 C  s
   | j  S )zCA dict of metadata about the application, driver, os, and platform.)r   r   r   r\   r\   r]   r   v  s    zPoolOptions.metadatac                 C  s   | j S )z'A pymongo.server_api.ServerApi or None.)r   r   r\   r\   r]   r   {  s    zPoolOptions.server_apic                 C  s   | j S )z6True if this Pool is configured in load balanced mode.)r   r   r\   r\   r]   r     s    zPoolOptions.load_balanced)__name__
__module____qualname____doc__	__slots__r   r   r   r    r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r\   r\   r\   r]   r     st   0Er   c                   @  s:   e Zd ZddddZddddZedddd	Zd
S )_CancellationContextrX   r   c                 C  s
   d| _ d S )NFZ
_cancelledr   r\   r\   r]   r     s    z_CancellationContext.__init__c                 C  s
   d| _ dS )zCancel this context.TNr  r   r\   r\   r]   cancel  s    z_CancellationContext.cancelr   c                 C  s   | j S )zWas cancel called?r  r   r\   r\   r]   	cancelled  s    z_CancellationContext.cancelledN)r   r   r  r   r  r  r  r\   r\   r\   r]   r    s   r  c                   @  sD  e Zd ZdZdddddddZd	d
dddZddd	dddZd
dddZd
dddZd
dddZ	ddddZ
ddddZd d!d"dd#d$d%Zd&dd'd(Zeejed)d*d*d*d+d*d*d*d+d)d*d+fd,d-d.d/d0d1d2d3d0d4d5d6d0d0d7d0d&d8d9d:Zd;dd
d<d=d>Zd"d?d@dAdBZd0d
dCdDdEZd;dd
dFdGdHZdd;d/d&dIdJdKZdwd0d
dLdMdNZd6d5d
dOdPdQZdRd
dSdTdUZd
ddVdWZd0ddXdYZd-d5d6d
dZd[d\Zd-d
d]d^d_Zd
dd`daZd0d
dbdcddZ deddfdgZ!dhdidjdkdlZ"dmd0dndodpZ#dmd0dndqdrZ$dddsdtZ%d,ddudvZ&d*S )x
ConnectionzStore a connection with some metadata.

    :Parameters:
      - `conn`: a raw connection object
      - `pool`: a Pool instance
      - `address`: the server's (host, port)
      - `id`: the id of this socket in it's pool
    Union[socket.socket, _sslConn]PoolzTuple[str, int]rW   )connpoolr   idc                 C  s  t || _|| _|| _|| _d| _t | _	d| _
d| _t| _t| _t| _t| _d| _d| _d| _d| _|jj| _|j| _|jj| _d | _t | _ d | _!d | _"d | _#|j$| _%| j%& | _'d| _(d | _)|j*st+ | _)|j| _d| _,d | _-d| _.d| _/d| _0| jj1| _2d| _3d S )NFg        )4weakrefrefpool_refr  r   r  closedr   r   last_checkin_timeperformed_handshakeis_writabler   max_wire_versionr   max_bson_sizer   max_message_sizer   max_write_batch_sizesupports_sessionshello_ok	is_mongosop_msg_enabledr   r   	listenersenabled_for_cmapr   r   compression_contextr9   socket_checkerZoidc_token_gen_idnegotiated_mechsauth_ctxgenpool_genget_overall
generationreadycancel_context	handshaker  more_to_come
service_id
pinned_txnpinned_cursoractiver   last_timeoutconnect_rtt)r   r  r  r   r  r\   r\   r]   r     sL    



zConnection.__init__r   rX   )r   rZ   c                 C  s$   || j krdS || _ | j| dS )z?Cache last timeout to avoid duplicate calls to conn.settimeout.N)r0  r  
settimeout)r   r   r\   r\   r]   set_conn_timeout  s    
zConnection.set_conn_timeoutrH   z"Optional[MutableMapping[str, Any]])clientcmdrZ   c                 C  s   t  }|d kr(| js$| | jj d S t  }|d kr>| j}|| }|dk rzd|dd|d}t|dd|dd| j	|d k	rt
|d |d< | | |S )	Nr   z5operation would exceed time limit, remaining timeout:z.5fz <= network round trip time:2   okerrmsgcodern   Z	maxTimeMS)r   	remainingr+  r3  r   r   Zget_rttr1  r%   r  rW   )r   r4  r5  r   ZrttZmax_time_msr9  r\   r\   r]   apply_timeout  s*      
 
zConnection.apply_timeoutr   c                 C  s   d| _ | jrtd S NT)r-  r.  r   r   r\   r\   r]   pin_txn  s    zConnection.pin_txnc                 C  s   d| _ | jrtd S r=  )r.  r-  r   r   r\   r\   r]   
pin_cursor  s    zConnection.pin_cursorc                 C  s(   |   }|r||  n| tj d S rc   )r  checkin
close_connr2   STALE)r   r  r\   r\   r]   unpin  s    zConnection.unpinr   c                 C  sB   | j js| js| j jr,d| _ttjdfgS ttjdfdgS d S )NTro   )ZhelloOkT)	r   r   r  r   r  r   r.   ZCMDZ
LEGACY_CMDr   r\   r\   r]   	hello_cmd   s    zConnection.hello_cmdzHello[Dict[str, Any]]c                 C  s   |  d d d S rc   )_hellor   r\   r\   r]   hello	  s    zConnection.hellozOptional[ClusterTime]zOptional[Any]r   )cluster_timetopology_versionheartbeat_frequencyrZ   c                 C  sX  |   }| j }d}|rNd| _| jj|d< | jr<| jj|d< | jjrd|d< nJ|d k	r||d< |d k	sjtt|d |d< d}| jj	r| 
| jj	|  |s|d k	r||d	< | jj}|r|jd
kr|jr|jd |j |d< tj|| j}|r| }	|	d k	r|	|d< nd }|r"t }
| jd|d|d}|rHt |
 | _t||d}|j| _|j| _|j| _|j| _|j| _|jd k	| _|j | _ |j!t"j#t"j$t"j%t"j&t"j'fk| _(|j!t"j)k| _*|j!t"j+k| _,|r| jr| j-|j}|| _.d| _/|r|j0| _1|r"|2| |3 r"|| _4| jjrT|j5s<t6d|j5| _5| j78| j5| _9|S )NFTr4  compressionZloadBalancedZtopologyVersionrn   ZmaxAwaitTimeMSz$clusterTimeDEFAULTr   ZsaslSupportedMechsZspeculativeAuthenticateZadmin)publish_eventsexhaust_allowed)	awaitablez`Driver attempted to initialize in load balancing mode, but the server does not support this mode):rD  r  r   r   r   Zcompressorsr   r   rW   r   r3  r   Z	mechanismusernamesourcer   r?   Zfrom_credentialsr   Zspeculate_commandr   r   r4   r1  r-   r  r  r  r  r  Zlogical_session_timeout_minutesr  r  Zserver_typer8   Z	RSPrimaryZRSSecondaryZ	RSArbiterZRSOtherZRSGhostZis_replZ
StandaloneZis_standaloneZMongosr  Zget_compression_contextr   r  Zsasl_supported_mechsr"  parse_responseZspeculate_succeededr#  r,  r"   r%  r   r'  )r   rG  rH  rI  r5  Zperforming_handshakerN  credsr#  Zspeculative_authenticatestartdocrF  ctxr\   r\   r]   rE    s    





zConnection._hellor   c                 C  s4   |  d }|j| _| }|d }t|| j |S Nr   )r5   r+  Zunpack_responser   _check_command_responser  )r   replyZunpacked_docsZresponse_docr\   r\   r]   _next_reply_  s    
zConnection._next_replyTNFrx   r   rM   r<   r   z#Optional[Sequence[Union[str, int]]]zOptional[ReadConcern]zOptional[WriteConcern]zOptional[_CollationIn]zOptional[ClientSession]zOptional[MongoClient]zOptional[Mapping[str, Any]])dbnamespecread_preferencecodec_optionscheckallowable_errorsread_concernwrite_concernparse_write_concern_error	collationsessionr4  retryable_writerL  user_fieldsrM  rZ   c                 C  s,  |  || t||}t|ts(t|}|dksF|jsF|
dksFtd| | |rd|||||  | 	||| |r|| j
nd}t|o|j }| jr| | z@t| ||| j||||||| j|| j||	|
| j| j||||dW S  ttfk
r    Y n. tk
r& } z| | W 5 d}~X Y nX dS )a  Execute a command or raise an error.

        :Parameters:
          - `dbname`: name of the database on which to run the command
          - `spec`: a command document as a dict, SON, or mapping object
          - `read_preference`: a read preference
          - `codec_options`: a CodecOptions instance
          - `check`: raise OperationFailure if there are errors
          - `allowable_errors`: errors to ignore if `check` is True
          - `read_concern`: The read concern for this command.
          - `write_concern`: The write concern for this command.
          - `parse_write_concern_error`: Whether to parse the
            ``writeConcernError`` field in the command response.
          - `collation`: The collation for this command.
          - `session`: optional ClientSession instance.
          - `client`: optional MongoClient for gossipping $clusterTime.
          - `retryable_write`: True if this command is a retryable write.
          - `publish_events`: Should we publish events for this command?
          - `user_fields` (optional): Response fields that should be decoded
            using the TypeDecoders from codec_options, passed to
            bson._decode_all_selective.
        Nz3Collation is unsupported for unacknowledged writes.)rb  rc  Zcompression_ctxZ
use_op_msgunacknowledgedrf  rM  ra  )validate_sessionr   r   r   r   Zacknowledgedr"   add_server_apiZ	_apply_tosend_cluster_timer  r   r  _raise_if_not_writabler4   r  r   r  r   r)   r(   BaseExceptionr   )r   rZ  r[  r\  r]  r^  r_  r`  ra  rb  rc  rd  r4  re  rL  rf  rM  r  rg  r   r\   r\   r]   r4   g  sV    *



zConnection.commandbytes)messagemax_doc_sizerZ   c              
   C  sh   | j dk	r&|| j kr&td|| j f z| j| W n, tk
rb } z| | W 5 d}~X Y nX dS )z}Send a raw BSON message or raise ConnectionFailure.

        If a network exception is raised, the socket is closed.
        NzfBSON document too large (%d bytes) - the connected server supports BSON document sizes up to %d bytes.)r  r$   r  sendallrl  r   )r   rn  ro  r   r\   r\   r]   send_message  s    zConnection.send_messagezUnion[_OpReply, _OpMsg])
request_idrZ   c              
   C  sB   zt | || jW S  tk
r< } z| | W 5 d}~X Y nX dS )zzReceive a raw BSON message or raise ConnectionFailure.

        If any exception is raised, the socket is closed.
        N)r5   r  rl  r   )r   rr  r   r\   r\   r]   r5     s    zConnection.receive_message)rg  rZ   c                 C  s    |r| j stddddddS )z^Raise NotPrimaryError on unacknowledged write if this socket is not
        writable.
        znot primaryr   i{'  r7  N)r  r(   )r   rg  r\   r\   r]   rk    s    
z!Connection._raise_if_not_writable)r   ro  rZ   c                 C  s   |  d | || dS )zSend unack OP_MSG.

        Can raise ConnectionFailure or InvalidDocument.

        :Parameters:
          - `msg`: bytes, an OP_MSG message.
          - `max_doc_size`: size in bytes of the largest document in `msg`.
        TN)rk  rq  )r   r   ro  r\   r\   r]   unack_write  s    	
zConnection.unack_write)rr  r   r]  rZ   c                 C  s2   |  |d | |}||}t|| j |S )zSend "insert" etc. command, returning response as a dict.

        Can raise ConnectionFailure or OperationFailure.

        :Parameters:
          - `request_id`: an int.
          - `msg`: bytes, the command message.
        r   )rq  r5   Zcommand_responser   rW  r  )r   rr  r   r]  rX  resultr\   r\   r]   write_command  s
    

zConnection.write_command)reauthenticaterZ   c                 C  sh   |r| j rd| _d| _| jsd| jj}|r8tj|| |d d| _| jrd| jdk	sRt	| j
| j| j dS )zhAuthenticate to the server if needed.

        Can raise ConnectionFailure or OperationFailure.
        NF)rv  T)r  r#  r(  r   r   r   authenticater  r  r   Zpublish_connection_readyr   r  )r   rv  rR  r\   r\   r]   rw    s    zConnection.authenticate)r4  rd  rZ   c                 C  s   |r|j |k	rtddS )zValidate this session before use with client.

        Raises error if the client is not the one that created the session.
        z9Can only use session with the MongoClient that started itN)Z_clientr&   )r   r4  rd  r\   r\   r]   rh    s    
zConnection.validate_sessionr   )reasonrZ   c                 C  sB   | j r
dS |   |r>| jr>| jdk	s*t| j| j| j| dS )z$Close this connection with a reason.N)r  _close_connr  r  r   publish_connection_closedr   r  )r   rx  r\   r\   r]   rA  #  s    
zConnection.close_connc                 C  sH   | j r
dS d| _ | jr | j  z| j  W n tk
rB   Y nX dS )zClose this connection.NT)r  r)  r  r  closer   r   r\   r\   r]   ry  ,  s    
zConnection._close_connc                 C  s   | j | jS )z?Return True if we know socket has been closed, False otherwise.)r!  Zsocket_closedr  r   r\   r\   r]   conn_closed:  s    zConnection.conn_closed)r4   rd  r4  rZ   c                 C  s   |r| || dS )zAdd $clusterTime.N)Z_send_cluster_time)r   r4   rd  r4  r\   r\   r]   rj  >  s    zConnection.send_cluster_time)r4   rZ   c                 C  s   | j jrt|| j j dS )zAdd server_api parameters.N)r   r   r7   )r   r4   r\   r\   r]   ri  H  s    zConnection.add_server_apic                 C  s   t  | _d S rc   r   r   r  r   r\   r\   r]   update_last_checkin_timeM  s    z#Connection.update_last_checkin_timer  rZ   c                 C  s
   || _ d S rc   )r  )r   r  r\   r\   r]   update_is_writableP  s    zConnection.update_is_writabler   c                 C  s   t  | j S )z9Seconds since this socket was last checked into its pool.r}  r   r\   r\   r]   idle_time_secondsS  s    zConnection.idle_time_secondsrl  r
   )r   rZ   c                 C  s@   | j rd }ntj}| | t|tttfr:t| j	| n d S rc   )
r(  r2   ERRORrA  r   IOErrorrf   r;   r   r   )r   r   rx  r\   r\   r]   r   W  s    
z$Connection._raise_connection_failurer   )otherrZ   c                 C  s   | j |j kS rc   )r  r   r  r\   r\   r]   __eq__r  s    zConnection.__eq__c                 C  s
   | |k S rc   r\   r  r\   r\   r]   __ne__u  s    zConnection.__ne__c                 C  s
   t | jS rc   )hashr  r   r\   r\   r]   __hash__x  s    zConnection.__hash__c                 C  s"   d t| j| jrdpdt| S )NzConnection({}){} at {}z CLOSEDr   )r   reprr  r  r  r   r\   r\   r]   __repr__{  s
    zConnection.__repr__)F)'r   r   r  r  r   r3  r<  r>  r?  rC  rD  rF  rE  rY  r/   r6   ZPRIMARYr   r4   rq  r5   rk  rs  ru  rw  rh  rA  ry  r|  rj  ri  r~  r  r  r   r  r  r  r  r\   r\   r\   r]   r	    s^   	2	S2Z
	
r	  rP   )r   optionsrZ   c                 C  s  | \}}| drlttds$tdttj}t|  z|| |W S  tk
rj   |	   Y nX tj
}tjr|dkrtj}d}t|||tjD ]
}|\}}	}
}}zt||	ttddB |
}W n" tk
r   t||	|
}Y nX t|  zx|tjtjd t }|dkr,|j}n|dkr@td	|| |tjtjd
 t| || |W   S  tk
r } z|}|	  W 5 d}~X Y qX q|dk	r|ntddS )zGiven (host, port) and PoolOptions, connect and return a socket object.

    Can raise socket.error.

    This is a modified version of create_connection from CPython >= 2.7.
    z.sockAF_UNIXz-UNIX-sockets are not supported on this system	localhostNSOCK_CLOEXECr   ro   r   Tzgetaddrinfo failed)endswithr{   ru   r#   r  r^   filenoconnectrf   r{  AF_INEThas_ipv6	AF_UNSPECgetaddrinfoSOCK_STREAMr|   r   r~   TCP_NODELAYr   r;  r   r   r2  
SOL_SOCKETSO_KEEPALIVErw   )r   r  r   r   rv   familyerrresafsocktypeprotodummysar   er\   r\   r]   _create_connection  sR    









r  r
  c              
   C  s  t | |}|j}|dkr(||j |S | d }z"trF|j||d}n
||}W nV tk
rp   |   Y n: tt	fk
r } z|  t
| |d W 5 d}~X Y nX |jr|js|jsztj| |d W n tk
r   |   Y nX ||j |S )zGiven (host, port) and PoolOptions, return a configured socket.

    Can raise socket.error, ConnectionFailure, or _CertificateError.

    Sets socket's SSL and timeout options.
    Nr   )server_hostnamezSSL handshake failed: )hostname)r  r   r2  r   r:   wrap_socketr,   r{  rf   r;   r   verify_modecheck_hostnamer   sslmatch_hostnamegetpeercert)r   r  rv   r   r   Zssl_sockexcr\   r\   r]   _configured_socket  s:    
r  c                   @  s   e Zd ZdZdS )_PoolClosedErrorzZInternal error raised when a thread tries to get a connection from a
    closed pool.
    N)r   r   r  r  r\   r\   r\   r]   r    s   r  c                   @  sZ   e Zd ZddddZddddd	Zddd
dZdddddZddddddZdS )_PoolGenerationrX   r   c                 C  s   t t| _d| _d S rV  )collectionsdefaultdictrW   _generations_generationr   r\   r\   r]   r     s    z_PoolGeneration.__init__Optional[ObjectId]rW   r,  rZ   c                 C  s   |dkr| j S | j| S )z,Get the generation for the given service_id.Nr  r  r   r,  r\   r\   r]   r     s    z_PoolGeneration.getc                 C  s   | j S )z"Get the Pool's overall generation.)r  r   r\   r\   r]   r&    s    z_PoolGeneration.get_overallc                 C  sL   |  j d7  _ |dkr6| jD ]}| j|  d7  < qn| j|  d7  < dS )z2Increment the generation for the given service_id.ro   Nr  r  r\   r\   r]   inc  s
    
z_PoolGeneration.incr   r$  r,  rZ   c                 C  s   ||  |kS )z?Return if the given generation for a given service_id is stale.)r   r   r$  r,  r\   r\   r]   stale  s    z_PoolGeneration.staleN)r   r   r  r   r   r&  r  r  r\   r\   r\   r]   r     s
   	r  c                   @  s   e Zd ZdZdZdZdS )	PoolStatero   r   r   N)r   r   r  PAUSEDREADYCLOSEDr\   r\   r\   r]   r    s   r  c                   @  s>  e Zd Zd<ddddddZdd	d
dZedd	ddZd=dddddddZdddddZd>dddddZ	dd	ddZ
dd	ddZddddd d!Zddd"d#d$Zd?d%d&d'd(d)Zejd@d%d*d'd+d,Zddd-d.d/ZdAd%d&d'd0d1Zd&dd2d3d4Zd&dd2d5d6Zd7d	d8d9Zdd	d:d;ZdS )Br  TrP   r   r   )r   r  r*  c                 C  s  |j rtj| _ntj| _d| _t | _t	 | _
d| _d| _d| _t | _t | _|| _|| _|| _| jo| jjdk	o| jjj| _t| j
| _d| _| jj| _| jstd| _t| j
| _| jj| _ d| _!| jr| jjdk	st"| jj#| j| jj$ d| _%t& | _'d| _(d| _)dS )z
        :Parameters:
          - `address`: a (hostname, port) tuple
          - `options`: a PoolOptions instance
          - `handshake`: whether to call hello for each new Connection
        ro   r   Ninf)*r   r  r  stater  _check_interval_secondsr  dequeconnsr0   lockactive_socketsnext_connection_idr  r  r$  r   getpidpidr   r   r*  r   r  	threading	Condition	size_condrequestsr   r   _max_connecting_condr   _max_connecting_pendingr   Zpublish_pool_createdr   operation_countset_Pool__pinned_socketsncursorsntxns)r   r   r  r*  r\   r\   r]   r   (  sJ    



	


 zPool.__init__rX   r   c              	   C  sP   | j @ | jtjkrBtj| _| jrB| jjd k	s2t| jj| j	 W 5 Q R X d S rc   )
r  r  r  r  r  r   r   r   Zpublish_pool_readyr   r   r\   r\   r]   r(  l  s    z
Pool.readyc                 C  s   | j tjkS rc   )r  r  r  r   r\   r\   r]   r  u  s    zPool.closedNr  )r{  pauser,  rZ   c              	   C  s  | j }| j | jr"W 5 Q R  d S | jjrF|rF| jjsF| j tj }| _ | j	| t
 }| j|krv|| _d| _d| _|d kr| jt  }| _nFt }t }| jD ]$}	|	j|kr||	 q||	 q|}|| _|rtj| _ | j  | j  W 5 Q R X | jj}
|rL|D ]}	|	tj q| jr|
d k	s>t|
| j nJ|tjkr~| jr~|
d k	snt|
j| j|d |D ]}	|	tj  qd S )Nr   )r,  )!r  r  r  r   r   r   r  r  r$  r  r   r  r  r  r  r  r  r  r,  appendr  r  
notify_allr   rA  r2   POOL_CLOSEDr  r   Zpublish_pool_closedr   Zpublish_pool_clearedrB  )r   r{  r  r,  	old_stateZnewpidZsocketsdiscardZkeepr  r  r\   r\   r]   _resety  sL    



zPool._resetr   r  c              	   C  s4   || _ | j | jD ]}|| j  qW 5 Q R X dS )zXUpdates the is_writable attribute on all sockets currently in the
        Pool.
        N)r  r  r  r  )r   r  _socketr\   r\   r]   r    s    
zPool.update_is_writabler  c                 C  s   | j d|d d S )NF)r{  r,  r  r  r\   r\   r]   reset  s    z
Pool.resetc                 C  s   | j ddd d S )NF)r{  r  r  r   r\   r\   r]   reset_without_pause  s    zPool.reset_without_pausec                 C  s   | j dd d S )NT)r{  r  r   r\   r\   r]   r{    s    z
Pool.closerW   r  c                 C  s   | j ||S rc   )r$  r  r  r\   r\   r]   stale_generation  s    zPool.stale_generation)reference_generationrZ   c                 C  s  | j   | jtjkr"W 5 Q R  dS W 5 Q R X | jjdk	r~| j : | jrt| jd  | jjkrt| j }|	t
j q@W 5 Q R X | jV t| j| j | jjkrW 5 Q R  dS | j| jjkrW 5 Q R  dS |  jd7  _W 5 Q R X d}z| j8 | j| jkrW 5 Q R  W xdS |  jd7  _d}W 5 Q R X |  }| j @ | j |krh|	t
j W 5 Q R  W dS | j| W 5 Q R X W 5 |r| j |  jd8  _| j  W 5 Q R X | j |  jd8  _| j  W 5 Q R X X q~dS )zRemoves stale sockets then adds new ones if pool is too small and
        has not been reset. The `reference_generation` argument specifies the
        `generation` at the point in time this operation was requested on the
        pool.
        Nr   ro   FT)r  r  r  r  r   r   r  r  r   rA  r2   IDLEr  r   r  r   r  r  r  notifyr  r  r$  r&  rB  
appendleft)r   r  r  incrementedr\   r\   r]   remove_stale_sockets  sJ    
zPool.remove_stale_socketsz"Optional[_MongoClientErrorHandler]r	  )handlerrZ   c              
   C  s6  | j  | j}|  jd7  _W 5 Q R X | jj}| jrN|dk	s@t|| j| zt| j| j}W nd t	k
r } zF| jr|dk	st|
| j|tj t|tttfrt| j|  W 5 d}~X Y nX t|| | j|}z6| jr|  |j| _|r|j|dd |  W n$ t	k
r0   |tj  Y nX |S )zConnect to Mongo and return a new Connection.

        Can raise ConnectionFailure.

        Note that the pool does not keep a reference to the socket -- you
        must call checkin() when you're done with it.
        ro   NF)Zcompleted_handshake)r  r  r   r   r  r   Zpublish_connection_createdr   r  rl  rz  r2   r  r   r  rf   r;   r   r	  r*  rF  r  Zcontribute_socketrw  rA  )r   r  Zconn_idr  rv   r   r  r\   r\   r]   r    s@      zPool.connectzIterator[Connection]c              	   c  s4  | j j}| jr&|dk	st|| j | j|d}| jrT|dk	sDt|| j|j z
|V  W nT t	k
r   |j
px|j}|rt \}}}||| |s|jr| |  Y nX |j
r| j  | j| |  jd7  _W 5 Q R X nH|jr| j  | j| |  jd7  _W 5 Q R X n|jr0| | dS )a  Get a connection from the pool. Use with a "with" statement.

        Returns a :class:`Connection` object wrapping a connected
        :class:`socket.socket`.

        This method should always be used in a with-statement::

            with pool.get_conn() as connection:
                connection.send_message(msg)
                data = connection.receive_message(op_code, request_id)

        Can raise ConnectionFailure or OperationFailure.

        :Parameters:
          - `handler` (optional): A _MongoClientErrorHandler.
        Nr  ro   )r   r   r  r   Z$publish_connection_check_out_startedr   	_get_connZpublish_connection_checked_outr  rl  r-  r.  sysexc_infohandler/  r@  r  r  addr  r  )r   r  r  r  Zpinnedexc_typeexc_valrl   r\   r\   r]   checkout'  s8    


zPool.checkout)
emit_eventrZ   c                 C  sN   | j tjkrJ| jr:|r:| jjd k	s&t| jj| jt	j
 t| jtd d S )Nzconnection pool paused)r  r  r  r  r   r   r   #publish_connection_check_out_failedr   r1   
CONN_ERRORr   r!   )r   r  r\   r\   r]   _raise_if_not_ready^  s    
 zPool._raise_if_not_readyc                 C  s  | j t kr|   | jrN| jrF| jjdk	s2t| jj	| j
tj td| j |  jd7  _W 5 Q R X t rt }n| jjrt | jj }nd}| jd | jdd | j| jk st| j|s| j| jk r| j  |   | jdd q|  jd7  _W 5 Q R X d}d}d}zB| j |  jd7  _d}W 5 Q R X |dkr\| j | jdd | js| j | j!k st| j|s| js| j | j!k r| j  d}|   | jdd q`z| j" }W n$ t#k
r   |  j d7  _ Y nX W 5 Q R X |r| $|rXd}qBn>z| j%|d}W 5 | j |  j d8  _ | j  W 5 Q R X X qBW n t&k
r   |r|'t(j) | j2 |  jd8  _|r|  jd8  _| j  W 5 Q R X | jr|s| jjdk	st| jj	| j
tj*  Y nX d|_+|S )z8Get or create a Connection. Can raise ConnectionFailure.Nz?Attempted to check out a connection from closed connection poolro   T)r  Fr  ),r  r   r  r  r  r  r   r   r   r  r   r1   r  r  r  r  r   get_timeoutZget_deadliner   r   r   r  r  r  r   r   r  _raise_wait_queue_timeoutr  r  r  r  r  popleft
IndexError	_perishedr  rl  rA  r2   r  r  r/  )r   r  r   r  r  Zemitted_eventr\   r\   r]   r  g  s     



 zPool._get_conn)r  rZ   c              	   C  s  |j }|j}d|_d|_ d|_| j| | jj}| jrT|dk	sDt|	| j
|j | jt krl|   n| jr|tj n|jr| jr|dk	st|| j
|jtj n^| jR | |j|jr|tj n.|  |t| j | j | | j!"  W 5 Q R X | j#d |r*|  j$d8  _$n|r>|  j%d8  _%|  j&d8  _&|  j'd8  _'|  j(d8  _(| j#"  W 5 Q R X dS )zReturn the connection to the pool, or if it's closed discard it.

        :Parameters:
          - `conn`: The connection to check into the pool.
        FNro   ))r-  r.  r/  r  r  r   r   r  r   Zpublish_connection_checked_inr   r  r  r   r  r  r  rA  r2   r  rz  r  r  r  r'  r,  rB  r~  r  r   r  r  r  r  r  r  r  r  r  r  r  )r   r  Ztxncursorr  r\   r\   r]   r@    sL    
  zPool.checkinc                 C  s   |  }| jjdk	r0|| jjkr0|tj dS | jdk	rfd| jksN|| jkrf| rf|tj dS | 	|j
|jr|tj dS dS )a  Return True and close the connection if it is "perished".

        This side-effecty function checks if this socket has been idle for
        for longer than the max idle time, or if the socket has been closed by
        some external network error, or if the socket's generation is outdated.

        Checking sockets lets us avoid seeing *some*
        :class:`~pymongo.errors.AutoReconnect` exceptions on server
        hiccups, etc. We only check if the socket was closed by an external
        error if it has been > 1 second since the socket was checked into the
        pool, to keep performance reasonable - we can't avoid AutoReconnects
        completely anyway.
        NTr   F)r  r   r   rA  r2   r  r  r|  r  r  r'  r,  rB  )r   r  r  r\   r\   r]   r    s&    


zPool._perishedr
   c                 C  s   | j j}| jr*|d k	st|| jtj t	 p8| j j
}| j jrr| j| j | j }td| j j| j| j||td| j j|d S )NzTimeout waiting for connection from the connection pool. maxPoolSize: {}, connections in use by cursors: {}, connections in use by transactions: {}, connections in use by other operations: {}, timeout: {}z\Timed out while checking out a connection from connection pool. maxPoolSize: {}, timeout: {})r   r   r  r   r  r   r1   TIMEOUTr   r  r   r   r  r  r  r+   r   r   )r   r  r   Z	other_opsr\   r\   r]   r  "  s2      zPool._raise_wait_queue_timeoutc                 C  s   | j D ]}|d  qd S rc   )r  rA  )r   r  r\   r\   r]   __del__=  s    
zPool.__del__)T)TN)N)N)N)N)r   r   r  r   r(  r  r  r  r  r  r  r{  r  r  r  
contextlibcontextmanagerr  r  r  r@  r  r  r  r\   r\   r\   r]   r  '  s,   D	   3	8.6	d3$r  )N)
__future__r   r  r  r   r   r   ru   r  r  r  r   r  typingr   r   r   r   r   r   r	   r
   r   r   r   r   r   r   r   Zbson.sonr   Zpymongor   r   r   r   Zpymongo.client_sessionr   Zpymongo.commonr   r   r   r   r   r   r   r   r   r    Zpymongo.errorsr!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   Zpymongo.hellor-   r.   Zpymongo.helpersr/   Zpymongo.lockr0   Zpymongo.monitoringr1   r2   r3   Zpymongo.networkr4   r5   Zpymongo.read_preferencesr6   Zpymongo.server_apir7   Zpymongo.server_typer8   Zpymongo.socket_checkerr9   Zpymongo.ssl_supportr:   r;   r<   Zbson.objectidr=   Zpymongo.authr>   r?   r@   Zpymongo.compression_supportrA   rB   rC   rD   Zpymongo.driver_inforE   Zpymongo.messagerF   rG   Zpymongo.mongo_clientrH   rI   Zpymongo.pyopenssl_contextrJ   rK   Zpymongo.read_concernrL   rM   rN   Zpymongo.typingsrO   rP   rQ   Zpymongo.write_concernrR   rV   rS   rT   rU   r^   ImportErrorrr   rt   r   _winregrd   rm   OpenKeyHKEY_LOCAL_MACHINErh   rq   rs   rf   rw   r   r   __annotations__r   system_namemachinereleasemac_verjoin	win32_verjava_ver_verZ_archsystem_aliasr   Z_aliasedpython_implementationmaprx   pypy_version_infoversion_infor   r   r   r   r   r   r   r   r   r   r   r   r  r	  r  r  r  r  r  r  r\   r\   r\   r]   <module>   sB   <08
 



	











	

	"$
  i   rG0