summaryrefslogtreecommitdiffhomepage
path: root/libs/apscheduler/job.py
blob: 445d9a868e51820730a8047b8143be39b0591fbf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
from inspect import ismethod, isclass
from uuid import uuid4

import six

from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import (
    ref_to_obj, obj_to_ref, datetime_repr, repr_escape, get_callable_name, check_callable_args,
    convert_to_datetime)

try:
    from collections.abc import Iterable, Mapping
except ImportError:
    from collections import Iterable, Mapping


class Job(object):
    """
    Contains the options given when scheduling callables and its current schedule and other state.
    This class should never be instantiated by the user.

    :var str id: the unique identifier of this job
    :var str name: the description of this job
    :var func: the callable to execute
    :var tuple|list args: positional arguments to the callable
    :var dict kwargs: keyword arguments to the callable
    :var bool coalesce: whether to only run the job once when several run times are due
    :var trigger: the trigger object that controls the schedule of this job
    :var str executor: the name of the executor that will run this job
    :var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to
        be late (``None`` means "allow the job to run no matter how late it is")
    :var int max_instances: the maximum number of concurrently executing instances allowed for this
        job
    :var datetime.datetime next_run_time: the next scheduled run time of this job

    .. note::
        The ``misfire_grace_time`` has some non-obvious effects on job execution. See the
        :ref:`missed-job-executions` section in the documentation for an in-depth explanation.
    """

    __slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref',
                 'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances',
                 'next_run_time', '__weakref__')

    def __init__(self, scheduler, id=None, **kwargs):
        super(Job, self).__init__()
        self._scheduler = scheduler
        self._jobstore_alias = None
        self._modify(id=id or uuid4().hex, **kwargs)

    def modify(self, **changes):
        """
        Makes the given changes to this job and saves it in the associated job store.

        Accepted keyword arguments are the same as the variables on this class.

        .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.modify_job`

        :return Job: this job instance

        """
        self._scheduler.modify_job(self.id, self._jobstore_alias, **changes)
        return self

    def reschedule(self, trigger, **trigger_args):
        """
        Shortcut for switching the trigger on this job.

        .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.reschedule_job`

        :return Job: this job instance

        """
        self._scheduler.reschedule_job(self.id, self._jobstore_alias, trigger, **trigger_args)
        return self

    def pause(self):
        """
        Temporarily suspend the execution of this job.

        .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`

        :return Job: this job instance

        """
        self._scheduler.pause_job(self.id, self._jobstore_alias)
        return self

    def resume(self):
        """
        Resume the schedule of this job if previously paused.

        .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.resume_job`

        :return Job: this job instance

        """
        self._scheduler.resume_job(self.id, self._jobstore_alias)
        return self

    def remove(self):
        """
        Unschedules this job and removes it from its associated job store.

        .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.remove_job`

        """
        self._scheduler.remove_job(self.id, self._jobstore_alias)

    @property
    def pending(self):
        """
        Returns ``True`` if the referenced job is still waiting to be added to its designated job
        store.

        """
        return self._jobstore_alias is None

    #
    # Private API
    #

    def _get_run_times(self, now):
        """
        Computes the scheduled run times between ``next_run_time`` and ``now`` (inclusive).

        :type now: datetime.datetime
        :rtype: list[datetime.datetime]

        """
        run_times = []
        next_run_time = self.next_run_time
        while next_run_time and next_run_time <= now:
            run_times.append(next_run_time)
            next_run_time = self.trigger.get_next_fire_time(next_run_time, now)

        return run_times

    def _modify(self, **changes):
        """
        Validates the changes to the Job and makes the modifications if and only if all of them
        validate.

        """
        approved = {}

        if 'id' in changes:
            value = changes.pop('id')
            if not isinstance(value, six.string_types):
                raise TypeError("id must be a nonempty string")
            if hasattr(self, 'id'):
                raise ValueError('The job ID may not be changed')
            approved['id'] = value

        if 'func' in changes or 'args' in changes or 'kwargs' in changes:
            func = changes.pop('func') if 'func' in changes else self.func
            args = changes.pop('args') if 'args' in changes else self.args
            kwargs = changes.pop('kwargs') if 'kwargs' in changes else self.kwargs

            if isinstance(func, six.string_types):
                func_ref = func
                func = ref_to_obj(func)
            elif callable(func):
                try:
                    func_ref = obj_to_ref(func)
                except ValueError:
                    # If this happens, this Job won't be serializable
                    func_ref = None
            else:
                raise TypeError('func must be a callable or a textual reference to one')

            if not hasattr(self, 'name') and changes.get('name', None) is None:
                changes['name'] = get_callable_name(func)

            if isinstance(args, six.string_types) or not isinstance(args, Iterable):
                raise TypeError('args must be a non-string iterable')
            if isinstance(kwargs, six.string_types) or not isinstance(kwargs, Mapping):
                raise TypeError('kwargs must be a dict-like object')

            check_callable_args(func, args, kwargs)

            approved['func'] = func
            approved['func_ref'] = func_ref
            approved['args'] = args
            approved['kwargs'] = kwargs

        if 'name' in changes:
            value = changes.pop('name')
            if not value or not isinstance(value, six.string_types):
                raise TypeError("name must be a nonempty string")
            approved['name'] = value

        if 'misfire_grace_time' in changes:
            value = changes.pop('misfire_grace_time')
            if value is not None and (not isinstance(value, six.integer_types) or value <= 0):
                raise TypeError('misfire_grace_time must be either None or a positive integer')
            approved['misfire_grace_time'] = value

        if 'coalesce' in changes:
            value = bool(changes.pop('coalesce'))
            approved['coalesce'] = value

        if 'max_instances' in changes:
            value = changes.pop('max_instances')
            if not isinstance(value, six.integer_types) or value <= 0:
                raise TypeError('max_instances must be a positive integer')
            approved['max_instances'] = value

        if 'trigger' in changes:
            trigger = changes.pop('trigger')
            if not isinstance(trigger, BaseTrigger):
                raise TypeError('Expected a trigger instance, got %s instead' %
                                trigger.__class__.__name__)

            approved['trigger'] = trigger

        if 'executor' in changes:
            value = changes.pop('executor')
            if not isinstance(value, six.string_types):
                raise TypeError('executor must be a string')
            approved['executor'] = value

        if 'next_run_time' in changes:
            value = changes.pop('next_run_time')
            approved['next_run_time'] = convert_to_datetime(value, self._scheduler.timezone,
                                                            'next_run_time')

        if changes:
            raise AttributeError('The following are not modifiable attributes of Job: %s' %
                                 ', '.join(changes))

        for key, value in six.iteritems(approved):
            setattr(self, key, value)

    def __getstate__(self):
        # Don't allow this Job to be serialized if the function reference could not be determined
        if not self.func_ref:
            raise ValueError(
                'This Job cannot be serialized since the reference to its callable (%r) could not '
                'be determined. Consider giving a textual reference (module:function name) '
                'instead.' % (self.func,))

        # Instance methods cannot survive serialization as-is, so store the "self" argument
        # explicitly
        func = self.func
        if ismethod(func) and not isclass(func.__self__) and obj_to_ref(func) == self.func_ref:
            args = (func.__self__,) + tuple(self.args)
        else:
            args = self.args

        return {
            'version': 1,
            'id': self.id,
            'func': self.func_ref,
            'trigger': self.trigger,
            'executor': self.executor,
            'args': args,
            'kwargs': self.kwargs,
            'name': self.name,
            'misfire_grace_time': self.misfire_grace_time,
            'coalesce': self.coalesce,
            'max_instances': self.max_instances,
            'next_run_time': self.next_run_time
        }

    def __setstate__(self, state):
        if state.get('version', 1) > 1:
            raise ValueError('Job has version %s, but only version 1 can be handled' %
                             state['version'])

        self.id = state['id']
        self.func_ref = state['func']
        self.func = ref_to_obj(self.func_ref)
        self.trigger = state['trigger']
        self.executor = state['executor']
        self.args = state['args']
        self.kwargs = state['kwargs']
        self.name = state['name']
        self.misfire_grace_time = state['misfire_grace_time']
        self.coalesce = state['coalesce']
        self.max_instances = state['max_instances']
        self.next_run_time = state['next_run_time']

    def __eq__(self, other):
        if isinstance(other, Job):
            return self.id == other.id
        return NotImplemented

    def __repr__(self):
        return '<Job (id=%s name=%s)>' % (repr_escape(self.id), repr_escape(self.name))

    def __str__(self):
        return repr_escape(self.__unicode__())

    def __unicode__(self):
        if hasattr(self, 'next_run_time'):
            status = ('next run at: ' + datetime_repr(self.next_run_time) if
                      self.next_run_time else 'paused')
        else:
            status = 'pending'

        return u'%s (trigger: %s, %s)' % (self.name, self.trigger, status)