1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
from datetime import timedelta, datetime
from math import ceil
from tzlocal import get_localzone
from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import convert_to_datetime, timedelta_seconds, datetime_repr, astimezone
class IntervalTrigger(BaseTrigger):
"""
Triggers on specified intervals, starting on ``start_date`` if specified, ``datetime.now()`` +
interval otherwise.
:param int weeks: number of weeks to wait
:param int days: number of days to wait
:param int hours: number of hours to wait
:param int minutes: number of minutes to wait
:param int seconds: number of seconds to wait
:param datetime|str start_date: starting point for the interval calculation
:param datetime|str end_date: latest possible date/time to trigger on
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations
:param int|None jitter: delay the job execution by ``jitter`` seconds at most
"""
__slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter'
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
end_date=None, timezone=None, jitter=None):
self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
seconds=seconds)
self.interval_length = timedelta_seconds(self.interval)
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if timezone:
self.timezone = astimezone(timezone)
elif isinstance(start_date, datetime) and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif isinstance(end_date, datetime) and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
start_date = start_date or (datetime.now(self.timezone) + self.interval)
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
self.jitter = jitter
def get_next_fire_time(self, previous_fire_time, now):
if previous_fire_time:
next_fire_time = previous_fire_time + self.interval
elif self.start_date > now:
next_fire_time = self.start_date
else:
timediff_seconds = timedelta_seconds(now - self.start_date)
next_interval_num = int(ceil(timediff_seconds / self.interval_length))
next_fire_time = self.start_date + self.interval * next_interval_num
if self.jitter is not None:
next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now)
if not self.end_date or next_fire_time <= self.end_date:
return self.timezone.normalize(next_fire_time)
def __getstate__(self):
return {
'version': 2,
'timezone': self.timezone,
'start_date': self.start_date,
'end_date': self.end_date,
'interval': self.interval,
'jitter': self.jitter,
}
def __setstate__(self, state):
# This is for compatibility with APScheduler 3.0.x
if isinstance(state, tuple):
state = state[1]
if state.get('version', 1) > 2:
raise ValueError(
'Got serialized data for version %s of %s, but only versions up to 2 can be '
'handled' % (state['version'], self.__class__.__name__))
self.timezone = state['timezone']
self.start_date = state['start_date']
self.end_date = state['end_date']
self.interval = state['interval']
self.interval_length = timedelta_seconds(self.interval)
self.jitter = state.get('jitter')
def __str__(self):
return 'interval[%s]' % str(self.interval)
def __repr__(self):
options = ['interval=%r' % self.interval, 'start_date=%r' % datetime_repr(self.start_date)]
if self.end_date:
options.append("end_date=%r" % datetime_repr(self.end_date))
if self.jitter:
options.append('jitter=%s' % self.jitter)
return "<%s (%s, timezone='%s')>" % (
self.__class__.__name__, ', '.join(options), self.timezone)
|