U
    O8c                  
   @   s  d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	Z
ddlmZ ddlmZmZ ddlmZmZmZ ddlmZmZ ddlmZmZ dd	lmZmZmZmZmZ e e!Z"d
Z#e$dddddddgZ%e$ddddddddgZ&ej'dd Z(dd Z)G dd dZ*G dd dZ+G dd deZ,G dd  d eZ-G d!d" d"Z.G d#d$ d$Z/G d%d& d&Z0G d'd( d(eZ1e12d$e/ G d)d* d*ej3Z4G d+d, d,e4Z5G d-d. d.e4Z6dS )/aC  Speeds up S3 throughput by using processes

Getting Started
===============

The :class:`ProcessPoolDownloader` can be used to download a single file by
calling :meth:`ProcessPoolDownloader.download_file`:

.. code:: python

     from s3transfer.processpool import ProcessPoolDownloader

     with ProcessPoolDownloader() as downloader:
          downloader.download_file('mybucket', 'mykey', 'myfile')


This snippet downloads the S3 object located in the bucket ``mybucket`` at the
key ``mykey`` to the local file ``myfile``. Any errors encountered during the
transfer are not propagated. To determine if a transfer succeeded or
failed, use the `Futures`_ interface.


The :class:`ProcessPoolDownloader` can be used to download multiple files as
well:

.. code:: python

     from s3transfer.processpool import ProcessPoolDownloader

     with ProcessPoolDownloader() as downloader:
          downloader.download_file('mybucket', 'mykey', 'myfile')
          downloader.download_file('mybucket', 'myotherkey', 'myotherfile')


When running this snippet, the downloading of ``mykey`` and ``myotherkey``
happen in parallel. The first ``download_file`` call does not block the
second ``download_file`` call. The snippet blocks when exiting
the context manager and blocks until both downloads are complete.

Alternatively, the ``ProcessPoolDownloader`` can be instantiated
and explicitly be shutdown using :meth:`ProcessPoolDownloader.shutdown`:

.. code:: python

     from s3transfer.processpool import ProcessPoolDownloader

     downloader = ProcessPoolDownloader()
     downloader.download_file('mybucket', 'mykey', 'myfile')
     downloader.download_file('mybucket', 'myotherkey', 'myotherfile')
     downloader.shutdown()


For this code snippet, the call to ``shutdown`` blocks until both
downloads are complete.


Additional Parameters
=====================

Additional parameters can be provided to the ``download_file`` method:

* ``extra_args``: A dictionary containing any additional client arguments
  to include in the
  `GetObject <https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object>`_
  API request. For example:

  .. code:: python

     from s3transfer.processpool import ProcessPoolDownloader

     with ProcessPoolDownloader() as downloader:
          downloader.download_file(
               'mybucket', 'mykey', 'myfile',
               extra_args={'VersionId': 'myversion'})


* ``expected_size``: By default, the downloader will make a HeadObject
  call to determine the size of the object. To opt-out of this additional
  API call, you can provide the size of the object in bytes:

  .. code:: python

     from s3transfer.processpool import ProcessPoolDownloader

     MB = 1024 * 1024
     with ProcessPoolDownloader() as downloader:
          downloader.download_file(
               'mybucket', 'mykey', 'myfile', expected_size=2 * MB)


Futures
=======

When ``download_file`` is called, it immediately returns a
:class:`ProcessPoolTransferFuture`. The future can be used to poll the state
of a particular transfer. To get the result of the download,
call :meth:`ProcessPoolTransferFuture.result`. The method blocks
until the transfer completes, whether it succeeds or fails. For example:

.. code:: python

     from s3transfer.processpool import ProcessPoolDownloader

     with ProcessPoolDownloader() as downloader:
          future = downloader.download_file('mybucket', 'mykey', 'myfile')
          print(future.result())


If the download succeeds, the future returns ``None``:

.. code:: python

     None


If the download fails, the exception causing the failure is raised. For
example, if ``mykey`` did not exist, the following error would be raised


.. code:: python

     botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found


.. note::

    :meth:`ProcessPoolTransferFuture.result` can only be called while the
    ``ProcessPoolDownloader`` is running (e.g. before calling ``shutdown`` or
    inside the context manager).


Process Pool Configuration
==========================

By default, the downloader has the following configuration options:

* ``multipart_threshold``: The threshold size for performing ranged downloads
  in bytes. By default, ranged downloads happen for S3 objects that are
  greater than or equal to 8 MB in size.

* ``multipart_chunksize``: The size of each ranged download in bytes. By
  default, the size of each ranged download is 8 MB.

* ``max_request_processes``: The maximum number of processes used to download
  S3 objects. By default, the maximum is 10 processes.


To change the default configuration, use the :class:`ProcessTransferConfig`:

.. code:: python

     from s3transfer.processpool import ProcessPoolDownloader
     from s3transfer.processpool import ProcessTransferConfig

     config = ProcessTransferConfig(
          multipart_threshold=64 * 1024 * 1024,  # 64 MB
          max_request_processes=50
     )
     downloader = ProcessPoolDownloader(config=config)


Client Configuration
====================

The process pool downloader creates ``botocore`` clients on your behalf. In
order to affect how the client is created, pass the keyword arguments
that would have been used in the :meth:`botocore.Session.create_client` call:

.. code:: python


     from s3transfer.processpool import ProcessPoolDownloader
     from s3transfer.processpool import ProcessTransferConfig

     downloader = ProcessPoolDownloader(
          client_kwargs={'region_name': 'us-west-2'})


This snippet ensures that all clients created by the ``ProcessPoolDownloader``
are using ``us-west-2`` as their region.

    N)deepcopy)Config)MAXINTBaseManager)ALLOWED_DOWNLOAD_ARGSMBPROCESS_USER_AGENT)CancelledErrorRetriesExceededError)BaseTransferFutureBaseTransferMeta)S3_RETRYABLE_DOWNLOAD_ERRORSCallArgsOSUtilscalculate_num_partscalculate_range_parameterZSHUTDOWNDownloadFileRequesttransfer_idbucketkeyfilename
extra_argsexpected_sizeGetObjectJobtemp_filenameoffsetc                  c   s   t  } d V  ttj|  d S N)"_add_ignore_handler_for_interruptssignalSIGINT)original_handler r!   :/tmp/pip-unpacked-wheel-6hpttf6a/s3transfer/processpool.pyignore_ctrl_c   s    r#   c                   C   s   t  t jt jS r   )r   r   SIG_IGNr!   r!   r!   r"   r     s    r   c                   @   s$   e Zd Zde de dfddZdS )ProcessTransferConfig   
   c                 C   s   || _ || _|| _dS )au  Configuration for the ProcessPoolDownloader

        :param multipart_threshold: The threshold for which ranged downloads
            occur.

        :param multipart_chunksize: The chunk size of each ranged download.

        :param max_request_processes: The maximum number of processes that
            will be making S3 API transfer-related requests at a time.
        N)multipart_thresholdmultipart_chunksizemax_request_processes)selfr(   r)   r*   r!   r!   r"   __init__  s    zProcessTransferConfig.__init__N)__name__
__module____qualname__r   r,   r!   r!   r!   r"   r%     s   r%   c                   @   s   e Zd Zd$ddZd%ddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# ZdS )&ProcessPoolDownloaderNc                 C   sx   |dkri }t || _|| _|dkr,t | _td| _td| _t | _	d| _
t | _d| _d| _d| _g | _dS )a  Downloads S3 objects using process pools

        :type client_kwargs: dict
        :param client_kwargs: The keyword arguments to provide when
            instantiating S3 clients. The arguments must match the keyword
            arguments provided to the
            `botocore.session.Session.create_client()` method.

        :type config: ProcessTransferConfig
        :param config: Configuration for the downloader
        Ni  F)ClientFactory_client_factory_transfer_configr%   multiprocessingQueue_download_request_queue_worker_queuer   _osutil_started	threadingLock_start_lock_manager_transfer_monitor
_submitter_workers)r+   client_kwargsconfigr!   r!   r"   r,   $  s    

zProcessPoolDownloader.__init__c           
      C   sv   |    |dkri }| | | j }t||||||d}td| | j| t	|||||d}| 
||}	|	S )as  Downloads the object's contents to a file

        :type bucket: str
        :param bucket: The name of the bucket to download from

        :type key: str
        :param key: The name of the key to download from

        :type filename: str
        :param filename: The name of a file to download to.

        :type extra_args: dict
        :param extra_args: Extra arguments that may be passed to the
            client operation

        :type expected_size: int
        :param expected_size: The expected size in bytes of the download. If
            provided, the downloader will not call HeadObject to determine the
            object's size and use the provided value instead. The size is
            needed to determine whether to do a multipart download.

        :rtype: s3transfer.futures.TransferFuture
        :returns: Transfer future representing the download
        N)r   r   r   r   r   r   z%Submitting download file request: %s.)r   r   r   r   r   )_start_if_needed_validate_all_known_argsr>   notify_new_transferr   loggerdebugr6   putr   _get_transfer_future)
r+   r   r   r   r   r   r   download_file_request	call_argsfuturer!   r!   r"   download_fileE  s6    

 z#ProcessPoolDownloader.download_filec                 C   s   |    dS )zhShutdown the downloader

        It will wait till all downloads are complete before returning.
        N)_shutdown_if_neededr+   r!   r!   r"   shutdown{  s    zProcessPoolDownloader.shutdownc                 C   s   | S r   r!   rO   r!   r!   r"   	__enter__  s    zProcessPoolDownloader.__enter__c                 G   s*   t |tr| jd k	r| j  |   d S r   )
isinstanceKeyboardInterruptr>   notify_cancel_all_in_progressrP   )r+   exc_type	exc_valueargsr!   r!   r"   __exit__  s    


zProcessPoolDownloader.__exit__c              	   C   s$   | j  | js|   W 5 Q R X d S r   )r<   r9   _startrO   r!   r!   r"   rC     s    z&ProcessPoolDownloader._start_if_neededc                 C   s"   |    |   |   d| _d S )NT)_start_transfer_monitor_manager_start_submitter_start_get_object_workersr9   rO   r!   r!   r"   rY     s    zProcessPoolDownloader._startc                 C   s4   |D ]*}|t krdt }td| d| qd S )Nz, zInvalid extra_args key 'z', must be one of: )r   join
ValueError)r+   providedkwargZdownload_argsr!   r!   r"   rD     s    
z.ProcessPoolDownloader._validate_all_known_argsc                 C   s   t ||d}t| j|d}|S )N)rK   r   )monitormeta)ProcessPoolTransferMetaProcessPoolTransferFuturer>   )r+   r   rK   rb   rL   r!   r!   r"   rI     s      z*ProcessPoolDownloader._get_transfer_futurec                 C   s.   t d t | _| jt | j | _d S )Nz$Starting the TransferMonitorManager.)rF   rG   TransferMonitorManagerr=   startr   TransferMonitorr>   rO   r!   r!   r"   rZ     s    
z5ProcessPoolDownloader._start_transfer_monitor_managerc                 C   s:   t d t| j| j| j| j| j| jd| _	| j	
  d S )Nz Starting the GetObjectSubmitter.)transfer_configclient_factorytransfer_monitorosutildownload_request_queueworker_queue)rF   rG   GetObjectSubmitterr3   r2   r>   r8   r6   r7   r?   rf   rO   r!   r!   r"   r[     s    
z&ProcessPoolDownloader._start_submitterc                 C   sR   t d| jj t| jjD ]0}t| j| j| j| j	d}|
  | j| qd S )NzStarting %s GetObjectWorkers.)queueri   rj   rk   )rF   rG   r3   r*   rangeGetObjectWorkerr7   r2   r>   r8   rf   r@   appendr+   _Zworkerr!   r!   r"   r\     s    z/ProcessPoolDownloader._start_get_object_workersc              	   C   s$   | j  | jr|   W 5 Q R X d S r   )r<   r9   	_shutdownrO   r!   r!   r"   rN     s    z)ProcessPoolDownloader._shutdown_if_neededc                 C   s"   |    |   |   d| _d S )NF)_shutdown_submitter_shutdown_get_object_workers"_shutdown_transfer_monitor_managerr9   rO   r!   r!   r"   ru     s    zProcessPoolDownloader._shutdownc                 C   s   t d | j  d S )Nz)Shutting down the TransferMonitorManager.)rF   rG   r=   rP   rO   r!   r!   r"   rx     s    
z8ProcessPoolDownloader._shutdown_transfer_monitor_managerc                 C   s$   t d | jt | j  d S )Nz%Shutting down the GetObjectSubmitter.)rF   rG   r6   rH   SHUTDOWN_SIGNALr?   r]   rO   r!   r!   r"   rv     s    
z)ProcessPoolDownloader._shutdown_submitterc                 C   s:   t d | jD ]}| jt q| jD ]}|  q(d S )Nz#Shutting down the GetObjectWorkers.)rF   rG   r@   r7   rH   ry   r]   rs   r!   r!   r"   rw     s
    


z2ProcessPoolDownloader._shutdown_get_object_workers)NN)NN)r-   r.   r/   r,   rM   rP   rQ   rX   rC   rY   rD   rI   rZ   r[   r\   rN   ru   rx   rv   rw   r!   r!   r!   r"   r0   #  s&   
"   
6		
r0   c                   @   s8   e Zd Zdd Zedd Zdd Zdd Zd	d
 ZdS )rd   c                 C   s   || _ || _dS )a`  The future associated to a submitted process pool transfer request

        :type monitor: TransferMonitor
        :param monitor: The monitor associated to the process pool downloader

        :type meta: ProcessPoolTransferMeta
        :param meta: The metadata associated to the request. This object
            is visible to the requester.
        N)_monitor_meta)r+   ra   rb   r!   r!   r"   r,     s    
z"ProcessPoolTransferFuture.__init__c                 C   s   | j S r   )r{   rO   r!   r!   r"   rb     s    zProcessPoolTransferFuture.metac                 C   s   | j | jjS r   )rz   is_doner{   r   rO   r!   r!   r"   done  s    zProcessPoolTransferFuture.donec                 C   s@   z| j | jjW S  tk
r:   | j   |    Y nX d S r   )rz   poll_for_resultr{   r   rS   _connectcancelrO   r!   r!   r"   result  s    
z ProcessPoolTransferFuture.resultc                 C   s   | j | jjt  d S r   )rz   notify_exceptionr{   r   r	   rO   r!   r!   r"   r     s     z ProcessPoolTransferFuture.cancelN)	r-   r.   r/   r,   propertyrb   r}   r   r   r!   r!   r!   r"   rd     s   
rd   c                   @   s<   e Zd ZdZdd Zedd Zedd Zedd	 Zd
S )rc   z2Holds metadata about the ProcessPoolTransferFuturec                 C   s   || _ || _i | _d S r   )_transfer_id
_call_args_user_context)r+   r   rK   r!   r!   r"   r,     s    z ProcessPoolTransferMeta.__init__c                 C   s   | j S r   )r   rO   r!   r!   r"   rK   !  s    z!ProcessPoolTransferMeta.call_argsc                 C   s   | j S r   )r   rO   r!   r!   r"   r   %  s    z#ProcessPoolTransferMeta.transfer_idc                 C   s   | j S r   )r   rO   r!   r!   r"   user_context)  s    z$ProcessPoolTransferMeta.user_contextN)	r-   r.   r/   __doc__r,   r   rK   r   r   r!   r!   r!   r"   rc     s   

rc   c                   @   s   e Zd ZdddZdd ZdS )r1   Nc                 C   sX   || _ | j dkri | _ t| j dt }|js8t|_n| jdt 7  _|| j d< dS )zCreates S3 clients for processes

        Botocore sessions and clients are not pickleable so they cannot be
        inherited across Process boundaries. Instead, they must be instantiated
        once a process is running.
        NrB    )_client_kwargsr   getr   Zuser_agent_extrar   )r+   rA   Zclient_configr!   r!   r"   r,   /  s    
zClientFactory.__init__c                 C   s   t j jd| jS )zCreate a botocore S3 clients3)r   )botocoresessionSessioncreate_clientr   rO   r!   r!   r"   r   A  s
    
 zClientFactory.create_client)N)r-   r.   r/   r,   r   r!   r!   r!   r"   r1   .  s   
r1   c                   @   s\   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd ZdS )rg   c                 C   s   i | _ d| _t | _dS )a@  Monitors transfers for cross-process communication

        Notifications can be sent to the monitor and information can be
        retrieved from the monitor for a particular transfer. This abstraction
        is ran in a ``multiprocessing.managers.BaseManager`` in order to be
        shared across processes.
        r   N)_transfer_states	_id_countr:   r;   
_init_lockrO   r!   r!   r"   r,   I  s    zTransferMonitor.__init__c              
   C   sB   | j 2 | j}t | j|< |  jd7  _|W  5 Q R  S Q R X d S N   )r   r   TransferStater   r+   r   r!   r!   r"   rE   Y  s
    z#TransferMonitor.notify_new_transferc                 C   s   | j | jS )zDetermine a particular transfer is complete

        :param transfer_id: Unique identifier for the transfer
        :return: True, if done. False, otherwise.
        )r   r}   r   r!   r!   r"   r|   `  s    zTransferMonitor.is_donec                 C   s   | j |   dS )zqNotify a particular transfer is complete

        :param transfer_id: Unique identifier for the transfer
        N)r   set_doner   r!   r!   r"   notify_doneh  s    zTransferMonitor.notify_donec                 C   s&   | j |   | j | j}|r"|dS )a  Poll for the result of a transfer

        :param transfer_id: Unique identifier for the transfer
        :return: If the transfer succeeded, it will return the result. If the
            transfer failed, it will raise the exception associated to the
            failure.
        N)r   wait_till_done	exceptionr+   r   r   r!   r!   r"   r~   o  s
    zTransferMonitor.poll_for_resultc                 C   s   || j | _dS )zNotify an exception was encountered for a transfer

        :param transfer_id: Unique identifier for the transfer
        :param exception: The exception encountered for that transfer
        Nr   r   r   r!   r!   r"   r   }  s    
z TransferMonitor.notify_exceptionc                 C   s"   | j  D ]}|js
t |_q
d S r   )r   valuesr}   r	   r   )r+   Ztransfer_stater!   r!   r"   rT     s    z-TransferMonitor.notify_cancel_all_in_progressc                 C   s   | j | jS )zRetrieve the exception encountered for the transfer

        :param transfer_id: Unique identifier for the transfer
        :return: The exception encountered for that transfer. Otherwise
            if there were no exceptions, returns None.
        r   r   r!   r!   r"   get_exception  s    zTransferMonitor.get_exceptionc                 C   s   || j | _dS )zNotify the amount of jobs expected for a transfer

        :param transfer_id: Unique identifier for the transfer
        :param num_jobs: The number of jobs to complete the transfer
        N)r   jobs_to_complete)r+   r   Znum_jobsr!   r!   r"    notify_expected_jobs_to_complete  s    z0TransferMonitor.notify_expected_jobs_to_completec                 C   s   | j |  S )zNotify that a single job is completed for a transfer

        :param transfer_id: Unique identifier for the transfer
        :return: The number of jobs remaining to complete the transfer
        )r   decrement_jobs_to_completer   r!   r!   r"   notify_job_complete  s    z#TransferMonitor.notify_job_completeN)r-   r.   r/   r,   rE   r|   r   r~   r   rT   r   r   r   r!   r!   r!   r"   rg   H  s   	rg   c                   @   sp   e Zd ZdZdd Zedd Zdd Zdd	 Zed
d Z	e	j
dd Z	edd Zej
dd Zdd ZdS )r   z6Represents the current state of an individual transferc                 C   s$   d | _ t | _t | _d| _d S )Nr   )
_exceptionr:   Event_done_eventr;   	_job_lock_jobs_to_completerO   r!   r!   r"   r,     s    

zTransferState.__init__c                 C   s
   | j  S r   )r   is_setrO   r!   r!   r"   r}     s    zTransferState.donec                 C   s   | j   d S r   )r   setrO   r!   r!   r"   r     s    zTransferState.set_donec                 C   s   | j t d S r   )r   waitr   rO   r!   r!   r"   r     s    zTransferState.wait_till_donec                 C   s   | j S r   r   rO   r!   r!   r"   r     s    zTransferState.exceptionc                 C   s
   || _ d S r   r   r+   valr!   r!   r"   r     s    c                 C   s   | j S r   r   rO   r!   r!   r"   r     s    zTransferState.jobs_to_completec                 C   s
   || _ d S r   r   r   r!   r!   r"   r     s    c              
   C   s2   | j " |  jd8  _| jW  5 Q R  S Q R X d S r   )r   r   rO   r!   r!   r"   r     s    z(TransferState.decrement_jobs_to_completeN)r-   r.   r/   r   r,   r   r}   r   r   r   setterr   r   r!   r!   r!   r"   r     s   	




r   c                   @   s   e Zd ZdS )re   N)r-   r.   r/   r!   r!   r!   r"   re     s   re   c                       s,   e Zd Z fddZdd Zdd Z  ZS )BaseS3TransferProcessc                    s   t    || _d | _d S r   )superr,   r2   _client)r+   ri   	__class__r!   r"   r,     s    
zBaseS3TransferProcess.__init__c              	   C   s*   | j  | _t  |   W 5 Q R X d S r   )r2   r   r   r#   _do_runrO   r!   r!   r"   run  s    
zBaseS3TransferProcess.runc                 C   s   t dd S )Nz	_do_run())NotImplementedErrorrO   r!   r!   r"   r     s    zBaseS3TransferProcess._do_run)r-   r.   r/   r,   r   r   __classcell__r!   r!   r   r"   r     s   r   c                       s\   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Z  ZS )rn   c                    s.   t  | || _|| _|| _|| _|| _dS )aT  Submit GetObjectJobs to fulfill a download file request

        :param transfer_config: Configuration for transfers.
        :param client_factory: ClientFactory for creating S3 clients.
        :param transfer_monitor: Monitor for notifying and retrieving state
            of transfer.
        :param osutil: OSUtils object to use for os-related behavior when
            performing the transfer.
        :param download_request_queue: Queue to retrieve download file
            requests.
        :param worker_queue: Queue to submit GetObjectJobs for workers
            to perform.
        N)r   r,   r3   r>   r8   r6   r7   )r+   rh   ri   rj   rk   rl   rm   r   r!   r"   r,     s    zGetObjectSubmitter.__init__c              
   C   s   | j  }|tkr td d S z| | W q  tk
r } z4tjd||dd | j|j	| | j
|j	 W 5 d }~X Y q X q d S )Nz#Submitter shutdown signal received.zFException caught when submitting jobs for download file request %s: %sTexc_info)r6   r   ry   rF   rG   _submit_get_object_jobs	Exceptionr>   r   r   r   )r+   rJ   er!   r!   r"   r     s(    

 zGetObjectSubmitter._do_runc                 C   sB   |  |}| ||}|| jjk r0| || n| ||| d S r   )	_get_size_allocate_temp_filer3   r(   _submit_single_get_object_job_submit_ranged_get_object_jobsr+   rJ   sizer   r!   r!   r"   r   /  s    
   z*GetObjectSubmitter._submit_get_object_jobsc                 C   s4   |j }|d kr0| jjf |j|jd|jd }|S )NZBucketZKeyZContentLength)r   r   Zhead_objectr   r   r   )r+   rJ   r   r!   r!   r"   r   ;  s    zGetObjectSubmitter._get_sizec                 C   s    | j |j}| j || |S r   )r8   Zget_temp_filenamer   allocater   r!   r!   r"   r   E  s
    z&GetObjectSubmitter._allocate_temp_filec              	   C   s4   |  |jd | j|j|j|j|d|j|jd d S )Nr   r   r   r   r   r   r   r   r   )_notify_jobs_to_completer   _submit_get_object_jobr   r   r   r   )r+   rJ   r   r!   r!   r"   r   L  s    z0GetObjectSubmitter._submit_single_get_object_jobc           
   
   C   sz   | j j}t||}| |j| t|D ]L}|| }t|||}d|i}	|	|j | j	|j|j
|j|||	|jd q(d S )NZRanger   )r3   r)   r   r   r   rp   r   updater   r   r   r   r   )
r+   rJ   r   r   Z	part_sizeZ	num_partsir   Zrange_parameterZget_object_kwargsr!   r!   r"   r   Z  s0    
   z1GetObjectSubmitter._submit_ranged_get_object_jobsc                 K   s   | j tf | d S r   )r7   rH   r   )r+   Zget_object_job_kwargsr!   r!   r"   r   s  s    z)GetObjectSubmitter._submit_get_object_jobc                 C   s    t d|| | j|| d S )Nz3Notifying %s job(s) to complete for transfer_id %s.)rF   rG   r>   r   )r+   r   r   r!   r!   r"   r   v  s     z+GetObjectSubmitter._notify_jobs_to_complete)r-   r.   r/   r,   r   r   r   r   r   r   r   r   r   r!   r!   r   r"   rn     s   
rn   c                       sX   e Zd ZdZde Z fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Z  ZS )rq         c                    s(   t  | || _|| _|| _|| _dS )a  Fulfills GetObjectJobs

        Downloads the S3 object, writes it to the specified file, and
        renames the file to its final location if it completes the final
        job for a particular transfer.

        :param queue: Queue for retrieving GetObjectJob's
        :param client_factory: ClientFactory for creating S3 clients
        :param transfer_monitor: Monitor for notifying
        :param osutil: OSUtils object to use for os-related behavior when
            performing the transfer.
        N)r   r,   _queuer2   r>   r8   )r+   ro   ri   rj   rk   r   r!   r"   r,     s
    zGetObjectWorker.__init__c                 C   s   | j  }|tkr td d S | j|js:| | ntd| | j	|j}td||j |s | 
|j|j|j q d S )Nz Worker shutdown signal received.zBSkipping get object job %s because there was a previous exception.z%%s jobs remaining for transfer_id %s.)r   r   ry   rF   rG   r>   r   r   _run_get_object_jobr   _finalize_downloadr   r   )r+   job	remainingr!   r!   r"   r     s0    

  zGetObjectWorker._do_runc              
   C   sl   z"| j |j|j|j|j|jd W nD tk
rf } z&tjd||dd | j	
|j| W 5 d }~X Y nX d S )N)r   r   r   r   r   zBException caught when downloading object for get object job %s: %sTr   )_do_get_objectr   r   r   r   r   r   rF   rG   r>   r   r   )r+   r   r   r!   r!   r"   r     s     
z#GetObjectWorker._run_get_object_jobc           
      C   s   d }t | jD ]x}z2| jjf ||d|}| |||d  W  d S  tk
r }	 z"tjd|	|d | jdd |	}W 5 d }	~	X Y qX qt|d S )Nr   ZBodyzCRetrying exception caught (%s), retrying request, (attempt %s / %s)r   Tr   )	rp   _MAX_ATTEMPTSr   Z
get_object_write_to_filer   rF   rG   r
   )
r+   r   r   r   r   r   Zlast_exceptionr   responser   r!   r!   r"   r     s*     zGetObjectWorker._do_get_objectc              	      sL   t |d8}|| t fddd}|D ]}|| q.W 5 Q R X d S )Nzrb+c                      s     jS r   )read_IO_CHUNKSIZEr!   bodyr+   r!   r"   <lambda>      z0GetObjectWorker._write_to_file.<locals>.<lambda>r   )openseekiterwrite)r+   r   r   r   fchunkschunkr!   r   r"   r     s
    
zGetObjectWorker._write_to_filec                 C   s8   | j |r| j| n| ||| | j | d S r   )r>   r   r8   remove_file_do_file_renamer   )r+   r   r   r   r!   r!   r"   r     s    z"GetObjectWorker._finalize_downloadc              
   C   sT   z| j || W n< tk
rN } z| j|| | j | W 5 d }~X Y nX d S r   )r8   Zrename_filer   r>   r   r   )r+   r   r   r   r   r!   r!   r"   r     s
    zGetObjectWorker._do_file_rename)r-   r.   r/   r   r   r   r,   r   r   r   r   r   r   r   r!   r!   r   r"   rq     s   rq   )7r   collections
contextlibloggingr4   r   r:   copyr   Zbotocore.sessionr   Zbotocore.configr   Zs3transfer.compatr   r   Zs3transfer.constantsr   r   r   Zs3transfer.exceptionsr	   r
   Zs3transfer.futuresr   r   Zs3transfer.utilsr   r   r   r   r   	getLoggerr-   rF   ry   
namedtupler   r   contextmanagerr#   r   r%   r0   rd   rc   r1   rg   r   re   registerProcessr   rn   rq   r!   r!   r!   r"   <module>   sn    7

 G0`0 