API reference: Threading

This section documents the threading helpers in schedium.threading.

ThreadedJobsScheduler

class schedium.threading.ThreadedJobsScheduler(scheduler=None, *, max_workers=None, thread_name_prefix='schedium-job', revert_last_event_on_failure=False)[source][source]

Bases: object

Run due jobs concurrently using a thread pool.

in other words, each job will be executed in a separate thread.

This is an helper that wraps a regular Scheduler.

Key difference vs. schedium.scheduler.Scheduler.run_pending():

  • When a job is due, this wrapper claims the trigger token immediately (by updating job.last_event) before dispatching job.func to a worker thread. This prevents duplicate submissions when the scheduler loop runs again while a job is still executing.

Parameters:
schedulerschedium.scheduler.Scheduler, optional

An optional scheduler to wrap. If omitted, a new empty scheduler is used.

max_workersint | None, default None

Passed to concurrent.futures.ThreadPoolExecutor.

thread_name_prefixstr, default “schedium-job”

Prefix used for worker thread names.

revert_last_event_on_failurebool, default False

If True, and a job raises an exception, job.last_event is reverted to its previous value so the job may be retried within the same token on a subsequent call.

Parameters:
  • scheduler (Scheduler | None)

  • max_workers (int | None)

  • thread_name_prefix (str)

  • revert_last_event_on_failure (bool)

Notes

Thread safety

This wrapper guards access to scheduler.jobs and job.last_event. You should register/remove jobs either before starting threads, or by using the same instance and avoiding concurrent mutation from user threads.

CancelJob handling

If a job returns CancelJob, the job is removed from the underlying scheduler.

Examples

Run due jobs in a worker pool while you keep control of the main loop

>>> import time
>>> from datetime import datetime
>>> from schedium import Every, Job, JobDidNotRun, Scheduler
>>> from schedium.threading import ThreadedJobsScheduler
>>> sched = Scheduler()
>>> sched.append(Job(lambda: "ok", Every(unit="second", interval=1)))
>>> threaded = ThreadedJobsScheduler(sched, max_workers=4)
>>> futures = threaded.run_pending(now=datetime(2026, 2, 12, 12, 0, 0), wait=False)
>>> [f.result(timeout=1) for f in futures if f is not JobDidNotRun]
['ok']
>>> threaded.shutdown()
append(job)[source][source]
Parameters:

job (Job)

Return type:

None

property jobs: list[Job][source]

Access the underlying job list.

shutdown(*, wait=True, cancel_futures=False)[source][source]

Shut down the worker pool.

Parameters:
waitbool, default True

If True, block until all running jobs complete. If False, return immediately and let running jobs complete in the background.

cancel_futuresbool, default False

If True, attempt to cancel pending jobs that have not yet started. This does not affect already running jobs.

Parameters:
  • wait (bool)

  • cancel_futures (bool)

Return type:

None

run_pending(now: datetime | None = None, *, wait: Literal[False], timeout: float | None = None) list[JobDidNotRunType | Future[object]][source][source]
run_pending(now: datetime | None = None, *, wait: Literal[True], timeout: float | None = None) list[object]

Run due jobs in worker threads.

Parameters:
nowdatetime | None, optional

Timestamp used to evaluate triggers. Defaults to datetime.now().

waitbool, default False

If True, block and return concrete results (like Scheduler.run_pending). If False, return Future for due jobs.

timeoutfloat | None, default None

Timeout (seconds) used when waiting for each job future.

Returns:
list[JobDidNotRunType | Future[object]]

A list aligned with the scheduler’s job snapshot.

  • Not due -> schedium.scheduler.JobDidNotRun

  • Due + wait=False -> concurrent.futures.Future

  • Due + wait=True -> the job’s return value

QueuedJobsScheduler

class schedium.threading.QueuedJobsScheduler(scheduler=None, *, worker_count=1, queue_=None, thread_name_prefix='schedium-worker', revert_last_event_on_failure=False)[source][source]

Bases: object

Run the scheduler in the main thread, dispatch due jobs via a queue to workers.

This helper is useful when you want the scheduler loop to remain in the main thread (or any single thread you control), while job execution is handled by one or more worker threads consuming a queue.

Compared to ThreadedJobsScheduler, this does not use concurrent.futures.ThreadPoolExecutor. Instead, it maintains:

  • a producer step (your call to run_pending()) that enqueues due jobs, and

  • worker threads that consume and execute the queued jobs.

Parameters:
schedulerschedium.scheduler.Scheduler, optional

Scheduler holding the jobs.

worker_countint, default 1

Number of worker threads consuming the job queue.

queue_queue.Queue | None, default None

Queue to use for job queue. If None, a new queue.Queue is created.

thread_name_prefixstr, default “schedium-worker”

Prefix for worker thread names.

revert_last_event_on_failurebool, default False

If True and a job raises, reverts job.last_event to the previous value so it may be retried for the same token.

Parameters:
  • scheduler (Scheduler | None)

  • worker_count (int)

  • queue_ (queue.Queue | None)

  • thread_name_prefix (str)

  • revert_last_event_on_failure (bool)

Examples

Keep scheduling decisions in your thread, but execute jobs on worker threads

>>> import queue
>>> from datetime import datetime
>>> from schedium import Every, Job, JobDidNotRun, Scheduler
>>> from schedium.threading import QueuedJobsScheduler
>>> sched = Scheduler()
>>> sched.append(Job(lambda: "ok", Every(unit="second", interval=1)))
>>> q = queue.Queue(maxsize=100)
>>> queued = QueuedJobsScheduler(sched, worker_count=2, queue_=q)
>>> futures = queued.run_pending(now=datetime(2026, 2, 12, 12, 0, 0))
>>> [f.result(timeout=1) for f in futures if f is not JobDidNotRun]
['ok']
>>> queued.stop_workers(join=True)
append(job)[source][source]
Parameters:

job (Job)

Return type:

None

property jobs: list[Job][source]
start_workers()[source][source]
Return type:

None

stop_workers(*, join=True, timeout=None)[source][source]
Parameters:
  • join (bool)

  • timeout (float | None)

Return type:

None

run_pending(now=None, *, block=True, queue_timeout=None)[source][source]

Enqueue due jobs and return Futures aligned with the job snapshot.

Parameters:
nowdatetime | None, optional

Timestamp used to evaluate triggers. Defaults to datetime.now().

blockbool, default True

If True, block if the queue is full until a free slot is available. If False, raise queue.Full if the queue is full.

queue_timeoutfloat | None, default None

Timeout (seconds) used when blocking to enqueue. If None, block indefinitely.

Parameters:
  • now (datetime | None)

  • block (bool)

  • queue_timeout (float | None)

Return type:

list[JobDidNotRunType | Future[object]]

SchedulerThread

class schedium.threading.SchedulerThread(scheduler, *, interval=1.0, name='schedium-scheduler', daemon=True, now_func=None)[source][source]

Bases: object

Run a already-initialized scheduler loop in a dedicated thread.

Parameters:
schedulerschedium.scheduler.Scheduler | schedium.utils.threading.ThreadedJobsScheduler

Scheduler-like object providing run_pending.

intervalfloat, default 1.0

Sleep interval between calls to run_pending.

namestr, default “schedium-scheduler”

Background thread name.

daemonbool, default True

Whether the scheduler thread is a daemon.

now_funcCallable[[], datetime] | None, default None

If provided, used as the now=... argument passed to run_pending. Useful for deterministic testing.

Parameters:

Notes

If scheduler is a ThreadedJobsScheduler, the loop calls run_pending(wait=False) so jobs run on the worker pool while the scheduler thread keeps ticking.

Examples

Run the scheduler loop in a background thread

>>> import time
>>> from schedium import Every, Job, Scheduler
>>> from schedium.threading import SchedulerThread, ThreadedJobsScheduler
>>> sched = Scheduler()
>>> sched.append(Job(lambda: None, Every(unit="second", interval=1)))
>>> threaded = ThreadedJobsScheduler(sched, max_workers=2)
>>> runner = SchedulerThread(threaded, interval=0.1)
>>> runner.start()
>>> time.sleep(0.2)
>>> runner.stop(); runner.join()
>>> threaded.shutdown()
start()[source][source]
Return type:

None

stop()[source][source]
Return type:

None

join(timeout=None)[source][source]
Parameters:

timeout (float | None)

Return type:

None

property is_alive: bool[source]