Source code for redbeat.schedulers
# Licensed under the Apache License, Version 2.0 (the 'License'); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
# Copyright 2015 Marc Sibson
import json
import ssl
import warnings
from datetime import MINYEAR, datetime
import redis.exceptions
from celery.app import app_or_default
from celery.beat import DEFAULT_MAX_INTERVAL, ScheduleEntry, Scheduler
from celery.signals import beat_init
from celery.utils.log import get_logger
from celery.utils.time import humanize_seconds
from kombu.utils.objects import cached_property
from kombu.utils.url import maybe_sanitize_url
from redis.client import StrictRedis
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
from .decoder import RedBeatJSONDecoder, RedBeatJSONEncoder, to_timestamp
logger = get_logger('celery.beat')
# Copied from:
# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33
# Changes:
# The second line from the bottom: The original Lua script intends
# to extend time to (lock remaining time + additional time); while
# the script here extend time to a expected expiration time.
# KEYS[1] - lock name
# ARGS[1] - token
# ARGS[2] - additional milliseconds
# return 1 if the locks time was extended, otherwise 0
LUA_EXTEND_TO_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
local expiration = redis.call('pttl', KEYS[1])
if not expiration then
expiration = 0
end
if expiration < 0 then
return 0
end
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
"""
class RetryingConnection:
"""A proxy for the Redis connection that delegates all the calls to
underlying Redis connection while retrying on connection or time-out error.
"""
RETRY_MAX_WAIT = 30
def __init__(self, retry_period, wrapped_connection):
self.wrapped_connection = wrapped_connection
self.retry_kwargs = dict(
retry=(
retry_if_exception_type(redis.exceptions.ConnectionError)
| retry_if_exception_type(redis.exceptions.TimeoutError)
),
reraise=True,
wait=wait_exponential(multiplier=1, max=self.RETRY_MAX_WAIT),
before_sleep=self._log_retry_attempt,
)
if retry_period >= 0:
self.retry_kwargs.update(dict(stop=stop_after_delay(retry_period)))
def __getattr__(self, item):
method = getattr(self.wrapped_connection, item)
# we don't want to deal attributes or properties
if not callable(method):
return method
@retry(**self.retry_kwargs)
def retrier(*args, **kwargs):
return method(*args, **kwargs)
return retrier
@staticmethod
def _log_retry_attempt(retry_state):
"""Log when next reconnection attempt is about to be made."""
logger.warning(
'beat: Retrying Redis connection in %s seconds...', retry_state.next_action.sleep
)
[docs]
def ensure_conf(app):
"""
Ensure for the given app the the redbeat_conf
attribute is set to an instance of the RedBeatConfig
class.
"""
name = 'redbeat_conf'
app = app_or_default(app)
try:
config = getattr(app, name)
except AttributeError:
config = RedBeatConfig(app)
setattr(app, name, config)
return config
def get_redis(app=None):
app = app_or_default(app)
conf = ensure_conf(app)
if not hasattr(app, 'redbeat_redis') or app.redbeat_redis is None:
redis_options = conf.redbeat_redis_options
retry_period = redis_options.get('retry_period')
if redis_options.get('cluster', False):
from redis.cluster import RedisCluster
connection = RedisCluster.from_url(conf.redis_url, **redis_options)
elif conf.redis_url.startswith('redis-sentinel') and 'sentinels' in redis_options:
from redis.sentinel import Sentinel
connection_kwargs = {}
if isinstance(conf.redis_use_ssl, dict):
connection_kwargs['ssl'] = True
connection_kwargs.update(conf.redis_use_ssl)
sentinel = Sentinel(
redis_options['sentinels'],
socket_timeout=redis_options.get('socket_timeout'),
password=redis_options.get('password'),
db=redis_options.get('db', 0),
decode_responses=True,
sentinel_kwargs=redis_options.get('sentinel_kwargs'),
**connection_kwargs,
)
connection = sentinel.master_for(
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
)
elif conf.redis_url.startswith('rediss'):
ssl_options = {'ssl_cert_reqs': ssl.CERT_REQUIRED}
if isinstance(conf.redis_use_ssl, dict):
ssl_options.update(conf.redis_use_ssl)
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True, **ssl_options)
elif conf.redis_url.startswith('redis-cluster'):
from rediscluster import RedisCluster
if not redis_options.get('startup_nodes'):
redis_options = {'startup_nodes': [{"host": "localhost", "port": "30001"}]}
connection = RedisCluster(decode_responses=True, **redis_options)
else:
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True)
if retry_period is None:
app.redbeat_redis = connection
else:
app.redbeat_redis = RetryingConnection(retry_period, connection)
return app.redbeat_redis
ADD_ENTRY_ERROR = """\
Couldn't add entry %r to redis schedule: %r. Contents: %r
"""
class RedBeatConfig:
def __init__(self, app=None):
self.app = app_or_default(app)
self.key_prefix = self.either_or('redbeat_key_prefix', 'redbeat:')
self.schedule_key = self.key_prefix + ':schedule'
self.statics_key = self.key_prefix + ':statics'
self.lock_key = self.either_or('redbeat_lock_key', self.key_prefix + ':lock')
self.lock_timeout = self.either_or('redbeat_lock_timeout', None)
self.redis_url = self.either_or('redbeat_redis_url', app.conf['BROKER_URL'])
self.redis_use_ssl = self.either_or('redbeat_redis_use_ssl', app.conf['BROKER_USE_SSL'])
self.redbeat_redis_options = self.either_or(
'redbeat_redis_options', app.conf['BROKER_TRANSPORT_OPTIONS']
)
@property
def schedule(self):
return self.app.conf.beat_schedule
@schedule.setter
def schedule(self, value):
self.app.conf.beat_schedule = value
def either_or(self, name, default=None):
if name == name.upper():
warnings.warn(
'Celery v4 installed, but detected Celery v3 '
'configuration %s (use %s instead).' % (name, name.lower()),
UserWarning,
)
return self.app.conf.first(name, name.upper()) or default
[docs]
class RedBeatSchedulerEntry(ScheduleEntry):
_meta = None
[docs]
def __init__(
self,
name=None,
task=None,
schedule=None,
args=None,
kwargs=None,
enabled=True,
options=None,
**clsargs,
):
super().__init__(
name=str(name),
task=task,
schedule=schedule,
args=args,
kwargs=kwargs,
options=options,
**clsargs,
)
self.enabled = enabled
ensure_conf(self.app)
[docs]
@staticmethod
def load_definition(key, app=None, definition=None):
if definition is None:
definition = get_redis(app).hget(key, 'definition')
if not definition:
raise KeyError(key)
definition = RedBeatSchedulerEntry.decode_definition(definition)
return definition
[docs]
@staticmethod
def decode_definition(definition):
return json.loads(definition, cls=RedBeatJSONDecoder)
[docs]
@staticmethod
def load_meta(key, app=None):
return RedBeatSchedulerEntry.decode_meta(get_redis(app).hget(key, 'meta'))
[docs]
@staticmethod
def decode_meta(meta, app=None):
if not meta:
return {'last_run_at': None}
return json.loads(meta, cls=RedBeatJSONDecoder)
[docs]
@classmethod
def from_key(cls, key, app=None):
ensure_conf(app)
with get_redis(app).pipeline() as pipe:
pipe.hget(key, 'definition')
pipe.hget(key, 'meta')
definition, meta = pipe.execute()
if not definition:
raise KeyError(key)
definition = cls.decode_definition(definition)
meta = cls.decode_meta(meta)
definition.update(meta)
entry = cls(app=app, **definition)
# celery.ScheduleEntry sets last_run_at = utcnow(), which is confusing and wrong
entry.last_run_at = meta['last_run_at']
return entry
@property
def due_at(self):
# never run => due now
if self.last_run_at is None:
return self._default_now()
delta = self.schedule.remaining_estimate(self.last_run_at)
# if no delta, means no more events after the last_run_at.
if delta is None:
return None
# overdue => due now
if delta.total_seconds() < 0:
return self._default_now()
return self.last_run_at + delta
@property
def key(self):
return self.app.redbeat_conf.key_prefix + self.name
@property
def score(self):
"""return UTC based UNIX timestamp"""
if self.due_at is None:
# Scores < zero are ignored on each tick.
return -1
return to_timestamp(self.due_at)
@property
def rank(self):
return get_redis(self.app).zrank(self.app.redbeat_conf.schedule_key, self.key)
[docs]
def save(self):
definition = {
'name': self.name,
'task': self.task,
'args': self.args,
'kwargs': self.kwargs,
'options': self.options,
'schedule': self.schedule,
'enabled': self.enabled,
}
meta = {
'last_run_at': self.last_run_at,
}
with get_redis(self.app).pipeline() as pipe:
pipe.hset(self.key, 'definition', json.dumps(definition, cls=RedBeatJSONEncoder))
pipe.hsetnx(self.key, 'meta', json.dumps(meta, cls=RedBeatJSONEncoder))
pipe.zadd(self.app.redbeat_conf.schedule_key, {self.key: self.score})
pipe.execute()
return self
[docs]
def delete(self):
with get_redis(self.app).pipeline() as pipe:
pipe.zrem(self.app.redbeat_conf.schedule_key, self.key)
pipe.delete(self.key)
pipe.execute()
def _next_instance(self, last_run_at=None, only_update_last_run_at=False):
entry = super()._next_instance(last_run_at=last_run_at)
if only_update_last_run_at:
# rollback the update to total_run_count
entry.total_run_count = self.total_run_count
meta = {
'last_run_at': entry.last_run_at,
'total_run_count': entry.total_run_count,
}
with get_redis(self.app).pipeline() as pipe:
pipe.hset(self.key, 'meta', json.dumps(meta, cls=RedBeatJSONEncoder))
pipe.zadd(self.app.redbeat_conf.schedule_key, {entry.key: entry.score})
pipe.execute()
return entry
__next__ = next = _next_instance
[docs]
def reschedule(self, last_run_at=None):
self.last_run_at = last_run_at or self._default_now()
meta = {
'last_run_at': self.last_run_at,
}
with get_redis(self.app).pipeline() as pipe:
pipe.hset(self.key, 'meta', json.dumps(meta, cls=RedBeatJSONEncoder))
pipe.zadd(self.app.redbeat_conf.schedule_key, {self.key: self.score})
pipe.execute()
[docs]
def is_due(self):
if not self.enabled:
return False, 5.0 # 5 second delay for re-enable.
return self.schedule.is_due(
self.last_run_at or datetime(MINYEAR, 1, 2, tzinfo=self.schedule.tz)
)
class RedBeatScheduler(Scheduler):
# how often should we sync in schedule information
# from the backend redis database
Entry = RedBeatSchedulerEntry
lock = None
#: The default lock timeout in seconds.
lock_timeout = DEFAULT_MAX_INTERVAL * 5
def __init__(self, app, lock_key=None, lock_timeout=None, **kwargs):
ensure_conf(app) # set app.redbeat_conf
super(RedBeatScheduler, self).__init__(app, **kwargs)
self.lock_key = lock_key or app.redbeat_conf.lock_key
self.lock_timeout = (
lock_timeout
or app.redbeat_conf.lock_timeout
or self.max_interval * 5
or self.lock_timeout
)
def setup_schedule(self):
# cleanup old static schedule entries
client = get_redis(self.app)
previous = {key for key in client.smembers(self.app.redbeat_conf.statics_key)}
removed = previous.difference(self.app.redbeat_conf.schedule.keys())
for name in removed:
logger.debug("beat: Removing old static schedule entry '%s'.", name)
with client.pipeline() as pipe:
RedBeatSchedulerEntry(name, app=self.app).delete()
pipe.srem(self.app.redbeat_conf.statics_key, name)
pipe.execute()
# setup static schedule entries
self.install_default_entries(self.app.redbeat_conf.schedule)
if self.app.redbeat_conf.schedule:
self.update_from_dict(self.app.redbeat_conf.schedule)
# keep track of static schedule entries,
# so we notice when any are removed at next startup
client.sadd(self.app.redbeat_conf.statics_key, *self.app.redbeat_conf.schedule.keys())
def update_from_dict(self, dict_):
for name, entry in dict_.items():
try:
entry = self._maybe_entry(name, entry)
except Exception as exc:
logger.error(ADD_ENTRY_ERROR, name, exc, entry)
continue
entry.save() # store into redis
logger.debug("beat: Stored entry: %s", entry)
def reserve(self, entry):
new_entry = next(entry)
return new_entry
@property
def schedule(self):
logger.debug('beat: Selecting tasks')
max_due_at = to_timestamp(self.app.now())
client = get_redis(self.app)
with client.pipeline() as pipe:
pipe.zrangebyscore(self.app.redbeat_conf.schedule_key, 0, max_due_at)
# peek into the next tick to accuratly calculate sleep between ticks
pipe.zrangebyscore(
self.app.redbeat_conf.schedule_key,
'({}'.format(max_due_at),
max_due_at + self.max_interval,
start=0,
num=1,
)
due_tasks, maybe_due = pipe.execute()
logger.debug('beat: Loading %d tasks', len(due_tasks) + len(maybe_due))
d = {}
for key in due_tasks + maybe_due:
try:
entry = self.Entry.from_key(key, app=self.app)
except KeyError:
logger.warning('beat: Failed to load %s, removing', key)
client.zrem(self.app.redbeat_conf.schedule_key, key)
continue
d[entry.name] = entry
return d
def maybe_due(self, entry, **kwargs):
is_due, next_time_to_run = entry.is_due()
if is_due:
logger.info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
try:
result = self.apply_async(entry, **kwargs)
except Exception as exc:
logger.exception('Scheduler: Message Error: %s', exc)
else:
logger.debug('Scheduler: %s sent. id->%s', entry.task, result.id)
return next_time_to_run
def tick(self, min=min, **kwargs):
if self.lock_key:
logger.debug('beat: Extending lock...')
self.lock.extend(int(self.lock_timeout))
remaining_times = []
try:
for entry in self.schedule.values():
next_time_to_run = self.maybe_due(entry, **self._maybe_due_kwargs)
if next_time_to_run:
remaining_times.append(next_time_to_run)
except RuntimeError:
logger.debug('beat: RuntimeError', exc_info=True)
return min(remaining_times + [self.max_interval])
def close(self):
if self.lock:
logger.info('beat: Releasing lock')
if self.lock.owned():
self.lock.release()
self.lock = None
super().close()
@property
def info(self):
info = [' . redis -> {}'.format(maybe_sanitize_url(self.app.redbeat_conf.redis_url))]
if self.lock_key:
info.append(
' . lock -> `{}` {} ({}s)'.format(
self.lock_key, humanize_seconds(self.lock_timeout), self.lock_timeout
)
)
return '\n'.join(info)
@cached_property
def _maybe_due_kwargs(self):
"""handle rename of publisher to producer"""
return {'producer': self.producer}
@beat_init.connect
def acquire_distributed_beat_lock(sender=None, **kwargs):
"""
Attempt to acquire lock on startup
Celery will squash any exceptions raised here. If one is raised
scheduler.lock will be None while scheduler.lock_key is set
"""
scheduler = sender.scheduler
if not scheduler.lock_key:
return
logger.debug('beat: Acquiring lock...')
redis_client = get_redis(scheduler.app)
lock = redis_client.lock(
scheduler.lock_key,
timeout=scheduler.lock_timeout,
sleep=scheduler.max_interval,
)
# overwrite redis-py's extend script
# which will add additional timeout instead of extend to a new timeout
lock.lua_extend = redis_client.register_script(LUA_EXTEND_TO_SCRIPT)
lock.acquire()
logger.info('beat: Acquired lock')
scheduler.lock = lock
Last update:
Aug 11, 2024