diff options
Diffstat (limited to 'libs/apscheduler')
21 files changed, 166 insertions, 93 deletions
diff --git a/libs/apscheduler/events.py b/libs/apscheduler/events.py index 890763eb6..016da03c5 100644 --- a/libs/apscheduler/events.py +++ b/libs/apscheduler/events.py @@ -3,7 +3,7 @@ __all__ = ('EVENT_SCHEDULER_STARTED', 'EVENT_SCHEDULER_SHUTDOWN', 'EVENT_SCHEDUL 'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED', 'EVENT_ALL_JOBS_REMOVED', 'EVENT_JOB_ADDED', 'EVENT_JOB_REMOVED', 'EVENT_JOB_MODIFIED', 'EVENT_JOB_EXECUTED', 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', 'EVENT_JOB_SUBMITTED', 'EVENT_JOB_MAX_INSTANCES', - 'SchedulerEvent', 'JobEvent', 'JobExecutionEvent') + 'SchedulerEvent', 'JobEvent', 'JobExecutionEvent', 'JobSubmissionEvent') EVENT_SCHEDULER_STARTED = EVENT_SCHEDULER_START = 2 ** 0 diff --git a/libs/apscheduler/executors/asyncio.py b/libs/apscheduler/executors/asyncio.py index 5139622d1..06fc7f968 100644 --- a/libs/apscheduler/executors/asyncio.py +++ b/libs/apscheduler/executors/asyncio.py @@ -3,12 +3,11 @@ from __future__ import absolute_import import sys from apscheduler.executors.base import BaseExecutor, run_job +from apscheduler.util import iscoroutinefunction_partial try: - from asyncio import iscoroutinefunction from apscheduler.executors.base_py3 import run_coroutine_job except ImportError: - from trollius import iscoroutinefunction run_coroutine_job = None @@ -46,7 +45,7 @@ class AsyncIOExecutor(BaseExecutor): else: self._run_job_success(job.id, events) - if iscoroutinefunction(job.func): + if iscoroutinefunction_partial(job.func): if run_coroutine_job is not None: coro = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name) f = self._eventloop.create_task(coro) diff --git a/libs/apscheduler/executors/base_py3.py b/libs/apscheduler/executors/base_py3.py index 61abd8424..7111d2aec 100644 --- a/libs/apscheduler/executors/base_py3.py +++ b/libs/apscheduler/executors/base_py3.py @@ -1,5 +1,6 @@ import logging import sys +import traceback from datetime import datetime, timedelta from traceback import format_tb @@ -33,6 +34,7 @@ async def run_coroutine_job(job, jobstore_alias, run_times, logger_name): events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, exception=exc, traceback=formatted_tb)) logger.exception('Job "%s" raised an exception', job) + traceback.clear_frames(tb) else: events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, retval=retval)) diff --git a/libs/apscheduler/executors/pool.py b/libs/apscheduler/executors/pool.py index 2f4ef455c..c85896ec2 100644 --- a/libs/apscheduler/executors/pool.py +++ b/libs/apscheduler/executors/pool.py @@ -3,6 +3,11 @@ import concurrent.futures from apscheduler.executors.base import BaseExecutor, run_job +try: + from concurrent.futures.process import BrokenProcessPool +except ImportError: + BrokenProcessPool = None + class BasePoolExecutor(BaseExecutor): @abstractmethod @@ -19,7 +24,13 @@ class BasePoolExecutor(BaseExecutor): else: self._run_job_success(job.id, f.result()) - f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + try: + f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + except BrokenProcessPool: + self._logger.warning('Process pool is broken; replacing pool with a fresh instance') + self._pool = self._pool.__class__(self._pool._max_workers) + f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + f.add_done_callback(callback) def shutdown(self, wait=True): @@ -33,10 +44,13 @@ class ThreadPoolExecutor(BasePoolExecutor): Plugin alias: ``threadpool`` :param max_workers: the maximum number of spawned threads. + :param pool_kwargs: dict of keyword arguments to pass to the underlying + ThreadPoolExecutor constructor """ - def __init__(self, max_workers=10): - pool = concurrent.futures.ThreadPoolExecutor(int(max_workers)) + def __init__(self, max_workers=10, pool_kwargs=None): + pool_kwargs = pool_kwargs or {} + pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs) super(ThreadPoolExecutor, self).__init__(pool) @@ -47,8 +61,11 @@ class ProcessPoolExecutor(BasePoolExecutor): Plugin alias: ``processpool`` :param max_workers: the maximum number of spawned processes. + :param pool_kwargs: dict of keyword arguments to pass to the underlying + ProcessPoolExecutor constructor """ - def __init__(self, max_workers=10): - pool = concurrent.futures.ProcessPoolExecutor(int(max_workers)) + def __init__(self, max_workers=10, pool_kwargs=None): + pool_kwargs = pool_kwargs or {} + pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs) super(ProcessPoolExecutor, self).__init__(pool) diff --git a/libs/apscheduler/executors/tornado.py b/libs/apscheduler/executors/tornado.py index a4696ce79..3b97eec92 100644 --- a/libs/apscheduler/executors/tornado.py +++ b/libs/apscheduler/executors/tornado.py @@ -8,10 +8,10 @@ from tornado.gen import convert_yielded from apscheduler.executors.base import BaseExecutor, run_job try: - from inspect import iscoroutinefunction from apscheduler.executors.base_py3 import run_coroutine_job + from apscheduler.util import iscoroutinefunction_partial except ImportError: - def iscoroutinefunction(func): + def iscoroutinefunction_partial(func): return False @@ -44,7 +44,7 @@ class TornadoExecutor(BaseExecutor): else: self._run_job_success(job.id, events) - if iscoroutinefunction(job.func): + if iscoroutinefunction_partial(job.func): f = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name) else: f = self.executor.submit(run_job, job, job._jobstore_alias, run_times, diff --git a/libs/apscheduler/job.py b/libs/apscheduler/job.py index 4e24bec7a..445d9a868 100644 --- a/libs/apscheduler/job.py +++ b/libs/apscheduler/job.py @@ -1,4 +1,3 @@ -from collections import Iterable, Mapping from inspect import ismethod, isclass from uuid import uuid4 @@ -9,6 +8,11 @@ from apscheduler.util import ( ref_to_obj, obj_to_ref, datetime_repr, repr_escape, get_callable_name, check_callable_args, convert_to_datetime) +try: + from collections.abc import Iterable, Mapping +except ImportError: + from collections import Iterable, Mapping + class Job(object): """ @@ -24,7 +28,7 @@ class Job(object): :var trigger: the trigger object that controls the schedule of this job :var str executor: the name of the executor that will run this job :var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to - be late + be late (``None`` means "allow the job to run no matter how late it is") :var int max_instances: the maximum number of concurrently executing instances allowed for this job :var datetime.datetime next_run_time: the next scheduled run time of this job @@ -36,7 +40,7 @@ class Job(object): __slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref', 'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances', - 'next_run_time') + 'next_run_time', '__weakref__') def __init__(self, scheduler, id=None, **kwargs): super(Job, self).__init__() @@ -238,8 +242,9 @@ class Job(object): # Instance methods cannot survive serialization as-is, so store the "self" argument # explicitly - if ismethod(self.func) and not isclass(self.func.__self__): - args = (self.func.__self__,) + tuple(self.args) + func = self.func + if ismethod(func) and not isclass(func.__self__) and obj_to_ref(func) == self.func_ref: + args = (func.__self__,) + tuple(self.args) else: args = self.args diff --git a/libs/apscheduler/jobstores/mongodb.py b/libs/apscheduler/jobstores/mongodb.py index 7dbc3b127..ea3097ddc 100644 --- a/libs/apscheduler/jobstores/mongodb.py +++ b/libs/apscheduler/jobstores/mongodb.py @@ -54,7 +54,7 @@ class MongoDBJobStore(BaseJobStore): def start(self, scheduler, alias): super(MongoDBJobStore, self).start(scheduler, alias) - self.collection.ensure_index('next_run_time', sparse=True) + self.collection.create_index('next_run_time', sparse=True) @property def connection(self): @@ -83,7 +83,7 @@ class MongoDBJobStore(BaseJobStore): def add_job(self, job): try: - self.collection.insert({ + self.collection.insert_one({ '_id': job.id, 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) @@ -96,13 +96,13 @@ class MongoDBJobStore(BaseJobStore): 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } - result = self.collection.update({'_id': job.id}, {'$set': changes}) - if result and result['n'] == 0: + result = self.collection.update_one({'_id': job.id}, {'$set': changes}) + if result and result.matched_count == 0: raise JobLookupError(job.id) def remove_job(self, job_id): - result = self.collection.remove(job_id) - if result and result['n'] == 0: + result = self.collection.delete_one({'_id': job_id}) + if result and result.deleted_count == 0: raise JobLookupError(job_id) def remove_all_jobs(self): diff --git a/libs/apscheduler/jobstores/redis.py b/libs/apscheduler/jobstores/redis.py index 61f913e9e..5bb69d635 100644 --- a/libs/apscheduler/jobstores/redis.py +++ b/libs/apscheduler/jobstores/redis.py @@ -14,7 +14,7 @@ except ImportError: # pragma: nocover import pickle try: - from redis import StrictRedis + from redis import Redis except ImportError: # pragma: nocover raise ImportError('RedisJobStore requires redis installed') @@ -47,7 +47,7 @@ class RedisJobStore(BaseJobStore): self.pickle_protocol = pickle_protocol self.jobs_key = jobs_key self.run_times_key = run_times_key - self.redis = StrictRedis(db=int(db), **connect_args) + self.redis = Redis(db=int(db), **connect_args) def lookup_job(self, job_id): job_state = self.redis.hget(self.jobs_key, job_id) @@ -81,7 +81,9 @@ class RedisJobStore(BaseJobStore): pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), self.pickle_protocol)) if job.next_run_time: - pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id) + pipe.zadd(self.run_times_key, + {job.id: datetime_to_utc_timestamp(job.next_run_time)}) + pipe.execute() def update_job(self, job): @@ -92,9 +94,11 @@ class RedisJobStore(BaseJobStore): pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), self.pickle_protocol)) if job.next_run_time: - pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id) + pipe.zadd(self.run_times_key, + {job.id: datetime_to_utc_timestamp(job.next_run_time)}) else: pipe.zrem(self.run_times_key, job.id) + pipe.execute() def remove_job(self, job_id): diff --git a/libs/apscheduler/jobstores/rethinkdb.py b/libs/apscheduler/jobstores/rethinkdb.py index 2185c6cc1..d8a78cde3 100644 --- a/libs/apscheduler/jobstores/rethinkdb.py +++ b/libs/apscheduler/jobstores/rethinkdb.py @@ -10,7 +10,7 @@ except ImportError: # pragma: nocover import pickle try: - import rethinkdb as r + from rethinkdb import RethinkDB except ImportError: # pragma: nocover raise ImportError('RethinkDBJobStore requires rethinkdb installed') @@ -40,10 +40,12 @@ class RethinkDBJobStore(BaseJobStore): raise ValueError('The "table" parameter must not be empty') self.database = database - self.table = table + self.table_name = table + self.table = None self.client = client self.pickle_protocol = pickle_protocol self.connect_args = connect_args + self.r = RethinkDB() self.conn = None def start(self, scheduler, alias): @@ -52,31 +54,31 @@ class RethinkDBJobStore(BaseJobStore): if self.client: self.conn = maybe_ref(self.client) else: - self.conn = r.connect(db=self.database, **self.connect_args) + self.conn = self.r.connect(db=self.database, **self.connect_args) - if self.database not in r.db_list().run(self.conn): - r.db_create(self.database).run(self.conn) + if self.database not in self.r.db_list().run(self.conn): + self.r.db_create(self.database).run(self.conn) - if self.table not in r.table_list().run(self.conn): - r.table_create(self.table).run(self.conn) + if self.table_name not in self.r.table_list().run(self.conn): + self.r.table_create(self.table_name).run(self.conn) - if 'next_run_time' not in r.table(self.table).index_list().run(self.conn): - r.table(self.table).index_create('next_run_time').run(self.conn) + if 'next_run_time' not in self.r.table(self.table_name).index_list().run(self.conn): + self.r.table(self.table_name).index_create('next_run_time').run(self.conn) - self.table = r.db(self.database).table(self.table) + self.table = self.r.db(self.database).table(self.table_name) def lookup_job(self, job_id): results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn)) return self._reconstitute_job(results[0]['job_state']) if results else None def get_due_jobs(self, now): - return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now)) + return self._get_jobs(self.r.row['next_run_time'] <= datetime_to_utc_timestamp(now)) def get_next_run_time(self): results = list( self.table - .filter(r.row['next_run_time'] != None) # flake8: noqa - .order_by(r.asc('next_run_time')) + .filter(self.r.row['next_run_time'] != None) # noqa + .order_by(self.r.asc('next_run_time')) .map(lambda x: x['next_run_time']) .limit(1) .run(self.conn) @@ -92,7 +94,7 @@ class RethinkDBJobStore(BaseJobStore): job_dict = { 'id': job.id, 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), - 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) + 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } results = self.table.insert(job_dict).run(self.conn) if results['errors'] > 0: @@ -101,7 +103,7 @@ class RethinkDBJobStore(BaseJobStore): def update_job(self, job): changes = { 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), - 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) + 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } results = self.table.get_all(job.id).update(changes).run(self.conn) skipped = False in map(lambda x: results[x] == 0, results.keys()) @@ -130,20 +132,20 @@ class RethinkDBJobStore(BaseJobStore): def _get_jobs(self, predicate=None): jobs = [] failed_job_ids = [] - query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if - predicate else self.table) + query = (self.table.filter(self.r.row['next_run_time'] != None).filter(predicate) # noqa + if predicate else self.table) query = query.order_by('next_run_time', 'id').pluck('id', 'job_state') for document in query.run(self.conn): try: jobs.append(self._reconstitute_job(document['job_state'])) - except: + except Exception: self._logger.exception('Unable to restore job "%s" -- removing it', document['id']) failed_job_ids.append(document['id']) # Remove all the jobs we failed to restore if failed_job_ids: - r.expr(failed_job_ids).for_each( + self.r.expr(failed_job_ids).for_each( lambda job_id: self.table.get_all(job_id).delete()).run(self.conn) return jobs diff --git a/libs/apscheduler/jobstores/sqlalchemy.py b/libs/apscheduler/jobstores/sqlalchemy.py index beb27fb56..dcfd3e565 100644 --- a/libs/apscheduler/jobstores/sqlalchemy.py +++ b/libs/apscheduler/jobstores/sqlalchemy.py @@ -11,7 +11,7 @@ except ImportError: # pragma: nocover try: from sqlalchemy import ( - create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select) + create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select, and_) from sqlalchemy.exc import IntegrityError from sqlalchemy.sql.expression import null except ImportError: # pragma: nocover @@ -106,7 +106,7 @@ class SQLAlchemyJobStore(BaseJobStore): }).where(self.jobs_t.c.id == job.id) result = self.engine.execute(update) if result.rowcount == 0: - raise JobLookupError(id) + raise JobLookupError(job.id) def remove_job(self, job_id): delete = self.jobs_t.delete().where(self.jobs_t.c.id == job_id) @@ -134,7 +134,7 @@ class SQLAlchemyJobStore(BaseJobStore): jobs = [] selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\ order_by(self.jobs_t.c.next_run_time) - selectable = selectable.where(*conditions) if conditions else selectable + selectable = selectable.where(and_(*conditions)) if conditions else selectable failed_job_ids = set() for row in self.engine.execute(selectable): try: diff --git a/libs/apscheduler/jobstores/zookeeper.py b/libs/apscheduler/jobstores/zookeeper.py index 2cca83e8f..525306936 100644 --- a/libs/apscheduler/jobstores/zookeeper.py +++ b/libs/apscheduler/jobstores/zookeeper.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -import os from datetime import datetime from pytz import utc @@ -65,7 +64,7 @@ class ZooKeeperJobStore(BaseJobStore): def lookup_job(self, job_id): self._ensure_paths() - node_path = os.path.join(self.path, job_id) + node_path = self.path + "/" + str(job_id) try: content, _ = self.client.get(node_path) doc = pickle.loads(content) @@ -92,7 +91,7 @@ class ZooKeeperJobStore(BaseJobStore): def add_job(self, job): self._ensure_paths() - node_path = os.path.join(self.path, str(job.id)) + node_path = self.path + "/" + str(job.id) value = { 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': job.__getstate__() @@ -105,7 +104,7 @@ class ZooKeeperJobStore(BaseJobStore): def update_job(self, job): self._ensure_paths() - node_path = os.path.join(self.path, str(job.id)) + node_path = self.path + "/" + str(job.id) changes = { 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': job.__getstate__() @@ -118,7 +117,7 @@ class ZooKeeperJobStore(BaseJobStore): def remove_job(self, job_id): self._ensure_paths() - node_path = os.path.join(self.path, str(job_id)) + node_path = self.path + "/" + str(job_id) try: self.client.delete(node_path) except NoNodeError: @@ -151,7 +150,7 @@ class ZooKeeperJobStore(BaseJobStore): all_ids = self.client.get_children(self.path) for node_name in all_ids: try: - node_path = os.path.join(self.path, node_name) + node_path = self.path + "/" + node_name content, _ = self.client.get(node_path) doc = pickle.loads(content) job_def = { diff --git a/libs/apscheduler/schedulers/asyncio.py b/libs/apscheduler/schedulers/asyncio.py index 289ef13fe..70ebedeb6 100644 --- a/libs/apscheduler/schedulers/asyncio.py +++ b/libs/apscheduler/schedulers/asyncio.py @@ -38,13 +38,19 @@ class AsyncIOScheduler(BaseScheduler): _eventloop = None _timeout = None + def start(self, paused=False): + if not self._eventloop: + self._eventloop = asyncio.get_event_loop() + + super(AsyncIOScheduler, self).start(paused) + @run_in_event_loop def shutdown(self, wait=True): super(AsyncIOScheduler, self).shutdown(wait) self._stop_timer() def _configure(self, config): - self._eventloop = maybe_ref(config.pop('event_loop', None)) or asyncio.get_event_loop() + self._eventloop = maybe_ref(config.pop('event_loop', None)) super(AsyncIOScheduler, self)._configure(config) def _start_timer(self, wait_seconds): diff --git a/libs/apscheduler/schedulers/background.py b/libs/apscheduler/schedulers/background.py index 03f29822a..bb8f77da2 100644 --- a/libs/apscheduler/schedulers/background.py +++ b/libs/apscheduler/schedulers/background.py @@ -29,7 +29,9 @@ class BackgroundScheduler(BlockingScheduler): super(BackgroundScheduler, self)._configure(config) def start(self, *args, **kwargs): - self._event = Event() + if self._event is None or self._event.is_set(): + self._event = Event() + BaseScheduler.start(self, *args, **kwargs) self._thread = Thread(target=self._main_loop, name='APScheduler') self._thread.daemon = self._daemon diff --git a/libs/apscheduler/schedulers/base.py b/libs/apscheduler/schedulers/base.py index 8f910a653..3dfb74377 100644 --- a/libs/apscheduler/schedulers/base.py +++ b/libs/apscheduler/schedulers/base.py @@ -1,7 +1,6 @@ from __future__ import print_function from abc import ABCMeta, abstractmethod -from collections import MutableMapping from threading import RLock from datetime import datetime, timedelta from logging import getLogger @@ -27,6 +26,11 @@ from apscheduler.events import ( EVENT_JOB_ADDED, EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, EVENT_ALL_JOBS_REMOVED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_RESUMED, EVENT_SCHEDULER_PAUSED) +try: + from collections.abc import MutableMapping +except ImportError: + from collections import MutableMapping + #: constant indicating a scheduler's stopped state STATE_STOPPED = 0 #: constant indicating a scheduler's running state (started and processing jobs) @@ -82,6 +86,11 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self.state = STATE_STOPPED self.configure(gconfig, **options) + def __getstate__(self): + raise TypeError("Schedulers cannot be serialized. Ensure that you are not passing a " + "scheduler instance as an argument to a job, or scheduling an instance " + "method where the instance contains a scheduler as an attribute.") + def configure(self, gconfig={}, prefix='apscheduler.', **options): """ Reconfigures the scheduler with the given options. @@ -398,7 +407,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|unicode id: explicit identifier for the job (for modifying it later) :param str|unicode name: textual description of the job :param int misfire_grace_time: seconds after the designated runtime that the job is still - allowed to be run + allowed to be run (or ``None`` to allow the job to run no matter how late it is) :param bool coalesce: run once instead of many times if the scheduler determines that the job should be run more than once in succession :param int max_instances: maximum number of concurrently running instances allowed for this @@ -594,14 +603,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): """ jobstore_alias = None with self._jobstores_lock: + # Check if the job is among the pending jobs if self.state == STATE_STOPPED: - # Check if the job is among the pending jobs - if self.state == STATE_STOPPED: - for i, (job, alias, replace_existing) in enumerate(self._pending_jobs): - if job.id == job_id and jobstore in (None, alias): - del self._pending_jobs[i] - jobstore_alias = alias - break + for i, (job, alias, replace_existing) in enumerate(self._pending_jobs): + if job.id == job_id and jobstore in (None, alias): + del self._pending_jobs[i] + jobstore_alias = alias + break else: # Otherwise, try to remove it from each store until it succeeds or we run out of # stores to check diff --git a/libs/apscheduler/schedulers/blocking.py b/libs/apscheduler/schedulers/blocking.py index e61715757..4ecc9f6f1 100644 --- a/libs/apscheduler/schedulers/blocking.py +++ b/libs/apscheduler/schedulers/blocking.py @@ -14,7 +14,9 @@ class BlockingScheduler(BaseScheduler): _event = None def start(self, *args, **kwargs): - self._event = Event() + if self._event is None or self._event.is_set(): + self._event = Event() + super(BlockingScheduler, self).start(*args, **kwargs) self._main_loop() diff --git a/libs/apscheduler/schedulers/qt.py b/libs/apscheduler/schedulers/qt.py index 6ee5d332a..dda77d796 100644 --- a/libs/apscheduler/schedulers/qt.py +++ b/libs/apscheduler/schedulers/qt.py @@ -9,9 +9,13 @@ except (ImportError, RuntimeError): # pragma: nocover from PyQt4.QtCore import QObject, QTimer except ImportError: try: - from PySide.QtCore import QObject, QTimer # flake8: noqa + from PySide2.QtCore import QObject, QTimer # noqa except ImportError: - raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed') + try: + from PySide.QtCore import QObject, QTimer # noqa + except ImportError: + raise ImportError('QtScheduler requires either PyQt5, PyQt4, PySide2 ' + 'or PySide installed') class QtScheduler(BaseScheduler): @@ -26,7 +30,8 @@ class QtScheduler(BaseScheduler): def _start_timer(self, wait_seconds): self._stop_timer() if wait_seconds is not None: - self._timer = QTimer.singleShot(wait_seconds * 1000, self._process_jobs) + wait_time = min(wait_seconds * 1000, 2147483647) + self._timer = QTimer.singleShot(wait_time, self._process_jobs) def _stop_timer(self): if self._timer: diff --git a/libs/apscheduler/triggers/base.py b/libs/apscheduler/triggers/base.py index ce2526a88..55d010dba 100644 --- a/libs/apscheduler/triggers/base.py +++ b/libs/apscheduler/triggers/base.py @@ -22,27 +22,16 @@ class BaseTrigger(six.with_metaclass(ABCMeta)): def _apply_jitter(self, next_fire_time, jitter, now): """ - Randomize ``next_fire_time`` by adding or subtracting a random value (the jitter). If the - resulting datetime is in the past, returns the initial ``next_fire_time`` without jitter. - - ``next_fire_time - jitter <= result <= next_fire_time + jitter`` + Randomize ``next_fire_time`` by adding a random value (the jitter). :param datetime.datetime|None next_fire_time: next fire time without jitter applied. If ``None``, returns ``None``. - :param int|None jitter: maximum number of seconds to add or subtract to - ``next_fire_time``. If ``None`` or ``0``, returns ``next_fire_time`` + :param int|None jitter: maximum number of seconds to add to ``next_fire_time`` + (if ``None`` or ``0``, returns ``next_fire_time``) :param datetime.datetime now: current datetime :return datetime.datetime|None: next fire time with a jitter. """ if next_fire_time is None or not jitter: return next_fire_time - next_fire_time_with_jitter = next_fire_time + timedelta( - seconds=random.uniform(-jitter, jitter)) - - if next_fire_time_with_jitter < now: - # Next fire time with jitter is in the past. - # Ignore jitter to avoid false misfire. - return next_fire_time - - return next_fire_time_with_jitter + return next_fire_time + timedelta(seconds=random.uniform(0, jitter)) diff --git a/libs/apscheduler/triggers/combining.py b/libs/apscheduler/triggers/combining.py index 64f83011a..bb9000618 100644 --- a/libs/apscheduler/triggers/combining.py +++ b/libs/apscheduler/triggers/combining.py @@ -45,7 +45,7 @@ class AndTrigger(BaseCombiningTrigger): Trigger alias: ``and`` :param list triggers: triggers to combine - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most """ __slots__ = () @@ -73,7 +73,7 @@ class OrTrigger(BaseCombiningTrigger): Trigger alias: ``or`` :param list triggers: triggers to combine - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most .. note:: Triggers that depends on the previous fire time, such as the interval trigger, may seem to behave strangely since they are always passed the previous fire time produced by diff --git a/libs/apscheduler/triggers/cron/__init__.py b/libs/apscheduler/triggers/cron/__init__.py index ce675dd93..fec6e3b5c 100644 --- a/libs/apscheduler/triggers/cron/__init__.py +++ b/libs/apscheduler/triggers/cron/__init__.py @@ -16,7 +16,7 @@ class CronTrigger(BaseTrigger): :param int|str year: 4-digit year :param int|str month: month (1-12) - :param int|str day: day of the (1-31) + :param int|str day: day of month (1-31) :param int|str week: ISO week (1-53) :param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) :param int|str hour: hour (0-23) @@ -26,7 +26,7 @@ class CronTrigger(BaseTrigger): :param datetime|str end_date: latest possible date/time to trigger on (inclusive) :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults to scheduler timezone) - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most .. note:: The first weekday is always **monday**. """ diff --git a/libs/apscheduler/triggers/interval.py b/libs/apscheduler/triggers/interval.py index 831ba3830..61094aa13 100644 --- a/libs/apscheduler/triggers/interval.py +++ b/libs/apscheduler/triggers/interval.py @@ -20,7 +20,7 @@ class IntervalTrigger(BaseTrigger): :param datetime|str start_date: starting point for the interval calculation :param datetime|str end_date: latest possible date/time to trigger on :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most """ __slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter' diff --git a/libs/apscheduler/util.py b/libs/apscheduler/util.py index 3c48e550b..1e643bffa 100644 --- a/libs/apscheduler/util.py +++ b/libs/apscheduler/util.py @@ -5,8 +5,9 @@ from __future__ import division from datetime import date, datetime, time, timedelta, tzinfo from calendar import timegm from functools import partial -from inspect import isclass +from inspect import isclass, ismethod import re +import sys from pytz import timezone, utc, FixedOffset import six @@ -21,6 +22,15 @@ try: except ImportError: TIMEOUT_MAX = 4294967 # Maximum value accepted by Event.wait() on Windows +try: + from asyncio import iscoroutinefunction +except ImportError: + try: + from trollius import iscoroutinefunction + except ImportError: + def iscoroutinefunction(func): + return False + __all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp', 'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args', @@ -263,7 +273,18 @@ def obj_to_ref(obj): if '<locals>' in name: raise ValueError('Cannot create a reference to a nested function') - return '%s:%s' % (obj.__module__, name) + if ismethod(obj): + if hasattr(obj, 'im_self') and obj.im_self: + # bound method + module = obj.im_self.__module__ + elif hasattr(obj, 'im_class') and obj.im_class: + # unbound method + module = obj.im_class.__module__ + else: + module = obj.__module__ + else: + module = obj.__module__ + return '%s:%s' % (module, name) def ref_to_obj(ref): @@ -332,7 +353,10 @@ def check_callable_args(func, args, kwargs): has_varargs = has_var_kwargs = False try: - sig = signature(func) + if sys.version_info >= (3, 5): + sig = signature(func, follow_wrapped=False) + else: + sig = signature(func) except ValueError: # signature() doesn't work against every kind of callable return @@ -398,3 +422,12 @@ def check_callable_args(func, args, kwargs): raise ValueError( 'The target callable does not accept the following keyword arguments: %s' % ', '.join(unmatched_kwargs)) + + +def iscoroutinefunction_partial(f): + while isinstance(f, partial): + f = f.func + + # The asyncio version of iscoroutinefunction includes testing for @coroutine + # decorations vs. the inspect version which does not. + return iscoroutinefunction(f) |