U
    d                     @  sV  U d Z ddlmZ ddlZddlZddlZddlmZ ddl	m
Z
mZmZmZmZmZmZmZmZmZmZmZmZ ddlZddlmZmZmZmZmZ ddlmZ ddlm Z m!Z!m"Z"m#Z# dd	l$m%Z% zdd
l&m'Z' dZ(W n e)k
r   dZ(Y nX ddl*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2 ddl3m4Z4 ddl5m6Z6 ddl7m8Z8 ddl9m:Z: e
rddlm;Z; ddl<m=Z= ddl>m?Z?m@Z@mAZA ddlBmCZC ddlDmEZE ddlFmGZG ddlHmIZI ddl7mJZJ ddlKmLZLmMZM dZNdZOdZPdZQdZRdZSd ZTd!ZUd"ZVd#ZWd$ZXd%ZYd&ZZeQd'eRd(eSd)iZ[d*d+d,d-Z\ed.d/Z]d0e^d1< d2d3d4d5Z_d6d7d6d8d9d:Z`d;d<d=d>d?Zad@dAdAd<dBdCdDZbe%dEdFdGdHdIgZce%dJdKdLdMdNdOdPdQdRdSdTdUgZddd@dAdVd2d2dWdWdXdYdZd[d\d]d^d_ZedWd@dWdWd`dad\dbdcddZfG dedf dfZgG dgdh dhZhG didj djegZiG dkdl dlehZjG dmdn dnekZlemdojnZodpZpd2dqdrdsdtdudvZqemdwjnZrd2dqdsdxdydzZsemd{jnZtemd|jnZuemd}jnZvd2dAd@d~dddddZwd2dAd@d~ddrddddZxd2dAd@d~dddddZye(re'jzZydd2d6d@dddddddZzd2d@d2d2dAdYdddddZ{d2d@d2d2dAdYddrdd	ddZ|d2d@d2d2dAdYdddddZ}e(rhe'j~Z}dd2d@d2d2dAdYdddd	ddZemdjnZd@d2d2dqdddZd@d2d2drdsdddZd@d2d2dsdddZe(re'jZdd@d2d2ddsdddZG dd dZdZG dd deZd@d2d2ddddZeQdeRdeSdiZd2dAdddddddddZd2dAddddddddZe(r~e'jZd2dAddddddddZd2dAddddddddZe(re'jZd@d2d6dddddddĄZd@d2d6dddddddƄZe(re'jZd@d2d6ddddddǜddɄZG dd˄ d˃ZG dd̈́ d̓ZejejejejiZde^d< dS )zTools for creating `messages
<https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/>`_ to be sent to
MongoDB.

.. note:: This module is for internal use and is generally not needed by
   application developers.
    )annotationsN)BytesIO)TYPE_CHECKINGAnyCallableDictIterableListMappingMutableMappingNoReturnOptionalTupleUnioncast)CodecOptions_decode_selective_dict_to_bson_make_c_stringencode)Int64)_RAW_ARRAY_BSON_OPTIONSDEFAULT_RAW_BSON_OPTIONSRawBSONDocument_inflate_bson)SON)	_cmessageTF)ConfigurationErrorCursorNotFoundDocumentTooLargeExecutionTimeoutInvalidOperationNotPrimaryErrorOperationFailureProtocolError)HelloCompat)_handle_reauth)ReadPreference)WriteConcern)	timedelta)ClientSession)SnappyContextZlibContextZstdContext)MongoClient)_EventListeners)
Connection)ReadConcern)_ServerMode)_Address_DocumentOutii   i?                   s            s           s       s   documents     s   updates     s   deletes     	documentsupdatesZdeletes)insertupdatedeletereplace)Zunicode_decode_error_handlerz!'CodecOptions[Mapping[str, Any]]'_UNICODE_REPLACE_CODEC_OPTIONSintreturnc                   C  s   t ttS )z(Generate a pseudo random 32 bit integer.)randomrandint	MIN_INT32	MAX_INT32 rI   rI   3/tmp/pip-unpacked-wheel-oblwsawz/pymongo/message.py_randintu   s    rK   MutableMapping[str, Any]r2   )specread_preferencerD   c                 C  sJ   |j }|j}|rF|tjj ks(t|dkrFd| kr>td| fg} || d< | S )z-Add $readPreference to spec when appropriate.r5   $query$readPreference)modedocumentr'   ZSECONDARY_PREFERREDlenr   )rM   rN   rQ   rR   rI   rI   rJ   _maybe_add_read_preferencez   s    rT   	ExceptionDict[str, Any])	exceptionrD   c                 C  s   t | | jjdS )z<Convert an Exception into a failure document for publishing.)errmsgZerrtype)str	__class____name__)rW   rI   rI   rJ   _convert_exception   s    r\   rY   Mapping[str, Any])	operationcommandresultrD   c           	      C  s  | dd}d|d}| d| dd}|r| drN|d	dd
id|d< n6d| dd|d}d|krv|d |d< |g|d< |S | dkrt|d |d< nx| dkrd|krd|d dg|d< nP| ddkr|dkr|d d }|d  d|d  d}d|dg|d< |S )z6Convert a legacy write result to write command format.nr   r5   )okra   rX   err Zwtimeout@   T)rX   codeerrInfoZwriteConcernErrorrf      )indexrf   rX   rg   ZwriteErrorsr=   r;   r>   Zupserted)ri   _idZupdatedExistingFr<   urj   q)getrS   )	r^   r_   r`   ZaffectedresrX   errorr>   rj   rI   rI   rJ   _convert_write_result   s*    



rp   )Ztailabler6   )ZoplogReplayrh   )ZnoCursorTimeout   )Z	awaitData    )ZallowPartialResults   )rO   filter)z$orderbysort)z$hinthint)z$commentcomment)z$maxScanZmaxScan)z
$maxTimeMS	maxTimeMS)z$maxmax)z$minmin)z
$returnKeyZ	returnKey)z$showRecordIdshowRecordId)z$showDiskLocr{   )z	$snapshotZsnapshotz1Optional[Union[Mapping[str, Any], Iterable[str]]]Optional[int]r1   Optional[Mapping[str, Any]]Optional[ClientSession]Optional[bool]zSON[str, Any])collrM   
projectionskiplimit
batch_sizeoptionsread_concern	collationsessionallow_disk_userD   c                   s   t d| fg}d|krT|dd | D  d|kr@|d d|kr\|d n||d< |rh||d< |rt||d	< |rt||d
< |dk rd|d< |r||d< |jr|	r|	js|j|d< |r||d< |
dk	r|
|d<  r| fddt D  |S )z!Generate a find command document.findrO   c                 S  s,   g | ]$\}}|t kr t | |fn||fqS rI   )
_MODIFIERS).0keyvalrI   rI   rJ   
<listcomp>   s   z%_gen_find_command.<locals>.<listcomp>$explainrP   rt   r   r   r   r   TZsingleBatch	batchSizeZreadConcernr   NZallowDiskUsec                   s    g | ]\}} |@ r|d fqS )TrI   )r   optr   r   rI   rJ   r      s      )	r   r>   itemspopabslevelin_transactionrR   _OPTIONS)r   rM   r   r   r   r   r   r   r   r   r   cmdrI   r   rJ   _gen_find_command   s>    

r   zOptional[Any]r0   )	cursor_idr   r   max_await_time_msrw   connrD   c                 C  sN   t d| fd|fg}|r ||d< |dk	r0||d< |dk	rJ|jdkrJ||d< |S )z$Generate a getMore command document.getMoreZ
collectionr   Nrx   	   rw   )r   max_wire_version)r   r   r   r   rw   r   r   rI   rI   rJ   _gen_get_more_command  s    	r   c                   @  s   e Zd ZdZdZdZdZdddddddd	ddd
ddddddddZddddZddddZ	dddddZ
d%dddddd Zd&d	ddd!d"d#d$ZdS )'_QueryzA query operation.)flagsdbr   ntoskiprM   fieldscodec_optionsrN   r   r   namer   r   r   clientr   _as_commandexhaustNrB   rY   r]   r}   r   r2   r1   r~   r.   r   bool)r   r   r   r   rM   r   r   rN   r   r   r   r   r   r   r   r   c                 C  sp   || _ || _|| _|| _|| _|| _|| _|| _|| _|	| _	|
| _
|| _|| _|| _|| _d| _d | _|| _d S )Nr   )r   r   r   r   rM   r   r   rN   r   r   r   r   r   r   r   r   r   r   )selfr   r   r   r   rM   r   r   rN   r   r   r   r   r   r   r   r   rI   rI   rJ   __init__4  s$    z_Query.__init__NonerC   c                 C  s
   d | _ d S Nr   r   rI   rI   rJ   resetZ  s    z_Query.resetc                 C  s   | j  d| j S N.r   r   r   rI   rI   rJ   	namespace]  s    z_Query.namespacer0   r   rD   c                 C  sR   d}| j sd}n.|jdkr d}n| jjs>td| jj|jf || j| j |S )NFTrh   zDread concern level of %s is not valid with a max wire version of %d.)	r   r   r   Zok_for_legacyr   r   validate_sessionr   r   )r   r   Zuse_find_cmdrI   rI   rJ   use_command`  s    
z_Query.use_commandFTuple[SON[str, Any], str]r   apply_timeoutrD   c                 C  s  | j dk	r| j S d| jk}t| j| j| j| j| j| j| j| j	| j
| j| j}|rdd| _td|fg}| j}|| |r||d| j| |s|js||| |||| j | j}|jr|jjsttttf |j| j|| j}|r||| || jf| _ | j S )z.Return a find command document for this query.Nr   explainF)r   rM   r   r   r   r   r   r   r   r   r   r   r   r   r   add_server_api	_apply_torN   r   Z_update_read_concernsend_cluster_timer   
_encrypter_bypass_auto_encryptionr   rY   r   encryptr   r   r   )r   r   r   r   r   r   r   rI   rI   rJ   
as_commandp  sB    



"z_Query.as_commandTuple[int, bytes, int])rN   r   use_cmdrD   c              
   C  s   || _ |jr| jdB }n| j}|  }| j}|rn| j|ddd }td|| j|| j|j	d\}}}	}
|||	fS | j
dkr|dp| j
}| jr|rt| j|}n| j}|jrt|tstt||}t||| j|||rdn| j| j|j	dS )	z:Get a query message, possibly setting the secondaryOk bit.   Tr   r   ctxr5   r6   N)rN   rQ   r   r   rM   r   _op_msgr   r   compression_contextr   r   rz   Z	is_mongos
isinstancer   AssertionErrorrT   _queryr   r   )r   rN   r   r   r   nsrM   
request_idmsgsize_	ntoreturnrI   rI   rJ   get_message  sF    

z_Query.get_message)F)F)r[   
__module____qualname____doc__	__slots__conn_mgrr   r   r   r   r   r   r   rI   rI   rI   rJ   r     s   ,& - r   c                   @  s   e Zd ZdZdZdZdddddddd	d
ddddddZddddZddddZdddddZ	d$ddddddZ
d%ddddd d!d"Zd#S )&_GetMorezA getmore operation.)r   r   r   r   r   r   rN   r   r   r   r   r   rw   r   rY   rB   r   r2   r~   r.   r|   r   r   )r   r   r   r   r   rN   r   r   r   r   r   rw   c                 C  sR   || _ || _|| _|| _|| _|| _|| _|| _|	| _|
| _	d | _
|| _|| _d S r   )r   r   r   r   r   rN   r   r   r   r   r   r   rw   )r   r   r   r   r   r   rN   r   r   r   r   r   rw   rI   rI   rJ   r     s    z_GetMore.__init__r   rC   c                 C  s
   d | _ d S r   r   r   rI   rI   rJ   r     s    z_GetMore.resetc                 C  s   | j  d| j S r   r   r   rI   rI   rJ   r     s    z_GetMore.namespacer0   r   c                 C  s2   d}| j sd}n|jdkrd}|| j| j |S )NFTrh   )r   r   r   r   r   )r   r   r   rI   rI   rJ   r   	  s    
z_GetMore.use_commandFr   r   c                 C  s   | j dk	r| j S t| j| j| j| j| j|}| jrF| j|d| j	| |
| ||| j| j | j}|jr|jjsttttf |j| j|| j}|r|j|dd || jf| _ | j S )z1Return a getMore command document for this query.NF)r   )r   r   r   r   r   r   rw   r   r   rN   r   r   r   r   r   r   r   rY   r   r   r   r   r   )r   r   r   r   r   rI   rI   rJ   r     s*    

"z_GetMore.as_commandz0Union[Tuple[int, bytes, int], Tuple[int, bytes]])dummy0r   r   rD   c                 C  st   |   }|j}|rb| j|ddd }| jr2tj}nd}t||| jd| j|jd\}}	}
}||	|
fS t	|| j
| j|S )zGet a getmore message.Tr   r   Nr   )r   r   r   r   _OpMsgEXHAUST_ALLOWEDr   r   r   	_get_morer   r   )r   r   r   r   r   r   rM   r   r   r   r   r   rI   rI   rJ   r   2  s"         
z_GetMore.get_messageN)F)F)r[   r   r   r   r   r   r   r   r   r   r   r   rI   rI   rI   rJ   r     s   $  r   c                      s$   e Zd Zddd fddZ  ZS )_RawBatchQueryr0   r   r   c                   s(   t  | |jdkrdS | js$dS dS Nrh   TFsuperr   r   r   r   r   rZ   rI   rJ   r   H  s    
z_RawBatchQuery.use_commandr[   r   r   r   __classcell__rI   rI   r   rJ   r   G  s   r   c                      s$   e Zd Zddd fddZ  ZS )_RawBatchGetMorer0   r   r   c                   s(   t  | |jdkrdS | js$dS dS r   r   r   r   rI   rJ   r   T  s    
z_RawBatchGetMore.use_commandr   rI   rI   r   rJ   r   S  s   r   c                   @  sl   e Zd ZU dZded< ddd dddZedd	d
dZdd	ddZdddddZ	dddddZ
dS )_CursorAddresszEThe server address (host, port) of a cursor, with namespace property.r   _CursorAddress__namespacer3   rY   )addressr   rD   c                 C  s   t | |}||_|S r   )tuple__new__r   )clsr   r   r   rI   rI   rJ   r   d  s    z_CursorAddress.__new__rC   c                 C  s   | j S )zThe namespace this cursor.)r   r   rI   rI   rJ   r   i  s    z_CursorAddress.namespacerB   c                 C  s   | | j f S r   )r   __hash__r   rI   rI   rJ   r   n  s    z_CursorAddress.__hash__objectr   )otherrD   c                 C  s*   t |tr&t| t|ko$| j|jkS tS r   )r   r   r   r   NotImplementedr   r   rI   rI   rJ   __eq__s  s    
z_CursorAddress.__eq__c                 C  s
   | |k S r   rI   r   rI   rI   rJ   __ne__x  s    z_CursorAddress.__ne__N)r[   r   r   r   __annotations__r   propertyr   r   r   r   rI   rI   rI   rJ   r   _  s   
r   z<iiiiiiB   bytesz.Union[SnappyContext, ZlibContext, ZstdContext]zTuple[int, bytes])r^   datar   rD   c                 C  s>   | |}t }ttt| |dd| t||j}||| fS )zDTakes message data, compresses it, and adds an OP_COMPRESSED header.r   i  )compressrK   _pack_compression_header_COMPRESSION_HEADER_SIZErS   Zcompressor_id)r^   r   r   
compressedr   headerrI   rI   rJ   	_compress  s    

	r  z<iiii)r^   r   rD   c                 C  s(   t  }tdt| |d| }||| fS )ztTakes message data and adds a message header based on the operation.

    Returns the resultant message string.
    rq   r   )rK   _pack_headerrS   )r^   r   ridmessagerI   rI   rJ   __pack_message  s    r  z<iz<IBz<Bz!Optional[List[Mapping[str, Any]]]r   zTuple[bytes, int, int])r   r_   
identifierdocsoptsrD   c                   s   t |d }t| d}t|}d}|r|dk	rtd}	t|}
 fdd|D }t|
tdd |D  d	 }t|}||7 }td
d |D }|||	||
f|}n||g}d|||fS )zGet a OP_MSG message.

    Note: this method handles multiple documents in a type one payload but
    it does not perform batch splitting and the total message size is
    only checked *after* generating the entire message.
    Fr   Nr5   c                   s   g | ]}t |d  qS )F)r   r   docr
  rI   rJ   r     s     z%_op_msg_no_header.<locals>.<listcomp>c                 s  s   | ]}t |V  qd S r   rS   r  rI   rI   rJ   	<genexpr>  s     z$_op_msg_no_header.<locals>.<genexpr>r   c                 s  s   | ]}t |V  qd S r   r  r  rI   rI   rJ   r    s     r7   )	r   _pack_op_msg_flags_typerS   
_pack_byter   sum	_pack_intry   join)r   r_   r  r	  r
  encodedZ
flags_type
total_sizemax_doc_sizeZtype_oneZcstringZencoded_docsr   Zencoded_sizer   rI   r  rJ   _op_msg_no_header  s    
r  zTuple[int, bytes, int, int])r   r_   r  r	  r
  r   rD   c           
      C  s2   t | ||||\}}}td||\}	}|	|||fS )zInternal OP_MSG message helper.  )r  r  )
r   r_   r  r	  r
  r   r   r  max_bson_sizer  rI   rI   rJ   _op_msg_compressed  s    	r  c           
      C  s0   t | ||||\}}}td|\}}	||	||fS )z*Internal compressed OP_MSG message helper.r  )r  r  )
r   r_   r  r	  r
  r   r  r  r   Z
op_messagerI   rI   rJ   _op_msg_uncompressed  s    r  zOptional[_ServerMode]z4Union[SnappyContext, ZlibContext, ZstdContext, None])r   r_   dbnamerN   r
  r   rD   c           	   	   C  s   ||d< |dk	r(d|kr(|j r(|j|d< tt|}zt| }||}W n tk
rf   d}d}Y nX z.|rt| |||||W S t| ||||W S |r|||< X dS )zGet a OP_MSG message.$dbNrP   rd   )	rQ   rR   nextiter
_FIELD_MAPr   KeyErrorr  r  )	r   r_   r  rN   r
  r   r   r  r	  rI   rI   rJ   r     s"    	

r   zTuple[bytes, int])r   collection_namenum_to_skipnum_to_returnqueryfield_selectorr
  rD   c           
      C  s^   t |d|}|rt |d|}nd}tt|t|}	dt| t|t|t|||g|	fS )zGet an OP_QUERY message.Fr7   )r   ry   rS   r  r  r   )
r   r#  r$  r%  r&  r'  r
  r  Zefsr  rI   rI   rJ   _query_impl  s     

r(  r   )	r   r#  r$  r%  r&  r'  r
  r   rD   c                 C  s2   t | ||||||\}}	td||\}
}|
||	fS )z)Internal compressed query message helper.  )r(  r  )r   r#  r$  r%  r&  r'  r
  r   op_queryr  r  r   rI   rI   rJ   _query_compressed#  s          r+  c                 C  s0   t | ||||||\}}td|\}	}
|	|
|fS )zInternal query message helper.r)  )r(  r  )r   r#  r$  r%  r&  r'  r
  r*  r  r  r   rI   rI   rJ   _query_uncompressed5  s    
      r,  c              	   C  s.   |rt | |||||||S t| ||||||S )zGet a **query** message.)r+  r,  )r   r#  r$  r%  r&  r'  r
  r   rI   rI   rJ   r   J  s(                 r   z<q)r#  r%  r   rD   c                 C  s   d tt| t|t|gS )zGet an OP_GET_MORE message.r7   )r  _ZERO_32r   r  _pack_long_longr#  r%  r   rI   rI   rJ   _get_more_impla  s    r0  )r#  r%  r   r   rD   c                 C  s   t dt| |||S )z+Internal compressed getMore message helper.  )r  r0  r#  r%  r   r   rI   rI   rJ   _get_more_compressedm  s    r3  c                 C  s   t dt| ||S )z Internal getMore message helper.r1  )r  r0  r/  rI   rI   rJ   _get_more_uncompressedw  s    r4  c                 C  s   |rt | |||S t| ||S )zGet a **getMore** message.)r3  r4  r2  rI   rI   rJ   r     s    r   c                	   @  s   e Zd ZdZdZddddddddd	d
dZddddddZdddddddZdddddddZe	ddddZ
e	ddddZe	ddddZe	ddd d!Zddd"ddd#d$d%d&Zeddd"dd'd(d)d*Zddddd+d,d-Zdd.d/d0d1d2d3Zdd.d/d0d4d5d6Zd7S )8_BulkWriteContextzCA wrapper around Connection for use with write splitting functions.)db_namer   op_idr   fieldpublish
start_time	listenersr   r   op_typecodecrY   r0   rB   r/   r*   r   )database_namecmd_namer   operation_idr;  r   r<  r=  c	           	      C  sn   || _ || _|| _|| _|j| _|| _t| j | _| jrBt	j	
 nd | _|| _|jrXdnd| _|| _|| _d S )NTF)r6  r   r7  r;  Zenabled_for_commandsr9  r   r!  r8  datetimenowr:  r   r   r   r<  r=  )	r   r>  r?  r   r@  r;  r   r<  r=  rI   rI   rJ   r     s    z_BulkWriteContext.__init__rL   List[Mapping[str, Any]]*Tuple[int, bytes, List[Mapping[str, Any]]]r   r	  rD   c                 C  s<   | j d }t|| j||| j| \}}}|s2td|||fS )N.$cmdcannot do an empty bulk write)r6  _do_batched_op_msgr<  r=  r!   )r   r   r	  r   r   r   to_sendrI   rI   rJ   __batch_command  s    
     
z!_BulkWriteContext.__batch_commandr.   1Tuple[Mapping[str, Any], List[Mapping[str, Any]]]r   r	  r   rD   c                 C  s8   |  ||\}}}| ||||}||| j ||fS r   ) _BulkWriteContext__batch_commandwrite_commandZ_process_responser   )r   r   r	  r   r   r   rI  r`   rI   rI   rJ   execute  s    z_BulkWriteContext.executec                 C  s(   |  ||\}}}| |||d| |S )Nr   )rM  unack_write)r   r   r	  r   r   r   rI  rI   rI   rJ   execute_unack  s    z_BulkWriteContext.execute_unackrC   c                 C  s   | j jS )z#A proxy for SockInfo.max_bson_size.)r   r  r   rI   rI   rJ   r    s    z_BulkWriteContext.max_bson_sizec                 C  s   | j r| jjd S | jjS )z&A proxy for SockInfo.max_message_size.rq   )r   r   max_message_sizer   rI   rI   rJ   rR    s    z"_BulkWriteContext.max_message_sizec                 C  s   | j jS )z*A proxy for SockInfo.max_write_batch_size.)r   max_write_batch_sizer   rI   rI   rJ   rS    s    z&_BulkWriteContext.max_write_batch_sizec                 C  s   | j S )z:The maximum size of a BSON command before batch splitting.)r  r   rI   rI   rJ   max_split_size  s    z _BulkWriteContext.max_split_sizer   r}   )r   r   r   r  r	  rD   c              
   C  sB  | j r<| jdk	sttj | j }| |||}tj }zzX| j||}| j rtj | | }|dk	r~t| j	||}	nddi}	| 
||	| W n tk
r* }
 zt| j r| jdk	sttj | | }t|
trt| j	||
j}nt|
tr|
j}nt|
}| |||  W 5 d}
~
X Y nX W 5 tj | _X |S )zAA proxy for Connection.unack_write that handles event publishing.Nrb   r5   )r9  r:  r   rA  rB  _startr   rP  rp   r   _succeedrU   r   r#   detailsr"   r\   _fail)r   r   r   r   r  r	  durationstartr`   replyexcfailurerI   rI   rJ   rP    s4    	

z_BulkWriteContext.unack_writerV   )r   r   r   r	  rD   c           
   
   C  s   | j r<| jdk	sttj | j }| ||| tj }zz<| j||| j}| j rxtj | | }| 	||| W nh t
k
r } zJ| j rtj | | }t|ttfr|j}	nt|}	| ||	|  W 5 d}~X Y nX W 5 tj | _X |S )zCA proxy for SocketInfo.write_command that handles event publishing.N)r9  r:  r   rA  rB  rU  r   rN  r=  rV  rU   r   r"   r#   rW  r\   rX  )
r   r   r   r   r	  rY  rZ  r[  r\  r]  rI   rI   rJ   rN    s(    	
z_BulkWriteContext.write_command)r   r   r	  rD   c                 C  s0   ||| j < | j|| j|| jj| j| jj |S )zPublish a CommandStartedEvent.)r8  r;  Zpublish_command_startr6  r   r   r7  
service_id)r   r   r   r	  rI   rI   rJ   rU  6  s    
z_BulkWriteContext._startr4   r)   r   )r   r[  rY  rD   c              	   C  s(   | j ||| j|| jj| j| jj dS )z Publish a CommandSucceededEvent.N)r;  Zpublish_command_successr   r   r   r7  r^  )r   r   r[  rY  rI   rI   rJ   rV  E  s    z_BulkWriteContext._succeed)r   r]  rY  rD   c              	   C  s(   | j ||| j|| jj| j| jj dS )zPublish a CommandFailedEvent.N)r;  Zpublish_command_failurer   r   r   r7  r^  )r   r   r]  rY  rI   rI   rJ   rX  Q  s    z_BulkWriteContext._failN)r[   r   r   r   r   r   rM  rO  rQ  r   r  rR  rS  rT  rP  r&   rN  rU  rV  rX  rI   rI   rI   rJ   r5    s(   (r5  i    c                   @  s\   e Zd ZdZddddddZdddd	d
ddZddddd
ddZeddddZdS )_EncryptedBulkWriteContextrI   rL   rC  z.Tuple[Dict[str, Any], List[Mapping[str, Any]]]rE  c                 C  s^   | j d }t|| j||| j| \}}|s0td|ddd }tt||d  t}||fS )NrF  rG  r9   r   r   )	r6  _encode_batched_write_commandr<  r=  r!   ri   r   
memoryviewr   )r   r   r	  r   r   rI  Z	cmd_startrI   rI   rJ   rJ  h  s    
     z*_EncryptedBulkWriteContext.__batch_commandr.   rK  rL  c                 C  s4   |  ||\}}| jj| j|| j| j|d}||fS )N)r   r   r   ))_EncryptedBulkWriteContext__batch_commandr   r_   r6  r=  r   )r   r   r	  r   batched_cmdrI  r`   rI   rI   rJ   rO  w  s        z"_EncryptedBulkWriteContext.executec                 C  s4   |  ||\}}| jj| j|tdd| j|d |S )Nr   )w)Zwrite_concernr   r   )rb  r   r_   r6  r(   r   )r   r   r	  r   rc  rI  rI   rI   rJ   rQ    s    z(_EncryptedBulkWriteContext.execute_unackrB   rC   c                 C  s   t S )z Reduce the batch splitting size.)_MAX_SPLIT_SIZE_ENCr   rI   rI   rJ   rT    s    z)_EncryptedBulkWriteContext.max_split_sizeN)	r[   r   r   r   rb  rO  rQ  r   rT  rI   rI   rI   rJ   r_  e  s   	r_  r   )r^   doc_sizemax_sizerD   c                 C  s,   | dkrt d||f nt | ddS )z-Internal helper for raising DocumentTooLarge.r=   zfBSON document too large (%d bytes) - the connected server supports BSON document sizes up to %d bytes.z command document too largeN)r   )r^   rf  rg  rI   rI   rJ   _raise_document_too_large  s    	rh  s
   documents s   updates s   deletes rC  r   _BytesIOz#Tuple[List[Mapping[str, Any]], int])r^   r_   r	  ackr
  r   bufrD   c                 C  sp  |j }|j}|j}	|rdnd}
||
 |d |t|d| |d | }|d z|t|   W n tk
r   tdY nX g }d}|D ]}t|d|}t	|}| | }|dko||	k}| o||k}|s|rt
t |  }t|t	|| ||	kr qD|| || |d7 }||kr qDq| }|| |t||  ||fS )	zCreate a batched OP_MSG write.r:   s      r9   F   Unknown commandr   r5   )r  rS  rR  writer   tell_OP_MSG_MAPr"  r!   rS   listr!  keysrh  appendseekr  )r^   r_   r	  rj  r
  r   rk  r  rS  rR  r   Zsize_locationrI  idxr  valueZ
doc_lengthZnew_message_sizedoc_too_largeZunacked_doc_too_largewrite_oplengthrI   rI   rJ   _batched_op_msg_impl  sH    









rz  z%Tuple[bytes, List[Mapping[str, Any]]])r^   r_   r	  rj  r
  r   rD   c           	      C  s*   t  }t| ||||||\}}| |fS )zOEncode the next batched insert, update, or delete operation
    as OP_MSG.
    )ri  rz  getvalue)	r^   r_   r	  rj  r
  r   rk  rI  r   rI   rI   rJ   _encode_batched_op_msg  s    r|  rD  c           
      C  sD   t | |||||\}}|jjdk	s&ttd||jj\}}	||	|fS )z]Create the next batched insert, update, or delete operation
    with OP_MSG, compressed.
    Nr  )r|  r   r   r   r  )
r^   r_   r	  rj  r
  r   r   rI  r   r   rI   rI   rJ   _batched_op_msg_compressed  s    r}  c           
      C  sv   t  }|t |d t| ||||||\}}|d t }	|t|	 |d |t| |	| |fS )z"OP_MSG implementation entry point.s         r   r   )ri  rn  _ZERO_64rz  rt  rK   r  r{  )
r^   r_   r	  rj  r
  r   rk  rI  ry  r   rI   rI   rJ   _batched_op_msg  s    	



r  )r   r^   r_   r	  r
  r   rD   c                 C  sb   |  ddd |d< d|kr2t|d dd}nd}|jjrPt||||||S t||||||S )zRCreate the next batched insert, update, or delete operation
    using OP_MSG.
    r   r5   r   r  ZwriteConcernrd  T)splitr   rm   r   r   r}  r  )r   r^   r_   r	  r
  r   rj  rI   rI   rJ   rH  7  s    rH  c           	      C  s*   t  }t| ||||||\}}| |fS )z:Encode the next batched insert, update, or delete command.)ri  _batched_write_command_implr{  )	r   r^   r_   r	  r
  r   rk  rI  r   rI   rI   rJ   r`  O  s    	r`  )r   r^   r_   r	  r
  r   rk  rD   c                 C  s  |j }|j}|t }	|j}
|t || d |t |t |	 }|t| |
dd |  z|t|  W n tk
r   tdY nX |	 d }g }d}|D ]}t|d}t|d|}t||	k}|r
tt | }t|t|| |dko.|	 t| t| |
k}||k}|sD|rJ q|t || |t || || |d7 }q|t |	 }|
| |t|| d  |
| |t||  ||fS )	z(Create a batched OP_QUERY write command.utf8r6   rm  r   r   Fr5   )r  rS  _COMMAND_OVERHEADrT  rn  r-  r   _ZERO_8_SKIPLIMro  rt  truncate_OP_MAPr"  r!   rY   r   rS   rq  r!  rr  rh  _BSONOBJrs  _ZERO_16r  )r   r^   r_   r	  r
  r   rk  r  rS  Zmax_cmd_sizerT  Zcommand_startZ
list_startrI  ru  r  r   rv  rw  rx  Zenough_dataZenough_documentsry  rI   rI   rJ   r  b  sV    



&








r  c                   @  s   e Zd ZdZdZedjZdZ	dddddddZ
d$d
dddddZd	ed	dfd
dddddddZdddddZddddZedddd Zedd d!d"d#Zd	S )%_OpReplyz$A MongoDB OP_REPLY response message.)r   r   number_returnedr;   z<iqiir5   rB   r   c                 C  s    || _ t|| _|| _|| _d S r   )r   r   r   r  r;   )r   r   r   r  r;   rI   rI   rJ   r     s    
z_OpReply.__init__Nr|   r}   zList[bytes]r   user_fieldsrD   c                 C  s   | j d@ r>|dkrtdd|f }d|dd}t|d|n| j d@ rt| j }|d	d |d
 t	j
rt|d
 |n,|ddkrd}t|d
||d|td|d
 |d|| jr| jgS g S )a  Check the response header from the database, without decoding BSON.

        Check the response for errors and unpack.

        Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or
        OperationFailure.

        :Parameters:
          - `cursor_id` (optional): cursor_id we sent to get this response -
            used for raising an informative exception when we get cursor id not
            valid at server response.
        r5   Nz"No cursor id for getMore operationzCursor not found, cursor id: %dr   +   )rb   rX   rf   r6   rb   z$errrf   2   zoperation exceeded time limitzdatabase error: %s)r   r$   r   bsonZBSONr;   decode
setdefault
startswithr%   ZLEGACY_ERRORr"   rm   r    r#   )r   r   r  r   ZerrobjZerror_objectZdefault_msgrI   rI   rJ   raw_response  s4    



  z_OpReply.raw_responseFr   r   List[Dict[str, Any]]r   r   r  legacy_responserD   c                 C  s,   |  | |rt| j|S t| j||S )a  Unpack a response from the database and decode the BSON document(s).

        Check the response for errors and unpack, returning a dictionary
        containing the response data.

        Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or
        OperationFailure.

        :Parameters:
          - `cursor_id` (optional): cursor_id we sent to get this response -
            used for raising an informative exception when we get cursor id not
            valid at server response
          - `codec_options` (optional): an instance of
            :class:`~bson.codec_options.CodecOptions`
          - `user_fields` (optional): Response fields that should be decoded
            using the TypeDecoders from codec_options, passed to
            bson._decode_all_selective.
        )r  r  Z
decode_allr;   _decode_all_selectiver   r   r   r  r  rI   rI   rJ   unpack_response  s    
z_OpReply.unpack_responserV   r   rD   c                 C  s"   | j |d}| jdkst|d S )Unpack a command response.r   r5   r   )r  r  r   )r   r   r	  rI   rI   rJ   command_response  s    z_OpReply.command_responser   rC   c                 C  s   t dS ))Return the bytes of the command response.N)NotImplementedErrorr   rI   rI   rJ   raw_command_response  s    z_OpReply.raw_command_responsec                 C  s   dS )+Is the moreToCome bit set on this response?FrI   r   rI   rI   rJ   more_to_come  s    z_OpReply.more_to_comer   rD   c                 C  s,   |  |\}}}}|dd }| ||||S )z%Construct an _OpReply from raw bytes.   N)UNPACK_FROM)r   r   r   r   r   r  r;   rI   rI   rJ   unpack  s    z_OpReply.unpack)NN)r[   r   r   r   r   structStructunpack_fromr  OP_CODEr   r  rA   r  r  r  r   r  classmethodr  rI   rI   rI   rJ   r    s&      /r  c                   @  s   e Zd ZdZdZedjZdZ	dZ
dZdZdd	d
ddZdi fddddddZdeddfddddddddZdddddZd	dd d!Zeddd"d#Zed	d d$d%d&ZdS )'r   z"A MongoDB OP_MSG response message.)r   r   r  payload_documentz<IBir  r5   r6   i   rB   r   r   r  c                 C  s   || _ || _d S r   r  )r   r   r  rI   rI   rJ   r   /  s    z_OpMsg.__init__Nr|   r}   rC  r  c                 C  s   t t| j|t}|gS )zp
        cursor_id is ignored
        user_fields is used to determine which fields must not be decoded
        )r   r   r  r   )r   r   r  Zinflated_responserI   rI   rJ   r  3  s    	  z_OpMsg.raw_responseFr   r   r  r  c                 C  s   |rt t| j||S )a  Unpack a OP_MSG command response.

        :Parameters:
          - `cursor_id` (optional): Ignored, for compatibility with _OpReply.
          - `codec_options` (optional): an instance of
            :class:`~bson.codec_options.CodecOptions`
          - `user_fields` (optional): Response fields that should be decoded
            using the TypeDecoders from codec_options, passed to
            bson._decode_all_selective.
        )r   r  r  r  r  rI   rI   rJ   r  A  s    z_OpMsg.unpack_responserV   r  c                 C  s   | j |dd S )r  r  r   )r  )r   r   rI   rI   rJ   r  V  s    z_OpMsg.command_responserC   c                 C  s   | j S )r  )r  r   rI   rI   rJ   r  Z  s    z_OpMsg.raw_command_responsec                 C  s   t | j| j@ S )r  )r   r   MORE_TO_COMEr   rI   rI   rJ   r  ^  s    z_OpMsg.more_to_comer  c                 C  s   |  |\}}}|dkrL|| j@ r2td|d|| jA rLtd|d|dkrdtd|dt||d kr|td|dd }| ||S )	z#Construct an _OpMsg from raw bytes.r   z+Unsupported OP_MSG flag checksumPresent: 0xxzUnsupported OP_MSG flags: 0xz#Unsupported OP_MSG payload type: 0x   z$Unsupported OP_MSG reply: >1 sectionN)r  CHECKSUM_PRESENTr$   r  rS   )r   r   r   Zfirst_payload_typeZfirst_payload_sizer  rI   rI   rJ   r  c  s    

z_OpMsg.unpack)r[   r   r   r   r   r  r  r  r  r  r  r  r   r   r  rA   r  r  r  r   r  r  r  rI   rI   rI   rJ   r   "  s,   r   z5Dict[int, Callable[[bytes], Union[_OpReply, _OpMsg]]]_UNPACK_REPLY)NNN)N)N)N)r   
__future__r   rA  rE   r  ior   ri  typingr   r   r   r   r   r	   r
   r   r   r   r   r   r   r  r   r   r   r   r   Z
bson.int64r   Zbson.raw_bsonr   r   r   r   Zbson.sonr   Zpymongor   Z_use_cImportErrorZpymongo.errorsr   r   r   r    r!   r"   r#   r$   Zpymongo.hellor%   Zpymongo.helpersr&   Zpymongo.read_preferencesr'   Zpymongo.write_concernr(   r)   Zpymongo.client_sessionr*   Zpymongo.compression_supportr+   r,   r-   Zpymongo.mongo_clientr.   Zpymongo.monitoringr/   Zpymongo.poolr0   Zpymongo.read_concernr1   r2   Zpymongo.typingsr3   r4   rH   rG   r  Z_INSERTZ_UPDATEZ_DELETEZ_EMPTYr  r  r  r-  r~  r  r  r!  rA   r   rK   rT   r\   rp   r   r   r   r   r   r   r   r   r   r   r  packr   r   r  r  r  r  r  r  r  r  r  r   r(  r+  r,  Z_query_messager   r.  r0  r3  r4  Z_get_more_messager   r5  re  r_  rh  rp  rz  r|  r}  r  rH  r`  r  r  r   r  r  r  rI   rI   rI   rJ   <module>   s6  <
(
   $   &3 :v
 	    
  U.   DMsV  