U
    dQ                     @  s  U d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddl m!Z!m"Z"m#Z# ddl$m%Z%m&Z&m'Z'm(Z( ddl)m*Z*m+Z+ ddl,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2 ddl3m4Z4 ddl5m6Z6 e	rPddl7m8Z8 ddl9m:Z: ddl;m<Z<m=Z=m>Z> dZ?de@d< dZAde@d< dZBde@d< dZCde@d< dZDde@d< dZEd e@d!< G d"d# d#ZFd#d$dd%d&d'd(d)ZGd*d+d,d-d.ZHG d/d0 d0ZIdS )1z<The bulk write operations interface.

.. versionadded:: 2.7
    )annotationsN)MutableMapping)islice)TYPE_CHECKINGAnyDictIteratorListMappingNoReturnOptionalTupleTypeUnion)ObjectId)RawBSONDocument)SON)_csotcommon)ClientSession_validate_session_write_concern)validate_is_document_typevalidate_ok_for_replacevalidate_ok_for_update)BulkWriteErrorConfigurationErrorInvalidOperationOperationFailure)_RETRYABLE_ERROR_CODES_get_wce_doc)_DELETE_INSERT_UPDATE_BulkWriteContext_EncryptedBulkWriteContext_randint)ReadPreference)WriteConcern)
Collection)
Connection)_DocumentOut_DocumentType	_Pipelineint_DELETE_ALL   _DELETE_ONE   
_BAD_VALUE   _UNKNOWN_ERROR@   _WRITE_CONCERN_ERROR)insertupdatedeletezTuple[str, str, str]	_COMMANDSc                   @  sB   e Zd ZdZdddddZddddd	Zdd
ddddZdS )_Runz'Represents a batch of write operations.r-   None)op_typereturnc                 C  s   || _ g | _g | _d| _dS )zInitialize a new Run object.r   N)r=   	index_mapops
idx_offset)selfr=    rC   0/tmp/pip-unpacked-wheel-oblwsawz/pymongo/bulk.py__init__U   s    z_Run.__init__)idxr>   c                 C  s
   | j | S )zGet the original index of an operation in this run.

        :Parameters:
          - `idx`: The Run index that maps to the original index.
        )r?   )rB   rF   rC   rC   rD   index\   s    z
_Run.indexr   )original_index	operationr>   c                 C  s   | j | | j| dS )zAdd an operation to this Run instance.

        :Parameters:
          - `original_index`: The original index of this operation
            within a larger bulk operation.
          - `operation`: The operation document.
        N)r?   appendr@   )rB   rH   rI   rC   rC   rD   addd   s    z_Run.addN)__name__
__module____qualname____doc__rE   rG   rK   rC   rC   rC   rD   r;   R   s   r;   MutableMapping[str, Any]Mapping[str, Any]r<   )runfull_resultoffsetresultr>   c                 C  sT  | dd}| jtkr(|d  |7  < n| jtkrD|d  |7  < n| jtkr| d}|rt|}|D ]}| |d | |d< qh|d | |d  |7  < |d  || 7  < n|d  |7  < |d	  |d	 7  < | d
}|r4|D ]B}| }	|d | }
| |
|	d< | j	|
 |	d< |d
 
|	 qt|}|rP|d 
| dS )z7Merge a write command result into the full bulk result.nr   	nInsertednRemovedupsertedrG   	nUpsertednMatched	nModifiedwriteErrorsopwriteConcernErrorsN)getr=   r!   r    r"   lenrG   extendcopyr@   rJ   r   )rR   rS   rT   rU   ZaffectedrY   Z
n_upserteddocZwrite_errorsreplacementrF   wcerC   rC   rD   _merge_commandp   s6    




rg   r*   r   )rS   r>   c                 C  s(   | d r| d j dd d t| dS )z5Raise a BulkWriteError from the full bulk api result.r]   c                 S  s   | d S )NrG   rC   )errorrC   rC   rD   <lambda>       z)_raise_bulk_write_error.<locals>.<lambda>)keyN)sortr   )rS   rC   rC   rD   _raise_bulk_write_error   s    rm   c                   @  s8  e Zd ZdZdBdddddddd	d
ZeddddZdddddZdCdddddddddddZdDdddddddddZ	dEdddddd d!d"Z
d#dd$d%Zd&dd'd(ZdFd)d*d+d,ddd-d.dd/	d0d1Zd)d*d+d2d3d4d5Zd,d)dd6d7d8Zd,d)d*dd9d:d;Zd,d)d*dd9d<d=Zd*d+d>d?d@dAZdS )G_Bulkz'The private guts of the bulk write API.NzCollection[_DocumentType]boolzOptional[str]zOptional[Any]r<   )
collectionorderedbypass_document_validationcommentletr>   c                 C  s   |j |jjdtdd| _|| _| jdk	r8td| j || _|| _	g | _
d| _|| _d| _d| _d| _d| _d| _d| _d| _d| _d| _dS )zInitialize a _Bulk instance.replace)Zunicode_decode_error_handlerZdocument_class)codec_optionsNrt   FT)Zwith_optionsrv   _replacedictrp   rt   r   r   rs   rq   r@   executedbypass_doc_valuses_collationuses_array_filtersuses_hint_updateuses_hint_deleteis_retryableretryingstarted_retryable_writecurrent_runnext_run)rB   rp   rq   rr   rs   rt   rC   rC   rD   rE      s.    	 
z_Bulk.__init__zType[_BulkWriteContext])r>   c                 C  s"   | j jjj}|r|jstS tS d S N)rp   databaseclientZ
_encrypterZ_bypass_auto_encryptionr$   r#   )rB   Z	encrypterrC   rC   rD   bulk_ctx_class   s    
z_Bulk.bulk_ctx_classr*   )documentr>   c                 C  s:   t d| t|ts&d|ks&t |d< | jt|f dS )z*Add an insert document to the list of ops.r   Z_idN)r   
isinstancer   r   r@   rJ   r!   )rB   r   rC   rC   rD   
add_insert   s    

z_Bulk.add_insertFrQ   z#Union[Mapping[str, Any], _Pipeline]zOptional[Mapping[str, Any]]z!Optional[List[Mapping[str, Any]]]zUnion[str, SON[str, Any], None])selectorr8   multiupsert	collationarray_filtershintr>   c           	      C  s   t | td|fd|fd|fd|fg}|dk	r>d| _||d< |dk	rTd| _||d< |dk	rjd| _||d	< |rtd
| _| jt|f dS )z8Create an update document and add it to the list of ops.qur   r   NTr   ZarrayFiltersr   F)	r   rx   r{   r|   r}   r   r@   rJ   r"   )	rB   r   r8   r   r   r   r   r   cmdrC   rC   rD   
add_update   s     z_Bulk.add_update)r   re   r   r   r   r>   c                 C  sd   t | td|fd|fdd|fg}|dk	r:d| _||d< |dk	rPd| _||d< | jt|f dS )	z8Create a replace document and add it to the list of ops.r   r   )r   Fr   NTr   r   )r   r   r{   r}   r@   rJ   r"   )rB   r   re   r   r   r   r   rC   rC   rD   add_replace   s    	z_Bulk.add_replacer-   )r   limitr   r   r>   c                 C  sb   t d|fd|fg}|dk	r*d| _||d< |dk	r@d| _||d< |tkrNd| _| jt|f dS )z7Create a delete document and add it to the list of ops.r   r   NTr   r   F)r   r{   r~   r.   r   r@   rJ   r    )rB   r   r   r   r   r   rC   rC   rD   
add_delete  s    z_Bulk.add_deletezIterator[Optional[_Run]]c                 c  s\   d}t | jD ]B\}\}}|dkr,t|}n|j|krD|V  t|}||| q|V  dS )ziGenerate batches of operations, batched by type of
        operation, in the order **provided**.
        N)	enumerater@   r;   r=   rK   )rB   rR   rF   r=   rI   rC   rC   rD   gen_ordered  s    

z_Bulk.gen_orderedzIterator[_Run]c                 c  sX   t tt tt tg}t| jD ]\}\}}|| || q |D ]}|jrB|V  qBdS )zbGenerate batches of operations, batched by type of
        operation, in arbitrary order.
        N)r;   r!   r"   r    r   r@   rK   )rB   
operationsrF   r=   rI   rR   rC   rC   rD   gen_unordered)  s    z_Bulk.gen_unorderedzIterator[Any]r'   Optional[ClientSession]r)   rP   zOptional[WriteConcern])		generatorwrite_concernsessionconnop_id	retryablerS   final_write_concernr>   c	              
   C  sr  | j jj}	| j jj}
|
j}| js0t|| _d | _| j}||
| d}|rn| j	slt|d | _| jd krld}t
|j }| |	||||||j| j j}|jt|jk rJ|rt|j|j dkr|p|}t|| j jfd| jfg}| jr| j|d< t|| | jrd|d< | jd k	r2|jttfkr2| j|d< |rf|rT| jsT|  d| _|||tj| ||||
 | | |!|
| t"|j|jd }|j#r(|$|||
\}}|%di }|%d	d
t&krt'(|}t)|||j| t*| t)|||j| d| _	d| _| jr6d|kr6qJn|+|||
}| jt|7  _q| jr`|d r`qn| j | _}qFd S )NFTr/   rq   rs   ZbypassDocumentValidationrt   ZwriteConcernErrorcoder   r]   ),rp   r   namer   _event_listenersr   nextr   Zvalidate_sessionr   r:   r=   r   rv   rA   ra   r@   r   rq   rs   r   Zapply_write_concernrz   rt   r    r"   r   Z_start_retryable_writeZ	_apply_tor&   ZPRIMARYZsend_cluster_timeadd_server_apiZapply_timeoutr   acknowledgedexecuter`   r   rc   deepcopyrg   rm   execute_unack)rB   r   r   r   r   r   r   rS   r   db_namer   	listenersrR   Zlast_runcmd_namebwcr   r@   rU   to_sendrf   fullrC   rC   rD   _execute_command5  s|    








z_Bulk._execute_commandzDict[str, Any])r   r   r   r>   c              	     s   g g dddddg d t  ddddd fdd	}jjj}||}|j|| W 5 Q R X  d
 s~ d rt   S )zExecute using write commands.r   r]   r_   rW   rZ   r[   r\   rX   rY   r   r)   ro   r<   )r   r   r   r>   c              	     s    | ||  d S r   )r   )r   r   r   rS   r   r   rB   r   rC   rD   retryable_bulk  s    z-_Bulk.execute_command.<locals>.retryable_bulkr]   r_   )r%   rp   r   r   Z_tmp_sessionZ_retry_with_sessionr   rm   )rB   r   r   r   r   r   srC   r   rD   execute_command  s"    	
 
z_Bulk.execute_command)r   r   r>   c              
   C  s   | j jj}| j jj}|j}t }| js0t|| _| j}|rt|j	 }| 
|||||d|j	| j j}	|jt|jk rt|| j jfddddifg}
||
 t|j|jd}|	|
||}| jt|7  _qbt|d | _}q6dS )zCExecute write commands with OP_MSG and w=0 writeConcern, unordered.N)rq   FZwriteConcernwr   )rp   r   r   r   r   r%   r   r   r:   r=   r   rv   rA   ra   r@   r   r   r   r   )rB   r   r   r   r   r   r   rR   r   r   r   r@   r   rC   rC   rD   execute_op_msg_no_results  s>    






z_Bulk.execute_op_msg_no_results)r   r   r   r>   c              
   C  sX   g g dddddg d}t  }t }z| ||d||d|| W n tk
rR   Y nX dS )zAExecute write commands with OP_MSG and w=0 WriteConcern, ordered.r   r   NF)r'   r%   r   r   )rB   r   r   r   rS   Zinitial_write_concernr   rC   rC   rD   execute_command_no_results  s0    
z _Bulk.execute_command_no_resultsc                 C  s   | j rtd| jrtd|o&|j }|rD| jrD|jdk rDtd|r`| jr`|jdk r`td| jrntd| j	r| 
|||S | ||S )z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes.	   zPMust be connected to MongoDB 4.4+ to use hint on unacknowledged delete commands.r3   zPMust be connected to MongoDB 4.2+ to use hint on unacknowledged update commands.zGCannot set bypass_document_validation with unacknowledged write concern)r{   r   r|   r   r~   Zmax_wire_versionr}   rz   r   rq   r   r   )rB   r   r   r   ZunackrC   rC   rD   execute_no_results  s(    z_Bulk.execute_no_resultsr   )r   r   r>   c              	   C  s   | j std| jrtdd| _|p,| jj}t||}| jrH|  }n|  }| jj	j
}|js||}| ||| W 5 Q R  dS Q R X n| |||S dS )zExecute operations.zNo operations to executez*Bulk operations can only be executed once.TN)r@   r   ry   rp   r   r   rq   r   r   r   r   r   Z_conn_for_writesr   r   )rB   r   r   r   r   
connectionrC   rC   rD   r   /  s     


z_Bulk.execute)NN)FFNNN)FNN)NN)N)rL   rM   rN   rO   rE   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   rC   rC   rC   rD   rn      s:     !     !       g)'$rn   )JrO   
__future__r   rc   collections.abcr   	itertoolsr   typingr   r   r   r   r	   r
   r   r   r   r   r   Zbson.objectidr   Zbson.raw_bsonr   Zbson.sonr   Zpymongor   r   Zpymongo.client_sessionr   r   Zpymongo.commonr   r   r   Zpymongo.errorsr   r   r   r   Zpymongo.helpersr   r   Zpymongo.messager    r!   r"   r#   r$   r%   Zpymongo.read_preferencesr&   Zpymongo.write_concernr'   Zpymongo.collectionr(   Zpymongo.poolr)   Zpymongo.typingsr*   r+   r,   r.   __annotations__r0   r2   r4   r6   r:   r;   rg   rm   rn   rC   rC   rC   rD   <module>   s<   4 ,