summaryrefslogtreecommitdiffhomepage
path: root/libs/apscheduler
diff options
context:
space:
mode:
Diffstat (limited to 'libs/apscheduler')
-rw-r--r--libs/apscheduler/events.py2
-rw-r--r--libs/apscheduler/executors/asyncio.py5
-rw-r--r--libs/apscheduler/executors/base_py3.py2
-rw-r--r--libs/apscheduler/executors/pool.py27
-rw-r--r--libs/apscheduler/executors/tornado.py6
-rw-r--r--libs/apscheduler/job.py15
-rw-r--r--libs/apscheduler/jobstores/mongodb.py12
-rw-r--r--libs/apscheduler/jobstores/redis.py12
-rw-r--r--libs/apscheduler/jobstores/rethinkdb.py40
-rw-r--r--libs/apscheduler/jobstores/sqlalchemy.py6
-rw-r--r--libs/apscheduler/jobstores/zookeeper.py11
-rw-r--r--libs/apscheduler/schedulers/asyncio.py8
-rw-r--r--libs/apscheduler/schedulers/background.py4
-rw-r--r--libs/apscheduler/schedulers/base.py26
-rw-r--r--libs/apscheduler/schedulers/blocking.py4
-rw-r--r--libs/apscheduler/schedulers/qt.py11
-rw-r--r--libs/apscheduler/triggers/base.py19
-rw-r--r--libs/apscheduler/triggers/combining.py4
-rw-r--r--libs/apscheduler/triggers/cron/__init__.py4
-rw-r--r--libs/apscheduler/triggers/interval.py2
-rw-r--r--libs/apscheduler/util.py39
21 files changed, 166 insertions, 93 deletions
diff --git a/libs/apscheduler/events.py b/libs/apscheduler/events.py
index 890763eb6..016da03c5 100644
--- a/libs/apscheduler/events.py
+++ b/libs/apscheduler/events.py
@@ -3,7 +3,7 @@ __all__ = ('EVENT_SCHEDULER_STARTED', 'EVENT_SCHEDULER_SHUTDOWN', 'EVENT_SCHEDUL
'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED', 'EVENT_ALL_JOBS_REMOVED',
'EVENT_JOB_ADDED', 'EVENT_JOB_REMOVED', 'EVENT_JOB_MODIFIED', 'EVENT_JOB_EXECUTED',
'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', 'EVENT_JOB_SUBMITTED', 'EVENT_JOB_MAX_INSTANCES',
- 'SchedulerEvent', 'JobEvent', 'JobExecutionEvent')
+ 'SchedulerEvent', 'JobEvent', 'JobExecutionEvent', 'JobSubmissionEvent')
EVENT_SCHEDULER_STARTED = EVENT_SCHEDULER_START = 2 ** 0
diff --git a/libs/apscheduler/executors/asyncio.py b/libs/apscheduler/executors/asyncio.py
index 5139622d1..06fc7f968 100644
--- a/libs/apscheduler/executors/asyncio.py
+++ b/libs/apscheduler/executors/asyncio.py
@@ -3,12 +3,11 @@ from __future__ import absolute_import
import sys
from apscheduler.executors.base import BaseExecutor, run_job
+from apscheduler.util import iscoroutinefunction_partial
try:
- from asyncio import iscoroutinefunction
from apscheduler.executors.base_py3 import run_coroutine_job
except ImportError:
- from trollius import iscoroutinefunction
run_coroutine_job = None
@@ -46,7 +45,7 @@ class AsyncIOExecutor(BaseExecutor):
else:
self._run_job_success(job.id, events)
- if iscoroutinefunction(job.func):
+ if iscoroutinefunction_partial(job.func):
if run_coroutine_job is not None:
coro = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name)
f = self._eventloop.create_task(coro)
diff --git a/libs/apscheduler/executors/base_py3.py b/libs/apscheduler/executors/base_py3.py
index 61abd8424..7111d2aec 100644
--- a/libs/apscheduler/executors/base_py3.py
+++ b/libs/apscheduler/executors/base_py3.py
@@ -1,5 +1,6 @@
import logging
import sys
+import traceback
from datetime import datetime, timedelta
from traceback import format_tb
@@ -33,6 +34,7 @@ async def run_coroutine_job(job, jobstore_alias, run_times, logger_name):
events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
exception=exc, traceback=formatted_tb))
logger.exception('Job "%s" raised an exception', job)
+ traceback.clear_frames(tb)
else:
events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
retval=retval))
diff --git a/libs/apscheduler/executors/pool.py b/libs/apscheduler/executors/pool.py
index 2f4ef455c..c85896ec2 100644
--- a/libs/apscheduler/executors/pool.py
+++ b/libs/apscheduler/executors/pool.py
@@ -3,6 +3,11 @@ import concurrent.futures
from apscheduler.executors.base import BaseExecutor, run_job
+try:
+ from concurrent.futures.process import BrokenProcessPool
+except ImportError:
+ BrokenProcessPool = None
+
class BasePoolExecutor(BaseExecutor):
@abstractmethod
@@ -19,7 +24,13 @@ class BasePoolExecutor(BaseExecutor):
else:
self._run_job_success(job.id, f.result())
- f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
+ try:
+ f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
+ except BrokenProcessPool:
+ self._logger.warning('Process pool is broken; replacing pool with a fresh instance')
+ self._pool = self._pool.__class__(self._pool._max_workers)
+ f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
+
f.add_done_callback(callback)
def shutdown(self, wait=True):
@@ -33,10 +44,13 @@ class ThreadPoolExecutor(BasePoolExecutor):
Plugin alias: ``threadpool``
:param max_workers: the maximum number of spawned threads.
+ :param pool_kwargs: dict of keyword arguments to pass to the underlying
+ ThreadPoolExecutor constructor
"""
- def __init__(self, max_workers=10):
- pool = concurrent.futures.ThreadPoolExecutor(int(max_workers))
+ def __init__(self, max_workers=10, pool_kwargs=None):
+ pool_kwargs = pool_kwargs or {}
+ pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs)
super(ThreadPoolExecutor, self).__init__(pool)
@@ -47,8 +61,11 @@ class ProcessPoolExecutor(BasePoolExecutor):
Plugin alias: ``processpool``
:param max_workers: the maximum number of spawned processes.
+ :param pool_kwargs: dict of keyword arguments to pass to the underlying
+ ProcessPoolExecutor constructor
"""
- def __init__(self, max_workers=10):
- pool = concurrent.futures.ProcessPoolExecutor(int(max_workers))
+ def __init__(self, max_workers=10, pool_kwargs=None):
+ pool_kwargs = pool_kwargs or {}
+ pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
super(ProcessPoolExecutor, self).__init__(pool)
diff --git a/libs/apscheduler/executors/tornado.py b/libs/apscheduler/executors/tornado.py
index a4696ce79..3b97eec92 100644
--- a/libs/apscheduler/executors/tornado.py
+++ b/libs/apscheduler/executors/tornado.py
@@ -8,10 +8,10 @@ from tornado.gen import convert_yielded
from apscheduler.executors.base import BaseExecutor, run_job
try:
- from inspect import iscoroutinefunction
from apscheduler.executors.base_py3 import run_coroutine_job
+ from apscheduler.util import iscoroutinefunction_partial
except ImportError:
- def iscoroutinefunction(func):
+ def iscoroutinefunction_partial(func):
return False
@@ -44,7 +44,7 @@ class TornadoExecutor(BaseExecutor):
else:
self._run_job_success(job.id, events)
- if iscoroutinefunction(job.func):
+ if iscoroutinefunction_partial(job.func):
f = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name)
else:
f = self.executor.submit(run_job, job, job._jobstore_alias, run_times,
diff --git a/libs/apscheduler/job.py b/libs/apscheduler/job.py
index 4e24bec7a..445d9a868 100644
--- a/libs/apscheduler/job.py
+++ b/libs/apscheduler/job.py
@@ -1,4 +1,3 @@
-from collections import Iterable, Mapping
from inspect import ismethod, isclass
from uuid import uuid4
@@ -9,6 +8,11 @@ from apscheduler.util import (
ref_to_obj, obj_to_ref, datetime_repr, repr_escape, get_callable_name, check_callable_args,
convert_to_datetime)
+try:
+ from collections.abc import Iterable, Mapping
+except ImportError:
+ from collections import Iterable, Mapping
+
class Job(object):
"""
@@ -24,7 +28,7 @@ class Job(object):
:var trigger: the trigger object that controls the schedule of this job
:var str executor: the name of the executor that will run this job
:var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to
- be late
+ be late (``None`` means "allow the job to run no matter how late it is")
:var int max_instances: the maximum number of concurrently executing instances allowed for this
job
:var datetime.datetime next_run_time: the next scheduled run time of this job
@@ -36,7 +40,7 @@ class Job(object):
__slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref',
'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances',
- 'next_run_time')
+ 'next_run_time', '__weakref__')
def __init__(self, scheduler, id=None, **kwargs):
super(Job, self).__init__()
@@ -238,8 +242,9 @@ class Job(object):
# Instance methods cannot survive serialization as-is, so store the "self" argument
# explicitly
- if ismethod(self.func) and not isclass(self.func.__self__):
- args = (self.func.__self__,) + tuple(self.args)
+ func = self.func
+ if ismethod(func) and not isclass(func.__self__) and obj_to_ref(func) == self.func_ref:
+ args = (func.__self__,) + tuple(self.args)
else:
args = self.args
diff --git a/libs/apscheduler/jobstores/mongodb.py b/libs/apscheduler/jobstores/mongodb.py
index 7dbc3b127..ea3097ddc 100644
--- a/libs/apscheduler/jobstores/mongodb.py
+++ b/libs/apscheduler/jobstores/mongodb.py
@@ -54,7 +54,7 @@ class MongoDBJobStore(BaseJobStore):
def start(self, scheduler, alias):
super(MongoDBJobStore, self).start(scheduler, alias)
- self.collection.ensure_index('next_run_time', sparse=True)
+ self.collection.create_index('next_run_time', sparse=True)
@property
def connection(self):
@@ -83,7 +83,7 @@ class MongoDBJobStore(BaseJobStore):
def add_job(self, job):
try:
- self.collection.insert({
+ self.collection.insert_one({
'_id': job.id,
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
@@ -96,13 +96,13 @@ class MongoDBJobStore(BaseJobStore):
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
- result = self.collection.update({'_id': job.id}, {'$set': changes})
- if result and result['n'] == 0:
+ result = self.collection.update_one({'_id': job.id}, {'$set': changes})
+ if result and result.matched_count == 0:
raise JobLookupError(job.id)
def remove_job(self, job_id):
- result = self.collection.remove(job_id)
- if result and result['n'] == 0:
+ result = self.collection.delete_one({'_id': job_id})
+ if result and result.deleted_count == 0:
raise JobLookupError(job_id)
def remove_all_jobs(self):
diff --git a/libs/apscheduler/jobstores/redis.py b/libs/apscheduler/jobstores/redis.py
index 61f913e9e..5bb69d635 100644
--- a/libs/apscheduler/jobstores/redis.py
+++ b/libs/apscheduler/jobstores/redis.py
@@ -14,7 +14,7 @@ except ImportError: # pragma: nocover
import pickle
try:
- from redis import StrictRedis
+ from redis import Redis
except ImportError: # pragma: nocover
raise ImportError('RedisJobStore requires redis installed')
@@ -47,7 +47,7 @@ class RedisJobStore(BaseJobStore):
self.pickle_protocol = pickle_protocol
self.jobs_key = jobs_key
self.run_times_key = run_times_key
- self.redis = StrictRedis(db=int(db), **connect_args)
+ self.redis = Redis(db=int(db), **connect_args)
def lookup_job(self, job_id):
job_state = self.redis.hget(self.jobs_key, job_id)
@@ -81,7 +81,9 @@ class RedisJobStore(BaseJobStore):
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
self.pickle_protocol))
if job.next_run_time:
- pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
+ pipe.zadd(self.run_times_key,
+ {job.id: datetime_to_utc_timestamp(job.next_run_time)})
+
pipe.execute()
def update_job(self, job):
@@ -92,9 +94,11 @@ class RedisJobStore(BaseJobStore):
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
self.pickle_protocol))
if job.next_run_time:
- pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
+ pipe.zadd(self.run_times_key,
+ {job.id: datetime_to_utc_timestamp(job.next_run_time)})
else:
pipe.zrem(self.run_times_key, job.id)
+
pipe.execute()
def remove_job(self, job_id):
diff --git a/libs/apscheduler/jobstores/rethinkdb.py b/libs/apscheduler/jobstores/rethinkdb.py
index 2185c6cc1..d8a78cde3 100644
--- a/libs/apscheduler/jobstores/rethinkdb.py
+++ b/libs/apscheduler/jobstores/rethinkdb.py
@@ -10,7 +10,7 @@ except ImportError: # pragma: nocover
import pickle
try:
- import rethinkdb as r
+ from rethinkdb import RethinkDB
except ImportError: # pragma: nocover
raise ImportError('RethinkDBJobStore requires rethinkdb installed')
@@ -40,10 +40,12 @@ class RethinkDBJobStore(BaseJobStore):
raise ValueError('The "table" parameter must not be empty')
self.database = database
- self.table = table
+ self.table_name = table
+ self.table = None
self.client = client
self.pickle_protocol = pickle_protocol
self.connect_args = connect_args
+ self.r = RethinkDB()
self.conn = None
def start(self, scheduler, alias):
@@ -52,31 +54,31 @@ class RethinkDBJobStore(BaseJobStore):
if self.client:
self.conn = maybe_ref(self.client)
else:
- self.conn = r.connect(db=self.database, **self.connect_args)
+ self.conn = self.r.connect(db=self.database, **self.connect_args)
- if self.database not in r.db_list().run(self.conn):
- r.db_create(self.database).run(self.conn)
+ if self.database not in self.r.db_list().run(self.conn):
+ self.r.db_create(self.database).run(self.conn)
- if self.table not in r.table_list().run(self.conn):
- r.table_create(self.table).run(self.conn)
+ if self.table_name not in self.r.table_list().run(self.conn):
+ self.r.table_create(self.table_name).run(self.conn)
- if 'next_run_time' not in r.table(self.table).index_list().run(self.conn):
- r.table(self.table).index_create('next_run_time').run(self.conn)
+ if 'next_run_time' not in self.r.table(self.table_name).index_list().run(self.conn):
+ self.r.table(self.table_name).index_create('next_run_time').run(self.conn)
- self.table = r.db(self.database).table(self.table)
+ self.table = self.r.db(self.database).table(self.table_name)
def lookup_job(self, job_id):
results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn))
return self._reconstitute_job(results[0]['job_state']) if results else None
def get_due_jobs(self, now):
- return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
+ return self._get_jobs(self.r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
def get_next_run_time(self):
results = list(
self.table
- .filter(r.row['next_run_time'] != None) # flake8: noqa
- .order_by(r.asc('next_run_time'))
+ .filter(self.r.row['next_run_time'] != None) # noqa
+ .order_by(self.r.asc('next_run_time'))
.map(lambda x: x['next_run_time'])
.limit(1)
.run(self.conn)
@@ -92,7 +94,7 @@ class RethinkDBJobStore(BaseJobStore):
job_dict = {
'id': job.id,
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
- 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
+ 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
results = self.table.insert(job_dict).run(self.conn)
if results['errors'] > 0:
@@ -101,7 +103,7 @@ class RethinkDBJobStore(BaseJobStore):
def update_job(self, job):
changes = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
- 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
+ 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
results = self.table.get_all(job.id).update(changes).run(self.conn)
skipped = False in map(lambda x: results[x] == 0, results.keys())
@@ -130,20 +132,20 @@ class RethinkDBJobStore(BaseJobStore):
def _get_jobs(self, predicate=None):
jobs = []
failed_job_ids = []
- query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if
- predicate else self.table)
+ query = (self.table.filter(self.r.row['next_run_time'] != None).filter(predicate) # noqa
+ if predicate else self.table)
query = query.order_by('next_run_time', 'id').pluck('id', 'job_state')
for document in query.run(self.conn):
try:
jobs.append(self._reconstitute_job(document['job_state']))
- except:
+ except Exception:
self._logger.exception('Unable to restore job "%s" -- removing it', document['id'])
failed_job_ids.append(document['id'])
# Remove all the jobs we failed to restore
if failed_job_ids:
- r.expr(failed_job_ids).for_each(
+ self.r.expr(failed_job_ids).for_each(
lambda job_id: self.table.get_all(job_id).delete()).run(self.conn)
return jobs
diff --git a/libs/apscheduler/jobstores/sqlalchemy.py b/libs/apscheduler/jobstores/sqlalchemy.py
index beb27fb56..dcfd3e565 100644
--- a/libs/apscheduler/jobstores/sqlalchemy.py
+++ b/libs/apscheduler/jobstores/sqlalchemy.py
@@ -11,7 +11,7 @@ except ImportError: # pragma: nocover
try:
from sqlalchemy import (
- create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select)
+ create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select, and_)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import null
except ImportError: # pragma: nocover
@@ -106,7 +106,7 @@ class SQLAlchemyJobStore(BaseJobStore):
}).where(self.jobs_t.c.id == job.id)
result = self.engine.execute(update)
if result.rowcount == 0:
- raise JobLookupError(id)
+ raise JobLookupError(job.id)
def remove_job(self, job_id):
delete = self.jobs_t.delete().where(self.jobs_t.c.id == job_id)
@@ -134,7 +134,7 @@ class SQLAlchemyJobStore(BaseJobStore):
jobs = []
selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\
order_by(self.jobs_t.c.next_run_time)
- selectable = selectable.where(*conditions) if conditions else selectable
+ selectable = selectable.where(and_(*conditions)) if conditions else selectable
failed_job_ids = set()
for row in self.engine.execute(selectable):
try:
diff --git a/libs/apscheduler/jobstores/zookeeper.py b/libs/apscheduler/jobstores/zookeeper.py
index 2cca83e8f..525306936 100644
--- a/libs/apscheduler/jobstores/zookeeper.py
+++ b/libs/apscheduler/jobstores/zookeeper.py
@@ -1,6 +1,5 @@
from __future__ import absolute_import
-import os
from datetime import datetime
from pytz import utc
@@ -65,7 +64,7 @@ class ZooKeeperJobStore(BaseJobStore):
def lookup_job(self, job_id):
self._ensure_paths()
- node_path = os.path.join(self.path, job_id)
+ node_path = self.path + "/" + str(job_id)
try:
content, _ = self.client.get(node_path)
doc = pickle.loads(content)
@@ -92,7 +91,7 @@ class ZooKeeperJobStore(BaseJobStore):
def add_job(self, job):
self._ensure_paths()
- node_path = os.path.join(self.path, str(job.id))
+ node_path = self.path + "/" + str(job.id)
value = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
@@ -105,7 +104,7 @@ class ZooKeeperJobStore(BaseJobStore):
def update_job(self, job):
self._ensure_paths()
- node_path = os.path.join(self.path, str(job.id))
+ node_path = self.path + "/" + str(job.id)
changes = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
@@ -118,7 +117,7 @@ class ZooKeeperJobStore(BaseJobStore):
def remove_job(self, job_id):
self._ensure_paths()
- node_path = os.path.join(self.path, str(job_id))
+ node_path = self.path + "/" + str(job_id)
try:
self.client.delete(node_path)
except NoNodeError:
@@ -151,7 +150,7 @@ class ZooKeeperJobStore(BaseJobStore):
all_ids = self.client.get_children(self.path)
for node_name in all_ids:
try:
- node_path = os.path.join(self.path, node_name)
+ node_path = self.path + "/" + node_name
content, _ = self.client.get(node_path)
doc = pickle.loads(content)
job_def = {
diff --git a/libs/apscheduler/schedulers/asyncio.py b/libs/apscheduler/schedulers/asyncio.py
index 289ef13fe..70ebedeb6 100644
--- a/libs/apscheduler/schedulers/asyncio.py
+++ b/libs/apscheduler/schedulers/asyncio.py
@@ -38,13 +38,19 @@ class AsyncIOScheduler(BaseScheduler):
_eventloop = None
_timeout = None
+ def start(self, paused=False):
+ if not self._eventloop:
+ self._eventloop = asyncio.get_event_loop()
+
+ super(AsyncIOScheduler, self).start(paused)
+
@run_in_event_loop
def shutdown(self, wait=True):
super(AsyncIOScheduler, self).shutdown(wait)
self._stop_timer()
def _configure(self, config):
- self._eventloop = maybe_ref(config.pop('event_loop', None)) or asyncio.get_event_loop()
+ self._eventloop = maybe_ref(config.pop('event_loop', None))
super(AsyncIOScheduler, self)._configure(config)
def _start_timer(self, wait_seconds):
diff --git a/libs/apscheduler/schedulers/background.py b/libs/apscheduler/schedulers/background.py
index 03f29822a..bb8f77da2 100644
--- a/libs/apscheduler/schedulers/background.py
+++ b/libs/apscheduler/schedulers/background.py
@@ -29,7 +29,9 @@ class BackgroundScheduler(BlockingScheduler):
super(BackgroundScheduler, self)._configure(config)
def start(self, *args, **kwargs):
- self._event = Event()
+ if self._event is None or self._event.is_set():
+ self._event = Event()
+
BaseScheduler.start(self, *args, **kwargs)
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.daemon = self._daemon
diff --git a/libs/apscheduler/schedulers/base.py b/libs/apscheduler/schedulers/base.py
index 8f910a653..3dfb74377 100644
--- a/libs/apscheduler/schedulers/base.py
+++ b/libs/apscheduler/schedulers/base.py
@@ -1,7 +1,6 @@
from __future__ import print_function
from abc import ABCMeta, abstractmethod
-from collections import MutableMapping
from threading import RLock
from datetime import datetime, timedelta
from logging import getLogger
@@ -27,6 +26,11 @@ from apscheduler.events import (
EVENT_JOB_ADDED, EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, EVENT_ALL_JOBS_REMOVED,
EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_RESUMED, EVENT_SCHEDULER_PAUSED)
+try:
+ from collections.abc import MutableMapping
+except ImportError:
+ from collections import MutableMapping
+
#: constant indicating a scheduler's stopped state
STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
@@ -82,6 +86,11 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self.state = STATE_STOPPED
self.configure(gconfig, **options)
+ def __getstate__(self):
+ raise TypeError("Schedulers cannot be serialized. Ensure that you are not passing a "
+ "scheduler instance as an argument to a job, or scheduling an instance "
+ "method where the instance contains a scheduler as an attribute.")
+
def configure(self, gconfig={}, prefix='apscheduler.', **options):
"""
Reconfigures the scheduler with the given options.
@@ -398,7 +407,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:param str|unicode id: explicit identifier for the job (for modifying it later)
:param str|unicode name: textual description of the job
:param int misfire_grace_time: seconds after the designated runtime that the job is still
- allowed to be run
+ allowed to be run (or ``None`` to allow the job to run no matter how late it is)
:param bool coalesce: run once instead of many times if the scheduler determines that the
job should be run more than once in succession
:param int max_instances: maximum number of concurrently running instances allowed for this
@@ -594,14 +603,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
"""
jobstore_alias = None
with self._jobstores_lock:
+ # Check if the job is among the pending jobs
if self.state == STATE_STOPPED:
- # Check if the job is among the pending jobs
- if self.state == STATE_STOPPED:
- for i, (job, alias, replace_existing) in enumerate(self._pending_jobs):
- if job.id == job_id and jobstore in (None, alias):
- del self._pending_jobs[i]
- jobstore_alias = alias
- break
+ for i, (job, alias, replace_existing) in enumerate(self._pending_jobs):
+ if job.id == job_id and jobstore in (None, alias):
+ del self._pending_jobs[i]
+ jobstore_alias = alias
+ break
else:
# Otherwise, try to remove it from each store until it succeeds or we run out of
# stores to check
diff --git a/libs/apscheduler/schedulers/blocking.py b/libs/apscheduler/schedulers/blocking.py
index e61715757..4ecc9f6f1 100644
--- a/libs/apscheduler/schedulers/blocking.py
+++ b/libs/apscheduler/schedulers/blocking.py
@@ -14,7 +14,9 @@ class BlockingScheduler(BaseScheduler):
_event = None
def start(self, *args, **kwargs):
- self._event = Event()
+ if self._event is None or self._event.is_set():
+ self._event = Event()
+
super(BlockingScheduler, self).start(*args, **kwargs)
self._main_loop()
diff --git a/libs/apscheduler/schedulers/qt.py b/libs/apscheduler/schedulers/qt.py
index 6ee5d332a..dda77d796 100644
--- a/libs/apscheduler/schedulers/qt.py
+++ b/libs/apscheduler/schedulers/qt.py
@@ -9,9 +9,13 @@ except (ImportError, RuntimeError): # pragma: nocover
from PyQt4.QtCore import QObject, QTimer
except ImportError:
try:
- from PySide.QtCore import QObject, QTimer # flake8: noqa
+ from PySide2.QtCore import QObject, QTimer # noqa
except ImportError:
- raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed')
+ try:
+ from PySide.QtCore import QObject, QTimer # noqa
+ except ImportError:
+ raise ImportError('QtScheduler requires either PyQt5, PyQt4, PySide2 '
+ 'or PySide installed')
class QtScheduler(BaseScheduler):
@@ -26,7 +30,8 @@ class QtScheduler(BaseScheduler):
def _start_timer(self, wait_seconds):
self._stop_timer()
if wait_seconds is not None:
- self._timer = QTimer.singleShot(wait_seconds * 1000, self._process_jobs)
+ wait_time = min(wait_seconds * 1000, 2147483647)
+ self._timer = QTimer.singleShot(wait_time, self._process_jobs)
def _stop_timer(self):
if self._timer:
diff --git a/libs/apscheduler/triggers/base.py b/libs/apscheduler/triggers/base.py
index ce2526a88..55d010dba 100644
--- a/libs/apscheduler/triggers/base.py
+++ b/libs/apscheduler/triggers/base.py
@@ -22,27 +22,16 @@ class BaseTrigger(six.with_metaclass(ABCMeta)):
def _apply_jitter(self, next_fire_time, jitter, now):
"""
- Randomize ``next_fire_time`` by adding or subtracting a random value (the jitter). If the
- resulting datetime is in the past, returns the initial ``next_fire_time`` without jitter.
-
- ``next_fire_time - jitter <= result <= next_fire_time + jitter``
+ Randomize ``next_fire_time`` by adding a random value (the jitter).
:param datetime.datetime|None next_fire_time: next fire time without jitter applied. If
``None``, returns ``None``.
- :param int|None jitter: maximum number of seconds to add or subtract to
- ``next_fire_time``. If ``None`` or ``0``, returns ``next_fire_time``
+ :param int|None jitter: maximum number of seconds to add to ``next_fire_time``
+ (if ``None`` or ``0``, returns ``next_fire_time``)
:param datetime.datetime now: current datetime
:return datetime.datetime|None: next fire time with a jitter.
"""
if next_fire_time is None or not jitter:
return next_fire_time
- next_fire_time_with_jitter = next_fire_time + timedelta(
- seconds=random.uniform(-jitter, jitter))
-
- if next_fire_time_with_jitter < now:
- # Next fire time with jitter is in the past.
- # Ignore jitter to avoid false misfire.
- return next_fire_time
-
- return next_fire_time_with_jitter
+ return next_fire_time + timedelta(seconds=random.uniform(0, jitter))
diff --git a/libs/apscheduler/triggers/combining.py b/libs/apscheduler/triggers/combining.py
index 64f83011a..bb9000618 100644
--- a/libs/apscheduler/triggers/combining.py
+++ b/libs/apscheduler/triggers/combining.py
@@ -45,7 +45,7 @@ class AndTrigger(BaseCombiningTrigger):
Trigger alias: ``and``
:param list triggers: triggers to combine
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ :param int|None jitter: delay the job execution by ``jitter`` seconds at most
"""
__slots__ = ()
@@ -73,7 +73,7 @@ class OrTrigger(BaseCombiningTrigger):
Trigger alias: ``or``
:param list triggers: triggers to combine
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ :param int|None jitter: delay the job execution by ``jitter`` seconds at most
.. note:: Triggers that depends on the previous fire time, such as the interval trigger, may
seem to behave strangely since they are always passed the previous fire time produced by
diff --git a/libs/apscheduler/triggers/cron/__init__.py b/libs/apscheduler/triggers/cron/__init__.py
index ce675dd93..fec6e3b5c 100644
--- a/libs/apscheduler/triggers/cron/__init__.py
+++ b/libs/apscheduler/triggers/cron/__init__.py
@@ -16,7 +16,7 @@ class CronTrigger(BaseTrigger):
:param int|str year: 4-digit year
:param int|str month: month (1-12)
- :param int|str day: day of the (1-31)
+ :param int|str day: day of month (1-31)
:param int|str week: ISO week (1-53)
:param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
:param int|str hour: hour (0-23)
@@ -26,7 +26,7 @@ class CronTrigger(BaseTrigger):
:param datetime|str end_date: latest possible date/time to trigger on (inclusive)
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults
to scheduler timezone)
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ :param int|None jitter: delay the job execution by ``jitter`` seconds at most
.. note:: The first weekday is always **monday**.
"""
diff --git a/libs/apscheduler/triggers/interval.py b/libs/apscheduler/triggers/interval.py
index 831ba3830..61094aa13 100644
--- a/libs/apscheduler/triggers/interval.py
+++ b/libs/apscheduler/triggers/interval.py
@@ -20,7 +20,7 @@ class IntervalTrigger(BaseTrigger):
:param datetime|str start_date: starting point for the interval calculation
:param datetime|str end_date: latest possible date/time to trigger on
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ :param int|None jitter: delay the job execution by ``jitter`` seconds at most
"""
__slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter'
diff --git a/libs/apscheduler/util.py b/libs/apscheduler/util.py
index 3c48e550b..1e643bffa 100644
--- a/libs/apscheduler/util.py
+++ b/libs/apscheduler/util.py
@@ -5,8 +5,9 @@ from __future__ import division
from datetime import date, datetime, time, timedelta, tzinfo
from calendar import timegm
from functools import partial
-from inspect import isclass
+from inspect import isclass, ismethod
import re
+import sys
from pytz import timezone, utc, FixedOffset
import six
@@ -21,6 +22,15 @@ try:
except ImportError:
TIMEOUT_MAX = 4294967 # Maximum value accepted by Event.wait() on Windows
+try:
+ from asyncio import iscoroutinefunction
+except ImportError:
+ try:
+ from trollius import iscoroutinefunction
+ except ImportError:
+ def iscoroutinefunction(func):
+ return False
+
__all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp',
'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'get_callable_name',
'obj_to_ref', 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args',
@@ -263,7 +273,18 @@ def obj_to_ref(obj):
if '<locals>' in name:
raise ValueError('Cannot create a reference to a nested function')
- return '%s:%s' % (obj.__module__, name)
+ if ismethod(obj):
+ if hasattr(obj, 'im_self') and obj.im_self:
+ # bound method
+ module = obj.im_self.__module__
+ elif hasattr(obj, 'im_class') and obj.im_class:
+ # unbound method
+ module = obj.im_class.__module__
+ else:
+ module = obj.__module__
+ else:
+ module = obj.__module__
+ return '%s:%s' % (module, name)
def ref_to_obj(ref):
@@ -332,7 +353,10 @@ def check_callable_args(func, args, kwargs):
has_varargs = has_var_kwargs = False
try:
- sig = signature(func)
+ if sys.version_info >= (3, 5):
+ sig = signature(func, follow_wrapped=False)
+ else:
+ sig = signature(func)
except ValueError:
# signature() doesn't work against every kind of callable
return
@@ -398,3 +422,12 @@ def check_callable_args(func, args, kwargs):
raise ValueError(
'The target callable does not accept the following keyword arguments: %s' %
', '.join(unmatched_kwargs))
+
+
+def iscoroutinefunction_partial(f):
+ while isinstance(f, partial):
+ f = f.func
+
+ # The asyncio version of iscoroutinefunction includes testing for @coroutine
+ # decorations vs. the inspect version which does not.
+ return iscoroutinefunction(f)