U
    Z+dJ_                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	m
Z
 ddlmZ ddlmZ ddlmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZmZm Z  ddl!m"Z" ddl#m$Z$m%Z% ddl&m'Z'm(Z( ddl)m*Z*m+Z+ ddl,m-Z-m.Z. dZ/eddZ0e*e1Z2e2j3e2j4e2j5e2j6f\Z3Z4Z5Z6dZ7G dd de8Z9G dd dZ:eG dd dZ;dd Z<dd  Z=G d!d" d"Z>G d#d$ d$e>Z?G d%d& d&Z@G d'd( d(eZAz
e  W n eBk
r   dZCY nX G d)d* d*eZCd-d+d,ZDdS ).zThe periodic task scheduler.    N)timegm)
namedtuple)total_ordering)EventThread)ensure_multiprocessing)reset_signals)Process)maybe_evaluatereprcall)cached_property   )__version__	platformssignals)reraise)crontabmaybe_schedule)load_extension_class_namessymbol_by_name)
get_loggeriter_open_logger_fds)humanize_secondsmaybe_make_aware)SchedulingErrorScheduleEntry	SchedulerPersistentSchedulerServiceEmbeddedServiceevent_t)timepriorityentryi,  c                   @   s   e Zd ZdZdS )r   z*An error occurred while scheduling a task.N)__name__
__module____qualname____doc__ r(   r(   //tmp/pip-unpacked-wheel-ucduq0nd/celery/beat.pyr   +   s   r   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	BeatLazyFuncap  An lazy function declared in 'beat_schedule' and called before sending to worker.

    Example:

        beat_schedule = {
            'test-every-5-minutes': {
                'task': 'test',
                'schedule': 300,
                'kwargs': {
                    "current": BeatCallBack(datetime.datetime.now)
                }
            }
        }

    c                 O   s   || _ ||d| _d S )N)argskwargsZ_funcZ_func_params)selffuncr+   r,   r(   r(   r)   __init__@   s    zBeatLazyFunc.__init__c                 C   s   |   S N)delayr.   r(   r(   r)   __call__G   s    zBeatLazyFunc.__call__c                 C   s   | j | jd | jd S )Nr+   r,   r-   r3   r(   r(   r)   r2   J   s    zBeatLazyFunc.delayN)r$   r%   r&   r'   r0   r4   r2   r(   r(   r(   r)   r*   /   s   r*   c                
   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
dddZdd	 ZeZdd
dZe ZZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS ) r   a  An entry in the scheduler.

    Arguments:
        name (str): see :attr:`name`.
        schedule (~celery.schedules.schedule): see :attr:`schedule`.
        args (Tuple): see :attr:`args`.
        kwargs (Dict): see :attr:`kwargs`.
        options (Dict): see :attr:`options`.
        last_run_at (~datetime.datetime): see :attr:`last_run_at`.
        total_run_count (int): see :attr:`total_run_count`.
        relative (bool): Is the time relative to when the server starts?
    Nr   r(   Fc                 C   sb   |
| _ || _|| _|| _|r |ni | _|r.|ni | _t||	| j d| _|pP|  | _	|pZd| _
d S )N)appr   )r5   nametaskr+   r,   optionsr   scheduledefault_nowlast_run_attotal_run_count)r.   r6   r7   r;   r<   r9   r+   r,   r8   relativer5   r(   r(   r)   r0   r   s    zScheduleEntry.__init__c                 C   s   | j r| j  S | j S r1   )r9   nowr5   r3   r(   r(   r)   r:      s    zScheduleEntry.default_nowc                 C   s$   | j f t| |p|  | jd dS )z8Return new instance, with date and count fields updated.r   )r;   r<   )	__class__dictr:   r<   )r.   r;   r(   r(   r)   _next_instance   s
    
zScheduleEntry._next_instancec              	   C   s*   | j | j| j| j| j| j| j| j| jffS r1   )	r?   r6   r7   r;   r<   r9   r+   r,   r8   r3   r(   r(   r)   
__reduce__   s          zScheduleEntry.__reduce__c                 C   s&   | j |j|j|j|j|jd dS )zUpdate values from another entry.

        Will only update "editable" fields:
            ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
        )r7   r9   r+   r,   r8   N)__dict__updater7   r9   r+   r,   r8   r.   otherr(   r(   r)   rD      s      zScheduleEntry.updatec                 C   s   | j | jS )z-See :meth:`~celery.schedule.schedule.is_due`.)r9   is_duer;   r3   r(   r(   r)   rG      s    zScheduleEntry.is_duec                 C   s   t t|  S r1   )itervarsitemsr3   r(   r(   r)   __iter__   s    zScheduleEntry.__iter__c                 C   s,   dj | t| j| jpd| jpi t| jdS )Nz%<{name}: {0.name} {call} {0.schedule}r(   )callr6   )formatr   r7   r+   r,   typer$   r3   r(   r(   r)   __repr__   s
    zScheduleEntry.__repr__c                 C   s   t |trt| t|k S tS r1   )
isinstancer   idNotImplementedrE   r(   r(   r)   __lt__   s    
zScheduleEntry.__lt__c                 C   s(   dD ]}t | |t ||kr dS qdS )N)r7   r+   r,   r8   r9   FT)getattr)r.   rF   attrr(   r(   r)   editable_fields_equal   s    z#ScheduleEntry.editable_fields_equalc                 C   s
   |  |S )zTest schedule entries equality.

        Will only compare "editable" fields:
        ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
        )rV   rE   r(   r(   r)   __eq__   s    zScheduleEntry.__eq__c                 C   s
   | |k S )zTest schedule entries inequality.

        Will only compare "editable" fields:
        ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
        r(   rE   r(   r(   r)   __ne__   s    zScheduleEntry.__ne__)
NNNNNr(   NNFN)N)r$   r%   r&   r'   r6   r9   r+   r,   r8   r;   r<   r0   r:   Z_default_nowrA   __next__nextrB   rD   rG   rK   rO   rS   rV   rW   rX   r(   r(   r(   r)   r   N   s<                

r   c                 C   s   | sg S dd | D S )Nc                 S   s    g | ]}t |tr| n|qS r(   rP   r*   ).0vr(   r(   r)   
<listcomp>   s   z(_evaluate_entry_args.<locals>.<listcomp>r(   )
entry_argsr(   r(   r)   _evaluate_entry_args   s
    r`   c                 C   s   | si S dd |   D S )Nc                 S   s&   i | ]\}}|t |tr| n|qS r(   r[   )r\   kr]   r(   r(   r)   
<dictcomp>   s    z*_evaluate_entry_kwargs.<locals>.<dictcomp>)rJ   )entry_kwargsr(   r(   r)   _evaluate_entry_kwargs   s
    rd   c                   @   s@  e Zd ZdZeZdZeZdZ	dZ
dZdZeZd>ddZdd	 Zd?d
dZd@ddZdd ZefddZeejfddZeeejejfddZdd Zdd Zdd ZdAddZ d d! Z!d"d# Z"d$d% Z#d&d' Z$d(d) Z%d*d+ Z&d,d- Z'd.d/ Z(d0d1 Z)d2d3 Z*d4d5 Z+d6d7 Z,e-e+e,Ze.d8d9 Z/e.d:d; Z0e-d<d= Z1dS )Br   a  Scheduler for periodic tasks.

    The :program:`celery beat` program may instantiate this class
    multiple times for introspection purposes, but then with the
    ``lazy`` argument set.  It's important for subclasses to
    be idempotent when this argument is set.

    Arguments:
        schedule (~celery.schedules.schedule): see :attr:`schedule`.
        max_interval (int): see :attr:`max_interval`.
        lazy (bool): Don't set up the schedule.
    N   r   Fc                 K   sp   || _ t|d kri n|| _|p,|jjp,| j| _|p:|jj| _d | _d | _	|d krZ|jj
n|| _|sl|   d S r1   )r5   r
   dataconfbeat_max_loop_intervalmax_intervalZamqpProducer_heapold_schedulersZbeat_sync_everysync_every_taskssetup_schedule)r.   r5   r9   ri   rj   lazyrm   r,   r(   r(   r)   r0     s    zScheduler.__init__c                 C   sJ   i }| j jjr<| j jjs<d|kr<dtdddddid|d< | | d S )Nzcelery.backend_cleanup04*expiresi  )r7   r9   r8   )r5   rg   Zresult_expiresbackendZsupports_autoexpirer   update_from_dict)r.   rf   entriesr(   r(   r)   install_default_entries  s    


z!Scheduler.install_default_entriesc              
   C   sp   t d|j|j z| j||dd}W n6 tk
rZ } ztd|t dd W 5 d }~X Y nX td|j|j	 d S )Nz#Scheduler: Sending due task %s (%s)F)produceradvancezMessage Error: %s
%sTexc_infoz%s sent. id->%s)
infor6   r7   apply_async	Exceptionerror	tracebackformat_stackdebugrQ   )r.   r#   rx   resultexcr(   r(   r)   apply_entry  s      zScheduler.apply_entry{Gzc                 C   s   |r|dkr|| S |S )Nr   r(   )r.   nZdriftr(   r(   r)   adjust&  s    zScheduler.adjustc                 C   s   |  S r1   )rG   )r.   r#   r(   r(   r)   rG   +  s    zScheduler.is_duec                 C   s4   | j }t| }|| |jd  ||p0d S )z9Return a utc timestamp, make sure heapq in currect order.g    .Ar   )r   r   r:   utctimetuplemicrosecond)r.   r#   next_time_to_runmktimer   Zas_nowr(   r(   r)   _when.  s    

zScheduler._whenc                 C   s\   d}g | _ | j D ]8}| \}}| j || ||r:dn|pBd|| q|| j  dS )z:Populate the heap with the data contained in the schedule.   r   N)rk   r9   valuesrG   appendr   )r.   r    heapifyr"   r#   rG   Znext_call_delayr(   r(   r)   populate_heap8  s    
 zScheduler.populate_heapc                 C   s   | j }| j}| jdks&| | j| js<t| j| _|   | j}|sJ|S |d }|d }	| |	\}
}|
r||}||kr| 	|	}| j
|	| jd |||| |||d | dS ||| ||d |S |||p||S )zRun a tick - one iteration of the scheduler.

        Executes one due task per call.

        Returns:
            float: preferred delay in seconds for next call.
        Nr      )rx   r   )r   ri   rk   schedules_equalrl   r9   copyr   rG   reserver   rx   r   )r.   r    minheappopheappushr   ri   Heventr#   rG   r   verifyZ
next_entryr(   r(   r)   tickH  s4    	

 
zScheduler.tickc                 C   s   ||  krd krn ndS |d ks,|d kr0dS t | t | krLdS | D ]*\}}||}|sp dS ||krT dS qTdS )NTF)setkeysrJ   get)r.   Zold_schedulesZnew_schedulesr6   Z	old_entry	new_entryr(   r(   r)   r   n  s    
zScheduler.schedules_equalc                 C   s.   | j  p,t | j  | jkp,| jo,| j| jkS r1   )
_last_syncr!   	monotonic
sync_everyrm   _tasks_since_syncr3   r(   r(   r)   should_sync}  s    
zScheduler.should_syncc                 C   s   t | }| j|j< |S r1   )rZ   r9   r6   )r.   r#   r   r(   r(   r)   r     s    zScheduler.reserveTc           	   
   K   s   |r|  |n|}| jj|j}zz`t|j	}t
|j}|r^|j||fd|i|jW W rS | j|j||fd|i|jW W NS W nD tk
r } z&tttdj||dt d  W 5 d }~X Y nX W 5 |  jd7  _|  r|   X d S )Nr   rx   z-Couldn't apply scheduled task {0.name}: {exc})r   r   )r   r5   Ztasksr   r7   r   r   _do_syncr`   r+   rd   r,   r}   r8   	send_taskr~   r   r   rM   sysr{   )	r.   r#   rx   ry   r,   r7   r_   rc   r   r(   r(   r)   r}     s8    

 
zScheduler.apply_asyncc                 O   s   | j j||S r1   )r5   r   r.   r+   r,   r(   r(   r)   r     s    zScheduler.send_taskc                 C   s    |  | j | | jjj d S r1   )rw   rf   merge_inplacer5   rg   beat_scheduler3   r(   r(   r)   rn     s    zScheduler.setup_schedulec                 C   s,   ztd |   W 5 t  | _d| _X d S )Nr   zbeat: Synchronizing schedule...)r!   r   r   r   r   syncr3   r(   r(   r)   r     s
    
zScheduler._do_syncc                 C   s   d S r1   r(   r3   r(   r(   r)   r     s    zScheduler.syncc                 C   s   |    d S r1   )r   r3   r(   r(   r)   close  s    zScheduler.closec                 K   s&   | j f d| ji|}|| j|j< |S )Nr5   )Entryr5   r9   r6   )r.   r,   r#   r(   r(   r)   add  s    zScheduler.addc                 C   s0   t || jr| j|_|S | jf t||| jdS N)r6   r5   )rP   r   r5   r@   )r.   r6   r#   r(   r(   r)   _maybe_entry  s    zScheduler._maybe_entryc                    s"    j  fdd| D  d S )Nc                    s   i | ]\}}|  ||qS r(   )r   )r\   r6   r#   r3   r(   r)   rb     s    z.Scheduler.update_from_dict.<locals>.<dictcomp>)r9   rD   rJ   )r.   Zdict_r(   r3   r)   ru     s    zScheduler.update_from_dictc                 C   s~   | j }t|t| }}||A D ]}||d  q |D ]B}| jf t|| || jd}||rp|| | q6|||< q6d S r   )r9   r   popr   r@   r5   r   rD   )r.   br9   ABkeyr#   r(   r(   r)   r     s    
zScheduler.merge_inplacec                 C   s   dd }| j || jjjS )Nc                 S   s   t d| | d S )Nz9beat: Connection error: %s. Trying again in %s seconds...)r   )r   intervalr(   r(   r)   _error_handler  s     z3Scheduler._ensure_connected.<locals>._error_handler)
connectionZensure_connectionr5   rg   Zbroker_connection_max_retries)r.   r   r(   r(   r)   _ensure_connected  s
     zScheduler._ensure_connectedc                 C   s   | j S r1   rf   r3   r(   r(   r)   get_schedule  s    zScheduler.get_schedulec                 C   s
   || _ d S r1   r   r.   r9   r(   r(   r)   set_schedule  s    zScheduler.set_schedulec                 C   s
   | j  S r1   )r5   Zconnection_for_writer3   r(   r(   r)   r     s    zScheduler.connectionc                 C   s   | j |  ddS )NF)Zauto_declare)rj   r   r3   r(   r(   r)   rx     s    zScheduler.producerc                 C   s   dS )N r(   r3   r(   r(   r)   r|     s    zScheduler.info)NNNFN)N)r   )NT)2r$   r%   r&   r'   r   r   r9   DEFAULT_MAX_INTERVALri   r   rm   r   r   loggerr0   rw   r   r   rG   r   r   r    heapqr   r   r   r   r   r   r   r   r   r}   r   rn   r   r   r   r   r   ru   r   r   r   r   propertyr   r   rx   r|   r(   r(   r(   r)   r      s\         





&



r   c                       s   e Zd ZdZeZdZdZ fddZdd Z	dd	 Z
d
d Zdd Zdd Zdd Zdd ZeeeZdd Zdd Zedd Z  ZS )r   z+Scheduler backed by :mod:`shelve` database.)r   z.dbz.datz.bakz.dirNc                    s   | d| _t j|| d S )Nschedule_filename)r   r   superr0   r   r?   r(   r)   r0     s    zPersistentScheduler.__init__c              
   C   s8   | j D ],}ttj t| j|  W 5 Q R X qd S r1   )known_suffixesr   Zignore_errnoerrnoENOENTosremover   )r.   suffixr(   r(   r)   
_remove_db  s    
zPersistentScheduler._remove_dbc                 C   s   | j j| jddS )NT)Z	writeback)persistenceopenr   r3   r(   r(   r)   _open_schedule
  s    z"PersistentScheduler._open_schedulec                 C   s"   t d| j|dd |   |  S )Nz'Removing corrupted schedule file %r: %rTrz   )r   r   r   r   )r.   r   r(   r(   r)    _destroy_open_corrupted_schedule  s      z4PersistentScheduler._destroy_open_corrupted_schedulec              
   C   sD  z|   | _| j  W n. tk
rF } z| || _W 5 d }~X Y nX |   | jjj}| j	d}|d k	r||krt
d|| | j  | jjj}| j	d}|d k	r||krddd}t
d|| ||  | j  | jdi }| | jjj | | j | jt||d	 |   td
ddd | D   d S )Ntzz%Reset: Timezone changed from %r to %rutc_enabledZenableddisabled)TFz Reset: UTC changed from %s to %srv   )r   r   r   zCurrent schedule:

c                 s   s   | ]}t |V  qd S r1   )repr)r\   r#   r(   r(   r)   	<genexpr>6  s    z5PersistentScheduler.setup_schedule.<locals>.<genexpr>)r   _storer   r~   r   _create_scheduler5   rg   timezoner   warningclearZ
enable_utc
setdefaultr   r   rw   r9   rD   r   r   r   joinr   )r.   r   r   Z	stored_tzutcZ
stored_utcchoicesrv   r(   r(   r)   rn     s@    




 
z"PersistentScheduler.setup_schedulec                 C   s   dD ]}z| j d  W n\ tk
rr   zi | j d< W n8 tk
rl } z| || _ W Y Y qW 5 d }~X Y nX Y nZX d| j krtd | j   n:d| j krtd | j   nd| j krtd | j    qqd S )	N)r   r   rv   r   z+DB Reset: Account for new __version__ fieldr   z"DB Reset: Account for new tz fieldr   z+DB Reset: Account for new utc_enabled field)r   KeyErrorr   r   r   )r.   _r   r(   r(   r)   r   9  s&    "



z$PersistentScheduler._create_schedulec                 C   s
   | j d S Nrv   r   r3   r(   r(   r)   r   P  s    z PersistentScheduler.get_schedulec                 C   s   || j d< d S r   r   r   r(   r(   r)   r   S  s    z PersistentScheduler.set_schedulec                 C   s   | j d k	r| j   d S r1   )r   r   r3   r(   r(   r)   r   W  s    
zPersistentScheduler.syncc                 C   s   |    | j  d S r1   )r   r   r   r3   r(   r(   r)   r   [  s    zPersistentScheduler.closec                 C   s   d| j  S )Nz    . db -> )r   r3   r(   r(   r)   r|   _  s    zPersistentScheduler.info)r$   r%   r&   r'   shelver   r   r   r0   r   r   r   rn   r   r   r   r   r9   r   r   r|   __classcell__r(   r(   r   r)   r     s"   &
r   c                   @   sX   e Zd ZdZeZdddZdd Zddd	Zd
d Z	dddZ
dddZedd ZdS )r   zCelery periodic task service.Nc                 C   sB   || _ |p|jj| _|p| j| _|p*|jj| _t | _t | _	d S r1   )
r5   rg   rh   ri   scheduler_clsZbeat_schedule_filenamer   r   _is_shutdown_is_stopped)r.   r5   ri   r   r   r(   r(   r)   r0   i  s    
zService.__init__c                 C   s   | j | j| j| j| jffS r1   )r?   ri   r   r   r5   r3   r(   r(   r)   rB   u  s     zService.__reduce__Fc              	   C   s   t d tdt| jj tjj| d |rDtjj| d t	
d z~zV| j s| j }|rH|dkrHtdt|dd t| | j rH| j  qHW n" ttfk
r   | j  Y nX W 5 |   X d S )	Nzbeat: Starting...z#beat: Ticking with max interval->%s)Zsenderzcelery beatg        zbeat: Waking up %s.zin )prefix)r|   r   r   	schedulerri   r   Z	beat_initsendZbeat_embedded_initr   Zset_process_titler   r   is_setr   r!   sleepr   r   KeyboardInterrupt
SystemExitr   )r.   embedded_processr   r(   r(   r)   starty  s*    






zService.startc                 C   s   | j   | j  d S r1   )r   r   r   r   r3   r(   r(   r)   r     s    
zService.syncc                 C   s$   t d | j  |o| j  d S )Nzbeat: Shutting down...)r|   r   r   r   wait)r.   r   r(   r(   r)   stop  s    
zService.stopcelery.beat_schedulersc                 C   s0   | j }tt|}t| j|d| j|| j|dS )N)aliases)r5   r   ri   ro   )r   r@   r   r   r   r5   ri   )r.   ro   Zextension_namespacefilenamer   r(   r(   r)   get_scheduler  s    zService.get_schedulerc                 C   s   |   S r1   )r   r3   r(   r(   r)   r     s    zService.scheduler)NNN)F)F)Fr   )r$   r%   r&   r'   r   r   r0   rB   r   r   r   r   r   r   r(   r(   r(   r)   r   d  s     


  
r   c                       s0   e Zd ZdZ fddZdd Zdd Z  ZS )	_Threadedz(Embedded task scheduler using threading.c                    s.   t    || _t|f|| _d| _d| _d S )NTBeat)r   r0   r5   r   servicedaemonr6   r.   r5   r,   r   r(   r)   r0     s
    
z_Threaded.__init__c                 C   s   | j   | j  d S r1   )r5   set_currentr   r   r3   r(   r(   r)   run  s    
z_Threaded.runc                 C   s   | j jdd d S )NT)r   )r   r   r3   r(   r(   r)   r     s    z_Threaded.stop)r$   r%   r&   r'   r0   r   r   r   r(   r(   r   r)   r     s   r   c                       s,   e Zd Z fddZdd Zdd Z  ZS )_Processc                    s(   t    || _t|f|| _d| _d S )Nr   )r   r0   r5   r   r   r6   r   r   r(   r)   r0     s    
z_Process.__init__c                 C   sP   t dd ttjtjtjgtt   | j	
  | j	  | jjdd d S )NF)fullT)r   )r   r   Zclose_open_fdsr   	__stdin__
__stdout__
__stderr__listr   r5   set_defaultr   r   r   r3   r(   r(   r)   r     s    
  

z_Process.runc                 C   s   | j   |   d S r1   )r   r   	terminater3   r(   r(   r)   r     s    
z_Process.stop)r$   r%   r&   r0   r   r   r   r(   r(   r   r)   r    s   	r  c                 K   s<   | ddstdkr(t| fddi|S t| fd|i|S )zReturn embedded clock service.

    Arguments:
        thread (bool): Run threaded instead of as a separate process.
            Uses :mod:`multiprocessing` by default, if available.
    threadFNri   r   )r   r  r   )r5   ri   r,   r(   r(   r)   r     s    r   )N)Er'   r   r   r   r   r   r   r!   r   calendarr   collectionsr   	functoolsr   	threadingr   r   Zbilliardr   Zbilliard.commonr   Zbilliard.contextr	   Zkombu.utils.functionalr
   r   Zkombu.utils.objectsr   r   r   r   r   
exceptionsr   Z	schedulesr   r   Zutils.importsr   r   Z	utils.logr   r   Z
utils.timer   r   __all__r    r$   r   r   r|   r   r   r   r~   r   r*   r   r`   rd   r   r   r   r   NotImplementedErrorr  r   r(   r(   r(   r)   <module>   sb   
 		  kF

