Source code for schedium.job
"""
Job definition.
A :class:`~schedium.job.Job` couples:
- a *callable* (your work), and
- a *trigger* (when to run).
Jobs are intentionally small and stateful: each job tracks the last trigger
event it ran for so it can deduplicate repeated calls to
:meth:`schedium.scheduler.Scheduler.run_pending`.
Most users will construct jobs directly and register them via
:meth:`schedium.scheduler.Scheduler.append`.
"""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
from schedium.triggers import BaseTrigger
from schedium.triggers.base import TriggerEvent
from schedium.utils.evaluate import evaluate
[docs]
class Job:
"""
A scheduled unit of work.
Parameters
----------
func : Callable[[], object]
A zero-argument callable. The return value is returned by
:meth:`run` and by :meth:`schedium.scheduler.Scheduler.run_pending`.
If you need to pass arguments, wrap them in a closure or `functools.partial`.
trigger : BaseTrigger
The trigger that decides when this job is due.
name : str, default None
Optional human-readable label used in `repr(job)`.
Notes
-----
Deduplication
A job runs at most once per trigger *event token*.
Internally, :func:`schedium.utils.evaluate.evaluate` turns a trigger
match into a :class:`schedium.triggers.base.TriggerEvent`. The
event's token typically represents a *time bucket* (minute/hour/day, etc.).
If the scheduler is called multiple times within the same bucket, the
job is considered not due after the first run.
State
Jobs are stateful: :attr:`last_event` is updated on successful
:meth:`run`. If you want the same callable to run under multiple
independent schedules, create multiple :class:`Job` instances.
Threading
Jobs do not run in separate threads/processes by default.
The scheduler runs them inline.
"""
def __init__(
self,
func: Callable[[], object],
trigger: BaseTrigger,
name: str | None = None,
):
self.func = func
self.trigger = trigger
self.name = name
self.last_event: TriggerEvent | None = None
[docs]
def is_due(self, now: datetime) -> bool:
"""
Return True if the job should run at ``now``.
A job is due when:
1) its trigger matches at ``now`` and yields an event token, and
2) that token is different from the last token the job already ran for.
This method does not mutate state.
Parameters
----------
now : datetime
Timestamp used to evaluate the trigger.
Returns
-------
bool
True if the job is due at ``now``.
"""
event = evaluate(self.trigger, now)
if event is None:
return False
return event != self.last_event
[docs]
def run(self, now: datetime) -> object:
"""
Run the job if due, update :attr:`last_event`, and return the result.
Parameters
----------
now : datetime
Timestamp used to evaluate the trigger.
Returns
-------
object
The callable's return value.
Raises
------
RuntimeError
If called when the job is not due, or if called again for the same
trigger token (double-run protection).
"""
event = evaluate(self.trigger, now)
if event is None:
raise RuntimeError("Job.run() called when job is not due")
if event == self.last_event:
raise RuntimeError("Job.run() called but job already ran for this token")
result = self.run_func()
self.last_event = event
return result
[docs]
def run_func(self) -> object:
"""
Run the job's callable without checking triggers or updating state.
Particularly useful to override in subclasses that want to customize
the run behavior while keeping the same trigger-based scheduling
logic.
Returns
-------
object
The callable's return value.
"""
return self.func()
[docs]
def datetime_of_next_run(
self,
after: datetime | None = None,
*,
max_iterations: int = 100_000,
) -> datetime | None:
"""
Return the next run datetime for this job.
This is derived from the trigger's next validity window.
Parameters
----------
after : datetime, optional
Lower bound (inclusive) for the computed next run time. If omitted,
uses the current system time.
max_iterations : int, default 100_000
Safety cap used by some triggers/combinators that scan forward.
Returns
-------
datetime | None
The earliest datetime at which this job is due, or :obj:`None` if the
trigger tree cannot produce a next window.
"""
if after is None:
after = datetime.now()
window = self.trigger.next_window(after, max_iterations=max_iterations)
return None if window is None else window.start
def __repr__(self) -> str:
return f"Job(name={self.name!r}, func={self.func!r}, trigger={self.trigger!r})"