API reference: Utils¶
This section documents utility helpers provided by schedium.
cancel_job_on_failure¶
- schedium.utils.cancel_job_on_failure(*, cancel=True, catch=<class 'Exception'>, logger=None, log_message='job failed', cancel_reason=None)[source][source]¶
Decorator that logs specified exceptions and optionally cancels the job.
If the wrapped callable raises an exception that matches
catch:the exception is logged (with traceback),
the exception is suppressed,
if
cancel=Truethe wrapper returnsCancelJobsoschedium.scheduler.Scheduler.run_pending()removes the job.
If an exception does not match
catch, it propagates.- Parameters:
- cancelbool, default True
If True, a caught exception results in returning
CancelJob. The scheduler will remove the job. If False, the exception is logged and suppressed but the wrapper returns None (job remains scheduled).- catchtype[BaseException] | Iterable[type[BaseException]], default Exception
Exception type (or tuple/iterable of types) to catch and handle. Exceptions not matching
catchare not intercepted.- loggerlogging.Logger | None, default None
Logger to use. If None, uses
logging.getLogger(func.__module__).- log_messagestr, default “job failed”
Message passed to
logger.exceptionwhen a caught exception occurs.- cancel_reasonstr | None, default None
Optional cancellation reason stored in the returned
CancelJob(reason=...). If omitted, the reason is derived from the exception as"<ExceptionType>: <message>".
- Returns:
- Callable[[Callable[P, object]], Callable[P, object]]
A decorator that wraps
func.On success: returns
func(*args, **kwargs).On caught exception: logs, suppresses, and returns either
NoneorCancelJob(...)depending oncancel.
- Parameters:
cancel (bool)
catch (type[BaseException] | Iterable[type[BaseException]])
logger (Logger | None)
log_message (str)
cancel_reason (str | None)
- Return type:
Callable[[Callable[[~P], object]], Callable[[~P], object]]
Notes
This decorator is most useful for jobs executed via
schedium.scheduler.Scheduler.run_pending().If you suppress exceptions (either by setting
cancel=Falseor by returningCancelJob), the job is treated as having completed for that trigger token, so it will not be retried within the same bucket.
Examples
Cancel the job after a known fatal error
>>> import logging >>> from schedium import Every, Job, Scheduler >>> from schedium.utils import cancel_job_on_failure >>> logger = logging.getLogger(__name__) >>> @cancel_job_on_failure(catch=(ValueError,), cancel=True, logger=logger) ... def task(): ... raise ValueError("boom") >>> sched = Scheduler() >>> sched.append(Job(task, Every(unit="minute", interval=1)))