API reference: Utils

This section documents utility helpers provided by schedium.

cancel_job_on_failure

schedium.utils.cancel_job_on_failure(*, cancel=True, catch=<class 'Exception'>, logger=None, log_message='job failed', cancel_reason=None)[source][source]

Decorator that logs specified exceptions and optionally cancels the job.

If the wrapped callable raises an exception that matches catch:

If an exception does not match catch, it propagates.

Parameters:
cancelbool, default True

If True, a caught exception results in returning CancelJob. The scheduler will remove the job. If False, the exception is logged and suppressed but the wrapper returns None (job remains scheduled).

catchtype[BaseException] | Iterable[type[BaseException]], default Exception

Exception type (or tuple/iterable of types) to catch and handle. Exceptions not matching catch are not intercepted.

loggerlogging.Logger | None, default None

Logger to use. If None, uses logging.getLogger(func.__module__).

log_messagestr, default “job failed”

Message passed to logger.exception when a caught exception occurs.

cancel_reasonstr | None, default None

Optional cancellation reason stored in the returned CancelJob(reason=...). If omitted, the reason is derived from the exception as "<ExceptionType>: <message>".

Returns:
Callable[[Callable[P, object]], Callable[P, object]]

A decorator that wraps func.

  • On success: returns func(*args, **kwargs).

  • On caught exception: logs, suppresses, and returns either None or CancelJob(...) depending on cancel.

Parameters:
  • cancel (bool)

  • catch (type[BaseException] | Iterable[type[BaseException]])

  • logger (Logger | None)

  • log_message (str)

  • cancel_reason (str | None)

Return type:

Callable[[Callable[[~P], object]], Callable[[~P], object]]

Notes

  • This decorator is most useful for jobs executed via schedium.scheduler.Scheduler.run_pending().

  • If you suppress exceptions (either by setting cancel=False or by returning CancelJob), the job is treated as having completed for that trigger token, so it will not be retried within the same bucket.

Examples

Cancel the job after a known fatal error

>>> import logging
>>> from schedium import Every, Job, Scheduler
>>> from schedium.utils import cancel_job_on_failure
>>> logger = logging.getLogger(__name__)
>>> @cancel_job_on_failure(catch=(ValueError,), cancel=True, logger=logger)
... def task():
...     raise ValueError("boom")
>>> sched = Scheduler()
>>> sched.append(Job(task, Every(unit="minute", interval=1)))