U
    d4                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	m
Z
mZmZmZmZmZ ddlmZ ddlmZmZmZmZ ddlmZ ddlmZmZ dd	lmZmZmZm Z  dd
l!m"Z"m#Z#m$Z$ ddl%m&Z& ddl'm(Z( e	rrddlm)Z) ddl*m+Z+ ddlm,Z,m-Z-m.Z. ddl/m0Z0 ddl%m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z9m:Z:m;Z;m<Z< ddl=m>Z> e?dj@ZAd?ddddddd d!dd"d#d$d%d&dd'd(ddd)dd*d+d,d-d.ZBe?d/j@ZCefdd%d0d1d2d3d4ZDd5ZEdd6d7d8d9d:ZFeGfejHZHdd0d6d;d<d=d>ZIdS )@z&Internal network layer helper methods.    )annotationsN)TYPE_CHECKINGAnyMappingMutableMappingOptionalSequenceUnion)_decode_all_selective)_csothelpersmessagessl_support)MAX_MESSAGE_SIZE)_NO_COMPRESSION
decompress)NotPrimaryErrorOperationFailureProtocolError_OperationCancelled)_UNPACK_REPLY_OpMsg_OpReply)_is_speculative_authenticate)_errno_from_exception)CodecOptions)ClientSession)SnappyContextZlibContextZstdContext)MongoClient)_EventListeners)
Connection)ReadConcern)_ServerMode)_Address_CollationIn_DocumentOut_DocumentType)WriteConcernz<iiiiTFr"   strzMutableMapping[str, Any]boolzOptional[_ServerMode]zCodecOptions[_DocumentType]zOptional[ClientSession]zOptional[MongoClient]z#Optional[Sequence[Union[str, int]]]zOptional[_Address]zOptional[_EventListeners]zOptional[int]zOptional[ReadConcern]zOptional[_CollationIn]z4Union[SnappyContext, ZlibContext, ZstdContext, None]zOptional[Mapping[str, Any]]zOptional[WriteConcern]r(   )conndbnamespec	is_mongosread_preferencecodec_optionssessionclientcheckallowable_errorsaddress	listenersmax_bson_sizeread_concernparse_write_concern_error	collationcompression_ctx
use_op_msgunacknowledgeduser_fieldsexhaust_allowedwrite_concernreturnc           )      C  s  t t|}|d }d}|}|r<|s<|dk	s0tt||}|rj|rJ|jsj|jrZ|j|d< |rj|||  |dk	rz||d< |dk	o|j	}|rt
j
 }t||}|r| tkrd}|r|jr|jjs|j||| }}|r| || t|| |rh|rtjnd}||rtjndO }tj||||||d\}}}} |r|dk	r| |krt||| ntd|dd|d||\}}}|dk	r||tj krt|||tj  |rt
j
 | }!|dk	st|
dk	st|j||||
| jd	 t
j
 }z~| j| |r,|r,d}"d
di}#nTt | |}"|"j!| _!|"j"||d}$|$d }#|rf|#|#| |rt$j%|#| j&|	|d W n t'k
r }% zt|rt
j
 | |! }&t(|%t)t*fr|%j+}'n
t,|%}'|dk	st|
dk	st|j-|&|'|||
| jd	  W 5 d}%~%X Y nX |rft
j
 | |! }&|dk	s>t|
dk	sLt|j.|&|#|||
| j|d |r|jr|"r|j/|"0 }(t1|(||d }#|#S )a  Execute a command over the socket, or raise socket.error.

    :Parameters:
      - `conn`: a Connection instance
      - `dbname`: name of the database on which to run the command
      - `spec`: a command document as an ordered dict type, eg SON.
      - `is_mongos`: are we connected to a mongos?
      - `read_preference`: a read preference
      - `codec_options`: a CodecOptions instance
      - `session`: optional ClientSession instance.
      - `client`: optional MongoClient instance for updating $clusterTime.
      - `check`: raise OperationFailure if there are errors
      - `allowable_errors`: errors to ignore if `check` is True
      - `address`: the (host, port) of `conn`
      - `listeners`: An instance of :class:`~pymongo.monitoring.EventListeners`
      - `max_bson_size`: The maximum encoded bson size for this server
      - `read_concern`: The read concern for this command.
      - `parse_write_concern_error`: Whether to parse the ``writeConcernError``
        field in the command response.
      - `collation`: The collation for this command.
      - `compression_ctx`: optional compression Context.
      - `use_op_msg`: True if we should use OP_MSG.
      - `unacknowledged`: True if this is an unacknowledged command.
      - `user_fields` (optional): Response fields that should be decoded
        using the TypeDecoders from codec_options, passed to
        bson._decode_all_selective.
      - `exhaust_allowed`: True if we should enable OP_MSG exhaustAllowed.
    z.$cmdFNZreadConcernr;   r   )ctx)
service_idok   )r1   r?   )r:   )rE   speculative_hello)2nextiterAssertionErrorr   Z_maybe_add_read_preferenceZin_transactionleveldocumentZ_update_read_concernZenabled_for_commandsdatetimenowr   lowerr   Z
_encrypterZ_bypass_auto_encryptionZencryptZapply_timeoutr   Zapply_write_concernr   ZMORE_TO_COMEZEXHAUST_ALLOWEDZ_op_msgZ_raise_document_too_largeZ_queryZ_COMMAND_OVERHEADZpublish_command_startrE   r,   sendallreceive_messageZmore_to_comeZunpack_responseZ_process_responser   Z_check_command_responseZmax_wire_version	Exception
isinstancer   r   detailsZ_convert_exceptionZpublish_command_failureZpublish_command_successZdecryptZraw_command_responser
   ))r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   namensrH   origpublishstartflags
request_idmsgsizeZmax_doc_sizeZencoding_durationZreplyZresponse_docZunpacked_docsexcdurationZfailureZ	decrypted ra   3/tmp/pip-unpacked-wheel-oblwsawz/pymongo/network.pycommand>   s    4


            
    


 

     
rc   z<iiBintzUnion[_OpReply, _OpMsg])r,   r\   max_message_sizerB   c              	   C  s(  t  rt  }n | j }|r.t | }nd}tt| d|\}}}}|dk	rn||krnt	d|d||dkrt	d|d||krt	d
|||dkrtt| d	|\}}}	tt| |d
 ||	}
nt| |d |}
zt| }W n. tk
r   t	d|dt Y nX ||
S )z1Receive a raw BSON message or raise socket.error.N   zGot response id z but expected zMessage length (z3) not longer than standard message header size (16)zCMessage length ({!r}) is larger than server max message size ({!r})i  	      zGot opcode )r   get_timeoutZget_deadliner,   
gettimeouttime	monotonic_UNPACK_HEADER_receive_data_on_socketr   format_UNPACK_COMPRESSION_HEADERr   r   KeyErrorkeys)r,   r\   re   deadlinetimeoutlength_Zresponse_toZop_codeZcompressor_iddataZunpack_replyra   ra   rb   rR      s@    


 

rR   g      ?zOptional[float]None)r,   rs   rB   c                 C  s   | j }|r| j}d}t|dr0| dkr0d}nD|r^|t  }|dkrLd}tt|td}nt}| j	j
|d|d}|jrtd|rdS |rtdqdS )	zABlock until at least one byte is read, or a timeout, or a cancel.Fpendingr   T)readrt   zhello cancelledN	timed out)Zcancel_contextr,   hasattrry   rk   rl   maxmin_POLL_TIMEOUTZsocket_checkerselectZ	cancelledr   socketrt   )r,   rs   contextsockZ	timed_outreadable	remainingrt   ra   ra   rb   wait_for_read  s&    r   
memoryview)r,   ru   rs   rB   c              
   C  s   t |}t|}d}||k rzJt| | t rP|d k	rP| t|t  d | j	
||d  }W nV tk
r   tdY n: tk
r } zt|tjkrW Y q W 5 d }~X Y nX |dkrtd||7 }q|S )Nr   r{   zconnection closed)	bytearrayr   r   r   ri   Zset_conn_timeoutr}   rk   rl   r,   	recv_intoBLOCKING_IO_ERRORSr   rt   OSErrorr   errnoZEINTR)r,   ru   rs   bufmv
bytes_readZchunk_lengthr_   ra   ra   rb   rn   <  s&    

rn   )TNNNNNFNNFFNFN)J__doc__
__future__r   rN   r   r   structrk   typingr   r   r   r   r   r   r	   Zbsonr
   Zpymongor   r   r   r   Zpymongo.commonr   Zpymongo.compression_supportr   r   Zpymongo.errorsr   r   r   r   Zpymongo.messager   r   r   Zpymongo.monitoringr   Zpymongo.socket_checkerr   r   Zpymongo.client_sessionr   r   r   r   Zpymongo.mongo_clientr    r!   Zpymongo.poolr"   Zpymongo.read_concernr#   Zpymongo.read_preferencesr$   Zpymongo.typingsr%   r&   r'   r(   Zpymongo.write_concernr)   Structunpackrm   rc   rp   rR   r   r   BlockingIOErrorr   rn   ra   ra   ra   rb   <module>   sd   $
              < **"