Source code for schedium.triggers.every

from __future__ import annotations

import logging
from datetime import datetime, timedelta, timezone, tzinfo

from schedium.triggers.base import BaseTrigger
from schedium.types.granularity import (
    UNIT_TO_GRANULARITY_MAP,
    Granularity,
    GranularityUnit,
)
from schedium.types.time_window import TimeWindow
from schedium.utils.since_epoch import since_epoch
from schedium.utils.truncate_to_granularity import truncate

logger = logging.getLogger(__name__)


def datetime_from_since_epoch(
    value: int, granularity: Granularity, tzinfo: tzinfo | None
) -> datetime:
    epoch = datetime(1970, 1, 1, tzinfo=tzinfo)
    if granularity == Granularity.MILLISECOND:
        return epoch + timedelta(milliseconds=value)
    if granularity == Granularity.SECOND:
        return epoch + timedelta(seconds=value)
    if granularity == Granularity.MINUTE:
        return epoch + timedelta(minutes=value)
    if granularity == Granularity.HOUR:
        return epoch + timedelta(hours=value)
    if granularity == Granularity.DAY:
        d = epoch + timedelta(days=value)
        return datetime(d.year, d.month, d.day, tzinfo=tzinfo)
    if granularity == Granularity.WEEK:
        d = epoch + timedelta(days=value * 7)
        return datetime(d.year, d.month, d.day, tzinfo=tzinfo)
    if granularity == Granularity.MONTH:
        year = 1970 + (value // 12)
        month = (value % 12) + 1
        return datetime(year, month, 1, tzinfo=tzinfo)
    if granularity == Granularity.YEAR:
        return datetime(1970 + value, 1, 1, tzinfo=tzinfo)
    raise ValueError(f"Unsupported granularity for datetime conversion: {granularity}")


[docs] class Every(BaseTrigger): """ Trigger that matches on a fixed cadence. `Every` defines *when time advances* for a schedule. When it matches, the scheduler may run the job (subject to deduplication and any additional constraints). Conceptually, `Every` divides time into buckets of a given granularity (minute/hour/day/etc.) starting at the Unix epoch, and matches when the bucket number satisfies:: unit_since_epoch % interval == offset unit_since_epoch is the number of complete `unit` buckets since the Unix epoch. For example, for `unit="minute"`, it counts the number of complete minutes since 1970-01-01T00:00:00Z. Parameters ---------- unit : schedium.schemas.granularity.GranularityUnit The cadence unit. One of ``"year"``, ``"month"``, ``"week"``, ``"day"``, ``"hour"``, ``"minute"``, ``"second"``, ``"millisecond"``. interval : int The period in number of `unit` buckets. Must be ``> 0``. Example: ``interval=5`` with ``unit="minute"`` means every 5 minutes on epoch-aligned boundaries. offset : int, default 0 Phase offset within the cycle. Must satisfy ``0 <= offset < interval``. Example: with ``Every(unit="minute", interval=5, offset=2)``, the trigger matches for minute buckets numbered 2, 7, 12, ... since epoch. auto_offset : bool, default False If True, ignores the provided `offset` and chooses an offset based on the current time (UTC) such that the trigger matches immediately and then repeats every `interval` buckets. This is useful when you want to start a job “now” but still maintain a stable cadence afterward. Notes ----- Alignment (important) `Every` is **epoch-aligned**, not “relative to job creation time” when auto_offset is False. For example, ``Every(unit="hour", interval=1)`` matches exactly on the hour. If you need a “run **once** sometime after X” semantics, consider :class:`~schedium.triggers.datetime.AtDateTime`. Deduplication schedium deduplicates at the job level using a trigger event token. For `Every`, the effective token bucket is the trigger's granularity (minute/hour/day...). Calling ``Scheduler.run_pending`` repeatedly within the same bucket will run the job only once. `interval == 1` When ``interval=1``, `Every` matches every bucket. The project logs a warning suggesting :class:`~schedium.triggers.sugar.tick.Tick` instead, which is often clearer for “always match, but dedup by granularity”. Every vs Tick Even though both ``Every(unit=..., interval=1)`` and ``Tick(...)`` match for all ``now``, they differ in their notion of “next time”. For example, for ``after=10:00:30``: - ``Tick("minute").next_window(after)`` starts at ``10:00:30``. - ``Every("minute", interval=1).next_window(after)`` starts at the next epoch-aligned boundary (typically ``10:01:00``). Timezones `since_epoch(...)` uses the provided ``datetime``. If you use timezone- aware datetimes, keep them consistent across calls. Examples -------- Every 5 minutes >>> from schedium import Every >>> Every(unit="minute", interval=5) Every(unit='minute', interval=5, offset=0) Every 5 minutes, but shifted by 2 minutes >>> from schedium import Every >>> Every(unit="minute", interval=5, offset=2) Every(unit='minute', interval=5, offset=2) Every 2 hours minute 30 (cadence + constraints) >>> from schedium import Every, On >>> trigger = Every(unit="hour", interval=2) & On(unit="minute_of_hour", value=30) Auto-offset: run immediately, then every 10 minutes >>> from schedium import Every >>> trigger = Every(unit="minute", interval=10, auto_offset=True) """ def __init__( self, unit: GranularityUnit, interval: int, offset: int = 0, auto_offset: bool = False, ): if interval <= 0: raise ValueError("interval must be > 0") if not (0 <= offset < interval): raise ValueError("offset must satisfy 0 <= offset < interval") if interval == 1: logger.warning( "Prefer using Tick(unit=%r) instead of Every(unit=%r, interval=1)", unit, unit, ) self.unit = unit self.granularity = UNIT_TO_GRANULARITY_MAP[unit] self.interval = interval self.offset = offset if auto_offset: self.offset = auto_offset_from_now( self.required_granularity(), self.interval )
[docs] def required_granularity(self) -> Granularity: return self.granularity
[docs] def fallback_granularity(self) -> Granularity: return self.required_granularity()
[docs] def matches(self, now: datetime) -> bool: granularity = self.required_granularity() return since_epoch(now, granularity) % self.interval == self.offset
[docs] def next_window( self, after: datetime, *, max_iterations: int = 100_000, ) -> TimeWindow: granularity = self.required_granularity() start = since_epoch(after, granularity) if truncate(after, granularity) < after: start += 1 remainder = start % self.interval delta = (self.offset - remainder) % self.interval value = start + delta bucket_start = datetime_from_since_epoch(value, granularity, after.tzinfo) bucket_end = datetime_from_since_epoch(value + 1, granularity, after.tzinfo) return TimeWindow( start=bucket_start, end=bucket_end - timedelta(microseconds=1) )
def __repr__(self) -> str: return ( f"Every(unit={self.unit!r}, interval={self.interval}, offset={self.offset})" )
def auto_offset_from_now(granularity: Granularity, interval: int) -> int: """ Calculate an epoch-aligned offset that matches immediately. This allows for the first run to fire right away, and subsequent runs to be spaced by the given interval. Parameters ---------- granularity : schedium.schemas.granularity.Granularity Granularity of the time bucket. interval : int Interval in number of buckets. Returns ------- int Offset in ``[0, interval)``. """ now = datetime.now(timezone.utc) return since_epoch(now, granularity) % interval