Source code for schedium.triggers.base

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta

from schedium.exceptions import NextRunMaxIterationsReached
from schedium.types.granularity import Granularity
from schedium.types.time_window import TimeWindow
from schedium.utils.truncate_to_granularity import truncate
from schedium.utils.window import (
    ONE_MICROSECOND,
    window_intersection,
    window_union_if_overlapping,
)


def _add_months(dt: datetime, months: int) -> datetime:
    if months == 0:
        return dt
    month0 = (dt.year * 12 + (dt.month - 1)) + months
    year = month0 // 12
    month = (month0 % 12) + 1
    return dt.replace(year=year, month=month)


def _increment(dt: datetime, granularity: Granularity) -> datetime:
    if granularity == Granularity.EXACT:
        raise ValueError("Cannot increment EXACT granularity")
    if granularity == Granularity.MILLISECOND:
        return dt + timedelta(milliseconds=1)
    if granularity == Granularity.SECOND:
        return dt + timedelta(seconds=1)
    if granularity == Granularity.MINUTE:
        return dt + timedelta(minutes=1)
    if granularity == Granularity.HOUR:
        return dt + timedelta(hours=1)
    if granularity == Granularity.DAY:
        return dt + timedelta(days=1)
    if granularity == Granularity.WEEK:
        return dt + timedelta(days=7)
    if granularity == Granularity.MONTH:
        return _add_months(dt, 1)
    if granularity == Granularity.YEAR:
        return dt.replace(year=dt.year + 1)
    raise ValueError(f"Unsupported granularity: {granularity}")


def _effective_granularity(trigger: BaseTrigger) -> Granularity:
    g = trigger.required_granularity()
    if g is not None:
        return g
    g = trigger.fallback_granularity()
    if g is not None:
        return g
    return Granularity.SECOND


[docs] @dataclass class TriggerEvent: token: object
def _bucket_end_inclusive(bucket_start: datetime, granularity: Granularity) -> datetime: next_boundary = _increment(truncate(bucket_start, granularity), granularity) return next_boundary - ONE_MICROSECOND def _scan_next_match_start( trigger: BaseTrigger, after: datetime, *, granularity: Granularity, max_iterations: int, ) -> datetime | None: if max_iterations <= 0: raise ValueError("max_iterations must be > 0") if granularity == Granularity.EXACT: return None candidate = truncate(after, granularity) if candidate < after: candidate = _increment(candidate, granularity) iterations = 0 while True: if trigger.matches(candidate): return candidate if iterations >= max_iterations: break candidate = _increment(candidate, granularity) iterations += 1 raise NextRunMaxIterationsReached( max_iterations=max_iterations, trigger_repr=repr(trigger), )
[docs] class BaseTrigger: """ Base trigger node. Triggers are pure (no mutation). Deduplication is handled at job level via the returned `TriggerEvent.token`. """ def __and__(self, other: BaseTrigger) -> BaseCombinatorTrigger: left = self.triggers if isinstance(self, AndTrigger) else (self,) right = other.triggers if isinstance(other, AndTrigger) else (other,) return AndTrigger(triggers=[*left, *right]) def __or__(self, other: BaseTrigger) -> BaseCombinatorTrigger: left = self.triggers if isinstance(self, OrTrigger) else (self,) right = other.triggers if isinstance(other, OrTrigger) else (other,) return OrTrigger(triggers=[*left, *right])
[docs] def matches(self, now: datetime) -> bool: raise NotImplementedError
[docs] def required_granularity(self) -> Granularity | None: return None
[docs] def fallback_granularity(self) -> Granularity | None: return None
[docs] def next_window( self, after: datetime, *, max_iterations: int = 100_000, ) -> TimeWindow | None: """ Return the next validity window whose start is >= ``after``. For most constraint-style triggers, the default implementation: 1) finds the next matching time using a forward scan at an inferred granularity, then 2) returns a single-bucket window at that granularity. Parameters ---------- after : datetime Lower bound (inclusive) for the returned window start. max_iterations : int, default 100_000 Safety cap used by some triggers/combinators that scan forward. Returns ------- schedium.types.time_window.TimeWindow | None Next validity window, or :obj:`None` if no future window exists. Raises ------ ValueError If ``max_iterations <= 0``. schedium.exceptions.NextRunMaxIterationsReached If a forward scan exceeds ``max_iterations``. """ if max_iterations <= 0: raise ValueError("max_iterations must be > 0") granularity = _effective_granularity(self) if self.matches(after): if granularity == Granularity.EXACT: return TimeWindow(start=after, end=after) return TimeWindow( start=after, end=_bucket_end_inclusive(after, granularity) ) start = _scan_next_match_start( self, after, granularity=granularity, max_iterations=max_iterations, ) if start is None: return None return TimeWindow(start=start, end=_bucket_end_inclusive(start, granularity))
[docs] class BaseCombinatorTrigger(BaseTrigger): def __init__(self, triggers: Sequence[BaseTrigger]) -> None: self.triggers = triggers def __repr__(self) -> str: inner = ", ".join(repr(t) for t in self.triggers) return f"{type(self).__name__}({inner})"
[docs] def required_granularity(self) -> Granularity | None: gran: list[Granularity] = [] for t in self.triggers: g = t.required_granularity() if g is not None: gran.append(g) return min(gran) if gran else None
[docs] def fallback_granularity(self) -> Granularity | None: gran: list[Granularity] = [] for t in self.triggers: g = t.fallback_granularity() if g is not None: gran.append(g) return min(gran) if gran else None
[docs] class AndTrigger(BaseCombinatorTrigger): # numpydoc ignore=PR02 """ Logical AND (intersection) of multiple triggers. An :class:`~schedium.triggers.AndTrigger` matches a datetime only when **all** its child triggers match. In window-time terms, ``A & B`` behaves like the intersection of the next validity windows returned by ``A`` and ``B``. Parameters ---------- triggers : Sequence[BaseTrigger] Child triggers to combine with AND logic. Notes ----- - Prefer composing with ``&`` over instantiating :class:`~schedium.triggers.AndTrigger` directly. - ``&`` is associative in meaning. schedium also flattens nested AND nodes, so ``(A & B) & C`` becomes a single :class:`~schedium.triggers.AndTrigger`. Examples -------- Use ``&`` to combine existing triggers: >>> from schedium import Every, On >>> trigger1 = Every(unit="day", interval=1) >>> trigger2 = On(unit="hour_of_day", value=8) >>> trigger3 = On(unit="minute_of_hour", value=0) ``trigger1 & trigger2`` produces an explicit ``AndTrigger(trigger1, trigger2)``: >>> trigger1 & trigger2 AndTrigger(Every(unit='day', interval=1, offset=0), On(unit='hour_of_day', value=8)) Chaining ``&`` is flattened into a single AndTrigger node: >>> trigger1 & trigger2 & trigger3 AndTrigger(Every(unit='day', interval=1, offset=0), On(unit='hour_of_day', value=8), On(unit='minute_of_hour', value=0)) """
[docs] def matches(self, now: datetime) -> bool: return all(t.matches(now) for t in self.triggers)
[docs] def next_window( self, after: datetime, *, max_iterations: int = 100_000 ) -> TimeWindow | None: if max_iterations <= 0: raise ValueError("max_iterations must be > 0") candidate = after iterations = 0 while iterations < max_iterations: remaining = max_iterations - iterations windows: list[TimeWindow] = [] for t in self.triggers: w = t.next_window(candidate, max_iterations=remaining) if w is None: return None windows.append(w) # Intersect all child windows. intersection = windows[0] for w in windows[1:]: nxt = window_intersection(intersection, w) if nxt is None: intersection = None # type: ignore[assignment] break intersection = nxt if intersection is not None: return intersection # No overlap: advance just past the earliest window end. earliest_end: datetime | None = None for w in windows: if w.end is None: continue if earliest_end is None or w.end < earliest_end: earliest_end = w.end if earliest_end is None: # All windows are unbounded but still didn't overlap (shouldn't happen) return None candidate = earliest_end + ONE_MICROSECOND iterations += 1 raise NextRunMaxIterationsReached( max_iterations=max_iterations, trigger_repr=repr(self) )
[docs] class OrTrigger(BaseCombinatorTrigger): # numpydoc ignore=PR02 """ Logical OR (alternatives) of multiple triggers. An :class:`~schedium.triggers.OrTrigger` matches a datetime when **any** child trigger matches. For ``A | B``, :meth:`~schedium.triggers.base.BaseTrigger.next_window` returns the earliest next window among children. If multiple children yield overlapping windows at the earliest start, those windows are merged. Parameters ---------- triggers : Sequence[BaseTrigger] Child triggers to combine with OR logic. Notes ----- - Prefer composing with ``|`` over instantiating :class:`~schedium.triggers.OrTrigger` directly. - schedium flattens nested OR nodes, so ``(A | B) | C`` becomes a single :class:`~schedium.triggers.OrTrigger`. """
[docs] def matches(self, now: datetime) -> bool: return any(t.matches(now) for t in self.triggers)
[docs] def next_window( self, after: datetime, *, max_iterations: int = 100_000 ) -> TimeWindow | None: if max_iterations <= 0: raise ValueError("max_iterations must be > 0") best: TimeWindow | None = None windows: list[TimeWindow] = [] for t in self.triggers: w = t.next_window(after, max_iterations=max_iterations) if w is None: continue windows.append(w) if best is None or w.start < best.start: best = w if best is None: return None # If any other child window overlaps the earliest, merge them. merged = best for w in windows: if w is merged: continue u = window_union_if_overlapping(merged, w) if u is not None: merged = u if merged.end is None: break return merged