Source code for schedium.scheduler

from __future__ import annotations

import logging
from datetime import datetime

from schedium.job import Job
from schedium.types.cancel_job import CancelJob

logger = logging.getLogger(__name__)


class JobDidNotRunType: ...


JobDidNotRun = JobDidNotRunType()
"""Sentinel value used by Scheduler.run_pending to indicate a job was not due"""


[docs] class Scheduler: """ Simple in-process scheduler. This scheduler is intentionally minimal: - Jobs run inline (no threads/processes by default). - User is responsible for calling :meth:`run_pending` periodically. - Deduplication is handled per-job: if you call :meth:`run_pending` multiple times within the same trigger "token" (e.g., the same minute bucket), the job runs only once. Notes ----- - :meth:`run_pending` returns a list aligned with :attr:`jobs`. For jobs that are not due, the entry is the sentinel :obj:`JobDidNotRun`. - Many triggers match only at specific boundaries (minute/hour/day). In production, call :meth:`run_pending` on a short interval (e.g., once per second) so you don't skip over a matching boundary. Examples -------- Run something every 5 minutes >>> from datetime import datetime >>> from schedium import JobDidNotRun, Every, Job, Scheduler >>> sched = Scheduler() >>> def tick(): ... print("tick") >>> sched.append(Job(tick, Every(unit="minute", interval=5))) >>> results = sched.run_pending(now=datetime(2026, 2, 4, 10, 1, 0)) >>> results[0] is JobDidNotRun True >>> results = sched.run_pending(now=datetime(2026, 2, 4, 10, 5, 0)) tick >>> results [None] Deduplication when called repeatedly at the same timestamp >>> results = sched.run_pending(now=datetime(2026, 2, 4, 10, 5, 0)) # same minute bucket >>> results[0] is JobDidNotRun True Combine triggers (weekday at 08:00) >>> from schedium import On >>> weekday_8am = ( ... Every(unit="day", interval=1) ... & On(unit="weekdays") ... & On(unit="hour_of_day", value=8) ... ) >>> sched = Scheduler() >>> def weekday_job(): ... print("weekday job") >>> sched.append(Job(weekday_job, weekday_8am)) >>> results = sched.run_pending(now=datetime(2026, 2, 2, 8, 0, 0)) # Monday weekday job >>> results [None] Inspect the next run time across all jobs >>> sched.time_of_next_run(after=datetime(2026, 2, 2, 8, 0, 1)) datetime.datetime(2026, 2, 3, 8, 0) """ def __init__(self): self.jobs: list[Job] = []
[docs] def append(self, job: Job) -> None: """ Append an already-constructed job. Parameters ---------- job : Job The job to append. The job's trigger is used to determine when it runs. """ self.jobs.append(job)
def __getitem__(self, item): return self.jobs[item]
[docs] def run_pending(self, now: datetime | None = None) -> list[object]: """ Run all jobs that are due at ``now``. Parameters ---------- now : datetime, optional If provided, uses this timestamp to evaluate triggers. If omitted, uses the current system time. Returns ------- list[object] The list of return values from each job. If a job is not due, its return value is :obj:`JobDidNotRun`. If a job runs and returns :class:`~schedium.types.cancel_job.CancelJob`, the job is removed from the scheduler. """ now_dt = now if now is not None else datetime.now() results: list[object] = [] # Iterate over a snapshot so results align with the jobs that were # present at the start of this call. for job in list(self.jobs): if not job.is_due(now_dt): results.append(JobDidNotRun) continue result = job.run(now_dt) results.append(result) if isinstance(result, CancelJob): try: self.jobs.remove(job) except ValueError: # Already removed by user code. pass logger.info( "Job %r cancelled itself (reason=%r)", job, result.reason, ) return results
[docs] def time_of_next_run( self, after: datetime | None = None, *, max_iterations: int = 100_000, ) -> datetime | None: """ Return the earliest next run time across all jobs. This asks each job's trigger for its next run time and returns the minimum. Triggers that cannot compute a next time are ignored. Parameters ---------- after : datetime, optional Lower bound (inclusive) for the computed next run time. If omitted, uses the current system time. max_iterations : int, default 100_000 Safety cap used by some triggers/combinators that scan forward. """ if after is None: after = datetime.now() next_runs: list[datetime] = [] for job in self.jobs: next_run = job.datetime_of_next_run(after, max_iterations=max_iterations) if next_run is not None: next_runs.append(next_run) return min(next_runs) if next_runs else None
def __repr__(self) -> str: return f"Scheduler(jobs={self.jobs!r})"