API reference: Threading¶
This section documents the threading helpers in schedium.threading.
ThreadedJobsScheduler¶
- class schedium.threading.ThreadedJobsScheduler(scheduler=None, *, max_workers=None, thread_name_prefix='schedium-job', revert_last_event_on_failure=False)[source][source]¶
Bases:
objectRun due jobs concurrently using a thread pool.
in other words, each job will be executed in a separate thread.
This is an helper that wraps a regular
Scheduler.Key difference vs.
schedium.scheduler.Scheduler.run_pending():When a job is due, this wrapper claims the trigger token immediately (by updating
job.last_event) before dispatchingjob.functo a worker thread. This prevents duplicate submissions when the scheduler loop runs again while a job is still executing.
- Parameters:
- schedulerschedium.scheduler.Scheduler, optional
An optional scheduler to wrap. If omitted, a new empty scheduler is used.
- max_workersint | None, default None
Passed to
concurrent.futures.ThreadPoolExecutor.- thread_name_prefixstr, default “schedium-job”
Prefix used for worker thread names.
- revert_last_event_on_failurebool, default False
If True, and a job raises an exception,
job.last_eventis reverted to its previous value so the job may be retried within the same token on a subsequent call.
- Parameters:
scheduler (Scheduler | None)
max_workers (int | None)
thread_name_prefix (str)
revert_last_event_on_failure (bool)
Notes
- Thread safety
This wrapper guards access to
scheduler.jobsandjob.last_event. You should register/remove jobs either before starting threads, or by using the same instance and avoiding concurrent mutation from user threads.- CancelJob handling
If a job returns
CancelJob, the job is removed from the underlying scheduler.
Examples
Run due jobs in a worker pool while you keep control of the main loop
>>> import time >>> from datetime import datetime >>> from schedium import Every, Job, JobDidNotRun, Scheduler >>> from schedium.threading import ThreadedJobsScheduler >>> sched = Scheduler() >>> sched.append(Job(lambda: "ok", Every(unit="second", interval=1))) >>> threaded = ThreadedJobsScheduler(sched, max_workers=4) >>> futures = threaded.run_pending(now=datetime(2026, 2, 12, 12, 0, 0), wait=False) >>> [f.result(timeout=1) for f in futures if f is not JobDidNotRun] ['ok'] >>> threaded.shutdown()
- shutdown(*, wait=True, cancel_futures=False)[source][source]¶
Shut down the worker pool.
- Parameters:
- waitbool, default True
If True, block until all running jobs complete. If False, return immediately and let running jobs complete in the background.
- cancel_futuresbool, default False
If True, attempt to cancel pending jobs that have not yet started. This does not affect already running jobs.
- Parameters:
wait (bool)
cancel_futures (bool)
- Return type:
None
- run_pending(now: datetime | None = None, *, wait: Literal[False], timeout: float | None = None) list[JobDidNotRunType | Future[object]][source][source]¶
- run_pending(now: datetime | None = None, *, wait: Literal[True], timeout: float | None = None) list[object]
Run due jobs in worker threads.
- Parameters:
- nowdatetime | None, optional
Timestamp used to evaluate triggers. Defaults to
datetime.now().- waitbool, default False
If True, block and return concrete results (like
Scheduler.run_pending). If False, returnFuturefor due jobs.- timeoutfloat | None, default None
Timeout (seconds) used when waiting for each job future.
- Returns:
- list[JobDidNotRunType | Future[object]]
A list aligned with the scheduler’s job snapshot.
Not due ->
schedium.scheduler.JobDidNotRunDue + wait=False ->
concurrent.futures.FutureDue + wait=True -> the job’s return value
QueuedJobsScheduler¶
- class schedium.threading.QueuedJobsScheduler(scheduler=None, *, worker_count=1, queue_=None, thread_name_prefix='schedium-worker', revert_last_event_on_failure=False)[source][source]¶
Bases:
objectRun the scheduler in the main thread, dispatch due jobs via a queue to workers.
This helper is useful when you want the scheduler loop to remain in the main thread (or any single thread you control), while job execution is handled by one or more worker threads consuming a queue.
Compared to
ThreadedJobsScheduler, this does not useconcurrent.futures.ThreadPoolExecutor. Instead, it maintains:a producer step (your call to
run_pending()) that enqueues due jobs, andworker threads that consume and execute the queued jobs.
- Parameters:
- schedulerschedium.scheduler.Scheduler, optional
Scheduler holding the jobs.
- worker_countint, default 1
Number of worker threads consuming the job queue.
- queue_queue.Queue | None, default None
Queue to use for job queue. If None, a new
queue.Queueis created.- thread_name_prefixstr, default “schedium-worker”
Prefix for worker thread names.
- revert_last_event_on_failurebool, default False
If True and a job raises, reverts
job.last_eventto the previous value so it may be retried for the same token.
- Parameters:
scheduler (Scheduler | None)
worker_count (int)
queue_ (queue.Queue | None)
thread_name_prefix (str)
revert_last_event_on_failure (bool)
Examples
Keep scheduling decisions in your thread, but execute jobs on worker threads
>>> import queue >>> from datetime import datetime >>> from schedium import Every, Job, JobDidNotRun, Scheduler >>> from schedium.threading import QueuedJobsScheduler >>> sched = Scheduler() >>> sched.append(Job(lambda: "ok", Every(unit="second", interval=1))) >>> q = queue.Queue(maxsize=100) >>> queued = QueuedJobsScheduler(sched, worker_count=2, queue_=q) >>> futures = queued.run_pending(now=datetime(2026, 2, 12, 12, 0, 0)) >>> [f.result(timeout=1) for f in futures if f is not JobDidNotRun] ['ok'] >>> queued.stop_workers(join=True)
- stop_workers(*, join=True, timeout=None)[source][source]¶
- Parameters:
join (bool)
timeout (float | None)
- Return type:
None
- run_pending(now=None, *, block=True, queue_timeout=None)[source][source]¶
Enqueue due jobs and return Futures aligned with the job snapshot.
- Parameters:
- nowdatetime | None, optional
Timestamp used to evaluate triggers. Defaults to
datetime.now().- blockbool, default True
If True, block if the queue is full until a free slot is available. If False, raise
queue.Fullif the queue is full.- queue_timeoutfloat | None, default None
Timeout (seconds) used when blocking to enqueue. If None, block indefinitely.
- Parameters:
now (datetime | None)
block (bool)
queue_timeout (float | None)
- Return type:
list[JobDidNotRunType | Future[object]]
SchedulerThread¶
- class schedium.threading.SchedulerThread(scheduler, *, interval=1.0, name='schedium-scheduler', daemon=True, now_func=None)[source][source]¶
Bases:
objectRun a already-initialized scheduler loop in a dedicated thread.
- Parameters:
- schedulerschedium.scheduler.Scheduler | schedium.utils.threading.ThreadedJobsScheduler
Scheduler-like object providing
run_pending.- intervalfloat, default 1.0
Sleep interval between calls to
run_pending.- namestr, default “schedium-scheduler”
Background thread name.
- daemonbool, default True
Whether the scheduler thread is a daemon.
- now_funcCallable[[], datetime] | None, default None
If provided, used as the
now=...argument passed torun_pending. Useful for deterministic testing.
- Parameters:
scheduler (Scheduler | ThreadedJobsScheduler)
interval (float)
name (str)
daemon (bool)
now_func (Callable[[], datetime] | None)
Notes
If
scheduleris aThreadedJobsScheduler, the loop callsrun_pending(wait=False)so jobs run on the worker pool while the scheduler thread keeps ticking.Examples
Run the scheduler loop in a background thread
>>> import time >>> from schedium import Every, Job, Scheduler >>> from schedium.threading import SchedulerThread, ThreadedJobsScheduler >>> sched = Scheduler() >>> sched.append(Job(lambda: None, Every(unit="second", interval=1))) >>> threaded = ThreadedJobsScheduler(sched, max_workers=2) >>> runner = SchedulerThread(threaded, interval=0.1) >>> runner.start() >>> time.sleep(0.2) >>> runner.stop(); runner.join() >>> threaded.shutdown()