U
    O8c=                     @   s   d dl Z d dlZG dd deZG dd dZG dd dZG dd	 d	ZG d
d dZG dd dZG dd dZ	G dd dZ
dS )    Nc                       s   e Zd Z fddZ  ZS )RequestExceededExceptionc                    s(   || _ || _d||}t | dS )a  Error when requested amount exceeds what is allowed

        The request that raised this error should be retried after waiting
        the time specified by ``retry_time``.

        :type requested_amt: int
        :param requested_amt: The originally requested byte amount

        :type retry_time: float
        :param retry_time: The length in time to wait to retry for the
            requested amount
        z<Request amount {} exceeded the amount available. Retry in {}N)requested_amt
retry_timeformatsuper__init__)selfr   r   msg	__class__ 8/tmp/pip-unpacked-wheel-6hpttf6a/s3transfer/bandwidth.pyr      s     z!RequestExceededException.__init__)__name__
__module____qualname__r   __classcell__r   r   r
   r   r      s   r   c                   @   s   e Zd ZdZdS )RequestTokenzDA token to pass as an identifier when consuming from the LeakyBucketN)r   r   r   __doc__r   r   r   r   r   '   s   r   c                   @   s   e Zd Zdd Zdd ZdS )	TimeUtilsc                 C   s   t   S )zgGet the current time back

        :rtype: float
        :returns: The current time in seconds
        )timer   r   r   r   r   .   s    zTimeUtils.timec                 C   s
   t |S )zwSleep for a designated time

        :type value: float
        :param value: The time to sleep for in seconds
        )r   sleep)r   valuer   r   r   r   6   s    zTimeUtils.sleepN)r   r   r   r   r   r   r   r   r   r   -   s   r   c                   @   s    e Zd ZdddZdddZdS )	BandwidthLimiterNc                 C   s    || _ || _|dkrt | _dS )a  Limits bandwidth for shared S3 transfers

        :type leaky_bucket: LeakyBucket
        :param leaky_bucket: The leaky bucket to use limit bandwidth

        :type time_utils: TimeUtils
        :param time_utils: Time utility to use for interacting with time.
        N)_leaky_bucket_time_utilsr   )r   leaky_bucket
time_utilsr   r   r   r   @   s    	zBandwidthLimiter.__init__Tc                 C   s"   t || j|| j}|s|  |S )a  Wraps a fileobj in a bandwidth limited stream wrapper

        :type fileobj: file-like obj
        :param fileobj: The file-like obj to wrap

        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
        param transfer_coordinator: The coordinator for the general transfer
            that the wrapped stream is a part of

        :type enabled: boolean
        :param enabled: Whether bandwidth limiting should be enabled to start
        )BandwidthLimitedStreamr   r   disable_bandwidth_limiting)r   fileobjtransfer_coordinatorZenabledstreamr   r   r   get_bandwith_limited_streamN   s       z,BandwidthLimiter.get_bandwith_limited_stream)N)T)r   r   r   r   r#   r   r   r   r   r   ?   s   
 r   c                   @   sp   e Zd ZdddZdd Zdd Zd	d
 Zdd Zdd Zdd Z	dddZ
dd Zdd Zdd Zdd ZdS )r   N   c                 C   sF   || _ || _|| _|| _|dkr(t | _d| _t | _d| _|| _	dS )a[  Limits bandwidth for reads on a wrapped stream

        :type fileobj: file-like object
        :param fileobj: The file like object to wrap

        :type leaky_bucket: LeakyBucket
        :param leaky_bucket: The leaky bucket to use to throttle reads on
            the stream

        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
        param transfer_coordinator: The coordinator for the general transfer
            that the wrapped stream is a part of

        :type time_utils: TimeUtils
        :param time_utils: The time utility to use for interacting with time
        NTr   )
_fileobjr   _transfer_coordinatorr   r   _bandwidth_limiting_enabledr   _request_token_bytes_seen_bytes_threshold)r   r    r   r!   r   Zbytes_thresholdr   r   r   r   f   s    zBandwidthLimitedStream.__init__c                 C   s
   d| _ dS )z0Enable bandwidth limiting on reads to the streamTNr'   r   r   r   r   enable_bandwidth_limiting   s    z0BandwidthLimitedStream.enable_bandwidth_limitingc                 C   s
   d| _ dS )z1Disable bandwidth limiting on reads to the streamFNr+   r   r   r   r   r      s    z1BandwidthLimitedStream.disable_bandwidth_limitingc                 C   sL   | j s| j|S |  j|7  _| j| jk r8| j|S |   | j|S )zhRead a specified amount

        Reads will only be throttled if bandwidth limiting is enabled.
        )r'   r%   readr)   r*   _consume_through_leaky_bucket)r   amountr   r   r   r-      s    zBandwidthLimitedStream.readc              
   C   sf   | j jsZz| j| j| j d| _W d S  tk
rV } z| j|j	 W 5 d }~X Y q X q | j jd S )Nr   )
r&   	exceptionr   consumer)   r(   r   r   r   r   )r   er   r   r   r.      s     "z4BandwidthLimitedStream._consume_through_leaky_bucketc                 C   s   |    dS )z6Signal that data being read is being transferred to S3N)r,   r   r   r   r   signal_transferring   s    z*BandwidthLimitedStream.signal_transferringc                 C   s   |    dS )z:Signal that data being read is not being transferred to S3N)r   r   r   r   r   signal_not_transferring   s    z.BandwidthLimitedStream.signal_not_transferringr   c                 C   s   | j || d S N)r%   seek)r   wherewhencer   r   r   r6      s    zBandwidthLimitedStream.seekc                 C   s
   | j  S r5   )r%   tellr   r   r   r   r9      s    zBandwidthLimitedStream.tellc                 C   s"   | j r| jr|   | j  d S r5   )r'   r)   r.   r%   closer   r   r   r   r:      s    zBandwidthLimitedStream.closec                 C   s   | S r5   r   r   r   r   r   	__enter__   s    z BandwidthLimitedStream.__enter__c                 O   s   |    d S r5   )r:   )r   argskwargsr   r   r   __exit__   s    zBandwidthLimitedStream.__exit__)Nr$   )r   )r   r   r   r   r,   r   r-   r.   r3   r4   r6   r9   r:   r;   r>   r   r   r   r   r   e   s     
#

r   c                   @   s>   e Zd ZdddZdd Zdd Zdd	 Zd
d Zdd ZdS )LeakyBucketNc                 C   sZ   t || _|| _|dkr t | _t | _|| _|dkr@t | _|| _	|dkrVt
 | _	dS )a9  A leaky bucket abstraction to limit bandwidth consumption

        :type rate: int
        :type rate: The maximum rate to allow. This rate is in terms of
            bytes per second.

        :type time_utils: TimeUtils
        :param time_utils: The time utility to use for interacting with time

        :type rate_tracker: BandwidthRateTracker
        :param rate_tracker: Tracks bandwidth consumption

        :type consumption_scheduler: ConsumptionScheduler
        :param consumption_scheduler: Schedules consumption retries when
            necessary
        N)float	_max_rater   r   	threadingLock_lock_rate_trackerBandwidthRateTracker_consumption_schedulerConsumptionScheduler)r   Zmax_rater   Zrate_trackerZconsumption_schedulerr   r   r   r      s    

zLeakyBucket.__init__c              
   C   sz   | j j | j }| j|r8| |||W  5 Q R  S | ||rT| ||| n| ||W  5 Q R  S W 5 Q R X dS )ac  Consume an a requested amount

        :type amt: int
        :param amt: The amount of bytes to request to consume

        :type request_token: RequestToken
        :param request_token: The token associated to the consumption
            request that is used to identify the request. So if a
            RequestExceededException is raised the token should be used
            in subsequent retry consume() request.

        :raises RequestExceededException: If the consumption amount would
            exceed the maximum allocated bandwidth

        :rtype: int
        :returns: The amount consumed
        N)	rD   r   r   rG   is_scheduled,_release_requested_amt_for_scheduled_request_projected_to_exceed_max_rate!_raise_request_exceeded_exception_release_requested_amtr   amtrequest_tokentime_nowr   r   r   r1      s    
    zLeakyBucket.consumec                 C   s   | j ||}|| jkS r5   )rE   get_projected_raterA   )r   rO   rQ   Zprojected_rater   r   r   rK     s    z)LeakyBucket._projected_to_exceed_max_ratec                 C   s   | j | | ||S r5   )rG   process_scheduled_consumptionrM   rN   r   r   r   rJ     s    z8LeakyBucket._release_requested_amt_for_scheduled_requestc                 C   s.   |t | j }| j|||}t||dd S )N)r   r   )r@   rA   rG   schedule_consumptionr   )r   rO   rP   rQ   Zallocated_timer   r   r   r   rL   %  s       z-LeakyBucket._raise_request_exceeded_exceptionc                 C   s   | j || |S r5   )rE   record_consumption_rate)r   rO   rQ   r   r   r   rM   .  s    z"LeakyBucket._release_requested_amt)NNN)	r   r   r   r   r1   rK   rJ   rL   rM   r   r   r   r   r?      s      
#	r?   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
rH   c                 C   s   i | _ d| _dS )z*Schedules when to consume a desired amountr   N) _tokens_to_scheduled_consumption_total_waitr   r   r   r   r   4  s    zConsumptionScheduler.__init__c                 C   s
   || j kS )zIndicates if a consumption request has been scheduled

        :type token: RequestToken
        :param token: The token associated to the consumption
            request that is used to identify the request.
        )rV   )r   tokenr   r   r   rI   9  s    z!ConsumptionScheduler.is_scheduledc                 C   s&   |  j |7  _ | j |d| j|< | j S )a  Schedules a wait time to be able to consume an amount

        :type amt: int
        :param amt: The amount of bytes scheduled to be consumed

        :type token: RequestToken
        :param token: The token associated to the consumption
            request that is used to identify the request.

        :type time_to_consume: float
        :param time_to_consume: The desired time it should take for that
            specific request amount to be consumed in regardless of previously
            scheduled consumption requests

        :rtype: float
        :returns: The amount of time to wait for the specific request before
            actually consuming the specified amount.
        )Zwait_durationtime_to_consume)rW   rV   )r   rO   rX   rY   r   r   r   rT   B  s
    z)ConsumptionScheduler.schedule_consumptionc                 C   s&   | j |}t| j|d  d| _dS )zProcesses a scheduled consumption request that has completed

        :type token: RequestToken
        :param token: The token associated to the consumption
            request that is used to identify the request.
        rY   r   N)rV   popmaxrW   )r   rX   Zscheduled_retryr   r   r   rS   \  s
     z2ConsumptionScheduler.process_scheduled_consumptionN)r   r   r   r   rI   rT   rS   r   r   r   r   rH   3  s   	rH   c                   @   sB   e Zd ZdddZedd Zdd Zdd	 Zd
d Zdd Z	dS )rF   皙?c                 C   s   || _ d| _d| _dS )a  Tracks the rate of bandwidth consumption

        :type a: float
        :param a: The constant to use in calculating the exponentional moving
            average of the bandwidth rate. Specifically it is used in the
            following calculation:

            current_rate = alpha * new_rate + (1 - alpha) * current_rate

            This value of this constant should be between 0 and 1.
        N)_alpha
_last_time_current_rate)r   alphar   r   r   r   j  s    zBandwidthRateTracker.__init__c                 C   s   | j dkrdS | jS )zmThe current transfer rate

        :rtype: float
        :returns: The current tracked transfer rate
        N        )r^   r_   r   r   r   r   current_ratez  s    
z!BandwidthRateTracker.current_ratec                 C   s   | j dkrdS | ||S )aZ  Get the projected rate using a provided amount and time

        :type amt: int
        :param amt: The proposed amount to consume

        :type time_at_consumption: float
        :param time_at_consumption: The proposed time to consume at

        :rtype: float
        :returns: The consumption rate if that amt and time were consumed
        Nra   )r^   *_calculate_exponential_moving_average_rater   rO   time_at_consumptionr   r   r   rR     s    
 z'BandwidthRateTracker.get_projected_ratec                 C   s2   | j dkr|| _ d| _dS | ||| _|| _ dS )a  Record the consumption rate based off amount and time point

        :type amt: int
        :param amt: The amount that got consumed

        :type time_at_consumption: float
        :param time_at_consumption: The time at which the amount was consumed
        Nra   )r^   r_   rc   rd   r   r   r   rU     s    	
 z,BandwidthRateTracker.record_consumption_ratec                 C   s"   || j  }|dkrtdS || S )Nr   inf)r^   r@   )r   rO   re   Z
time_deltar   r   r   _calculate_rate  s    
z$BandwidthRateTracker._calculate_ratec                 C   s&   |  ||}| j| d| j | j  S )N   )rg   r]   r_   )r   rO   re   Znew_rater   r   r   rc     s    z?BandwidthRateTracker._calculate_exponential_moving_average_rateN)r\   )
r   r   r   r   propertyrb   rR   rU   rg   rc   r   r   r   r   rF   i  s   



rF   )rB   r   	Exceptionr   r   r   r   r   r?   rH   rF   r   r   r   r   <module>   s   &q]6