Skip to content

API Reference

docket

docket - A distributed background task system for Python functions.

docket focuses on scheduling future work as seamlessly and efficiently as immediate work.

Agenda

A collection of tasks to be scheduled together on a Docket.

The Agenda allows you to build up a collection of tasks with their arguments, then schedule them all at once using various timing strategies like scattering.

Example

agenda = Agenda() agenda.add(process_item)(item1) agenda.add(process_item)(item2) agenda.add(send_email)(email) await agenda.scatter(docket, over=timedelta(minutes=50))

Source code in src/docket/agenda.py
class Agenda:
    """A collection of tasks to be scheduled together on a Docket.

    The Agenda allows you to build up a collection of tasks with their arguments,
    then schedule them all at once using various timing strategies like scattering.

    Example:
        >>> agenda = Agenda()
        >>> agenda.add(process_item)(item1)
        >>> agenda.add(process_item)(item2)
        >>> agenda.add(send_email)(email)
        >>> await agenda.scatter(docket, over=timedelta(minutes=50))
    """

    def __init__(self) -> None:
        """Initialize an empty Agenda."""
        self._tasks: list[
            tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]
        ] = []

    def __len__(self) -> int:
        """Return the number of tasks in the agenda."""
        return len(self._tasks)

    def __iter__(
        self,
    ) -> Iterator[tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]]:
        """Iterate over tasks in the agenda."""
        return iter(self._tasks)

    @overload
    def add(
        self,
        function: Callable[P, Awaitable[R]],
    ) -> Callable[P, None]:
        """Add a task function to the agenda.

        Args:
            function: The task function to add.

        Returns:
            A callable that accepts the task arguments.
        """

    @overload
    def add(
        self,
        function: str,
    ) -> Callable[..., None]:
        """Add a task by name to the agenda.

        Args:
            function: The name of a registered task.

        Returns:
            A callable that accepts the task arguments.
        """

    def add(
        self,
        function: Callable[P, Awaitable[R]] | str,
    ) -> Callable[..., None]:
        """Add a task to the agenda.

        Args:
            function: The task function or name to add.

        Returns:
            A callable that accepts the task arguments and adds them to the agenda.
        """

        def scheduler(*args: Any, **kwargs: Any) -> None:
            self._tasks.append((function, args, kwargs))

        return scheduler

    def clear(self) -> None:
        """Clear all tasks from the agenda."""
        self._tasks.clear()

    async def scatter(
        self,
        docket: Docket,
        over: timedelta,
        start: datetime | None = None,
        jitter: timedelta | None = None,
    ) -> list[Execution]:
        """Scatter the tasks in this agenda over a time period.

        Tasks are distributed evenly across the specified time window,
        optionally with random jitter to prevent thundering herd effects.

        If an error occurs during scheduling, some tasks may have already been
        scheduled successfully before the failure occurred.

        Args:
            docket: The Docket to schedule tasks on.
            over: Time period to scatter tasks over (required).
            start: When to start scattering from. Defaults to now.
            jitter: Maximum random offset to add/subtract from each scheduled time.

        Returns:
            List of Execution objects for the scheduled tasks.

        Raises:
            KeyError: If any task name is not registered with the docket.
            ValueError: If any task is stricken or 'over' is not positive.
        """
        if over.total_seconds() <= 0:
            raise ValueError("'over' parameter must be a positive duration")

        if not self._tasks:
            return []

        if start is None:
            start = datetime.now(timezone.utc)

        # Calculate even distribution over the time period
        task_count = len(self._tasks)

        if task_count == 1:
            # Single task goes in the middle of the window
            schedule_times = [start + over / 2]
        else:
            # Distribute tasks evenly across the window
            # For n tasks, we want n points from start to start+over inclusive
            interval = over / (task_count - 1)
            schedule_times = [start + interval * i for i in range(task_count)]

        # Apply jitter if specified
        if jitter:
            jittered_times: list[datetime] = []
            for schedule_time in schedule_times:
                # Random offset between -jitter and +jitter
                offset = timedelta(
                    seconds=random.uniform(
                        -jitter.total_seconds(), jitter.total_seconds()
                    )
                )
                # Ensure the jittered time doesn't go before start
                jittered_time = max(schedule_time + offset, start)
                jittered_times.append(jittered_time)
            schedule_times = jittered_times

        # Build all Execution objects first, validating as we go
        executions: list[Execution] = []
        for (task_func, args, kwargs), schedule_time in zip(
            self._tasks, schedule_times
        ):
            # Resolve task function if given by name
            if isinstance(task_func, str):
                if task_func not in docket.tasks:
                    raise KeyError(f"Task '{task_func}' is not registered")
                resolved_func = docket.tasks[task_func]
            else:
                # Ensure task is registered
                if task_func not in docket.tasks.values():
                    docket.register(task_func)
                resolved_func = task_func

            # Create execution with unique key
            key = str(uuid7())
            execution = Execution(
                docket=docket,
                function=resolved_func,
                args=args,
                kwargs=kwargs,
                key=key,
                when=schedule_time,
                attempt=1,
            )
            executions.append(execution)

        # Schedule all tasks - if any fail, some tasks may have been scheduled
        for execution in executions:
            scheduler = docket.add(
                execution.function, when=execution.when, key=execution.key
            )
            # Actually schedule the task - if this fails, earlier tasks remain scheduled
            await scheduler(*execution.args, **execution.kwargs)

        return executions

__init__()

Initialize an empty Agenda.

Source code in src/docket/agenda.py
def __init__(self) -> None:
    """Initialize an empty Agenda."""
    self._tasks: list[
        tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]
    ] = []

__iter__()

Iterate over tasks in the agenda.

Source code in src/docket/agenda.py
def __iter__(
    self,
) -> Iterator[tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]]:
    """Iterate over tasks in the agenda."""
    return iter(self._tasks)

__len__()

Return the number of tasks in the agenda.

Source code in src/docket/agenda.py
def __len__(self) -> int:
    """Return the number of tasks in the agenda."""
    return len(self._tasks)

add(function)

add(
    function: Callable[P, Awaitable[R]],
) -> Callable[P, None]
add(function: str) -> Callable[..., None]

Add a task to the agenda.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task function or name to add.

required

Returns:

Type Description
Callable[..., None]

A callable that accepts the task arguments and adds them to the agenda.

Source code in src/docket/agenda.py
def add(
    self,
    function: Callable[P, Awaitable[R]] | str,
) -> Callable[..., None]:
    """Add a task to the agenda.

    Args:
        function: The task function or name to add.

    Returns:
        A callable that accepts the task arguments and adds them to the agenda.
    """

    def scheduler(*args: Any, **kwargs: Any) -> None:
        self._tasks.append((function, args, kwargs))

    return scheduler

clear()

Clear all tasks from the agenda.

Source code in src/docket/agenda.py
def clear(self) -> None:
    """Clear all tasks from the agenda."""
    self._tasks.clear()

scatter(docket, over, start=None, jitter=None) async

Scatter the tasks in this agenda over a time period.

Tasks are distributed evenly across the specified time window, optionally with random jitter to prevent thundering herd effects.

If an error occurs during scheduling, some tasks may have already been scheduled successfully before the failure occurred.

Parameters:

Name Type Description Default
docket Docket

The Docket to schedule tasks on.

required
over timedelta

Time period to scatter tasks over (required).

required
start datetime | None

When to start scattering from. Defaults to now.

None
jitter timedelta | None

Maximum random offset to add/subtract from each scheduled time.

None

Returns:

Type Description
list[Execution]

List of Execution objects for the scheduled tasks.

Raises:

Type Description
KeyError

If any task name is not registered with the docket.

ValueError

If any task is stricken or 'over' is not positive.

Source code in src/docket/agenda.py
async def scatter(
    self,
    docket: Docket,
    over: timedelta,
    start: datetime | None = None,
    jitter: timedelta | None = None,
) -> list[Execution]:
    """Scatter the tasks in this agenda over a time period.

    Tasks are distributed evenly across the specified time window,
    optionally with random jitter to prevent thundering herd effects.

    If an error occurs during scheduling, some tasks may have already been
    scheduled successfully before the failure occurred.

    Args:
        docket: The Docket to schedule tasks on.
        over: Time period to scatter tasks over (required).
        start: When to start scattering from. Defaults to now.
        jitter: Maximum random offset to add/subtract from each scheduled time.

    Returns:
        List of Execution objects for the scheduled tasks.

    Raises:
        KeyError: If any task name is not registered with the docket.
        ValueError: If any task is stricken or 'over' is not positive.
    """
    if over.total_seconds() <= 0:
        raise ValueError("'over' parameter must be a positive duration")

    if not self._tasks:
        return []

    if start is None:
        start = datetime.now(timezone.utc)

    # Calculate even distribution over the time period
    task_count = len(self._tasks)

    if task_count == 1:
        # Single task goes in the middle of the window
        schedule_times = [start + over / 2]
    else:
        # Distribute tasks evenly across the window
        # For n tasks, we want n points from start to start+over inclusive
        interval = over / (task_count - 1)
        schedule_times = [start + interval * i for i in range(task_count)]

    # Apply jitter if specified
    if jitter:
        jittered_times: list[datetime] = []
        for schedule_time in schedule_times:
            # Random offset between -jitter and +jitter
            offset = timedelta(
                seconds=random.uniform(
                    -jitter.total_seconds(), jitter.total_seconds()
                )
            )
            # Ensure the jittered time doesn't go before start
            jittered_time = max(schedule_time + offset, start)
            jittered_times.append(jittered_time)
        schedule_times = jittered_times

    # Build all Execution objects first, validating as we go
    executions: list[Execution] = []
    for (task_func, args, kwargs), schedule_time in zip(
        self._tasks, schedule_times
    ):
        # Resolve task function if given by name
        if isinstance(task_func, str):
            if task_func not in docket.tasks:
                raise KeyError(f"Task '{task_func}' is not registered")
            resolved_func = docket.tasks[task_func]
        else:
            # Ensure task is registered
            if task_func not in docket.tasks.values():
                docket.register(task_func)
            resolved_func = task_func

        # Create execution with unique key
        key = str(uuid7())
        execution = Execution(
            docket=docket,
            function=resolved_func,
            args=args,
            kwargs=kwargs,
            key=key,
            when=schedule_time,
            attempt=1,
        )
        executions.append(execution)

    # Schedule all tasks - if any fail, some tasks may have been scheduled
    for execution in executions:
        scheduler = docket.add(
            execution.function, when=execution.when, key=execution.key
        )
        # Actually schedule the task - if this fails, earlier tasks remain scheduled
        await scheduler(*execution.args, **execution.kwargs)

    return executions

ConcurrencyLimit

Bases: Dependency

Configures concurrency limits for task execution.

Can limit concurrency globally for a task, or per specific argument value.

Example:

async def expensive_operation(
    concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3)
) -> None:
    # Only 3 instances of this task will run at a time
    ...

async def process_customer(
    customer_id: int,
    concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
) -> None:
    # Only one task per customer_id will run at a time
    ...

async def backup_db(
    db_name: str,
    concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=3)
) -> None:
    # Only 3 backup tasks per database name will run at a time
    ...
Source code in src/docket/dependencies/_concurrency.py
class ConcurrencyLimit(Dependency):
    """Configures concurrency limits for task execution.

    Can limit concurrency globally for a task, or per specific argument value.

    Example:

    ```python
    async def expensive_operation(
        concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3)
    ) -> None:
        # Only 3 instances of this task will run at a time
        ...

    async def process_customer(
        customer_id: int,
        concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
    ) -> None:
        # Only one task per customer_id will run at a time
        ...

    async def backup_db(
        db_name: str,
        concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=3)
    ) -> None:
        # Only 3 backup tasks per database name will run at a time
        ...
    ```
    """

    single: bool = True

    def __init__(
        self,
        argument_name: str | None = None,
        max_concurrent: int = 1,
        scope: str | None = None,
    ) -> None:
        """
        Args:
            argument_name: The name of the task argument to use for concurrency grouping.
                If None, limits concurrency for the task function itself.
            max_concurrent: Maximum number of concurrent tasks
            scope: Optional scope prefix for Redis keys (defaults to docket name)
        """
        self.argument_name = argument_name
        self.max_concurrent = max_concurrent
        self.scope = scope
        self._concurrency_key: str | None = None
        self._initialized: bool = False
        self._task_key: str | None = None
        self._renewal_task: asyncio.Task[None] | None = None

    async def __aenter__(self) -> ConcurrencyLimit:
        from ._functional import _Depends

        execution = self.execution.get()
        docket = self.docket.get()
        worker = self.worker.get()

        # Build concurrency key based on argument_name (if provided) or function name
        scope = self.scope or docket.name
        if self.argument_name is not None:
            # Per-argument concurrency: limit based on specific argument value
            try:
                argument_value = execution.get_argument(self.argument_name)
            except KeyError as e:
                raise ValueError(
                    f"ConcurrencyLimit argument '{self.argument_name}' not found in "
                    f"task arguments. Available: {list(execution.kwargs.keys())}"
                ) from e
            concurrency_key = (
                f"{scope}:concurrency:{self.argument_name}:{argument_value}"
            )
        else:
            # Per-task concurrency: limit based on task function name
            concurrency_key = f"{scope}:concurrency:{execution.function_name}"

        # Create a NEW instance for this specific task execution
        # This is critical because the original instance is shared across all tasks
        # (Python default arguments are evaluated once at function definition time)
        limit = ConcurrencyLimit(self.argument_name, self.max_concurrent, self.scope)
        limit._concurrency_key = concurrency_key
        limit._initialized = True
        limit._task_key = execution.key

        # Acquire slot
        async with docket.redis() as redis:
            acquired = await limit._acquire_slot(
                redis, execution.redelivered, worker.redelivery_timeout
            )
            if not acquired:  # pragma: no branch
                raise ConcurrencyBlocked(
                    execution, concurrency_key, self.max_concurrent
                )

        # Spawn background task for lease renewal
        limit._renewal_task = asyncio.create_task(
            limit._renew_lease_loop(worker.redelivery_timeout),
            name=f"{docket.name} - concurrency lease:{execution.key}",
        )

        # Register cleanup for this new instance with the AsyncExitStack
        # (The original instance's __aexit__ will also be called but does nothing)
        # Order matters (LIFO): release slot first, then cancel renewal task
        stack = _Depends.stack.get()
        stack.push_async_callback(limit._release_slot)
        stack.push_async_callback(cancel_task, limit._renewal_task, CANCEL_MSG_CLEANUP)

        return limit

    async def __aexit__(
        self,
        _exc_type: type[BaseException] | None,
        _exc_value: BaseException | None,
        _traceback: type[BaseException] | None,
    ) -> None:
        # No-op: The original instance (used as default argument) has no state.
        # Actual cleanup is handled by _cleanup() on the per-task instance,
        # which is registered with the AsyncExitStack via push_async_callback.
        pass

    async def _acquire_slot(
        self, redis: Redis, is_redelivery: bool, redelivery_timeout: timedelta
    ) -> bool:
        """Atomically acquire a concurrency slot.

        Uses a Redis sorted set to track concurrency slots per task. Each entry
        is keyed by task_key with the timestamp as the score.

        When XAUTOCLAIM reclaims a message (because the original worker stopped
        renewing its lease), is_redelivery=True signals that slot takeover is safe.
        If the message is NOT a redelivery and a slot already exists, we block to
        prevent duplicate execution.

        Slots are refreshed during lease renewal every redelivery_timeout/4.
        If all slots are full, we scavenge any slot older than redelivery_timeout
        (meaning it hasn't been refreshed and the worker must be dead).
        """
        # Lua script for atomic concurrency slot management.
        # KEYS[1]: concurrency_key (sorted set tracking slots)
        # ARGV[1]: max_concurrent, ARGV[2]: task_key, ARGV[3]: current_time,
        # ARGV[4]: is_redelivery (0/1), ARGV[5]: stale_threshold, ARGV[6]: key_ttl
        acquire_script = redis.register_script(
            """
            local key = KEYS[1]
            local max_concurrent = tonumber(ARGV[1])
            local task_key = ARGV[2]
            local current_time = tonumber(ARGV[3])
            local is_redelivery = tonumber(ARGV[4])
            local stale_threshold = tonumber(ARGV[5])
            local key_ttl = tonumber(ARGV[6])

            -- Check if this task already has a slot (from a previous delivery attempt)
            local slot_time = redis.call('ZSCORE', key, task_key)
            if slot_time then
                slot_time = tonumber(slot_time)
                if is_redelivery == 1 and slot_time <= stale_threshold then
                    -- Redelivery AND slot is stale: original worker stopped renewing,
                    -- safe to take over the slot.
                    redis.call('ZADD', key, current_time, task_key)
                    redis.call('EXPIRE', key, key_ttl)
                    return 1
                else
                    -- Either not a redelivery, or slot is still fresh (original worker
                    -- is just slow, not dead). Don't take over.
                    return 0
                end
            end

            -- No existing slot for this task - check if we can acquire a new one
            if redis.call('ZCARD', key) < max_concurrent then
                redis.call('ZADD', key, current_time, task_key)
                redis.call('EXPIRE', key, key_ttl)
                return 1
            end

            -- All slots are full. Scavenge any stale slot (not refreshed recently).
            -- Slots are refreshed every redelivery_timeout/4, so anything older than
            -- redelivery_timeout hasn't been refreshed and the worker must be dead.
            local stale_slots = redis.call('ZRANGEBYSCORE', key, 0, stale_threshold, 'LIMIT', 0, 1)
            if #stale_slots > 0 then
                redis.call('ZREM', key, stale_slots[1])
                redis.call('ZADD', key, current_time, task_key)
                redis.call('EXPIRE', key, key_ttl)
                return 1
            end

            return 0
            """
        )

        current_time = datetime.now(timezone.utc).timestamp()
        stale_threshold = current_time - redelivery_timeout.total_seconds()
        key_ttl = max(
            MINIMUM_TTL_SECONDS,
            int(redelivery_timeout.total_seconds() * LEASE_RENEWAL_FACTOR),
        )

        result = await acquire_script(
            keys=[self._concurrency_key],
            args=[
                self.max_concurrent,
                self._task_key,
                current_time,
                1 if is_redelivery else 0,
                stale_threshold,
                key_ttl,
            ],
        )

        return bool(result)

    async def _release_slot(self) -> None:
        """Release a concurrency slot when task completes."""
        # Note: only registered as callback for instances with valid keys
        assert self._concurrency_key and self._task_key

        docket = self.docket.get()
        async with docket.redis() as redis:
            # Remove this task from the sorted set and delete the key if empty
            # KEYS[1]: concurrency_key, ARGV[1]: task_key
            release_script = redis.register_script(
                """
                redis.call('ZREM', KEYS[1], ARGV[1])
                if redis.call('ZCARD', KEYS[1]) == 0 then
                    redis.call('DEL', KEYS[1])
                end
                """
            )
            await release_script(keys=[self._concurrency_key], args=[self._task_key])

    async def _renew_lease_loop(self, redelivery_timeout: timedelta) -> None:
        """Periodically refresh slot timestamp to prevent expiration."""
        docket = self.docket.get()
        renewal_interval = redelivery_timeout.total_seconds() / LEASE_RENEWAL_FACTOR
        key_ttl = max(
            MINIMUM_TTL_SECONDS,
            int(redelivery_timeout.total_seconds() * LEASE_RENEWAL_FACTOR),
        )

        while True:
            await asyncio.sleep(renewal_interval)
            try:
                async with docket.redis() as redis:
                    current_time = datetime.now(timezone.utc).timestamp()
                    await redis.zadd(
                        self._concurrency_key,
                        {self._task_key: current_time},  # type: ignore
                    )
                    await redis.expire(self._concurrency_key, key_ttl)  # type: ignore
            except Exception:  # pragma: no cover
                # Lease renewal is best-effort; if it fails, the slot will eventually
                # be scavenged as stale and the task can be redelivered
                logger.warning(
                    "Concurrency lease renewal failed for %s",
                    self._concurrency_key,
                    exc_info=True,
                )

    @property
    def concurrency_key(self) -> str:
        """Redis key used for tracking concurrency for this specific argument value.
        Raises RuntimeError if accessed before initialization."""
        if not self._initialized:
            raise RuntimeError(
                "ConcurrencyLimit not initialized - use within task context"
            )
        assert self._concurrency_key is not None
        return self._concurrency_key

concurrency_key property

Redis key used for tracking concurrency for this specific argument value. Raises RuntimeError if accessed before initialization.

__init__(argument_name=None, max_concurrent=1, scope=None)

Parameters:

Name Type Description Default
argument_name str | None

The name of the task argument to use for concurrency grouping. If None, limits concurrency for the task function itself.

None
max_concurrent int

Maximum number of concurrent tasks

1
scope str | None

Optional scope prefix for Redis keys (defaults to docket name)

None
Source code in src/docket/dependencies/_concurrency.py
def __init__(
    self,
    argument_name: str | None = None,
    max_concurrent: int = 1,
    scope: str | None = None,
) -> None:
    """
    Args:
        argument_name: The name of the task argument to use for concurrency grouping.
            If None, limits concurrency for the task function itself.
        max_concurrent: Maximum number of concurrent tasks
        scope: Optional scope prefix for Redis keys (defaults to docket name)
    """
    self.argument_name = argument_name
    self.max_concurrent = max_concurrent
    self.scope = scope
    self._concurrency_key: str | None = None
    self._initialized: bool = False
    self._task_key: str | None = None
    self._renewal_task: asyncio.Task[None] | None = None

Cron

Bases: Perpetual

Declare a task that should run on a cron schedule. Cron tasks are automatically rescheduled for the next matching time after they finish (whether they succeed or fail). By default, a cron task is scheduled at worker startup with automatic=True.

Unlike Perpetual which schedules based on intervals from the current time, Cron schedules based on wall-clock time, ensuring tasks run at consistent times regardless of execution duration or delays.

Supports standard cron expressions and Vixie cron-style keywords (@daily, @hourly, etc.) via the croniter library.

Example:

from zoneinfo import ZoneInfo

@task
async def weekly_report(cron: Cron = Cron("0 9 * * 1")) -> None:
    # Runs every Monday at 9:00 AM UTC
    ...

@task
async def daily_cleanup(cron: Cron = Cron("@daily")) -> None:
    # Runs every day at midnight UTC
    ...

@task
async def morning_standup(
    cron: Cron = Cron("0 9 * * 1-5", tz=ZoneInfo("America/Los_Angeles"))
) -> None:
    # Runs weekdays at 9:00 AM Pacific (handles DST automatically)
    ...
Source code in src/docket/dependencies/_cron.py
class Cron(Perpetual):
    """Declare a task that should run on a cron schedule. Cron tasks are automatically
    rescheduled for the next matching time after they finish (whether they succeed or
    fail). By default, a cron task is scheduled at worker startup with `automatic=True`.

    Unlike `Perpetual` which schedules based on intervals from the current time, `Cron`
    schedules based on wall-clock time, ensuring tasks run at consistent times regardless
    of execution duration or delays.

    Supports standard cron expressions and Vixie cron-style keywords (@daily, @hourly, etc.)
    via the croniter library.

    Example:

    ```python
    from zoneinfo import ZoneInfo

    @task
    async def weekly_report(cron: Cron = Cron("0 9 * * 1")) -> None:
        # Runs every Monday at 9:00 AM UTC
        ...

    @task
    async def daily_cleanup(cron: Cron = Cron("@daily")) -> None:
        # Runs every day at midnight UTC
        ...

    @task
    async def morning_standup(
        cron: Cron = Cron("0 9 * * 1-5", tz=ZoneInfo("America/Los_Angeles"))
    ) -> None:
        # Runs weekdays at 9:00 AM Pacific (handles DST automatically)
        ...
    ```
    """

    expression: str
    tz: tzinfo

    _croniter: croniter[datetime]

    def __init__(
        self,
        expression: str,
        automatic: bool = True,
        tz: tzinfo = timezone.utc,
    ) -> None:
        """
        Args:
            expression: A cron expression string. Supports:
                - Standard 5-field syntax: "minute hour day month weekday"
                  (e.g., "0 9 * * 1" for Mondays at 9 AM)
                - Vixie cron keywords: @yearly, @annually, @monthly, @weekly,
                  @daily, @midnight, @hourly
            automatic: If set, this task will be automatically scheduled during worker
                startup and continually through the worker's lifespan. This ensures
                that the task will always be scheduled despite crashes and other
                adverse conditions. Automatic tasks must not require any arguments.
            tz: Timezone for interpreting the cron expression. Defaults to UTC.
                Use `ZoneInfo("America/Los_Angeles")` for Pacific time, etc.
                This correctly handles daylight saving time transitions.
        """
        super().__init__(automatic=automatic)
        self.expression = expression
        self.tz = tz
        self._croniter = croniter(self.expression, datetime.now(self.tz), datetime)

    async def __aenter__(self) -> Cron:
        execution = self.execution.get()
        cron = Cron(expression=self.expression, automatic=self.automatic, tz=self.tz)
        cron.args = execution.args
        cron.kwargs = execution.kwargs
        return cron

    @property
    def initial_when(self) -> datetime:
        """Return the next cron time for initial scheduling."""
        return self._croniter.get_next()

    async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
        """Handle completion by scheduling the next execution at the exact cron time.

        This overrides Perpetual's on_complete to ensure we hit the exact wall-clock
        time rather than adjusting for task duration.
        """
        self.at(self._croniter.get_next())
        return await super().on_complete(execution, outcome)

initial_when property

Return the next cron time for initial scheduling.

__init__(expression, automatic=True, tz=timezone.utc)

Parameters:

Name Type Description Default
expression str

A cron expression string. Supports: - Standard 5-field syntax: "minute hour day month weekday" (e.g., "0 9 * * 1" for Mondays at 9 AM) - Vixie cron keywords: @yearly, @annually, @monthly, @weekly, @daily, @midnight, @hourly

required
automatic bool

If set, this task will be automatically scheduled during worker startup and continually through the worker's lifespan. This ensures that the task will always be scheduled despite crashes and other adverse conditions. Automatic tasks must not require any arguments.

True
tz tzinfo

Timezone for interpreting the cron expression. Defaults to UTC. Use ZoneInfo("America/Los_Angeles") for Pacific time, etc. This correctly handles daylight saving time transitions.

utc
Source code in src/docket/dependencies/_cron.py
def __init__(
    self,
    expression: str,
    automatic: bool = True,
    tz: tzinfo = timezone.utc,
) -> None:
    """
    Args:
        expression: A cron expression string. Supports:
            - Standard 5-field syntax: "minute hour day month weekday"
              (e.g., "0 9 * * 1" for Mondays at 9 AM)
            - Vixie cron keywords: @yearly, @annually, @monthly, @weekly,
              @daily, @midnight, @hourly
        automatic: If set, this task will be automatically scheduled during worker
            startup and continually through the worker's lifespan. This ensures
            that the task will always be scheduled despite crashes and other
            adverse conditions. Automatic tasks must not require any arguments.
        tz: Timezone for interpreting the cron expression. Defaults to UTC.
            Use `ZoneInfo("America/Los_Angeles")` for Pacific time, etc.
            This correctly handles daylight saving time transitions.
    """
    super().__init__(automatic=automatic)
    self.expression = expression
    self.tz = tz
    self._croniter = croniter(self.expression, datetime.now(self.tz), datetime)

on_complete(execution, outcome) async

Handle completion by scheduling the next execution at the exact cron time.

This overrides Perpetual's on_complete to ensure we hit the exact wall-clock time rather than adjusting for task duration.

Source code in src/docket/dependencies/_cron.py
async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
    """Handle completion by scheduling the next execution at the exact cron time.

    This overrides Perpetual's on_complete to ensure we hit the exact wall-clock
    time rather than adjusting for task duration.
    """
    self.at(self._croniter.get_next())
    return await super().on_complete(execution, outcome)

Docket

Bases: DocketSnapshotMixin

A Docket represents a collection of tasks that may be scheduled for later execution. With a Docket, you can add, replace, and cancel tasks. Example:

@task
async def my_task(greeting: str, recipient: str) -> None:
    print(f"{greeting}, {recipient}!")

async with Docket() as docket:
    docket.add(my_task)("Hello", recipient="world")
Source code in src/docket/docket.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
class Docket(DocketSnapshotMixin):
    """A Docket represents a collection of tasks that may be scheduled for later
    execution.  With a Docket, you can add, replace, and cancel tasks.
    Example:

    ```python
    @task
    async def my_task(greeting: str, recipient: str) -> None:
        print(f"{greeting}, {recipient}!")

    async with Docket() as docket:
        docket.add(my_task)("Hello", recipient="world")
    ```
    """

    tasks: dict[str, TaskFunction]
    strike_list: StrikeList

    _redis: RedisConnection
    _result_storage: ResultStorage | None
    _cancel_task_script: _cancel_task | None
    _stack: AsyncExitStack

    def __init__(
        self,
        name: str = "docket",
        url: str = "redis://localhost:6379/0",
        heartbeat_interval: timedelta = timedelta(seconds=2),
        missed_heartbeats: int = 5,
        execution_ttl: timedelta = timedelta(minutes=15),
        result_storage: AsyncKeyValue | None = None,
        enable_internal_instrumentation: bool = False,
    ) -> None:
        """
        Args:
            name: The name of the docket.
            url: The URL of the Redis server or in-memory backend.  For example:
                - "redis://localhost:6379/0"
                - "redis://user:password@localhost:6379/0"
                - "redis://user:password@localhost:6379/0?ssl=true"
                - "rediss://localhost:6379/0"
                - "unix:///path/to/redis.sock"
                - "memory://" (in-memory backend for testing)
            heartbeat_interval: How often workers send heartbeat messages to the docket.
            missed_heartbeats: How many heartbeats a worker can miss before it is
                considered dead.
            execution_ttl: How long to keep completed or failed execution state records
                in Redis before they expire. Defaults to 15 minutes.
            enable_internal_instrumentation: Whether to enable OpenTelemetry spans
                for internal Redis polling operations like strike stream monitoring.
                Defaults to False.
        """
        self.name = name
        self.url = url
        self.heartbeat_interval = heartbeat_interval
        self.missed_heartbeats = missed_heartbeats
        self.execution_ttl = execution_ttl
        self.enable_internal_instrumentation = enable_internal_instrumentation
        self._cancel_task_script = None
        self._user_result_storage = result_storage
        self._redis = RedisConnection(url)

        from .tasks import standard_tasks

        self.tasks: dict[str, TaskFunction] = {fn.__name__: fn for fn in standard_tasks}

    @property
    def worker_group_name(self) -> str:
        return "docket-workers"

    @property
    def prefix(self) -> str:
        """Return the key prefix for this docket.

        All Redis keys for this docket are prefixed with this value.

        For Redis Cluster mode, returns a hash-tagged prefix like "{myapp}"
        to ensure all keys hash to the same slot.
        """
        return self._redis.prefix(self.name)

    def key(self, suffix: str) -> str:
        """Return a Redis key with the docket prefix.

        Args:
            suffix: The key suffix (e.g., "queue", "stream", "runs:task-123")

        Returns:
            Full Redis key like "docket:queue" or "docket:stream"
        """
        return f"{self.prefix}:{suffix}"

    async def __aenter__(self) -> Self:
        self._stack = AsyncExitStack()
        await self._stack.__aenter__()

        self.strike_list = StrikeList(
            url=self.url,
            name=self.name,
            enable_internal_instrumentation=self.enable_internal_instrumentation,
        )

        # Connect to Redis (handles cluster vs standalone)
        await self._stack.enter_async_context(self._redis)

        # Connect the strike list to Redis and start monitoring
        await self._stack.enter_async_context(self.strike_list)

        # Initialize result storage
        if self._user_result_storage is not None:
            self.result_storage: AsyncKeyValue = self._user_result_storage
            self._result_storage = None
            # User-provided storage should handle its own initialization
            if hasattr(self.result_storage, "setup"):
                await self.result_storage.setup()  # type: ignore[union-attr]
        else:
            self._result_storage = ResultStorage(self._redis, self.results_collection)
            await self._stack.enter_async_context(self._result_storage)
            self._stack.callback(lambda: setattr(self, "_result_storage", None))
            self.result_storage = self._result_storage
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        try:
            await self._stack.__aexit__(exc_type, exc_value, traceback)
        finally:
            del self._stack

    @asynccontextmanager
    async def redis(self) -> AsyncGenerator[Redis | RedisCluster, None]:
        async with self._redis.client() as r:
            yield r

    @asynccontextmanager
    async def _pubsub(self) -> AsyncGenerator[PubSub, None]:
        async with self._redis.pubsub() as pubsub:
            yield pubsub

    async def _publish(self, channel: str, message: str) -> int:
        """Publish a message to a pub/sub channel.

        This handles both standalone and cluster modes transparently.

        Args:
            channel: The pub/sub channel to publish to
            message: The message to publish

        Returns:
            Number of subscribers that received the message
        """
        return await self._redis.publish(channel, message)

    def register(self, function: TaskFunction, names: list[str] | None = None) -> None:
        """Register a task with the Docket.

        Args:
            function: The task to register.
            names: Names to register the task under. Defaults to [function.__name__].
        """
        from .dependencies import validate_dependencies

        validate_dependencies(function)

        if not names:
            names = [function.__name__]

        for name in names:
            self.tasks[name] = function

    def register_collection(self, collection_path: str) -> None:
        """
        Register a collection of tasks.

        Args:
            collection_path: A path in the format "module:collection".
        """
        module_name, _, member_name = collection_path.rpartition(":")
        module = importlib.import_module(module_name)
        collection = getattr(module, member_name)
        for function in collection:
            self.register(function)

    def labels(self) -> Mapping[str, str]:
        return {
            "docket.name": self.name,
        }

    @overload
    def add(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[P, Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task function to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def add(
        self,
        function: str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The name of a task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def add(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        function_name: str | None = None
        if isinstance(function, str):
            function_name = function
            function = self.tasks[function]
        else:
            self.register(function)

        if when is None:
            when = datetime.now(timezone.utc)

        if key is None:
            key = str(uuid7())

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(
                self,
                function,
                args,
                kwargs,
                key,
                when,
                attempt=1,
                function_name=function_name,
            )

            with tracer.start_as_current_span(
                "docket.add",
                attributes={
                    **self.labels(),
                    **execution.specific_labels(),
                    "code.function.name": execution.function_name,
                },
            ):
                # Check if task is stricken before scheduling
                if self.strike_list.is_stricken(execution):
                    logger.warning(
                        "%r is stricken, skipping schedule of %r",
                        execution.function_name,
                        execution.key,
                    )
                    TASKS_STRICKEN.add(
                        1,
                        {
                            **self.labels(),
                            **execution.general_labels(),
                            "docket.where": "docket",
                        },
                    )
                    return execution

                # Schedule atomically (includes state record write)
                await execution.schedule(replace=False)

            TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    @overload
    def replace(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime,
        key: str,
    ) -> Callable[P, Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task function to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def replace(
        self,
        function: str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The name of a task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def replace(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        function_name: str | None = None
        if isinstance(function, str):
            function_name = function
            function = self.tasks[function]
        else:
            self.register(function)

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(
                self,
                function,
                args,
                kwargs,
                key,
                when,
                attempt=1,
                function_name=function_name,
            )

            with tracer.start_as_current_span(
                "docket.replace",
                attributes={
                    **self.labels(),
                    **execution.specific_labels(),
                    "code.function.name": execution.function_name,
                },
            ):
                # Check if task is stricken before scheduling
                if self.strike_list.is_stricken(execution):
                    logger.warning(
                        "%r is stricken, skipping schedule of %r",
                        execution.function_name,
                        execution.key,
                    )
                    TASKS_STRICKEN.add(
                        1,
                        {
                            **self.labels(),
                            **execution.general_labels(),
                            "docket.where": "docket",
                        },
                    )
                    return execution

                # Schedule atomically (includes state record write)
                await execution.schedule(replace=True)

            TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    async def schedule(self, execution: Execution) -> None:
        with tracer.start_as_current_span(
            "docket.schedule",
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function_name,
            },
        ):
            # Check if task is stricken before scheduling
            if self.strike_list.is_stricken(execution):
                logger.warning(
                    "%r is stricken, skipping schedule of %r",
                    execution.function_name,
                    execution.key,
                )
                TASKS_STRICKEN.add(
                    1,
                    {
                        **self.labels(),
                        **execution.general_labels(),
                        "docket.where": "docket",
                    },
                )
                return

            # Schedule atomically (includes state record write)
            await execution.schedule(replace=False)

        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

    async def cancel(self, key: str) -> None:
        """Cancel a previously scheduled task on the Docket.

        If the task is scheduled (in the queue or stream), it will be removed.
        If the task is currently running, a cancellation signal will be sent
        to the worker, which will attempt to cancel the asyncio task. This is
        best-effort: if the task completes before the signal is processed,
        the cancellation will have no effect.

        Args:
            key: The key of the task to cancel.
        """
        with tracer.start_as_current_span(
            "docket.cancel",
            attributes={**self.labels(), "docket.key": key},
        ):
            async with self.redis() as redis:
                await self._cancel(redis, key)

            # Publish cancellation signal for running tasks (best-effort)
            await self._publish(self.cancel_channel(key), key)

        TASKS_CANCELLED.add(1, self.labels())

    async def get_execution(self, key: str) -> Execution | None:
        """Get a task Execution from the Docket by its key.

        Args:
            key: The task key.

        Returns:
            The Execution if found, None if the key doesn't exist.

        Example:
            # Claim check pattern: schedule a task, save the key,
            # then retrieve the execution later to check status or get results
            execution = await docket.add(my_task, key="important-task")(args)
            task_key = execution.key

            # Later, retrieve the execution by key
            execution = await docket.get_execution(task_key)
            if execution:
                await execution.get_result()
        """
        import cloudpickle

        async with self.redis() as redis:
            data = await redis.hgetall(self.runs_key(key))

            if not data:
                return None

            # Extract task definition from runs hash
            function_name = data.get(b"function")
            args_data = data.get(b"args")
            kwargs_data = data.get(b"kwargs")

            if not function_name or not args_data or not kwargs_data:
                return None

            # Look up function in registry, or create a placeholder if not found
            function_name_str = function_name.decode()
            function = self.tasks.get(function_name_str)
            if not function:
                # Create a placeholder function for display purposes (e.g., CLI watch)
                # This allows viewing task state even if function isn't registered
                async def placeholder() -> None:
                    pass  # pragma: no cover

                placeholder.__name__ = function_name_str
                function = placeholder

            # Deserialize args and kwargs
            args = cloudpickle.loads(args_data)
            kwargs = cloudpickle.loads(kwargs_data)

            # Extract scheduling metadata
            when_str = data.get(b"when")
            if not when_str:  # pragma: no cover
                return None
            when = datetime.fromtimestamp(float(when_str.decode()), tz=timezone.utc)

            # Build execution (attempt defaults to 1 for initial scheduling)
            from docket.execution import Execution

            execution = Execution(
                docket=self,
                function=function,
                args=args,
                kwargs=kwargs,
                key=key,
                when=when,
                attempt=1,
            )

            # Sync with current state from Redis
            await execution.sync()

            return execution

    @property
    def queue_key(self) -> str:
        return self.key("queue")

    @property
    def stream_key(self) -> str:
        return self.key("stream")

    def known_task_key(self, task_key: str) -> str:
        return self.key(f"known:{task_key}")

    def parked_task_key(self, task_key: str) -> str:
        return self.key(task_key)

    def stream_id_key(self, task_key: str) -> str:
        return self.key(f"stream-id:{task_key}")

    def runs_key(self, task_key: str) -> str:
        """Return the Redis key for storing execution state for a task."""
        return self.key(f"runs:{task_key}")

    def cancel_channel(self, task_key: str) -> str:
        """Return the Redis pub/sub channel for cancellation signals for a task."""
        return self.key(f"cancel:{task_key}")

    @property
    def results_collection(self) -> str:
        """Return the collection name for result storage."""
        return self.key("results")

    async def _ensure_stream_and_group(self) -> None:
        """Create stream and consumer group if they don't exist (idempotent).

        This is safe to call from multiple workers racing to initialize - the
        BUSYGROUP error is silently ignored since it just means another worker
        created the group first.
        """
        try:
            async with self.redis() as r:
                await r.xgroup_create(
                    groupname=self.worker_group_name,
                    name=self.stream_key,
                    id="0-0",
                    mkstream=True,
                )
        except redis.exceptions.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise  # pragma: no cover

    async def _cancel(self, redis: Redis | RedisCluster, key: str) -> None:
        """Cancel a task atomically.

        Handles cancellation regardless of task location:
        - From the stream (using stored message ID)
        - From the queue (scheduled tasks)
        - Cleans up all associated metadata keys
        """
        if self._cancel_task_script is None:
            self._cancel_task_script = cast(
                _cancel_task,
                redis.register_script(
                    # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key, runs_key
                    # ARGV: task_key, completed_at
                    """
                    local stream_key = KEYS[1]
                    -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                    local known_key = KEYS[2]
                    local parked_key = KEYS[3]
                    local queue_key = KEYS[4]
                    local stream_id_key = KEYS[5]
                    local runs_key = KEYS[6]
                    local task_key = ARGV[1]
                    local completed_at = ARGV[2]

                    -- Get stream ID (check new location first, then legacy)
                    local message_id = redis.call('HGET', runs_key, 'stream_id')

                    -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                    if not message_id then
                        message_id = redis.call('GET', stream_id_key)
                    end

                    -- Delete from stream if message ID exists
                    if message_id then
                        redis.call('XDEL', stream_key, message_id)
                    end

                    -- Clean up legacy keys and parked data
                    redis.call('DEL', known_key, parked_key, stream_id_key)
                    redis.call('ZREM', queue_key, task_key)

                    -- Clear scheduling markers so add() can reschedule this key
                    redis.call('HDEL', runs_key, 'known', 'stream_id')

                    -- Only set CANCELLED if not already in a terminal state
                    local current_state = redis.call('HGET', runs_key, 'state')
                    if current_state ~= 'completed' and current_state ~= 'failed' and current_state ~= 'cancelled' then
                        redis.call('HSET', runs_key, 'state', 'cancelled', 'completed_at', completed_at)
                    end

                    return 'OK'
                    """
                ),
            )
        cancel_task = self._cancel_task_script

        # Create tombstone with CANCELLED state
        completed_at = datetime.now(timezone.utc).isoformat()
        task_runs_key = self.runs_key(key)

        # Execute the cancellation script
        await cancel_task(
            keys=[
                self.stream_key,
                self.known_task_key(key),
                self.parked_task_key(key),
                self.queue_key,
                self.stream_id_key(key),
                task_runs_key,
            ],
            args=[key, completed_at],
        )

        # Apply TTL or delete tombstone based on execution_ttl
        if self.execution_ttl:
            ttl_seconds = int(self.execution_ttl.total_seconds())
            await redis.expire(task_runs_key, ttl_seconds)
        else:
            # execution_ttl=0 means no observability - delete tombstone immediately
            await redis.delete(task_runs_key)

    async def strike(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Strike a task from the Docket.

        Args:
            function: The task to strike (function or name), or None for all tasks.
            parameter: The parameter to strike on, or None for entire task.
            operator: The comparison operator to use.
            value: The value to strike on.
        """
        function_name = function.__name__ if callable(function) else function

        instruction = Strike(function_name, parameter, Operator(operator), value)
        with tracer.start_as_current_span(
            "docket.strike",
            attributes={**self.labels(), **instruction.labels()},
        ):
            await self.strike_list.send_instruction(instruction)

    async def restore(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Restore a previously stricken task to the Docket.

        Args:
            function: The task to restore (function or name), or None for all tasks.
            parameter: The parameter to restore on, or None for entire task.
            operator: The comparison operator to use.
            value: The value to restore on.
        """
        function_name = function.__name__ if callable(function) else function

        instruction = Restore(function_name, parameter, Operator(operator), value)
        with tracer.start_as_current_span(
            "docket.restore",
            attributes={**self.labels(), **instruction.labels()},
        ):
            await self.strike_list.send_instruction(instruction)

    async def wait_for_strikes_loaded(self) -> None:
        """Wait for all existing strikes to be loaded from the stream.

        This method blocks until the strike monitor has completed its initial
        non-blocking read of all existing strike messages. Call this before
        making decisions that depend on the current strike state, such as
        scheduling automatic perpetual tasks.
        """
        await self.strike_list.wait_for_strikes_loaded()

    async def clear(self) -> int:
        """Clear all queued and scheduled tasks from the docket.

        This removes all tasks from the stream (immediate tasks) and queue
        (scheduled tasks), along with their associated parked data. Running
        tasks are not affected.

        Returns:
            The total number of tasks that were cleared.
        """
        with tracer.start_as_current_span(
            "docket.clear",
            attributes=self.labels(),
        ):
            async with self.redis() as redis:
                async with redis.pipeline() as pipeline:
                    # Get counts before clearing
                    pipeline.xlen(self.stream_key)
                    pipeline.zcard(self.queue_key)
                    pipeline.zrange(self.queue_key, 0, -1)

                    stream_count: int
                    queue_count: int
                    scheduled_keys: list[bytes]
                    stream_count, queue_count, scheduled_keys = await pipeline.execute()

                # Get keys from stream messages before trimming
                stream_keys: list[str] = []
                if stream_count > 0:
                    # Read all messages from the stream
                    messages = await redis.xrange(self.stream_key, "-", "+")
                    for message_id, fields in messages:
                        # Extract the key field from the message
                        if b"key" in fields:  # pragma: no branch
                            stream_keys.append(fields[b"key"].decode())

                async with redis.pipeline() as pipeline:
                    # Clear all data
                    # Trim stream to 0 messages instead of deleting it to preserve consumer group
                    if stream_count > 0:
                        pipeline.xtrim(self.stream_key, maxlen=0, approximate=False)
                    pipeline.delete(self.queue_key)

                    # Clear parked task data and known task keys for scheduled tasks
                    for key_bytes in scheduled_keys:
                        task_key = key_bytes.decode()
                        pipeline.delete(self.parked_task_key(task_key))
                        pipeline.delete(self.known_task_key(task_key))
                        pipeline.delete(self.stream_id_key(task_key))

                        # Handle runs hash: set TTL or delete based on execution_ttl
                        task_runs_key = self.runs_key(task_key)
                        if self.execution_ttl:
                            ttl_seconds = int(self.execution_ttl.total_seconds())
                            pipeline.expire(task_runs_key, ttl_seconds)
                        else:
                            pipeline.delete(task_runs_key)

                    # Handle runs hash for immediate tasks from stream
                    for task_key in stream_keys:
                        task_runs_key = self.runs_key(task_key)
                        if self.execution_ttl:
                            ttl_seconds = int(self.execution_ttl.total_seconds())
                            pipeline.expire(task_runs_key, ttl_seconds)
                        else:
                            pipeline.delete(task_runs_key)

                    await pipeline.execute()

                    total_cleared = stream_count + queue_count
                    return total_cleared

prefix property

Return the key prefix for this docket.

All Redis keys for this docket are prefixed with this value.

For Redis Cluster mode, returns a hash-tagged prefix like "{myapp}" to ensure all keys hash to the same slot.

results_collection property

Return the collection name for result storage.

__init__(name='docket', url='redis://localhost:6379/0', heartbeat_interval=timedelta(seconds=2), missed_heartbeats=5, execution_ttl=timedelta(minutes=15), result_storage=None, enable_internal_instrumentation=False)

Parameters:

Name Type Description Default
name str

The name of the docket.

'docket'
url str

The URL of the Redis server or in-memory backend. For example: - "redis://localhost:6379/0" - "redis://user:password@localhost:6379/0" - "redis://user:password@localhost:6379/0?ssl=true" - "rediss://localhost:6379/0" - "unix:///path/to/redis.sock" - "memory://" (in-memory backend for testing)

'redis://localhost:6379/0'
heartbeat_interval timedelta

How often workers send heartbeat messages to the docket.

timedelta(seconds=2)
missed_heartbeats int

How many heartbeats a worker can miss before it is considered dead.

5
execution_ttl timedelta

How long to keep completed or failed execution state records in Redis before they expire. Defaults to 15 minutes.

timedelta(minutes=15)
enable_internal_instrumentation bool

Whether to enable OpenTelemetry spans for internal Redis polling operations like strike stream monitoring. Defaults to False.

False
Source code in src/docket/docket.py
def __init__(
    self,
    name: str = "docket",
    url: str = "redis://localhost:6379/0",
    heartbeat_interval: timedelta = timedelta(seconds=2),
    missed_heartbeats: int = 5,
    execution_ttl: timedelta = timedelta(minutes=15),
    result_storage: AsyncKeyValue | None = None,
    enable_internal_instrumentation: bool = False,
) -> None:
    """
    Args:
        name: The name of the docket.
        url: The URL of the Redis server or in-memory backend.  For example:
            - "redis://localhost:6379/0"
            - "redis://user:password@localhost:6379/0"
            - "redis://user:password@localhost:6379/0?ssl=true"
            - "rediss://localhost:6379/0"
            - "unix:///path/to/redis.sock"
            - "memory://" (in-memory backend for testing)
        heartbeat_interval: How often workers send heartbeat messages to the docket.
        missed_heartbeats: How many heartbeats a worker can miss before it is
            considered dead.
        execution_ttl: How long to keep completed or failed execution state records
            in Redis before they expire. Defaults to 15 minutes.
        enable_internal_instrumentation: Whether to enable OpenTelemetry spans
            for internal Redis polling operations like strike stream monitoring.
            Defaults to False.
    """
    self.name = name
    self.url = url
    self.heartbeat_interval = heartbeat_interval
    self.missed_heartbeats = missed_heartbeats
    self.execution_ttl = execution_ttl
    self.enable_internal_instrumentation = enable_internal_instrumentation
    self._cancel_task_script = None
    self._user_result_storage = result_storage
    self._redis = RedisConnection(url)

    from .tasks import standard_tasks

    self.tasks: dict[str, TaskFunction] = {fn.__name__: fn for fn in standard_tasks}

add(function, when=None, key=None)

add(
    function: Callable[P, Awaitable[R]],
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[P, Awaitable[Execution]]
add(
    function: str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]

Add a task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to add.

required
when datetime | None

The time to schedule the task.

None
key str | None

The key to schedule the task under.

None
Source code in src/docket/docket.py
def add(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]:
    """Add a task to the Docket.

    Args:
        function: The task to add.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    function_name: str | None = None
    if isinstance(function, str):
        function_name = function
        function = self.tasks[function]
    else:
        self.register(function)

    if when is None:
        when = datetime.now(timezone.utc)

    if key is None:
        key = str(uuid7())

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(
            self,
            function,
            args,
            kwargs,
            key,
            when,
            attempt=1,
            function_name=function_name,
        )

        with tracer.start_as_current_span(
            "docket.add",
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function_name,
            },
        ):
            # Check if task is stricken before scheduling
            if self.strike_list.is_stricken(execution):
                logger.warning(
                    "%r is stricken, skipping schedule of %r",
                    execution.function_name,
                    execution.key,
                )
                TASKS_STRICKEN.add(
                    1,
                    {
                        **self.labels(),
                        **execution.general_labels(),
                        "docket.where": "docket",
                    },
                )
                return execution

            # Schedule atomically (includes state record write)
            await execution.schedule(replace=False)

        TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

cancel(key) async

Cancel a previously scheduled task on the Docket.

If the task is scheduled (in the queue or stream), it will be removed. If the task is currently running, a cancellation signal will be sent to the worker, which will attempt to cancel the asyncio task. This is best-effort: if the task completes before the signal is processed, the cancellation will have no effect.

Parameters:

Name Type Description Default
key str

The key of the task to cancel.

required
Source code in src/docket/docket.py
async def cancel(self, key: str) -> None:
    """Cancel a previously scheduled task on the Docket.

    If the task is scheduled (in the queue or stream), it will be removed.
    If the task is currently running, a cancellation signal will be sent
    to the worker, which will attempt to cancel the asyncio task. This is
    best-effort: if the task completes before the signal is processed,
    the cancellation will have no effect.

    Args:
        key: The key of the task to cancel.
    """
    with tracer.start_as_current_span(
        "docket.cancel",
        attributes={**self.labels(), "docket.key": key},
    ):
        async with self.redis() as redis:
            await self._cancel(redis, key)

        # Publish cancellation signal for running tasks (best-effort)
        await self._publish(self.cancel_channel(key), key)

    TASKS_CANCELLED.add(1, self.labels())

cancel_channel(task_key)

Return the Redis pub/sub channel for cancellation signals for a task.

Source code in src/docket/docket.py
def cancel_channel(self, task_key: str) -> str:
    """Return the Redis pub/sub channel for cancellation signals for a task."""
    return self.key(f"cancel:{task_key}")

clear() async

Clear all queued and scheduled tasks from the docket.

This removes all tasks from the stream (immediate tasks) and queue (scheduled tasks), along with their associated parked data. Running tasks are not affected.

Returns:

Type Description
int

The total number of tasks that were cleared.

Source code in src/docket/docket.py
async def clear(self) -> int:
    """Clear all queued and scheduled tasks from the docket.

    This removes all tasks from the stream (immediate tasks) and queue
    (scheduled tasks), along with their associated parked data. Running
    tasks are not affected.

    Returns:
        The total number of tasks that were cleared.
    """
    with tracer.start_as_current_span(
        "docket.clear",
        attributes=self.labels(),
    ):
        async with self.redis() as redis:
            async with redis.pipeline() as pipeline:
                # Get counts before clearing
                pipeline.xlen(self.stream_key)
                pipeline.zcard(self.queue_key)
                pipeline.zrange(self.queue_key, 0, -1)

                stream_count: int
                queue_count: int
                scheduled_keys: list[bytes]
                stream_count, queue_count, scheduled_keys = await pipeline.execute()

            # Get keys from stream messages before trimming
            stream_keys: list[str] = []
            if stream_count > 0:
                # Read all messages from the stream
                messages = await redis.xrange(self.stream_key, "-", "+")
                for message_id, fields in messages:
                    # Extract the key field from the message
                    if b"key" in fields:  # pragma: no branch
                        stream_keys.append(fields[b"key"].decode())

            async with redis.pipeline() as pipeline:
                # Clear all data
                # Trim stream to 0 messages instead of deleting it to preserve consumer group
                if stream_count > 0:
                    pipeline.xtrim(self.stream_key, maxlen=0, approximate=False)
                pipeline.delete(self.queue_key)

                # Clear parked task data and known task keys for scheduled tasks
                for key_bytes in scheduled_keys:
                    task_key = key_bytes.decode()
                    pipeline.delete(self.parked_task_key(task_key))
                    pipeline.delete(self.known_task_key(task_key))
                    pipeline.delete(self.stream_id_key(task_key))

                    # Handle runs hash: set TTL or delete based on execution_ttl
                    task_runs_key = self.runs_key(task_key)
                    if self.execution_ttl:
                        ttl_seconds = int(self.execution_ttl.total_seconds())
                        pipeline.expire(task_runs_key, ttl_seconds)
                    else:
                        pipeline.delete(task_runs_key)

                # Handle runs hash for immediate tasks from stream
                for task_key in stream_keys:
                    task_runs_key = self.runs_key(task_key)
                    if self.execution_ttl:
                        ttl_seconds = int(self.execution_ttl.total_seconds())
                        pipeline.expire(task_runs_key, ttl_seconds)
                    else:
                        pipeline.delete(task_runs_key)

                await pipeline.execute()

                total_cleared = stream_count + queue_count
                return total_cleared

get_execution(key) async

Get a task Execution from the Docket by its key.

Parameters:

Name Type Description Default
key str

The task key.

required

Returns:

Type Description
Execution | None

The Execution if found, None if the key doesn't exist.

Example
Claim check pattern: schedule a task, save the key,
then retrieve the execution later to check status or get results

execution = await docket.add(my_task, key="important-task")(args) task_key = execution.key

Later, retrieve the execution by key

execution = await docket.get_execution(task_key) if execution: await execution.get_result()

Source code in src/docket/docket.py
async def get_execution(self, key: str) -> Execution | None:
    """Get a task Execution from the Docket by its key.

    Args:
        key: The task key.

    Returns:
        The Execution if found, None if the key doesn't exist.

    Example:
        # Claim check pattern: schedule a task, save the key,
        # then retrieve the execution later to check status or get results
        execution = await docket.add(my_task, key="important-task")(args)
        task_key = execution.key

        # Later, retrieve the execution by key
        execution = await docket.get_execution(task_key)
        if execution:
            await execution.get_result()
    """
    import cloudpickle

    async with self.redis() as redis:
        data = await redis.hgetall(self.runs_key(key))

        if not data:
            return None

        # Extract task definition from runs hash
        function_name = data.get(b"function")
        args_data = data.get(b"args")
        kwargs_data = data.get(b"kwargs")

        if not function_name or not args_data or not kwargs_data:
            return None

        # Look up function in registry, or create a placeholder if not found
        function_name_str = function_name.decode()
        function = self.tasks.get(function_name_str)
        if not function:
            # Create a placeholder function for display purposes (e.g., CLI watch)
            # This allows viewing task state even if function isn't registered
            async def placeholder() -> None:
                pass  # pragma: no cover

            placeholder.__name__ = function_name_str
            function = placeholder

        # Deserialize args and kwargs
        args = cloudpickle.loads(args_data)
        kwargs = cloudpickle.loads(kwargs_data)

        # Extract scheduling metadata
        when_str = data.get(b"when")
        if not when_str:  # pragma: no cover
            return None
        when = datetime.fromtimestamp(float(when_str.decode()), tz=timezone.utc)

        # Build execution (attempt defaults to 1 for initial scheduling)
        from docket.execution import Execution

        execution = Execution(
            docket=self,
            function=function,
            args=args,
            kwargs=kwargs,
            key=key,
            when=when,
            attempt=1,
        )

        # Sync with current state from Redis
        await execution.sync()

        return execution

key(suffix)

Return a Redis key with the docket prefix.

Parameters:

Name Type Description Default
suffix str

The key suffix (e.g., "queue", "stream", "runs:task-123")

required

Returns:

Type Description
str

Full Redis key like "docket:queue" or "docket:stream"

Source code in src/docket/docket.py
def key(self, suffix: str) -> str:
    """Return a Redis key with the docket prefix.

    Args:
        suffix: The key suffix (e.g., "queue", "stream", "runs:task-123")

    Returns:
        Full Redis key like "docket:queue" or "docket:stream"
    """
    return f"{self.prefix}:{suffix}"

register(function, names=None)

Register a task with the Docket.

Parameters:

Name Type Description Default
function TaskFunction

The task to register.

required
names list[str] | None

Names to register the task under. Defaults to [function.name].

None
Source code in src/docket/docket.py
def register(self, function: TaskFunction, names: list[str] | None = None) -> None:
    """Register a task with the Docket.

    Args:
        function: The task to register.
        names: Names to register the task under. Defaults to [function.__name__].
    """
    from .dependencies import validate_dependencies

    validate_dependencies(function)

    if not names:
        names = [function.__name__]

    for name in names:
        self.tasks[name] = function

register_collection(collection_path)

Register a collection of tasks.

Parameters:

Name Type Description Default
collection_path str

A path in the format "module:collection".

required
Source code in src/docket/docket.py
def register_collection(self, collection_path: str) -> None:
    """
    Register a collection of tasks.

    Args:
        collection_path: A path in the format "module:collection".
    """
    module_name, _, member_name = collection_path.rpartition(":")
    module = importlib.import_module(module_name)
    collection = getattr(module, member_name)
    for function in collection:
        self.register(function)

replace(function, when, key)

replace(
    function: Callable[P, Awaitable[R]],
    when: datetime,
    key: str,
) -> Callable[P, Awaitable[Execution]]
replace(
    function: str, when: datetime, key: str
) -> Callable[..., Awaitable[Execution]]

Replace a previously scheduled task on the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to replace.

required
when datetime

The time to schedule the task.

required
key str

The key to schedule the task under.

required
Source code in src/docket/docket.py
def replace(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime,
    key: str,
) -> Callable[..., Awaitable[Execution]]:
    """Replace a previously scheduled task on the Docket.

    Args:
        function: The task to replace.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    function_name: str | None = None
    if isinstance(function, str):
        function_name = function
        function = self.tasks[function]
    else:
        self.register(function)

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(
            self,
            function,
            args,
            kwargs,
            key,
            when,
            attempt=1,
            function_name=function_name,
        )

        with tracer.start_as_current_span(
            "docket.replace",
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function_name,
            },
        ):
            # Check if task is stricken before scheduling
            if self.strike_list.is_stricken(execution):
                logger.warning(
                    "%r is stricken, skipping schedule of %r",
                    execution.function_name,
                    execution.key,
                )
                TASKS_STRICKEN.add(
                    1,
                    {
                        **self.labels(),
                        **execution.general_labels(),
                        "docket.where": "docket",
                    },
                )
                return execution

            # Schedule atomically (includes state record write)
            await execution.schedule(replace=True)

        TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

restore(function=None, parameter=None, operator='==', value=None) async

Restore a previously stricken task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to restore (function or name), or None for all tasks.

None
parameter str | None

The parameter to restore on, or None for entire task.

None
operator Operator | LiteralOperator

The comparison operator to use.

'=='
value Hashable | None

The value to restore on.

None
Source code in src/docket/docket.py
async def restore(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Restore a previously stricken task to the Docket.

    Args:
        function: The task to restore (function or name), or None for all tasks.
        parameter: The parameter to restore on, or None for entire task.
        operator: The comparison operator to use.
        value: The value to restore on.
    """
    function_name = function.__name__ if callable(function) else function

    instruction = Restore(function_name, parameter, Operator(operator), value)
    with tracer.start_as_current_span(
        "docket.restore",
        attributes={**self.labels(), **instruction.labels()},
    ):
        await self.strike_list.send_instruction(instruction)

runs_key(task_key)

Return the Redis key for storing execution state for a task.

Source code in src/docket/docket.py
def runs_key(self, task_key: str) -> str:
    """Return the Redis key for storing execution state for a task."""
    return self.key(f"runs:{task_key}")

strike(function=None, parameter=None, operator='==', value=None) async

Strike a task from the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to strike (function or name), or None for all tasks.

None
parameter str | None

The parameter to strike on, or None for entire task.

None
operator Operator | LiteralOperator

The comparison operator to use.

'=='
value Hashable | None

The value to strike on.

None
Source code in src/docket/docket.py
async def strike(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Strike a task from the Docket.

    Args:
        function: The task to strike (function or name), or None for all tasks.
        parameter: The parameter to strike on, or None for entire task.
        operator: The comparison operator to use.
        value: The value to strike on.
    """
    function_name = function.__name__ if callable(function) else function

    instruction = Strike(function_name, parameter, Operator(operator), value)
    with tracer.start_as_current_span(
        "docket.strike",
        attributes={**self.labels(), **instruction.labels()},
    ):
        await self.strike_list.send_instruction(instruction)

wait_for_strikes_loaded() async

Wait for all existing strikes to be loaded from the stream.

This method blocks until the strike monitor has completed its initial non-blocking read of all existing strike messages. Call this before making decisions that depend on the current strike state, such as scheduling automatic perpetual tasks.

Source code in src/docket/docket.py
async def wait_for_strikes_loaded(self) -> None:
    """Wait for all existing strikes to be loaded from the stream.

    This method blocks until the strike monitor has completed its initial
    non-blocking read of all existing strike messages. Call this before
    making decisions that depend on the current strike state, such as
    scheduling automatic perpetual tasks.
    """
    await self.strike_list.wait_for_strikes_loaded()

Execution

Represents a task execution with state management and progress tracking.

Combines task invocation metadata (function, args, when, etc.) with Redis-backed lifecycle state tracking and user-reported progress.

Source code in src/docket/execution.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
class Execution:
    """Represents a task execution with state management and progress tracking.

    Combines task invocation metadata (function, args, when, etc.) with
    Redis-backed lifecycle state tracking and user-reported progress.
    """

    def __init__(
        self,
        docket: "Docket",
        function: TaskFunction,
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
        key: str,
        when: datetime,
        attempt: int,
        trace_context: opentelemetry.context.Context | None = None,
        redelivered: bool = False,
        function_name: str | None = None,
        generation: int = 0,
    ) -> None:
        # Task definition (immutable)
        self._docket = docket
        self._function = function
        self._function_name = function_name or function.__name__
        self._args = args
        self._kwargs = kwargs
        self._key = key

        # Scheduling metadata
        self.when = when
        self.attempt = attempt
        self._trace_context = trace_context
        self._redelivered = redelivered
        self._generation = generation

        # Lifecycle state (mutable)
        self.state: ExecutionState = ExecutionState.SCHEDULED
        self.worker: str | None = None
        self.started_at: datetime | None = None
        self.completed_at: datetime | None = None
        self.error: str | None = None
        self.result_key: str | None = None

        # Progress tracking
        self.progress: ExecutionProgress = ExecutionProgress(docket, key)

        # Redis key
        self._redis_key = docket.key(f"runs:{key}")

    # Task definition properties (immutable)
    @property
    def docket(self) -> "Docket":
        """Parent docket instance."""
        return self._docket

    @property
    def function(self) -> TaskFunction:
        """Task function to execute."""
        return self._function

    @property
    def args(self) -> tuple[Any, ...]:
        """Positional arguments for the task."""
        return self._args

    @property
    def kwargs(self) -> dict[str, Any]:
        """Keyword arguments for the task."""
        return self._kwargs

    @property
    def key(self) -> str:
        """Unique task identifier."""
        return self._key

    @property
    def function_name(self) -> str:
        """Name of the task function (from message, may differ from function.__name__ for fallback tasks)."""
        return self._function_name

    # Scheduling metadata properties
    @property
    def trace_context(self) -> opentelemetry.context.Context | None:
        """OpenTelemetry trace context."""
        return self._trace_context

    @property
    def redelivered(self) -> bool:
        """Whether this message was redelivered."""
        return self._redelivered

    @property
    def generation(self) -> int:
        """Scheduling generation counter for supersession detection."""
        return self._generation

    @contextmanager
    def _maybe_suppress_instrumentation(self) -> Generator[None, None, None]:
        """Suppress OTel auto-instrumentation for internal Redis operations."""
        if not self._docket.enable_internal_instrumentation:
            with suppress_instrumentation():
                yield
        else:  # pragma: no cover
            yield

    def as_message(self) -> Message:
        return {
            b"key": self.key.encode(),
            b"when": self.when.isoformat().encode(),
            b"function": self.function_name.encode(),
            b"args": cloudpickle.dumps(self.args),
            b"kwargs": cloudpickle.dumps(self.kwargs),
            b"attempt": str(self.attempt).encode(),
            b"generation": str(self.generation).encode(),
        }

    @classmethod
    async def from_message(
        cls,
        docket: "Docket",
        message: Message,
        redelivered: bool = False,
        fallback_task: TaskFunction | None = None,
    ) -> Self:
        function_name = message[b"function"].decode()
        if not (function := docket.tasks.get(function_name)):
            if fallback_task is None:
                raise ValueError(
                    f"Task function {function_name!r} is not registered with the current docket"
                )
            function = fallback_task

        instance = cls(
            docket=docket,
            function=function,
            args=cloudpickle.loads(message[b"args"]),
            kwargs=cloudpickle.loads(message[b"kwargs"]),
            key=message[b"key"].decode(),
            when=datetime.fromisoformat(message[b"when"].decode()),
            attempt=int(message[b"attempt"].decode()),
            trace_context=propagate.extract(message, getter=message_getter),
            redelivered=redelivered,
            function_name=function_name,
            generation=int(message.get(b"generation", b"0")),
        )
        await instance.sync()
        return instance

    def general_labels(self) -> Mapping[str, str]:
        return {"docket.task": self.function_name}

    def specific_labels(self) -> Mapping[str, str | int]:
        return {
            "docket.task": self.function_name,
            "docket.key": self.key,
            "docket.when": self.when.isoformat(),
            "docket.attempt": self.attempt,
        }

    def get_argument(self, parameter: str) -> Any:
        signature = get_signature(self.function)
        bound_args = signature.bind(*self.args, **self.kwargs)
        return bound_args.arguments[parameter]

    def call_repr(self) -> str:
        arguments: list[str] = []
        function_name = self.function_name

        signature = get_signature(self.function)
        logged_parameters = Logged.annotated_parameters(signature)
        parameter_names = list(signature.parameters.keys())

        for i, argument in enumerate(self.args[: len(parameter_names)]):
            parameter_name = parameter_names[i]
            if logged := logged_parameters.get(parameter_name):
                arguments.append(logged.format(argument))
            else:
                arguments.append("...")

        for parameter_name, argument in self.kwargs.items():
            if logged := logged_parameters.get(parameter_name):
                arguments.append(f"{parameter_name}={logged.format(argument)}")
            else:
                arguments.append(f"{parameter_name}=...")

        return f"{function_name}({', '.join(arguments)}){{{self.key}}}"

    def incoming_span_links(self) -> list[trace.Link]:
        initiating_span = trace.get_current_span(self.trace_context)
        initiating_context = initiating_span.get_span_context()
        return [trace.Link(initiating_context)] if initiating_context.is_valid else []

    async def schedule(
        self, replace: bool = False, reschedule_message: "RedisMessageID | None" = None
    ) -> None:
        """Schedule this task atomically in Redis.

        This performs an atomic operation that:
        - Adds the task to the stream (immediate) or queue (future)
        - Writes the execution state record
        - Tracks metadata for later cancellation

        Usage patterns:
        - Normal add: schedule(replace=False)
        - Replace existing: schedule(replace=True)
        - Reschedule from stream: schedule(reschedule_message=message_id)
          This atomically acknowledges and deletes the stream message, then
          reschedules the task to the queue. Prevents both task loss and
          duplicate execution when rescheduling tasks (e.g., due to concurrency limits).

        Args:
            replace: If True, replaces any existing task with the same key.
                    If False, raises an error if the task already exists.
            reschedule_message: If provided, atomically acknowledges and deletes
                    this stream message ID before rescheduling the task to the queue.
                    Used when a task needs to be rescheduled from an active stream message.
        """
        message: dict[bytes, bytes] = self.as_message()
        propagate.inject(message, setter=message_setter)

        key = self.key
        when = self.when
        known_task_key = self.docket.known_task_key(key)
        is_immediate = when <= datetime.now(timezone.utc)

        async with self.docket.redis() as redis:
            # Lock per task key to prevent race conditions between concurrent operations
            async with redis.lock(f"{known_task_key}:lock", timeout=10):
                # Register script for this connection (not cached to avoid event loop issues)
                schedule_script = cast(
                    _schedule_task,
                    redis.register_script(
                        # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key, runs_key
                        # ARGV: task_key, when_timestamp, is_immediate, replace, reschedule_message_id, worker_group_name, ...message_fields
                        """
                            local stream_key = KEYS[1]
                            -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                            local known_key = KEYS[2]
                            local parked_key = KEYS[3]
                            local queue_key = KEYS[4]
                            local stream_id_key = KEYS[5]
                            local runs_key = KEYS[6]

                            local task_key = ARGV[1]
                            local when_timestamp = ARGV[2]
                            local is_immediate = ARGV[3] == '1'
                            local replace = ARGV[4] == '1'
                            local reschedule_message_id = ARGV[5]
                            local worker_group_name = ARGV[6]

                            -- Extract message fields from ARGV[7] onwards
                            local message = {}
                            local function_name = nil
                            local args_data = nil
                            local kwargs_data = nil
                            local generation_index = nil

                            for i = 7, #ARGV, 2 do
                                local field_name = ARGV[i]
                                local field_value = ARGV[i + 1]
                                message[#message + 1] = field_name
                                message[#message + 1] = field_value

                                -- Extract task data fields for runs hash
                                if field_name == 'function' then
                                    function_name = field_value
                                elseif field_name == 'args' then
                                    args_data = field_value
                                elseif field_name == 'kwargs' then
                                    kwargs_data = field_value
                                elseif field_name == 'generation' then
                                    generation_index = #message
                                end
                            end

                            -- Handle rescheduling from stream: atomically ACK message and reschedule to queue
                            -- This prevents both task loss (ACK before reschedule) and duplicate execution
                            -- (reschedule before ACK with slow reschedule causing redelivery)
                            if reschedule_message_id ~= '' then
                                -- Acknowledge and delete the message from the stream
                                redis.call('XACK', stream_key, worker_group_name, reschedule_message_id)
                                redis.call('XDEL', stream_key, reschedule_message_id)

                                -- Increment generation counter
                                local new_gen = redis.call('HINCRBY', runs_key, 'generation', 1)
                                if generation_index then
                                    message[generation_index] = tostring(new_gen)
                                end

                                -- Park task data for future execution
                                redis.call('HSET', parked_key, unpack(message))

                                -- Add to sorted set queue
                                redis.call('ZADD', queue_key, when_timestamp, task_key)

                                -- Update state in runs hash (clear stream_id since task is no longer in stream)
                                redis.call('HSET', runs_key,
                                    'state', 'scheduled',
                                    'when', when_timestamp,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                                redis.call('HDEL', runs_key, 'stream_id')

                                return 'OK'
                            end

                            -- Handle replacement: cancel existing task if needed
                            if replace then
                                -- Get stream ID from runs hash (check new location first)
                                local existing_message_id = redis.call('HGET', runs_key, 'stream_id')

                                -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                if not existing_message_id then
                                    existing_message_id = redis.call('GET', stream_id_key)
                                end

                                if existing_message_id then
                                    redis.call('XDEL', stream_key, existing_message_id)
                                end

                                redis.call('ZREM', queue_key, task_key)
                                redis.call('DEL', parked_key)

                                -- TODO: Remove in next breaking release (v0.14.0) - clean up legacy keys
                                redis.call('DEL', known_key, stream_id_key)

                                -- Note: runs_key is updated below, not deleted
                            else
                                -- Check if task already exists (check new location first, then legacy)
                                local known_exists = redis.call('HEXISTS', runs_key, 'known') == 1
                                if not known_exists then
                                    -- Check if task is currently running (known field deleted at claim time)
                                    local state = redis.call('HGET', runs_key, 'state')
                                    if state == 'running' then
                                        return 'EXISTS'
                                    end
                                    -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                    known_exists = redis.call('EXISTS', known_key) == 1
                                end
                                if known_exists then
                                    return 'EXISTS'
                                end
                            end

                            -- Increment generation counter
                            local new_gen = redis.call('HINCRBY', runs_key, 'generation', 1)
                            if generation_index then
                                message[generation_index] = tostring(new_gen)
                            end

                            if is_immediate then
                                -- Add to stream for immediate execution
                                local message_id = redis.call('XADD', stream_key, '*', unpack(message))

                                -- Store state and metadata in runs hash
                                redis.call('HSET', runs_key,
                                    'state', 'queued',
                                    'when', when_timestamp,
                                    'known', when_timestamp,
                                    'stream_id', message_id,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                            else
                                -- Park task data for future execution
                                redis.call('HSET', parked_key, unpack(message))

                                -- Add to sorted set queue
                                redis.call('ZADD', queue_key, when_timestamp, task_key)

                                -- Store state and metadata in runs hash
                                redis.call('HSET', runs_key,
                                    'state', 'scheduled',
                                    'when', when_timestamp,
                                    'known', when_timestamp,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                            end

                            return 'OK'
                            """
                    ),
                )

                await schedule_script(
                    keys=[
                        self.docket.stream_key,
                        known_task_key,
                        self.docket.parked_task_key(key),
                        self.docket.queue_key,
                        self.docket.stream_id_key(key),
                        self._redis_key,
                    ],
                    args=[
                        key,
                        str(when.timestamp()),
                        "1" if is_immediate else "0",
                        "1" if replace else "0",
                        reschedule_message or b"",
                        self.docket.worker_group_name,
                        *[
                            item
                            for field, value in message.items()
                            for item in (field, value)
                        ],
                    ],
                )

        # Update local state based on whether task is immediate, scheduled, or being rescheduled
        if reschedule_message:
            # When rescheduling from stream, task is always parked and queued (never immediate)
            self.state = ExecutionState.SCHEDULED
            await self._publish_state(
                {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
            )
        elif is_immediate:
            self.state = ExecutionState.QUEUED
            await self._publish_state(
                {"state": ExecutionState.QUEUED.value, "when": when.isoformat()}
            )
        else:
            self.state = ExecutionState.SCHEDULED
            await self._publish_state(
                {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
            )

    async def claim(self, worker: str) -> bool:
        """Atomically check supersession and claim task in a single round-trip.

        This consolidates worker operations when claiming a task into a single
        atomic Lua script that:
        - Checks if the task has been superseded by a newer generation
        - Sets state to RUNNING with worker name and timestamp
        - Initializes progress tracking (current=0, total=100)
        - Deletes known/stream_id fields to allow task rescheduling
        - Cleans up legacy keys for backwards compatibility

        Args:
            worker: Name of the worker claiming the task

        Returns:
            True if the task was claimed, False if it was superseded.
        """
        started_at = datetime.now(timezone.utc)
        started_at_iso = started_at.isoformat()

        with self._maybe_suppress_instrumentation():
            async with self.docket.redis() as redis:
                claim_script = redis.register_script(
                    # KEYS: runs_key, progress_key, known_key, stream_id_key
                    # ARGV: worker, started_at_iso, generation
                    """
                    local runs_key = KEYS[1]
                    local progress_key = KEYS[2]
                    -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                    local known_key = KEYS[3]
                    local stream_id_key = KEYS[4]

                    local worker = ARGV[1]
                    local started_at = ARGV[2]
                    local generation = tonumber(ARGV[3])

                    -- Check supersession: generation > 0 means tracking is active
                    if generation > 0 then
                        local current = redis.call('HGET', runs_key, 'generation')
                        if not current then
                            -- Runs hash was cleaned up (execution_ttl=0 after
                            -- a newer generation completed).  This message is stale.
                            return 'SUPERSEDED'
                        end
                        if tonumber(current) > generation then
                            return 'SUPERSEDED'
                        end
                    end

                    -- Update execution state to running
                    redis.call('HSET', runs_key,
                        'state', 'running',
                        'worker', worker,
                        'started_at', started_at
                    )

                    -- Initialize progress tracking
                    redis.call('HSET', progress_key,
                        'current', '0',
                        'total', '100'
                    )

                    -- Delete known/stream_id fields to allow task rescheduling
                    redis.call('HDEL', runs_key, 'known', 'stream_id')

                    -- TODO: Remove in next breaking release (v0.14.0) - legacy key cleanup
                    redis.call('DEL', known_key, stream_id_key)

                    return 'OK'
                    """
                )

                result = await claim_script(
                    keys=[
                        self._redis_key,  # runs_key
                        self.progress._redis_key,  # progress_key
                        self.docket.known_task_key(self.key),  # legacy known_key
                        self.docket.stream_id_key(self.key),  # legacy stream_id_key
                    ],
                    args=[worker, started_at_iso, str(self._generation)],
                )

        if result == b"SUPERSEDED":
            return False

        # Update local state
        self.state = ExecutionState.RUNNING
        self.worker = worker
        self.started_at = started_at
        self.progress.current = 0
        self.progress.total = 100

        # Publish state change event
        await self._publish_state(
            {
                "state": ExecutionState.RUNNING.value,
                "worker": worker,
                "started_at": started_at_iso,
            }
        )

        return True

    async def _mark_as_terminal(
        self,
        state: ExecutionState,
        *,
        error: str | None = None,
        result_key: str | None = None,
    ) -> None:
        """Mark task as having reached a terminal state.

        Args:
            state: The terminal state (COMPLETED, FAILED, or CANCELLED)
            error: Optional error message (for FAILED state)
            result_key: Optional key where the result/exception is stored

        Uses a Lua script to atomically check supersession and write the
        terminal state in a single round-trip.  If the runs hash has been
        claimed by a successor (e.g. a Perpetual on_complete already called
        docket.replace()), the hash is left untouched.

        Progress data and the pub/sub completion event are always handled
        regardless of supersession.
        """
        completed_at = datetime.now(timezone.utc).isoformat()

        # Build the optional HSET fields
        extra_fields: list[str] = []
        if error:
            extra_fields.extend(["error", error])
        if result_key is not None:
            extra_fields.extend(["result_key", result_key])

        ttl_seconds = (
            int(self.docket.execution_ttl.total_seconds())
            if self.docket.execution_ttl
            else 0
        )

        with self._maybe_suppress_instrumentation():
            async with self.docket.redis() as redis:
                terminal_script = redis.register_script(
                    # KEYS[1]: runs_key
                    # ARGV[1]: generation, ARGV[2]: state, ARGV[3]: completed_at
                    # ARGV[4]: ttl_seconds, ARGV[5..]: extra field pairs
                    """
                    local runs_key = KEYS[1]
                    local generation = tonumber(ARGV[1])
                    local state = ARGV[2]
                    local completed_at = ARGV[3]
                    local ttl_seconds = tonumber(ARGV[4])

                    -- Check supersession (generation 0 = pre-tracking, always write)
                    if generation > 0 then
                        local current = redis.call('HGET', runs_key, 'generation')
                        if current and tonumber(current) > generation then
                            return 'SUPERSEDED'
                        end
                    end

                    -- Build HSET args: state + completed_at + any extras
                    local hset_args = {'state', state, 'completed_at', completed_at}
                    for i = 5, #ARGV, 2 do
                        hset_args[#hset_args + 1] = ARGV[i]
                        hset_args[#hset_args + 1] = ARGV[i + 1]
                    end
                    redis.call('HSET', runs_key, unpack(hset_args))

                    if ttl_seconds > 0 then
                        redis.call('EXPIRE', runs_key, ttl_seconds)
                    else
                        redis.call('DEL', runs_key)
                    end

                    return 'OK'
                    """
                )

                await terminal_script(
                    keys=[self._redis_key],
                    args=[
                        str(self._generation),
                        state.value,
                        completed_at,
                        str(ttl_seconds),
                        *extra_fields,
                    ],
                )

        self.state = state
        if result_key is not None:
            self.result_key = result_key

        await self.progress.delete()

        state_data: dict[str, str] = {
            "state": state.value,
            "completed_at": completed_at,
        }
        if error:
            state_data["error"] = error
        await self._publish_state(state_data)

    async def mark_as_completed(self, result_key: str | None = None) -> None:
        """Mark task as completed successfully.

        Args:
            result_key: Optional key where the task result is stored
        """
        await self._mark_as_terminal(ExecutionState.COMPLETED, result_key=result_key)

    async def mark_as_failed(
        self, error: str | None = None, result_key: str | None = None
    ) -> None:
        """Mark task as failed.

        Args:
            error: Optional error message describing the failure
            result_key: Optional key where the exception is stored
        """
        await self._mark_as_terminal(
            ExecutionState.FAILED, error=error, result_key=result_key
        )

    async def mark_as_cancelled(self) -> None:
        """Mark task as cancelled."""
        await self._mark_as_terminal(ExecutionState.CANCELLED)

    async def get_result(
        self,
        *,
        timeout: timedelta | None = None,
        deadline: datetime | None = None,
    ) -> Any:
        """Retrieve the result of this task execution.

        If the execution is not yet complete, this method will wait using
        pub/sub for state updates until completion.

        Args:
            timeout: Optional duration to wait before giving up.
                    If None and deadline is None, waits indefinitely.
            deadline: Optional absolute datetime when to stop waiting.
                     If None and timeout is None, waits indefinitely.

        Returns:
            The result of the task execution, or None if the task returned None.

        Raises:
            ValueError: If both timeout and deadline are provided
            Exception: If the task failed, raises the stored exception
            TimeoutError: If timeout/deadline is reached before execution completes
        """
        # Validate that only one time limit is provided
        if timeout is not None and deadline is not None:
            raise ValueError("Cannot specify both timeout and deadline")

        # Convert timeout to deadline if provided
        if timeout is not None:
            deadline = datetime.now(timezone.utc) + timeout

        terminal_states = (
            ExecutionState.COMPLETED,
            ExecutionState.FAILED,
            ExecutionState.CANCELLED,
        )

        # Wait for execution to complete if not already done
        if self.state not in terminal_states:
            # Calculate timeout duration if absolute deadline provided
            timeout_seconds = None
            if deadline is not None:
                timeout_seconds = (
                    deadline - datetime.now(timezone.utc)
                ).total_seconds()
                if timeout_seconds <= 0:
                    raise TimeoutError(
                        f"Timeout waiting for execution {self.key} to complete"
                    )

            try:

                async def wait_for_completion():
                    async for event in self.subscribe():  # pragma: no branch
                        if event["type"] == "state":
                            state = ExecutionState(event["state"])
                            if state in terminal_states:
                                # Sync to get latest data including result key
                                await self.sync()
                                break

                # Use asyncio.wait_for to enforce timeout
                await asyncio.wait_for(wait_for_completion(), timeout=timeout_seconds)
            except asyncio.TimeoutError:
                raise TimeoutError(
                    f"Timeout waiting for execution {self.key} to complete"
                )

        # If cancelled, raise ExecutionCancelled
        if self.state == ExecutionState.CANCELLED:
            raise ExecutionCancelled(f"Execution {self.key} was cancelled")

        # If failed, retrieve and raise the exception
        if self.state == ExecutionState.FAILED:
            if self.result_key:
                # Retrieve serialized exception from result_storage
                result_data = await self.docket.result_storage.get(self.result_key)
                if result_data and "data" in result_data:
                    # Base64-decode and unpickle
                    pickled_exception = base64.b64decode(result_data["data"])
                    exception = cloudpickle.loads(pickled_exception)  # type: ignore[arg-type]
                    raise exception
            # If no stored exception, raise a generic error with the error message
            error_msg = self.error or "Task execution failed"
            raise Exception(error_msg)

        # If completed successfully, retrieve result if available
        if self.result_key:
            result_data = await self.docket.result_storage.get(self.result_key)
            if result_data is not None and "data" in result_data:
                # Base64-decode and unpickle
                pickled_result = base64.b64decode(result_data["data"])
                return cloudpickle.loads(pickled_result)  # type: ignore[arg-type]

        # No result stored - task returned None
        return None

    async def sync(self) -> None:
        """Synchronize instance attributes with current execution data from Redis.

        Updates self.state, execution metadata, and progress data from Redis.
        Sets attributes to None if no data exists.
        """
        with self._maybe_suppress_instrumentation():
            async with self.docket.redis() as redis:
                data = await redis.hgetall(self._redis_key)
                if data:
                    # Update state
                    state_value = data.get(b"state")
                    if state_value:
                        if isinstance(state_value, bytes):
                            state_value = state_value.decode()
                        self.state = ExecutionState(state_value)

                    # Update metadata
                    self.worker = (
                        data[b"worker"].decode() if b"worker" in data else None
                    )
                    self.started_at = (
                        datetime.fromisoformat(data[b"started_at"].decode())
                        if b"started_at" in data
                        else None
                    )
                    self.completed_at = (
                        datetime.fromisoformat(data[b"completed_at"].decode())
                        if b"completed_at" in data
                        else None
                    )
                    self.error = data[b"error"].decode() if b"error" in data else None
                    self.result_key = (
                        data[b"result_key"].decode() if b"result_key" in data else None
                    )
                else:
                    # No data exists - reset to defaults
                    self.state = ExecutionState.SCHEDULED
                    self.worker = None
                    self.started_at = None
                    self.completed_at = None
                    self.error = None
                    self.result_key = None

        # Sync progress data
        await self.progress.sync()

    async def is_superseded(self) -> bool:
        """Check whether a newer schedule has superseded this execution.

        Compares this execution's generation against the current generation
        stored in the runs hash. If the stored generation is strictly greater,
        this execution has been superseded by a newer schedule() call.

        Generation 0 means the message predates generation tracking (e.g. it
        was moved from queue to stream by an older worker's scheduler that
        doesn't pass through the generation field). These are never considered
        superseded since we can't tell.
        """
        if self._generation == 0:
            return False
        with self._maybe_suppress_instrumentation():
            async with self.docket.redis() as redis:
                current = await redis.hget(self._redis_key, "generation")
        current_gen = int(current) if current is not None else 0
        return current_gen > self._generation

    async def _publish_state(self, data: dict) -> None:
        """Publish state change to Redis pub/sub channel.

        Args:
            data: State data to publish
        """
        channel = self.docket.key(f"state:{self.key}")
        payload = {
            "type": "state",
            "key": self.key,
            **data,
        }
        await self.docket._publish(channel, json.dumps(payload))

    async def subscribe(self) -> AsyncGenerator[StateEvent | ProgressEvent, None]:
        """Subscribe to both state and progress updates for this task.

        Emits the current state as the first event, then subscribes to real-time
        state and progress updates via Redis pub/sub.

        Yields:
            Dict containing state or progress update events with a 'type' field:
            - For state events: type="state", state, worker, timestamps, error
            - For progress events: type="progress", current, total, message, updated_at
        """
        # First, emit the current state
        await self.sync()

        # Build initial state event from current attributes
        initial_state: StateEvent = {
            "type": "state",
            "key": self.key,
            "state": self.state,
            "when": self.when.isoformat(),
            "worker": self.worker,
            "started_at": self.started_at.isoformat() if self.started_at else None,
            "completed_at": (
                self.completed_at.isoformat() if self.completed_at else None
            ),
            "error": self.error,
        }

        yield initial_state

        progress_event: ProgressEvent = {
            "type": "progress",
            "key": self.key,
            "current": self.progress.current,
            "total": self.progress.total,
            "message": self.progress.message,
            "updated_at": self.progress.updated_at.isoformat()
            if self.progress.updated_at
            else None,
        }

        yield progress_event

        # Then subscribe to real-time updates
        state_channel = self.docket.key(f"state:{self.key}")
        progress_channel = self.docket.key(f"progress:{self.key}")
        async with self.docket._pubsub() as pubsub:
            await pubsub.subscribe(state_channel, progress_channel)
            async for message in pubsub.listen():  # pragma: no cover
                if message["type"] == "message":
                    message_data = json.loads(message["data"])
                    if message_data["type"] == "state":
                        message_data["state"] = ExecutionState(message_data["state"])
                    yield message_data

args property

Positional arguments for the task.

docket property

Parent docket instance.

function property

Task function to execute.

function_name property

Name of the task function (from message, may differ from function.name for fallback tasks).

generation property

Scheduling generation counter for supersession detection.

key property

Unique task identifier.

kwargs property

Keyword arguments for the task.

redelivered property

Whether this message was redelivered.

trace_context property

OpenTelemetry trace context.

claim(worker) async

Atomically check supersession and claim task in a single round-trip.

This consolidates worker operations when claiming a task into a single atomic Lua script that: - Checks if the task has been superseded by a newer generation - Sets state to RUNNING with worker name and timestamp - Initializes progress tracking (current=0, total=100) - Deletes known/stream_id fields to allow task rescheduling - Cleans up legacy keys for backwards compatibility

Parameters:

Name Type Description Default
worker str

Name of the worker claiming the task

required

Returns:

Type Description
bool

True if the task was claimed, False if it was superseded.

Source code in src/docket/execution.py
async def claim(self, worker: str) -> bool:
    """Atomically check supersession and claim task in a single round-trip.

    This consolidates worker operations when claiming a task into a single
    atomic Lua script that:
    - Checks if the task has been superseded by a newer generation
    - Sets state to RUNNING with worker name and timestamp
    - Initializes progress tracking (current=0, total=100)
    - Deletes known/stream_id fields to allow task rescheduling
    - Cleans up legacy keys for backwards compatibility

    Args:
        worker: Name of the worker claiming the task

    Returns:
        True if the task was claimed, False if it was superseded.
    """
    started_at = datetime.now(timezone.utc)
    started_at_iso = started_at.isoformat()

    with self._maybe_suppress_instrumentation():
        async with self.docket.redis() as redis:
            claim_script = redis.register_script(
                # KEYS: runs_key, progress_key, known_key, stream_id_key
                # ARGV: worker, started_at_iso, generation
                """
                local runs_key = KEYS[1]
                local progress_key = KEYS[2]
                -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                local known_key = KEYS[3]
                local stream_id_key = KEYS[4]

                local worker = ARGV[1]
                local started_at = ARGV[2]
                local generation = tonumber(ARGV[3])

                -- Check supersession: generation > 0 means tracking is active
                if generation > 0 then
                    local current = redis.call('HGET', runs_key, 'generation')
                    if not current then
                        -- Runs hash was cleaned up (execution_ttl=0 after
                        -- a newer generation completed).  This message is stale.
                        return 'SUPERSEDED'
                    end
                    if tonumber(current) > generation then
                        return 'SUPERSEDED'
                    end
                end

                -- Update execution state to running
                redis.call('HSET', runs_key,
                    'state', 'running',
                    'worker', worker,
                    'started_at', started_at
                )

                -- Initialize progress tracking
                redis.call('HSET', progress_key,
                    'current', '0',
                    'total', '100'
                )

                -- Delete known/stream_id fields to allow task rescheduling
                redis.call('HDEL', runs_key, 'known', 'stream_id')

                -- TODO: Remove in next breaking release (v0.14.0) - legacy key cleanup
                redis.call('DEL', known_key, stream_id_key)

                return 'OK'
                """
            )

            result = await claim_script(
                keys=[
                    self._redis_key,  # runs_key
                    self.progress._redis_key,  # progress_key
                    self.docket.known_task_key(self.key),  # legacy known_key
                    self.docket.stream_id_key(self.key),  # legacy stream_id_key
                ],
                args=[worker, started_at_iso, str(self._generation)],
            )

    if result == b"SUPERSEDED":
        return False

    # Update local state
    self.state = ExecutionState.RUNNING
    self.worker = worker
    self.started_at = started_at
    self.progress.current = 0
    self.progress.total = 100

    # Publish state change event
    await self._publish_state(
        {
            "state": ExecutionState.RUNNING.value,
            "worker": worker,
            "started_at": started_at_iso,
        }
    )

    return True

get_result(*, timeout=None, deadline=None) async

Retrieve the result of this task execution.

If the execution is not yet complete, this method will wait using pub/sub for state updates until completion.

Parameters:

Name Type Description Default
timeout timedelta | None

Optional duration to wait before giving up. If None and deadline is None, waits indefinitely.

None
deadline datetime | None

Optional absolute datetime when to stop waiting. If None and timeout is None, waits indefinitely.

None

Returns:

Type Description
Any

The result of the task execution, or None if the task returned None.

Raises:

Type Description
ValueError

If both timeout and deadline are provided

Exception

If the task failed, raises the stored exception

TimeoutError

If timeout/deadline is reached before execution completes

Source code in src/docket/execution.py
async def get_result(
    self,
    *,
    timeout: timedelta | None = None,
    deadline: datetime | None = None,
) -> Any:
    """Retrieve the result of this task execution.

    If the execution is not yet complete, this method will wait using
    pub/sub for state updates until completion.

    Args:
        timeout: Optional duration to wait before giving up.
                If None and deadline is None, waits indefinitely.
        deadline: Optional absolute datetime when to stop waiting.
                 If None and timeout is None, waits indefinitely.

    Returns:
        The result of the task execution, or None if the task returned None.

    Raises:
        ValueError: If both timeout and deadline are provided
        Exception: If the task failed, raises the stored exception
        TimeoutError: If timeout/deadline is reached before execution completes
    """
    # Validate that only one time limit is provided
    if timeout is not None and deadline is not None:
        raise ValueError("Cannot specify both timeout and deadline")

    # Convert timeout to deadline if provided
    if timeout is not None:
        deadline = datetime.now(timezone.utc) + timeout

    terminal_states = (
        ExecutionState.COMPLETED,
        ExecutionState.FAILED,
        ExecutionState.CANCELLED,
    )

    # Wait for execution to complete if not already done
    if self.state not in terminal_states:
        # Calculate timeout duration if absolute deadline provided
        timeout_seconds = None
        if deadline is not None:
            timeout_seconds = (
                deadline - datetime.now(timezone.utc)
            ).total_seconds()
            if timeout_seconds <= 0:
                raise TimeoutError(
                    f"Timeout waiting for execution {self.key} to complete"
                )

        try:

            async def wait_for_completion():
                async for event in self.subscribe():  # pragma: no branch
                    if event["type"] == "state":
                        state = ExecutionState(event["state"])
                        if state in terminal_states:
                            # Sync to get latest data including result key
                            await self.sync()
                            break

            # Use asyncio.wait_for to enforce timeout
            await asyncio.wait_for(wait_for_completion(), timeout=timeout_seconds)
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"Timeout waiting for execution {self.key} to complete"
            )

    # If cancelled, raise ExecutionCancelled
    if self.state == ExecutionState.CANCELLED:
        raise ExecutionCancelled(f"Execution {self.key} was cancelled")

    # If failed, retrieve and raise the exception
    if self.state == ExecutionState.FAILED:
        if self.result_key:
            # Retrieve serialized exception from result_storage
            result_data = await self.docket.result_storage.get(self.result_key)
            if result_data and "data" in result_data:
                # Base64-decode and unpickle
                pickled_exception = base64.b64decode(result_data["data"])
                exception = cloudpickle.loads(pickled_exception)  # type: ignore[arg-type]
                raise exception
        # If no stored exception, raise a generic error with the error message
        error_msg = self.error or "Task execution failed"
        raise Exception(error_msg)

    # If completed successfully, retrieve result if available
    if self.result_key:
        result_data = await self.docket.result_storage.get(self.result_key)
        if result_data is not None and "data" in result_data:
            # Base64-decode and unpickle
            pickled_result = base64.b64decode(result_data["data"])
            return cloudpickle.loads(pickled_result)  # type: ignore[arg-type]

    # No result stored - task returned None
    return None

is_superseded() async

Check whether a newer schedule has superseded this execution.

Compares this execution's generation against the current generation stored in the runs hash. If the stored generation is strictly greater, this execution has been superseded by a newer schedule() call.

Generation 0 means the message predates generation tracking (e.g. it was moved from queue to stream by an older worker's scheduler that doesn't pass through the generation field). These are never considered superseded since we can't tell.

Source code in src/docket/execution.py
async def is_superseded(self) -> bool:
    """Check whether a newer schedule has superseded this execution.

    Compares this execution's generation against the current generation
    stored in the runs hash. If the stored generation is strictly greater,
    this execution has been superseded by a newer schedule() call.

    Generation 0 means the message predates generation tracking (e.g. it
    was moved from queue to stream by an older worker's scheduler that
    doesn't pass through the generation field). These are never considered
    superseded since we can't tell.
    """
    if self._generation == 0:
        return False
    with self._maybe_suppress_instrumentation():
        async with self.docket.redis() as redis:
            current = await redis.hget(self._redis_key, "generation")
    current_gen = int(current) if current is not None else 0
    return current_gen > self._generation

mark_as_cancelled() async

Mark task as cancelled.

Source code in src/docket/execution.py
async def mark_as_cancelled(self) -> None:
    """Mark task as cancelled."""
    await self._mark_as_terminal(ExecutionState.CANCELLED)

mark_as_completed(result_key=None) async

Mark task as completed successfully.

Parameters:

Name Type Description Default
result_key str | None

Optional key where the task result is stored

None
Source code in src/docket/execution.py
async def mark_as_completed(self, result_key: str | None = None) -> None:
    """Mark task as completed successfully.

    Args:
        result_key: Optional key where the task result is stored
    """
    await self._mark_as_terminal(ExecutionState.COMPLETED, result_key=result_key)

mark_as_failed(error=None, result_key=None) async

Mark task as failed.

Parameters:

Name Type Description Default
error str | None

Optional error message describing the failure

None
result_key str | None

Optional key where the exception is stored

None
Source code in src/docket/execution.py
async def mark_as_failed(
    self, error: str | None = None, result_key: str | None = None
) -> None:
    """Mark task as failed.

    Args:
        error: Optional error message describing the failure
        result_key: Optional key where the exception is stored
    """
    await self._mark_as_terminal(
        ExecutionState.FAILED, error=error, result_key=result_key
    )

schedule(replace=False, reschedule_message=None) async

Schedule this task atomically in Redis.

This performs an atomic operation that: - Adds the task to the stream (immediate) or queue (future) - Writes the execution state record - Tracks metadata for later cancellation

Usage patterns: - Normal add: schedule(replace=False) - Replace existing: schedule(replace=True) - Reschedule from stream: schedule(reschedule_message=message_id) This atomically acknowledges and deletes the stream message, then reschedules the task to the queue. Prevents both task loss and duplicate execution when rescheduling tasks (e.g., due to concurrency limits).

Parameters:

Name Type Description Default
replace bool

If True, replaces any existing task with the same key. If False, raises an error if the task already exists.

False
reschedule_message RedisMessageID | None

If provided, atomically acknowledges and deletes this stream message ID before rescheduling the task to the queue. Used when a task needs to be rescheduled from an active stream message.

None
Source code in src/docket/execution.py
async def schedule(
    self, replace: bool = False, reschedule_message: "RedisMessageID | None" = None
) -> None:
    """Schedule this task atomically in Redis.

    This performs an atomic operation that:
    - Adds the task to the stream (immediate) or queue (future)
    - Writes the execution state record
    - Tracks metadata for later cancellation

    Usage patterns:
    - Normal add: schedule(replace=False)
    - Replace existing: schedule(replace=True)
    - Reschedule from stream: schedule(reschedule_message=message_id)
      This atomically acknowledges and deletes the stream message, then
      reschedules the task to the queue. Prevents both task loss and
      duplicate execution when rescheduling tasks (e.g., due to concurrency limits).

    Args:
        replace: If True, replaces any existing task with the same key.
                If False, raises an error if the task already exists.
        reschedule_message: If provided, atomically acknowledges and deletes
                this stream message ID before rescheduling the task to the queue.
                Used when a task needs to be rescheduled from an active stream message.
    """
    message: dict[bytes, bytes] = self.as_message()
    propagate.inject(message, setter=message_setter)

    key = self.key
    when = self.when
    known_task_key = self.docket.known_task_key(key)
    is_immediate = when <= datetime.now(timezone.utc)

    async with self.docket.redis() as redis:
        # Lock per task key to prevent race conditions between concurrent operations
        async with redis.lock(f"{known_task_key}:lock", timeout=10):
            # Register script for this connection (not cached to avoid event loop issues)
            schedule_script = cast(
                _schedule_task,
                redis.register_script(
                    # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key, runs_key
                    # ARGV: task_key, when_timestamp, is_immediate, replace, reschedule_message_id, worker_group_name, ...message_fields
                    """
                        local stream_key = KEYS[1]
                        -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                        local known_key = KEYS[2]
                        local parked_key = KEYS[3]
                        local queue_key = KEYS[4]
                        local stream_id_key = KEYS[5]
                        local runs_key = KEYS[6]

                        local task_key = ARGV[1]
                        local when_timestamp = ARGV[2]
                        local is_immediate = ARGV[3] == '1'
                        local replace = ARGV[4] == '1'
                        local reschedule_message_id = ARGV[5]
                        local worker_group_name = ARGV[6]

                        -- Extract message fields from ARGV[7] onwards
                        local message = {}
                        local function_name = nil
                        local args_data = nil
                        local kwargs_data = nil
                        local generation_index = nil

                        for i = 7, #ARGV, 2 do
                            local field_name = ARGV[i]
                            local field_value = ARGV[i + 1]
                            message[#message + 1] = field_name
                            message[#message + 1] = field_value

                            -- Extract task data fields for runs hash
                            if field_name == 'function' then
                                function_name = field_value
                            elseif field_name == 'args' then
                                args_data = field_value
                            elseif field_name == 'kwargs' then
                                kwargs_data = field_value
                            elseif field_name == 'generation' then
                                generation_index = #message
                            end
                        end

                        -- Handle rescheduling from stream: atomically ACK message and reschedule to queue
                        -- This prevents both task loss (ACK before reschedule) and duplicate execution
                        -- (reschedule before ACK with slow reschedule causing redelivery)
                        if reschedule_message_id ~= '' then
                            -- Acknowledge and delete the message from the stream
                            redis.call('XACK', stream_key, worker_group_name, reschedule_message_id)
                            redis.call('XDEL', stream_key, reschedule_message_id)

                            -- Increment generation counter
                            local new_gen = redis.call('HINCRBY', runs_key, 'generation', 1)
                            if generation_index then
                                message[generation_index] = tostring(new_gen)
                            end

                            -- Park task data for future execution
                            redis.call('HSET', parked_key, unpack(message))

                            -- Add to sorted set queue
                            redis.call('ZADD', queue_key, when_timestamp, task_key)

                            -- Update state in runs hash (clear stream_id since task is no longer in stream)
                            redis.call('HSET', runs_key,
                                'state', 'scheduled',
                                'when', when_timestamp,
                                'function', function_name,
                                'args', args_data,
                                'kwargs', kwargs_data
                            )
                            redis.call('HDEL', runs_key, 'stream_id')

                            return 'OK'
                        end

                        -- Handle replacement: cancel existing task if needed
                        if replace then
                            -- Get stream ID from runs hash (check new location first)
                            local existing_message_id = redis.call('HGET', runs_key, 'stream_id')

                            -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                            if not existing_message_id then
                                existing_message_id = redis.call('GET', stream_id_key)
                            end

                            if existing_message_id then
                                redis.call('XDEL', stream_key, existing_message_id)
                            end

                            redis.call('ZREM', queue_key, task_key)
                            redis.call('DEL', parked_key)

                            -- TODO: Remove in next breaking release (v0.14.0) - clean up legacy keys
                            redis.call('DEL', known_key, stream_id_key)

                            -- Note: runs_key is updated below, not deleted
                        else
                            -- Check if task already exists (check new location first, then legacy)
                            local known_exists = redis.call('HEXISTS', runs_key, 'known') == 1
                            if not known_exists then
                                -- Check if task is currently running (known field deleted at claim time)
                                local state = redis.call('HGET', runs_key, 'state')
                                if state == 'running' then
                                    return 'EXISTS'
                                end
                                -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                known_exists = redis.call('EXISTS', known_key) == 1
                            end
                            if known_exists then
                                return 'EXISTS'
                            end
                        end

                        -- Increment generation counter
                        local new_gen = redis.call('HINCRBY', runs_key, 'generation', 1)
                        if generation_index then
                            message[generation_index] = tostring(new_gen)
                        end

                        if is_immediate then
                            -- Add to stream for immediate execution
                            local message_id = redis.call('XADD', stream_key, '*', unpack(message))

                            -- Store state and metadata in runs hash
                            redis.call('HSET', runs_key,
                                'state', 'queued',
                                'when', when_timestamp,
                                'known', when_timestamp,
                                'stream_id', message_id,
                                'function', function_name,
                                'args', args_data,
                                'kwargs', kwargs_data
                            )
                        else
                            -- Park task data for future execution
                            redis.call('HSET', parked_key, unpack(message))

                            -- Add to sorted set queue
                            redis.call('ZADD', queue_key, when_timestamp, task_key)

                            -- Store state and metadata in runs hash
                            redis.call('HSET', runs_key,
                                'state', 'scheduled',
                                'when', when_timestamp,
                                'known', when_timestamp,
                                'function', function_name,
                                'args', args_data,
                                'kwargs', kwargs_data
                            )
                        end

                        return 'OK'
                        """
                ),
            )

            await schedule_script(
                keys=[
                    self.docket.stream_key,
                    known_task_key,
                    self.docket.parked_task_key(key),
                    self.docket.queue_key,
                    self.docket.stream_id_key(key),
                    self._redis_key,
                ],
                args=[
                    key,
                    str(when.timestamp()),
                    "1" if is_immediate else "0",
                    "1" if replace else "0",
                    reschedule_message or b"",
                    self.docket.worker_group_name,
                    *[
                        item
                        for field, value in message.items()
                        for item in (field, value)
                    ],
                ],
            )

    # Update local state based on whether task is immediate, scheduled, or being rescheduled
    if reschedule_message:
        # When rescheduling from stream, task is always parked and queued (never immediate)
        self.state = ExecutionState.SCHEDULED
        await self._publish_state(
            {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
        )
    elif is_immediate:
        self.state = ExecutionState.QUEUED
        await self._publish_state(
            {"state": ExecutionState.QUEUED.value, "when": when.isoformat()}
        )
    else:
        self.state = ExecutionState.SCHEDULED
        await self._publish_state(
            {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
        )

subscribe() async

Subscribe to both state and progress updates for this task.

Emits the current state as the first event, then subscribes to real-time state and progress updates via Redis pub/sub.

Yields:

Type Description
AsyncGenerator[StateEvent | ProgressEvent, None]

Dict containing state or progress update events with a 'type' field:

AsyncGenerator[StateEvent | ProgressEvent, None]
  • For state events: type="state", state, worker, timestamps, error
AsyncGenerator[StateEvent | ProgressEvent, None]
  • For progress events: type="progress", current, total, message, updated_at
Source code in src/docket/execution.py
async def subscribe(self) -> AsyncGenerator[StateEvent | ProgressEvent, None]:
    """Subscribe to both state and progress updates for this task.

    Emits the current state as the first event, then subscribes to real-time
    state and progress updates via Redis pub/sub.

    Yields:
        Dict containing state or progress update events with a 'type' field:
        - For state events: type="state", state, worker, timestamps, error
        - For progress events: type="progress", current, total, message, updated_at
    """
    # First, emit the current state
    await self.sync()

    # Build initial state event from current attributes
    initial_state: StateEvent = {
        "type": "state",
        "key": self.key,
        "state": self.state,
        "when": self.when.isoformat(),
        "worker": self.worker,
        "started_at": self.started_at.isoformat() if self.started_at else None,
        "completed_at": (
            self.completed_at.isoformat() if self.completed_at else None
        ),
        "error": self.error,
    }

    yield initial_state

    progress_event: ProgressEvent = {
        "type": "progress",
        "key": self.key,
        "current": self.progress.current,
        "total": self.progress.total,
        "message": self.progress.message,
        "updated_at": self.progress.updated_at.isoformat()
        if self.progress.updated_at
        else None,
    }

    yield progress_event

    # Then subscribe to real-time updates
    state_channel = self.docket.key(f"state:{self.key}")
    progress_channel = self.docket.key(f"progress:{self.key}")
    async with self.docket._pubsub() as pubsub:
        await pubsub.subscribe(state_channel, progress_channel)
        async for message in pubsub.listen():  # pragma: no cover
            if message["type"] == "message":
                message_data = json.loads(message["data"])
                if message_data["type"] == "state":
                    message_data["state"] = ExecutionState(message_data["state"])
                yield message_data

sync() async

Synchronize instance attributes with current execution data from Redis.

Updates self.state, execution metadata, and progress data from Redis. Sets attributes to None if no data exists.

Source code in src/docket/execution.py
async def sync(self) -> None:
    """Synchronize instance attributes with current execution data from Redis.

    Updates self.state, execution metadata, and progress data from Redis.
    Sets attributes to None if no data exists.
    """
    with self._maybe_suppress_instrumentation():
        async with self.docket.redis() as redis:
            data = await redis.hgetall(self._redis_key)
            if data:
                # Update state
                state_value = data.get(b"state")
                if state_value:
                    if isinstance(state_value, bytes):
                        state_value = state_value.decode()
                    self.state = ExecutionState(state_value)

                # Update metadata
                self.worker = (
                    data[b"worker"].decode() if b"worker" in data else None
                )
                self.started_at = (
                    datetime.fromisoformat(data[b"started_at"].decode())
                    if b"started_at" in data
                    else None
                )
                self.completed_at = (
                    datetime.fromisoformat(data[b"completed_at"].decode())
                    if b"completed_at" in data
                    else None
                )
                self.error = data[b"error"].decode() if b"error" in data else None
                self.result_key = (
                    data[b"result_key"].decode() if b"result_key" in data else None
                )
            else:
                # No data exists - reset to defaults
                self.state = ExecutionState.SCHEDULED
                self.worker = None
                self.started_at = None
                self.completed_at = None
                self.error = None
                self.result_key = None

    # Sync progress data
    await self.progress.sync()

ExecutionCancelled

Bases: Exception

Raised when get_result() is called on a cancelled execution.

Source code in src/docket/execution.py
class ExecutionCancelled(Exception):
    """Raised when get_result() is called on a cancelled execution."""

    pass

ExecutionState

Bases: Enum

Lifecycle states for task execution.

Source code in src/docket/execution.py
class ExecutionState(enum.Enum):
    """Lifecycle states for task execution."""

    SCHEDULED = "scheduled"
    """Task is scheduled and waiting in the queue for its execution time."""

    QUEUED = "queued"
    """Task has been moved to the stream and is ready to be claimed by a worker."""

    RUNNING = "running"
    """Task is currently being executed by a worker."""

    COMPLETED = "completed"
    """Task execution finished successfully."""

    FAILED = "failed"
    """Task execution failed."""

    CANCELLED = "cancelled"
    """Task was explicitly cancelled before completion."""

CANCELLED = 'cancelled' class-attribute instance-attribute

Task was explicitly cancelled before completion.

COMPLETED = 'completed' class-attribute instance-attribute

Task execution finished successfully.

FAILED = 'failed' class-attribute instance-attribute

Task execution failed.

QUEUED = 'queued' class-attribute instance-attribute

Task has been moved to the stream and is ready to be claimed by a worker.

RUNNING = 'running' class-attribute instance-attribute

Task is currently being executed by a worker.

SCHEDULED = 'scheduled' class-attribute instance-attribute

Task is scheduled and waiting in the queue for its execution time.

ExponentialRetry

Bases: Retry

Configures exponential retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the minimum and maximum delays between attempts.

Example:

@task
async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies/_retry.py
class ExponentialRetry(Retry):
    """Configures exponential retries for a task.  You can specify the total number
    of attempts (or `None` to retry indefinitely), and the minimum and maximum delays
    between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
        ...
    ```
    """

    def __init__(
        self,
        attempts: int | None = 1,
        minimum_delay: timedelta = timedelta(seconds=1),
        maximum_delay: timedelta = timedelta(seconds=64),
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            minimum_delay: The minimum delay between attempts.
            maximum_delay: The maximum delay between attempts.
        """
        super().__init__(attempts=attempts, delay=minimum_delay)
        self.maximum_delay = maximum_delay

    async def __aenter__(self) -> ExponentialRetry:
        execution = self.execution.get()

        retry = ExponentialRetry(
            attempts=self.attempts,
            minimum_delay=self.delay,
            maximum_delay=self.maximum_delay,
        )
        retry.attempt = execution.attempt

        if execution.attempt > 1:
            backoff_factor = 2 ** (execution.attempt - 1)
            calculated_delay = self.delay * backoff_factor

            if calculated_delay > self.maximum_delay:
                retry.delay = self.maximum_delay
            else:
                retry.delay = calculated_delay

        return retry

__init__(attempts=1, minimum_delay=timedelta(seconds=1), maximum_delay=timedelta(seconds=64))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
minimum_delay timedelta

The minimum delay between attempts.

timedelta(seconds=1)
maximum_delay timedelta

The maximum delay between attempts.

timedelta(seconds=64)
Source code in src/docket/dependencies/_retry.py
def __init__(
    self,
    attempts: int | None = 1,
    minimum_delay: timedelta = timedelta(seconds=1),
    maximum_delay: timedelta = timedelta(seconds=64),
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        minimum_delay: The minimum delay between attempts.
        maximum_delay: The maximum delay between attempts.
    """
    super().__init__(attempts=attempts, delay=minimum_delay)
    self.maximum_delay = maximum_delay

Logged

Bases: Annotation

Instructs docket to include arguments to this parameter in the log.

If length_only is True, only the length of the argument will be included in the log.

Example:

@task
def setup_new_customer(
    customer_id: Annotated[int, Logged],
    addresses: Annotated[list[Address], Logged(length_only=True)],
    password: str,
) -> None:
    ...

In the logs, you's see the task referenced as:

setup_new_customer(customer_id=123, addresses[len 2], password=...)
Source code in src/docket/annotations.py
class Logged(Annotation):
    """Instructs docket to include arguments to this parameter in the log.

    If `length_only` is `True`, only the length of the argument will be included in
    the log.

    Example:

    ```python
    @task
    def setup_new_customer(
        customer_id: Annotated[int, Logged],
        addresses: Annotated[list[Address], Logged(length_only=True)],
        password: str,
    ) -> None:
        ...
    ```

    In the logs, you's see the task referenced as:

    ```
    setup_new_customer(customer_id=123, addresses[len 2], password=...)
    ```
    """

    length_only: bool = False

    def __init__(self, length_only: bool = False) -> None:
        self.length_only = length_only

    def format(self, argument: Any) -> str:
        if self.length_only:
            if isinstance(argument, (dict, set)):
                return f"{{len {len(argument)}}}"
            elif isinstance(argument, tuple):
                return f"(len {len(argument)})"
            elif hasattr(argument, "__len__"):
                return f"[len {len(argument)}]"

        return repr(argument)

Perpetual

Bases: CompletionHandler

Declare a task that should be run perpetually. Perpetual tasks are automatically rescheduled for the future after they finish (whether they succeed or fail). A perpetual task can be scheduled at worker startup with the automatic=True.

Example:

@task
async def my_task(perpetual: Perpetual = Perpetual()) -> None:
    ...
Source code in src/docket/dependencies/_perpetual.py
class Perpetual(CompletionHandler):
    """Declare a task that should be run perpetually.  Perpetual tasks are automatically
    rescheduled for the future after they finish (whether they succeed or fail).  A
    perpetual task can be scheduled at worker startup with the `automatic=True`.

    Example:

    ```python
    @task
    async def my_task(perpetual: Perpetual = Perpetual()) -> None:
        ...
    ```
    """

    single = True

    every: timedelta
    automatic: bool

    args: tuple[Any, ...]
    kwargs: dict[str, Any]

    cancelled: bool
    _next_when: datetime | None

    def __init__(
        self,
        every: timedelta = timedelta(0),
        automatic: bool = False,
    ) -> None:
        """
        Args:
            every: The target interval between task executions.
            automatic: If set, this task will be automatically scheduled during worker
                startup and continually through the worker's lifespan.  This ensures
                that the task will always be scheduled despite crashes and other
                adverse conditions.  Automatic tasks must not require any arguments.
        """
        self.every = every
        self.automatic = automatic
        self.cancelled = False
        self._next_when = None

    async def __aenter__(self) -> Perpetual:
        execution = self.execution.get()
        perpetual = Perpetual(every=self.every, automatic=self.automatic)
        perpetual.args = execution.args
        perpetual.kwargs = execution.kwargs
        return perpetual

    @property
    def initial_when(self) -> datetime | None:
        """Return None to schedule for immediate execution at worker startup."""
        return None

    def cancel(self) -> None:
        self.cancelled = True

    def perpetuate(self, *args: Any, **kwargs: Any) -> None:
        self.args = args
        self.kwargs = kwargs

    def after(self, delay: timedelta) -> None:
        """Schedule the next execution after the given delay."""
        self._next_when = datetime.now(timezone.utc) + delay

    def at(self, when: datetime) -> None:
        """Schedule the next execution at the given time."""
        self._next_when = when

    async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
        """Handle completion by scheduling the next execution."""
        if self.cancelled:
            docket = self.docket.get()
            async with docket.redis() as redis:
                await docket._cancel(redis, execution.key)
            return False

        if await execution.is_superseded():
            worker = self.worker.get()
            TASKS_SUPERSEDED.add(
                1,
                {
                    **worker.labels(),
                    **execution.general_labels(),
                    "docket.where": "on_complete",
                },
            )
            logger.info(
                "↬ [%s] %s (superseded)",
                format_duration(outcome.duration.total_seconds()),
                execution.call_repr(),
            )
            return True

        docket = self.docket.get()
        worker = self.worker.get()

        if self._next_when:
            when = self._next_when
        else:
            now = datetime.now(timezone.utc)
            when = max(now, now + self.every - outcome.duration)

        await docket.replace(execution.function, when, execution.key)(
            *self.args,
            **self.kwargs,
        )

        TASKS_PERPETUATED.add(1, {**worker.labels(), **execution.general_labels()})
        logger.info(
            "↫ [%s] %s",
            format_duration(outcome.duration.total_seconds()),
            execution.call_repr(),
        )

        return True

initial_when property

Return None to schedule for immediate execution at worker startup.

__init__(every=timedelta(0), automatic=False)

Parameters:

Name Type Description Default
every timedelta

The target interval between task executions.

timedelta(0)
automatic bool

If set, this task will be automatically scheduled during worker startup and continually through the worker's lifespan. This ensures that the task will always be scheduled despite crashes and other adverse conditions. Automatic tasks must not require any arguments.

False
Source code in src/docket/dependencies/_perpetual.py
def __init__(
    self,
    every: timedelta = timedelta(0),
    automatic: bool = False,
) -> None:
    """
    Args:
        every: The target interval between task executions.
        automatic: If set, this task will be automatically scheduled during worker
            startup and continually through the worker's lifespan.  This ensures
            that the task will always be scheduled despite crashes and other
            adverse conditions.  Automatic tasks must not require any arguments.
    """
    self.every = every
    self.automatic = automatic
    self.cancelled = False
    self._next_when = None

after(delay)

Schedule the next execution after the given delay.

Source code in src/docket/dependencies/_perpetual.py
def after(self, delay: timedelta) -> None:
    """Schedule the next execution after the given delay."""
    self._next_when = datetime.now(timezone.utc) + delay

at(when)

Schedule the next execution at the given time.

Source code in src/docket/dependencies/_perpetual.py
def at(self, when: datetime) -> None:
    """Schedule the next execution at the given time."""
    self._next_when = when

on_complete(execution, outcome) async

Handle completion by scheduling the next execution.

Source code in src/docket/dependencies/_perpetual.py
async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
    """Handle completion by scheduling the next execution."""
    if self.cancelled:
        docket = self.docket.get()
        async with docket.redis() as redis:
            await docket._cancel(redis, execution.key)
        return False

    if await execution.is_superseded():
        worker = self.worker.get()
        TASKS_SUPERSEDED.add(
            1,
            {
                **worker.labels(),
                **execution.general_labels(),
                "docket.where": "on_complete",
            },
        )
        logger.info(
            "↬ [%s] %s (superseded)",
            format_duration(outcome.duration.total_seconds()),
            execution.call_repr(),
        )
        return True

    docket = self.docket.get()
    worker = self.worker.get()

    if self._next_when:
        when = self._next_when
    else:
        now = datetime.now(timezone.utc)
        when = max(now, now + self.every - outcome.duration)

    await docket.replace(execution.function, when, execution.key)(
        *self.args,
        **self.kwargs,
    )

    TASKS_PERPETUATED.add(1, {**worker.labels(), **execution.general_labels()})
    logger.info(
        "↫ [%s] %s",
        format_duration(outcome.duration.total_seconds()),
        execution.call_repr(),
    )

    return True

Progress

Bases: Dependency

A dependency to report progress updates for the currently executing task.

Tasks can use this to report their current progress (current/total values) and status messages to external observers.

Example:

@task
async def process_records(records: list, progress: Progress = Progress()) -> None:
    await progress.set_total(len(records))
    for i, record in enumerate(records):
        await process(record)
        await progress.increment()
        await progress.set_message(f"Processed {record.id}")
Source code in src/docket/dependencies/_progress.py
class Progress(Dependency):
    """A dependency to report progress updates for the currently executing task.

    Tasks can use this to report their current progress (current/total values) and
    status messages to external observers.

    Example:

    ```python
    @task
    async def process_records(records: list, progress: Progress = Progress()) -> None:
        await progress.set_total(len(records))
        for i, record in enumerate(records):
            await process(record)
            await progress.increment()
            await progress.set_message(f"Processed {record.id}")
    ```
    """

    def __init__(self) -> None:
        self._progress: ExecutionProgress | None = None

    async def __aenter__(self) -> Progress:
        execution = self.execution.get()
        self._progress = execution.progress
        return self

    @property
    def current(self) -> int | None:
        """Current progress value."""
        assert self._progress is not None, "Progress must be used as a dependency"
        return self._progress.current

    @property
    def total(self) -> int:
        """Total/target value for progress tracking."""
        assert self._progress is not None, "Progress must be used as a dependency"
        return self._progress.total

    @property
    def message(self) -> str | None:
        """User-provided status message."""
        assert self._progress is not None, "Progress must be used as a dependency"
        return self._progress.message

    async def set_total(self, total: int) -> None:
        """Set the total/target value for progress tracking."""
        assert self._progress is not None, "Progress must be used as a dependency"
        await self._progress.set_total(total)

    async def increment(self, amount: int = 1) -> None:
        """Atomically increment the current progress value."""
        assert self._progress is not None, "Progress must be used as a dependency"
        await self._progress.increment(amount)

    async def set_message(self, message: str | None) -> None:
        """Update the progress status message."""
        assert self._progress is not None, "Progress must be used as a dependency"
        await self._progress.set_message(message)

current property

Current progress value.

message property

User-provided status message.

total property

Total/target value for progress tracking.

increment(amount=1) async

Atomically increment the current progress value.

Source code in src/docket/dependencies/_progress.py
async def increment(self, amount: int = 1) -> None:
    """Atomically increment the current progress value."""
    assert self._progress is not None, "Progress must be used as a dependency"
    await self._progress.increment(amount)

set_message(message) async

Update the progress status message.

Source code in src/docket/dependencies/_progress.py
async def set_message(self, message: str | None) -> None:
    """Update the progress status message."""
    assert self._progress is not None, "Progress must be used as a dependency"
    await self._progress.set_message(message)

set_total(total) async

Set the total/target value for progress tracking.

Source code in src/docket/dependencies/_progress.py
async def set_total(self, total: int) -> None:
    """Set the total/target value for progress tracking."""
    assert self._progress is not None, "Progress must be used as a dependency"
    await self._progress.set_total(total)

Retry

Bases: FailureHandler

Configures linear retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the delay between attempts.

Example:

@task
async def my_task(retry: Retry = Retry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies/_retry.py
class Retry(FailureHandler):
    """Configures linear retries for a task.  You can specify the total number of
    attempts (or `None` to retry indefinitely), and the delay between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: Retry = Retry(attempts=3)) -> None:
        ...
    ```
    """

    single: bool = True

    attempts: int | None
    delay: timedelta
    attempt: int

    def __init__(
        self, attempts: int | None = 1, delay: timedelta = timedelta(0)
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            delay: The delay between attempts.
        """
        self.attempts = attempts
        self.delay = delay
        self.attempt = 1

    async def __aenter__(self) -> Retry:
        execution = self.execution.get()
        retry = Retry(attempts=self.attempts, delay=self.delay)
        retry.attempt = execution.attempt
        return retry

    def after(self, delay: timedelta) -> NoReturn:
        """Request a retry after the given delay."""
        self.delay = delay
        raise ForcedRetry()

    def at(self, when: datetime) -> NoReturn:
        """Request a retry at the given time."""
        now = datetime.now(timezone.utc)
        diff = when - now
        diff = diff if diff.total_seconds() >= 0 else timedelta(0)
        self.after(diff)

    def in_(self, delay: timedelta) -> NoReturn:
        """Deprecated: use after() instead."""
        self.after(delay)

    async def handle_failure(self, execution: Execution, outcome: TaskOutcome) -> bool:
        """Handle failure by scheduling a retry if attempts remain."""
        if self.attempts is not None and execution.attempt >= self.attempts:
            return False

        execution.when = datetime.now(timezone.utc) + self.delay
        execution.attempt += 1
        await execution.schedule(replace=True)

        worker = self.worker.get()
        TASKS_RETRIED.add(1, {**worker.labels(), **execution.general_labels()})
        logger.info(
            "↫ [%s] %s",
            format_duration(outcome.duration.total_seconds()),
            execution.call_repr(),
        )

        return True

__init__(attempts=1, delay=timedelta(0))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
delay timedelta

The delay between attempts.

timedelta(0)
Source code in src/docket/dependencies/_retry.py
def __init__(
    self, attempts: int | None = 1, delay: timedelta = timedelta(0)
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        delay: The delay between attempts.
    """
    self.attempts = attempts
    self.delay = delay
    self.attempt = 1

after(delay)

Request a retry after the given delay.

Source code in src/docket/dependencies/_retry.py
def after(self, delay: timedelta) -> NoReturn:
    """Request a retry after the given delay."""
    self.delay = delay
    raise ForcedRetry()

at(when)

Request a retry at the given time.

Source code in src/docket/dependencies/_retry.py
def at(self, when: datetime) -> NoReturn:
    """Request a retry at the given time."""
    now = datetime.now(timezone.utc)
    diff = when - now
    diff = diff if diff.total_seconds() >= 0 else timedelta(0)
    self.after(diff)

handle_failure(execution, outcome) async

Handle failure by scheduling a retry if attempts remain.

Source code in src/docket/dependencies/_retry.py
async def handle_failure(self, execution: Execution, outcome: TaskOutcome) -> bool:
    """Handle failure by scheduling a retry if attempts remain."""
    if self.attempts is not None and execution.attempt >= self.attempts:
        return False

    execution.when = datetime.now(timezone.utc) + self.delay
    execution.attempt += 1
    await execution.schedule(replace=True)

    worker = self.worker.get()
    TASKS_RETRIED.add(1, {**worker.labels(), **execution.general_labels()})
    logger.info(
        "↫ [%s] %s",
        format_duration(outcome.duration.total_seconds()),
        execution.call_repr(),
    )

    return True

in_(delay)

Deprecated: use after() instead.

Source code in src/docket/dependencies/_retry.py
def in_(self, delay: timedelta) -> NoReturn:
    """Deprecated: use after() instead."""
    self.after(delay)

StrikeList

A strike list that manages conditions for blocking task execution.

When a URL is provided, the strike list will connect to Redis and monitor a stream for strike/restore instructions. External processes (like Docket) can issue strikes, and all StrikeList instances listening to the same stream will receive and apply those updates.

Example using context manager with Redis

async with StrikeList(url="redis://localhost:6379/0", name="my-docket") as strikes: # External process issues: await docket.strike("my_task", "customer_id", "==", "blocked")

if strikes.is_stricken({"customer_id": "blocked"}):
    print("Customer is blocked")

Example with Docket (managed internally): async with Docket(name="my-docket", url="redis://localhost:6379/0") as docket: # Docket manages its own StrikeList internally await docket.strike(None, "customer_id", "==", "blocked")

Example using explicit connect/close: strikes = StrikeList(url="redis://localhost:6379/0", name="my-docket") await strikes.connect() try: if strikes.is_stricken({"customer_id": "blocked"}): print("Customer is blocked") finally: await strikes.close()

Example without Redis (local-only): strikes = StrikeList() # No URL = no Redis connection strikes.update(Strike(None, "customer_id", Operator.EQUAL, "blocked")) if strikes.is_stricken({"customer_id": "blocked"}): print("Customer is blocked")

Source code in src/docket/strikelist.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
class StrikeList:
    """A strike list that manages conditions for blocking task execution.

    When a URL is provided, the strike list will connect to Redis and monitor
    a stream for strike/restore instructions. External processes (like Docket)
    can issue strikes, and all StrikeList instances listening to the same
    stream will receive and apply those updates.

    Example using context manager with Redis:
        async with StrikeList(url="redis://localhost:6379/0", name="my-docket") as strikes:
            # External process issues: await docket.strike("my_task", "customer_id", "==", "blocked")

            if strikes.is_stricken({"customer_id": "blocked"}):
                print("Customer is blocked")

    Example with Docket (managed internally):
        async with Docket(name="my-docket", url="redis://localhost:6379/0") as docket:
            # Docket manages its own StrikeList internally
            await docket.strike(None, "customer_id", "==", "blocked")

    Example using explicit connect/close:
        strikes = StrikeList(url="redis://localhost:6379/0", name="my-docket")
        await strikes.connect()
        try:
            if strikes.is_stricken({"customer_id": "blocked"}):
                print("Customer is blocked")
        finally:
            await strikes.close()

    Example without Redis (local-only):
        strikes = StrikeList()  # No URL = no Redis connection
        strikes.update(Strike(None, "customer_id", Operator.EQUAL, "blocked"))
        if strikes.is_stricken({"customer_id": "blocked"}):
            print("Customer is blocked")
    """

    task_strikes: TaskStrikes
    parameter_strikes: ParameterStrikes
    _conditions: list[Callable[["Execution"], bool]]
    _redis: RedisConnection | None
    _monitor_task: asyncio.Task[NoReturn] | None
    _strikes_loaded: asyncio.Event | None
    _stack: AsyncExitStack

    def __init__(
        self,
        url: str | None = None,
        name: str = "strikelist",
        enable_internal_instrumentation: bool = False,
    ) -> None:
        """Initialize a StrikeList.

        Args:
            url: Redis connection URL. Use "memory://" for in-memory testing.
                 If None, no Redis connection is made (local-only mode).
            name: Name used as prefix for Redis keys (should match the Docket name
                  if you want to receive strikes from that Docket).
            enable_internal_instrumentation: If True, allows OpenTelemetry spans
                for internal Redis operations. Default False suppresses these spans.
        """
        self.url = url
        self.name = name
        self.enable_internal_instrumentation = enable_internal_instrumentation
        self.task_strikes = {}
        self.parameter_strikes = {}
        self._conditions = [self._matches_task_or_parameter_strike]
        self._redis = RedisConnection(url) if url else None
        self._monitor_task = None
        self._strikes_loaded = None

    @property
    def prefix(self) -> str:
        """Return the key prefix for this strike list.

        All Redis keys for this strike list are prefixed with this value.

        For Redis Cluster mode, returns a hash-tagged prefix like "{myapp}"
        to ensure all keys hash to the same slot.
        """
        if self._redis is not None:
            return self._redis.prefix(self.name)
        return self.name

    @property
    def strike_key(self) -> str:
        """Redis stream key for strike instructions."""
        return f"{self.prefix}:strikes"

    @contextmanager
    def _maybe_suppress_instrumentation(self) -> Generator[None, None, None]:
        """Suppress OTel auto-instrumentation for internal Redis operations."""
        if not self.enable_internal_instrumentation:
            with suppress_instrumentation():
                yield
        else:  # pragma: no cover
            yield

    async def __aenter__(self) -> Self:
        """Async context manager entry - connects to Redis if URL provided."""
        self._stack = AsyncExitStack()
        await self._stack.__aenter__()

        if self._redis is None:
            return self  # No Redis connection needed (local-only mode)

        assert not self._redis.is_connected, "StrikeList is not reentrant"
        await self._stack.enter_async_context(self._redis)

        self._strikes_loaded = asyncio.Event()
        self._stack.callback(lambda: setattr(self, "_strikes_loaded", None))

        self._monitor_task = asyncio.create_task(
            self._monitor_strikes(), name=f"{self.name} - strike monitor"
        )
        self._stack.callback(lambda: setattr(self, "_monitor_task", None))
        self._stack.push_async_callback(
            cancel_task, self._monitor_task, CANCEL_MSG_CLEANUP
        )

        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Async context manager exit - closes Redis connection."""
        try:
            await self._stack.__aexit__(exc_type, exc_value, traceback)
        finally:
            del self._stack

    def add_condition(self, condition: Callable[["Execution"], bool]) -> None:
        """Adds a temporary condition that indicates an execution is stricken."""
        self._conditions.insert(0, condition)

    def remove_condition(self, condition: Callable[["Execution"], bool]) -> None:
        """Removes a temporary condition that indicates an execution is stricken."""
        assert condition is not self._matches_task_or_parameter_strike
        self._conditions.remove(condition)

    async def wait_for_strikes_loaded(self) -> None:
        """Wait for all existing strikes to be loaded from the stream.

        This method blocks until the strike monitor has completed its initial
        non-blocking read of all existing strike messages. Call this before
        making decisions that depend on the current strike state.

        If not connected to Redis (local-only mode), returns immediately.
        """
        if self._strikes_loaded is None:
            return
        await self._strikes_loaded.wait()

    async def send_instruction(self, instruction: StrikeInstruction) -> None:
        """Send a strike instruction to Redis and update local state.

        Args:
            instruction: The Strike or Restore instruction to send.

        Raises:
            RuntimeError: If not connected to Redis.
        """
        if self._redis is None or not self._redis.is_connected:
            raise RuntimeError(
                "Cannot send strike instruction: not connected to Redis. "
                "Use connect() or async context manager first."
            )

        async with self._redis.client() as r:
            await r.xadd(self.strike_key, instruction.as_message())  # type: ignore[arg-type]

        self.update(instruction)

    async def strike(
        self,
        function: str | None = None,
        parameter: str | None = None,
        operator: "Operator | LiteralOperator" = "==",
        value: Hashable | None = None,
    ) -> None:
        """Issue a strike to block matching tasks or parameters.

        Args:
            function: Task function name to strike, or None for all tasks.
            parameter: Parameter name to match, or None for entire task.
            operator: Comparison operator for the value.
            value: Value to compare against.
        """
        instruction = Strike(function, parameter, Operator(operator), value)
        await self.send_instruction(instruction)

    async def restore(
        self,
        function: str | None = None,
        parameter: str | None = None,
        operator: "Operator | LiteralOperator" = "==",
        value: Hashable | None = None,
    ) -> None:
        """Restore a previously issued strike.

        Args:
            function: Task function name to restore, or None for all tasks.
            parameter: Parameter name to match, or None for entire task.
            operator: Comparison operator for the value.
            value: Value to compare against.
        """
        instruction = Restore(function, parameter, Operator(operator), value)
        await self.send_instruction(instruction)

    def is_stricken(self, target: "Execution | Mapping[str, Any]") -> bool:
        """Check if a target matches any strike condition.

        Args:
            target: Either an Execution object (for Docket/Worker use) or
                   a dictionary of parameter names to values (for standalone use).

        Returns:
            True if any parameter matches a strike condition.
        """
        # Check if this is a dict-like object (Mapping)
        if isinstance(target, Mapping):
            return self._is_dict_stricken(target)

        # Otherwise it's an Execution - use the full condition checking
        return any(condition(target) for condition in self._conditions)

    def _is_dict_stricken(self, params: Mapping[str, Any]) -> bool:
        """Check if a parameter dict matches any strike condition.

        Args:
            params: Dictionary of parameter names to values.

        Returns:
            True if any parameter matches a strike condition.
        """
        for parameter, argument in params.items():
            if parameter not in self.parameter_strikes:
                continue

            for operator, strike_value in self.parameter_strikes[parameter]:
                if self._is_match(argument, operator, strike_value):
                    return True

        return False

    def _matches_task_or_parameter_strike(self, execution: "Execution") -> bool:
        from .execution import get_signature

        function_name = execution.function_name

        # Check if the entire task is stricken (without parameter conditions)
        task_strikes = self.task_strikes.get(function_name, {})
        if function_name in self.task_strikes and not task_strikes:
            return True

        signature = get_signature(execution.function)

        try:
            bound_args = signature.bind(*execution.args, **execution.kwargs)
            bound_args.apply_defaults()
        except TypeError:
            # If we can't make sense of the arguments, just assume the task is fine
            return False

        all_arguments = {
            **bound_args.arguments,
            **{
                k: v
                for k, v in execution.kwargs.items()
                if k not in bound_args.arguments
            },
        }

        for parameter, argument in all_arguments.items():
            for strike_source in [task_strikes, self.parameter_strikes]:
                if parameter not in strike_source:
                    continue

                for operator, strike_value in strike_source[parameter]:
                    if self._is_match(argument, operator, strike_value):
                        return True

        return False

    def _is_match(self, value: Any, operator: Operator, strike_value: Any) -> bool:
        """Determines if a value matches a strike condition."""
        try:
            match operator:
                case "==":
                    return value == strike_value
                case "!=":
                    return value != strike_value
                case ">":
                    return value > strike_value
                case ">=":
                    return value >= strike_value
                case "<":
                    return value < strike_value
                case "<=":
                    return value <= strike_value
                case "between":  # pragma: no branch
                    lower, upper = strike_value
                    return lower <= value <= upper
                case _:  # pragma: no cover
                    raise ValueError(f"Unknown operator: {operator}")
        except (ValueError, TypeError):
            # If we can't make the comparison due to incompatible types, just log the
            # error and assume the task is not stricken
            logger.warning(
                "Incompatible type for strike condition: %r %s %r",
                strike_value,
                operator,
                value,
                exc_info=True,
            )
            return False

    def update(self, instruction: StrikeInstruction) -> None:
        try:
            hash(instruction.value)
        except TypeError:
            logger.warning(
                "Incompatible type for strike condition: %s %r",
                instruction.operator,
                instruction.value,
            )
            return

        if isinstance(instruction, Strike):
            self._strike(instruction)
        elif isinstance(instruction, Restore):  # pragma: no branch
            self._restore(instruction)

    def _strike(self, strike: Strike) -> None:
        if strike.function and strike.parameter:
            try:
                task_strikes = self.task_strikes[strike.function]
            except KeyError:
                task_strikes = self.task_strikes[strike.function] = {}

            try:
                parameter_strikes = task_strikes[strike.parameter]
            except KeyError:
                parameter_strikes = task_strikes[strike.parameter] = set()

            parameter_strikes.add((strike.operator, strike.value))

        elif strike.function:
            try:
                task_strikes = self.task_strikes[strike.function]
            except KeyError:
                task_strikes = self.task_strikes[strike.function] = {}

        elif strike.parameter:  # pragma: no branch
            try:
                parameter_strikes = self.parameter_strikes[strike.parameter]
            except KeyError:
                parameter_strikes = self.parameter_strikes[strike.parameter] = set()

            parameter_strikes.add((strike.operator, strike.value))

    def _restore(self, restore: Restore) -> None:
        if restore.function and restore.parameter:
            try:
                task_strikes = self.task_strikes[restore.function]
            except KeyError:
                return

            try:
                parameter_strikes = task_strikes[restore.parameter]
            except KeyError:
                task_strikes.pop(restore.parameter, None)
                return

            try:
                parameter_strikes.remove((restore.operator, restore.value))
            except KeyError:
                pass

            if not parameter_strikes:
                task_strikes.pop(restore.parameter, None)
                if not task_strikes:
                    self.task_strikes.pop(restore.function, None)

        elif restore.function:
            try:
                task_strikes = self.task_strikes[restore.function]
            except KeyError:
                return

            # If there are no parameter strikes, this was a full task strike
            if not task_strikes:
                self.task_strikes.pop(restore.function, None)

        elif restore.parameter:  # pragma: no branch
            try:
                parameter_strikes = self.parameter_strikes[restore.parameter]
            except KeyError:
                return

            try:
                parameter_strikes.remove((restore.operator, restore.value))
            except KeyError:
                pass

            if not parameter_strikes:
                self.parameter_strikes.pop(restore.parameter, None)

    async def _monitor_strikes(self) -> NoReturn:
        """Background task that monitors Redis for strike updates."""
        from .instrumentation import REDIS_DISRUPTIONS, STRIKES_IN_EFFECT

        assert self._redis is not None

        last_id = "0-0"
        initial_load_complete = False
        while True:
            try:
                async with self._redis.client() as r:
                    while True:
                        last_id, initial_load_complete = await self._read_strikes(
                            r, last_id, initial_load_complete, STRIKES_IN_EFFECT
                        )
            except redis.exceptions.ConnectionError:  # pragma: no cover
                REDIS_DISRUPTIONS.add(1, {"docket": self.name})
                logger.warning("Connection error, sleeping for 1 second...")
                await asyncio.sleep(1)
            except Exception:  # pragma: no cover
                logger.exception("Error monitoring strikes")
                await asyncio.sleep(1)

    async def _read_strikes(
        self,
        r: Redis | RedisCluster,
        last_id: str,
        initial_load_complete: bool,
        strikes_in_effect: Any,
    ) -> tuple[str, bool]:
        """Read and process strike messages from Redis stream.

        Returns:
            Tuple of (last_id, initial_load_complete) to allow state persistence.
        """
        with self._maybe_suppress_instrumentation():
            # Non-blocking for initial load (block=None), then block
            # for new messages (block=60_000). Note: block=0 means
            # "block forever" in Redis, not "non-blocking".
            streams = await r.xread(
                {self.strike_key: last_id},
                count=100,
                block=60_000 if initial_load_complete else None,
            )

        # If no messages and we haven't signaled yet, initial load is done
        if not streams and not initial_load_complete:
            initial_load_complete = True
            # _strikes_loaded is always set when _monitor_strikes runs
            assert self._strikes_loaded is not None
            self._strikes_loaded.set()
            return last_id, initial_load_complete

        if not streams and initial_load_complete:  # pragma: no cover
            if self.url and self.url.startswith("memory://"):
                await asyncio.sleep(0.1)
            return last_id, initial_load_complete

        for _, messages in streams:
            for message_id, message in messages:
                last_id = message_id  # type: ignore[assignment]
                instruction = StrikeInstruction.from_message(message)
                self.update(instruction)
                logger.info(
                    "%s %r",
                    ("Striking" if instruction.direction == "strike" else "Restoring"),
                    instruction.call_repr(),
                )

                strikes_in_effect.add(
                    1 if instruction.direction == "strike" else -1,
                    {
                        "docket.name": self.name,
                        **instruction.labels(),
                    },
                )

        return last_id, initial_load_complete

prefix property

Return the key prefix for this strike list.

All Redis keys for this strike list are prefixed with this value.

For Redis Cluster mode, returns a hash-tagged prefix like "{myapp}" to ensure all keys hash to the same slot.

strike_key property

Redis stream key for strike instructions.

__aenter__() async

Async context manager entry - connects to Redis if URL provided.

Source code in src/docket/strikelist.py
async def __aenter__(self) -> Self:
    """Async context manager entry - connects to Redis if URL provided."""
    self._stack = AsyncExitStack()
    await self._stack.__aenter__()

    if self._redis is None:
        return self  # No Redis connection needed (local-only mode)

    assert not self._redis.is_connected, "StrikeList is not reentrant"
    await self._stack.enter_async_context(self._redis)

    self._strikes_loaded = asyncio.Event()
    self._stack.callback(lambda: setattr(self, "_strikes_loaded", None))

    self._monitor_task = asyncio.create_task(
        self._monitor_strikes(), name=f"{self.name} - strike monitor"
    )
    self._stack.callback(lambda: setattr(self, "_monitor_task", None))
    self._stack.push_async_callback(
        cancel_task, self._monitor_task, CANCEL_MSG_CLEANUP
    )

    return self

__aexit__(exc_type, exc_value, traceback) async

Async context manager exit - closes Redis connection.

Source code in src/docket/strikelist.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Async context manager exit - closes Redis connection."""
    try:
        await self._stack.__aexit__(exc_type, exc_value, traceback)
    finally:
        del self._stack

__init__(url=None, name='strikelist', enable_internal_instrumentation=False)

Initialize a StrikeList.

Parameters:

Name Type Description Default
url str | None

Redis connection URL. Use "memory://" for in-memory testing. If None, no Redis connection is made (local-only mode).

None
name str

Name used as prefix for Redis keys (should match the Docket name if you want to receive strikes from that Docket).

'strikelist'
enable_internal_instrumentation bool

If True, allows OpenTelemetry spans for internal Redis operations. Default False suppresses these spans.

False
Source code in src/docket/strikelist.py
def __init__(
    self,
    url: str | None = None,
    name: str = "strikelist",
    enable_internal_instrumentation: bool = False,
) -> None:
    """Initialize a StrikeList.

    Args:
        url: Redis connection URL. Use "memory://" for in-memory testing.
             If None, no Redis connection is made (local-only mode).
        name: Name used as prefix for Redis keys (should match the Docket name
              if you want to receive strikes from that Docket).
        enable_internal_instrumentation: If True, allows OpenTelemetry spans
            for internal Redis operations. Default False suppresses these spans.
    """
    self.url = url
    self.name = name
    self.enable_internal_instrumentation = enable_internal_instrumentation
    self.task_strikes = {}
    self.parameter_strikes = {}
    self._conditions = [self._matches_task_or_parameter_strike]
    self._redis = RedisConnection(url) if url else None
    self._monitor_task = None
    self._strikes_loaded = None

add_condition(condition)

Adds a temporary condition that indicates an execution is stricken.

Source code in src/docket/strikelist.py
def add_condition(self, condition: Callable[["Execution"], bool]) -> None:
    """Adds a temporary condition that indicates an execution is stricken."""
    self._conditions.insert(0, condition)

is_stricken(target)

Check if a target matches any strike condition.

Parameters:

Name Type Description Default
target Execution | Mapping[str, Any]

Either an Execution object (for Docket/Worker use) or a dictionary of parameter names to values (for standalone use).

required

Returns:

Type Description
bool

True if any parameter matches a strike condition.

Source code in src/docket/strikelist.py
def is_stricken(self, target: "Execution | Mapping[str, Any]") -> bool:
    """Check if a target matches any strike condition.

    Args:
        target: Either an Execution object (for Docket/Worker use) or
               a dictionary of parameter names to values (for standalone use).

    Returns:
        True if any parameter matches a strike condition.
    """
    # Check if this is a dict-like object (Mapping)
    if isinstance(target, Mapping):
        return self._is_dict_stricken(target)

    # Otherwise it's an Execution - use the full condition checking
    return any(condition(target) for condition in self._conditions)

remove_condition(condition)

Removes a temporary condition that indicates an execution is stricken.

Source code in src/docket/strikelist.py
def remove_condition(self, condition: Callable[["Execution"], bool]) -> None:
    """Removes a temporary condition that indicates an execution is stricken."""
    assert condition is not self._matches_task_or_parameter_strike
    self._conditions.remove(condition)

restore(function=None, parameter=None, operator='==', value=None) async

Restore a previously issued strike.

Parameters:

Name Type Description Default
function str | None

Task function name to restore, or None for all tasks.

None
parameter str | None

Parameter name to match, or None for entire task.

None
operator Operator | LiteralOperator

Comparison operator for the value.

'=='
value Hashable | None

Value to compare against.

None
Source code in src/docket/strikelist.py
async def restore(
    self,
    function: str | None = None,
    parameter: str | None = None,
    operator: "Operator | LiteralOperator" = "==",
    value: Hashable | None = None,
) -> None:
    """Restore a previously issued strike.

    Args:
        function: Task function name to restore, or None for all tasks.
        parameter: Parameter name to match, or None for entire task.
        operator: Comparison operator for the value.
        value: Value to compare against.
    """
    instruction = Restore(function, parameter, Operator(operator), value)
    await self.send_instruction(instruction)

send_instruction(instruction) async

Send a strike instruction to Redis and update local state.

Parameters:

Name Type Description Default
instruction StrikeInstruction

The Strike or Restore instruction to send.

required

Raises:

Type Description
RuntimeError

If not connected to Redis.

Source code in src/docket/strikelist.py
async def send_instruction(self, instruction: StrikeInstruction) -> None:
    """Send a strike instruction to Redis and update local state.

    Args:
        instruction: The Strike or Restore instruction to send.

    Raises:
        RuntimeError: If not connected to Redis.
    """
    if self._redis is None or not self._redis.is_connected:
        raise RuntimeError(
            "Cannot send strike instruction: not connected to Redis. "
            "Use connect() or async context manager first."
        )

    async with self._redis.client() as r:
        await r.xadd(self.strike_key, instruction.as_message())  # type: ignore[arg-type]

    self.update(instruction)

strike(function=None, parameter=None, operator='==', value=None) async

Issue a strike to block matching tasks or parameters.

Parameters:

Name Type Description Default
function str | None

Task function name to strike, or None for all tasks.

None
parameter str | None

Parameter name to match, or None for entire task.

None
operator Operator | LiteralOperator

Comparison operator for the value.

'=='
value Hashable | None

Value to compare against.

None
Source code in src/docket/strikelist.py
async def strike(
    self,
    function: str | None = None,
    parameter: str | None = None,
    operator: "Operator | LiteralOperator" = "==",
    value: Hashable | None = None,
) -> None:
    """Issue a strike to block matching tasks or parameters.

    Args:
        function: Task function name to strike, or None for all tasks.
        parameter: Parameter name to match, or None for entire task.
        operator: Comparison operator for the value.
        value: Value to compare against.
    """
    instruction = Strike(function, parameter, Operator(operator), value)
    await self.send_instruction(instruction)

wait_for_strikes_loaded() async

Wait for all existing strikes to be loaded from the stream.

This method blocks until the strike monitor has completed its initial non-blocking read of all existing strike messages. Call this before making decisions that depend on the current strike state.

If not connected to Redis (local-only mode), returns immediately.

Source code in src/docket/strikelist.py
async def wait_for_strikes_loaded(self) -> None:
    """Wait for all existing strikes to be loaded from the stream.

    This method blocks until the strike monitor has completed its initial
    non-blocking read of all existing strike messages. Call this before
    making decisions that depend on the current strike state.

    If not connected to Redis (local-only mode), returns immediately.
    """
    if self._strikes_loaded is None:
        return
    await self._strikes_loaded.wait()

Timeout

Bases: Runtime

Configures a timeout for a task. You can specify the base timeout, and the task will be cancelled if it exceeds this duration. The timeout may be extended within the context of a single running task.

Example:

@task
async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
    ...
Source code in src/docket/dependencies/_timeout.py
class Timeout(Runtime):
    """Configures a timeout for a task.  You can specify the base timeout, and the
    task will be cancelled if it exceeds this duration.  The timeout may be extended
    within the context of a single running task.

    Example:

    ```python
    @task
    async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
        ...
    ```
    """

    single: bool = True

    base: timedelta
    _deadline: float

    def __init__(self, base: timedelta) -> None:
        """
        Args:
            base: The base timeout duration.
        """
        self.base = base

    async def __aenter__(self) -> Timeout:
        return Timeout(base=self.base)

    def start(self) -> None:
        self._deadline = time.monotonic() + self.base.total_seconds()

    def expired(self) -> bool:
        return time.monotonic() >= self._deadline

    def remaining(self) -> timedelta:
        """Get the remaining time until the timeout expires."""
        return timedelta(seconds=self._deadline - time.monotonic())

    def extend(self, by: timedelta | None = None) -> None:
        """Extend the timeout by a given duration.  If no duration is provided, the
        base timeout will be used.

        Args:
            by: The duration to extend the timeout by.
        """
        if by is None:
            by = self.base
        self._deadline += by.total_seconds()

    async def run(
        self,
        execution: Execution,
        function: Callable[..., Awaitable[Any]],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
    ) -> Any:
        """Execute the function with timeout enforcement."""
        self.start()

        docket = self.docket.get()
        task = asyncio.create_task(
            function(*args, **kwargs),  # type: ignore[arg-type]
            name=f"{docket.name} - task:{execution.key}",
        )

        timed_out = False
        try:
            while not task.done():  # pragma: no branch
                if self.expired():
                    timed_out = True
                    break

                try:
                    return await asyncio.wait_for(
                        asyncio.shield(task), timeout=self.remaining().total_seconds()
                    )
                except asyncio.TimeoutError:
                    continue
        finally:
            if not task.done():
                timeout_reason = (
                    f"Docket task {execution.key} exceeded "
                    f"timeout of {self.base.total_seconds()}s"
                )
                await cancel_task(task, timeout_reason)
                if timed_out:  # pragma: no branch
                    raise asyncio.TimeoutError(timeout_reason)

__init__(base)

Parameters:

Name Type Description Default
base timedelta

The base timeout duration.

required
Source code in src/docket/dependencies/_timeout.py
def __init__(self, base: timedelta) -> None:
    """
    Args:
        base: The base timeout duration.
    """
    self.base = base

extend(by=None)

Extend the timeout by a given duration. If no duration is provided, the base timeout will be used.

Parameters:

Name Type Description Default
by timedelta | None

The duration to extend the timeout by.

None
Source code in src/docket/dependencies/_timeout.py
def extend(self, by: timedelta | None = None) -> None:
    """Extend the timeout by a given duration.  If no duration is provided, the
    base timeout will be used.

    Args:
        by: The duration to extend the timeout by.
    """
    if by is None:
        by = self.base
    self._deadline += by.total_seconds()

remaining()

Get the remaining time until the timeout expires.

Source code in src/docket/dependencies/_timeout.py
def remaining(self) -> timedelta:
    """Get the remaining time until the timeout expires."""
    return timedelta(seconds=self._deadline - time.monotonic())

run(execution, function, args, kwargs) async

Execute the function with timeout enforcement.

Source code in src/docket/dependencies/_timeout.py
async def run(
    self,
    execution: Execution,
    function: Callable[..., Awaitable[Any]],
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
) -> Any:
    """Execute the function with timeout enforcement."""
    self.start()

    docket = self.docket.get()
    task = asyncio.create_task(
        function(*args, **kwargs),  # type: ignore[arg-type]
        name=f"{docket.name} - task:{execution.key}",
    )

    timed_out = False
    try:
        while not task.done():  # pragma: no branch
            if self.expired():
                timed_out = True
                break

            try:
                return await asyncio.wait_for(
                    asyncio.shield(task), timeout=self.remaining().total_seconds()
                )
            except asyncio.TimeoutError:
                continue
    finally:
        if not task.done():
            timeout_reason = (
                f"Docket task {execution.key} exceeded "
                f"timeout of {self.base.total_seconds()}s"
            )
            await cancel_task(task, timeout_reason)
            if timed_out:  # pragma: no branch
                raise asyncio.TimeoutError(timeout_reason)

Worker

A Worker executes tasks on a Docket. You may run as many workers as you like to work a single Docket.

Example:

async with Docket() as docket:
    async with Worker(docket) as worker:
        await worker.run_forever()
Source code in src/docket/worker.py
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
class Worker:
    """A Worker executes tasks on a Docket.  You may run as many workers as you like
    to work a single Docket.

    Example:

    ```python
    async with Docket() as docket:
        async with Worker(docket) as worker:
            await worker.run_forever()
    ```
    """

    docket: Docket
    name: str
    concurrency: int
    redelivery_timeout: timedelta
    reconnection_delay: timedelta
    minimum_check_interval: timedelta
    scheduling_resolution: timedelta
    schedule_automatic_tasks: bool
    enable_internal_instrumentation: bool
    fallback_task: TaskFunction

    def __init__(
        self,
        docket: Docket,
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=250),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
        enable_internal_instrumentation: bool = False,
        fallback_task: TaskFunction | None = None,
    ) -> None:
        self.docket = docket
        self.name = name or f"{socket.gethostname()}#{os.getpid()}"
        self.concurrency = concurrency
        self.redelivery_timeout = redelivery_timeout
        self.reconnection_delay = reconnection_delay
        self.minimum_check_interval = minimum_check_interval
        self.scheduling_resolution = scheduling_resolution
        self.schedule_automatic_tasks = schedule_automatic_tasks
        self.enable_internal_instrumentation = enable_internal_instrumentation
        self.fallback_task = fallback_task or default_fallback_task

    @contextmanager
    def _maybe_suppress_instrumentation(self) -> Generator[None, None, None]:
        """Suppress OTel auto-instrumentation for internal Redis operations.

        When enable_internal_instrumentation is False (default), this context manager
        suppresses OpenTelemetry auto-instrumentation spans for internal Redis polling
        operations like XREADGROUP, XAUTOCLAIM, and Lua script evaluations. This prevents
        thousands of noisy spans per minute from overwhelming trace storage.

        Task execution spans and user-facing operations (schedule, cancel, etc.) are
        NOT suppressed.
        """
        if not self.enable_internal_instrumentation:
            with suppress_instrumentation():
                yield
        else:  # pragma: no cover
            yield

    async def __aenter__(self) -> Self:
        self._stack = AsyncExitStack()
        await self._stack.__aenter__()

        # Events for coordinating worker loop shutdown (cleaned up last)
        self._worker_stopping = asyncio.Event()
        self._stack.callback(lambda: delattr(self, "_worker_stopping"))
        self._worker_done = asyncio.Event()
        self._stack.callback(lambda: delattr(self, "_worker_done"))
        self._worker_done.set()  # Initially done (not running)
        self._cancellation_ready = asyncio.Event()
        self._stack.callback(lambda: delattr(self, "_cancellation_ready"))

        self._execution_counts: dict[str, int] = {}
        self._stack.callback(lambda: delattr(self, "_execution_counts"))
        self._tasks_by_key: dict[TaskKey, asyncio.Task[None]] = {}
        self._stack.callback(lambda: delattr(self, "_tasks_by_key"))

        self._heartbeat_task = asyncio.create_task(
            self._heartbeat(), name=f"{self.docket.name} - heartbeat"
        )
        self._stack.callback(lambda: delattr(self, "_heartbeat_task"))
        self._stack.push_async_callback(
            cancel_task, self._heartbeat_task, CANCEL_MSG_CLEANUP
        )

        # Shared context is set up last, so it's cleaned up first (LIFO)
        self._shared_context = SharedContext(self.docket, self)
        self._stack.callback(lambda: delattr(self, "_shared_context"))
        await self._stack.enter_async_context(self._shared_context)

        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        # Signal worker loop to stop and wait for it to drain
        self._worker_stopping.set()
        await self._worker_done.wait()

        # Stack handles LIFO cleanup: shared_context first, then heartbeat
        try:
            await self._stack.__aexit__(exc_type, exc_value, traceback)
        finally:
            del self._stack

    def labels(self) -> Mapping[str, str]:
        return {
            **self.docket.labels(),
            "docket.worker": self.name,
        }

    def _log_context(self) -> Mapping[str, str]:
        return {
            **self.labels(),
            "docket.queue_key": self.docket.queue_key,
            "docket.stream_key": self.docket.stream_key,
        }

    @classmethod
    async def run(
        cls,
        docket_name: str = "docket",
        url: str = "redis://localhost:6379/0",
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=100),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
        enable_internal_instrumentation: bool = False,
        until_finished: bool = False,
        healthcheck_port: int | None = None,
        metrics_port: int | None = None,
        tasks: list[str] = ["docket.tasks:standard_tasks"],
        fallback_task: str | None = None,
    ) -> None:
        """Run a worker as the main entry point (CLI).

        This method installs signal handlers for graceful shutdown since it
        assumes ownership of the event loop. When embedding Docket in another
        framework (e.g., FastAPI with uvicorn), use Worker.run_forever() or
        Worker.run_until_finished() directly - those methods do not install
        signal handlers and rely on the framework to handle shutdown signals.
        """
        # Parse fallback_task string if provided (module:function format)
        resolved_fallback_task: TaskFunction | None = None
        if fallback_task:
            module_name, _, member_name = fallback_task.rpartition(":")
            module = importlib.import_module(module_name)
            resolved_fallback_task = getattr(module, member_name)

        with (
            healthcheck_server(port=healthcheck_port),
            metrics_server(port=metrics_port),
        ):
            async with Docket(
                name=docket_name,
                url=url,
                enable_internal_instrumentation=enable_internal_instrumentation,
            ) as docket:
                for task_path in tasks:
                    docket.register_collection(task_path)

                async with (
                    Worker(  # pragma: no branch - context manager exit varies across interpreters
                        docket=docket,
                        name=name,
                        concurrency=concurrency,
                        redelivery_timeout=redelivery_timeout,
                        reconnection_delay=reconnection_delay,
                        minimum_check_interval=minimum_check_interval,
                        scheduling_resolution=scheduling_resolution,
                        schedule_automatic_tasks=schedule_automatic_tasks,
                        enable_internal_instrumentation=enable_internal_instrumentation,
                        fallback_task=resolved_fallback_task,
                    ) as worker
                ):
                    # Install signal handlers for graceful shutdown.
                    # This is only appropriate when we own the event loop (CLI entry point).
                    # Embedded usage should let the framework handle signals.
                    loop = asyncio.get_running_loop()
                    run_task: asyncio.Task[None] | None = None

                    def handle_shutdown(sig_name: str) -> None:  # pragma: no cover
                        logger.info(
                            "Received %s, initiating graceful shutdown...", sig_name
                        )
                        if run_task and not run_task.done():
                            run_task.cancel()

                    try:  # pragma: no cover
                        loop.add_signal_handler(
                            signal.SIGTERM, lambda: handle_shutdown("SIGTERM")
                        )
                        loop.add_signal_handler(
                            signal.SIGINT, lambda: handle_shutdown("SIGINT")
                        )
                    except NotImplementedError:  # pragma: no cover
                        pass  # Windows doesn't support loop signal handlers

                    try:
                        if until_finished:
                            run_task = asyncio.create_task(
                                worker.run_until_finished(),
                                name=f"{docket_name} - worker",
                            )
                        else:
                            run_task = asyncio.create_task(
                                worker.run_forever(),
                                name=f"{docket_name} - worker",
                            )  # pragma: no cover
                        await run_task
                    except asyncio.CancelledError:  # pragma: no cover
                        pass
                    finally:
                        try:  # pragma: no cover
                            loop.remove_signal_handler(signal.SIGTERM)
                            loop.remove_signal_handler(signal.SIGINT)
                        except NotImplementedError:  # pragma: no cover
                            pass

    async def run_until_finished(self) -> None:
        """Run the worker until there are no more tasks to process."""
        return await self._run(forever=False)

    async def run_forever(self) -> None:
        """Run the worker indefinitely."""
        return await self._run(forever=True)  # pragma: no cover

    _execution_counts: dict[str, int]

    async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
        """
        Run the worker until there are no more tasks to process, but limit specified
        task keys to a maximum number of iterations.

        This is particularly useful for testing self-perpetuating tasks that would
        otherwise run indefinitely.

        Args:
            iterations_by_key: Maps task keys to their maximum allowed executions
        """
        self._execution_counts = {key: 0 for key in iterations_by_key}

        def has_reached_max_iterations(execution: Execution) -> bool:
            key = execution.key

            if key not in iterations_by_key:
                return False

            if self._execution_counts[key] >= iterations_by_key[key]:
                return True

            return False

        self.docket.strike_list.add_condition(has_reached_max_iterations)
        try:
            await self.run_until_finished()
        finally:
            self.docket.strike_list.remove_condition(has_reached_max_iterations)
            self._execution_counts = {}

    async def _run(self, forever: bool = False) -> None:
        self._startup_log()

        while True:
            try:
                async with self.docket.redis() as redis:
                    return await self._worker_loop(redis, forever=forever)
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.warning(
                    "Error connecting to redis, retrying in %s...",
                    self.reconnection_delay,
                    exc_info=True,
                )
                await asyncio.sleep(self.reconnection_delay.total_seconds())

    async def _worker_loop(self, redis: Redis, forever: bool = False):
        self._worker_stopping.clear()
        self._worker_done.clear()
        self._cancellation_ready.clear()  # Reset for reconnection scenarios

        active_tasks: dict[asyncio.Task[None], RedisMessageID] = {}
        task_executions: dict[asyncio.Task[None], Execution] = {}
        available_slots = self.concurrency
        log_context = self._log_context()

        async def check_for_work() -> bool:
            logger.debug("Checking for work", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xlen(self.docket.stream_key)
                pipeline.zcard(self.docket.queue_key)
                results: list[int] = await pipeline.execute()
                stream_len = results[0]
                queue_len = results[1]
                return stream_len > 0 or queue_len > 0

        async def get_redeliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting redeliveries", extra=log_context)
            try:
                with self._maybe_suppress_instrumentation():
                    _, redeliveries, *_ = await redis.xautoclaim(
                        name=self.docket.stream_key,
                        groupname=self.docket.worker_group_name,
                        consumername=self.name,
                        min_idle_time=int(
                            self.redelivery_timeout.total_seconds() * 1000
                        ),
                        start_id="0-0",
                        count=available_slots,
                    )
            except ResponseError as e:
                if "NOGROUP" in str(e):
                    await self.docket._ensure_stream_and_group()
                    return await get_redeliveries(redis)
                raise  # pragma: no cover
            return [(b"__redelivery__", redeliveries)]

        async def get_new_deliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting new deliveries", extra=log_context)
            # Use non-blocking read with in-memory backend + manual sleep
            # This is necessary because fakeredis's async blocking operations don't
            # properly yield control to the asyncio event loop
            is_memory = self.docket.url.startswith("memory://")
            try:
                with self._maybe_suppress_instrumentation():
                    result = await redis.xreadgroup(
                        groupname=self.docket.worker_group_name,
                        consumername=self.name,
                        streams={self.docket.stream_key: ">"},
                        block=0
                        if is_memory
                        else int(self.minimum_check_interval.total_seconds() * 1000),
                        count=available_slots,
                    )
            except ResponseError as e:
                if "NOGROUP" in str(e):
                    await self.docket._ensure_stream_and_group()
                    return await get_new_deliveries(redis)
                raise  # pragma: no cover
            if is_memory and not result:
                await asyncio.sleep(self.minimum_check_interval.total_seconds())
            return result

        async def start_task(
            message_id: RedisMessageID,
            message: RedisMessage,
            is_redelivery: bool = False,
        ) -> None:
            execution = await Execution.from_message(
                self.docket,
                message,
                redelivered=is_redelivery,
                fallback_task=self.fallback_task,
            )

            task = asyncio.create_task(
                self._execute(execution),
                name=f"{self.docket.name} - task:{execution.key}",
            )
            active_tasks[task] = message_id
            task_executions[task] = execution
            self._tasks_by_key[execution.key] = task

            nonlocal available_slots
            available_slots -= 1

        async def process_completed_tasks() -> None:
            completed_tasks = {task for task in active_tasks if task.done()}
            for task in completed_tasks:
                message_id = active_tasks.pop(task)
                execution = task_executions.pop(task)
                self._tasks_by_key.pop(execution.key, None)
                try:
                    await task
                    await ack_message(redis, message_id)
                except AdmissionBlocked as e:
                    logger.debug(
                        "🔒 Task %s blocked by admission control, rescheduling",
                        e.execution.key,
                        extra=log_context,
                    )
                    e.execution.when = (
                        datetime.now(timezone.utc) + ADMISSION_BLOCKED_RETRY_DELAY
                    )
                    await e.execution.schedule(reschedule_message=message_id)

        async def ack_message(redis: Redis, message_id: RedisMessageID) -> None:
            logger.debug("Acknowledging message", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xack(
                    self.docket.stream_key,
                    self.docket.worker_group_name,
                    message_id,
                )
                pipeline.xdel(
                    self.docket.stream_key,
                    message_id,
                )
                await pipeline.execute()

        try:
            async with TaskGroup() as infra:
                # Start cancellation listener and wait for it to be ready
                infra.create_task(
                    self._cancellation_listener(),
                    name=f"{self.docket.name} - cancellation listener",
                )
                await self._cancellation_ready.wait()

                if self.schedule_automatic_tasks:
                    await self._schedule_all_automatic_perpetual_tasks()

                infra.create_task(
                    self._scheduler_loop(redis),
                    name=f"{self.docket.name} - scheduler",
                )
                infra.create_task(
                    self._renew_leases(redis, active_tasks),
                    name=f"{self.docket.name} - lease renewal",
                )

                has_work: bool = True
                stopping = self._worker_stopping.is_set
                while (forever or has_work or active_tasks) and not stopping():
                    await process_completed_tasks()

                    available_slots = self.concurrency - len(active_tasks)

                    if available_slots <= 0:
                        await asyncio.sleep(self.minimum_check_interval.total_seconds())
                        continue

                    for source in [get_redeliveries, get_new_deliveries]:
                        for stream_key, messages in await source(redis):
                            is_redelivery = stream_key == b"__redelivery__"
                            for message_id, message in messages:
                                if not message:  # pragma: no cover
                                    continue

                                await start_task(message_id, message, is_redelivery)

                        if available_slots <= 0:
                            break

                    if not forever and not active_tasks:
                        has_work = await check_for_work()

                # Signal internal tasks to stop before exiting TaskGroup
                self._worker_stopping.set()

        except asyncio.CancelledError:
            if active_tasks:  # pragma: no cover
                logger.info(
                    "Shutdown requested, finishing %d active tasks...",
                    len(active_tasks),
                    extra=log_context,
                )
        finally:
            # Drain any remaining active tasks
            if active_tasks:
                await asyncio.gather(*active_tasks, return_exceptions=True)
                await process_completed_tasks()

            self._worker_done.set()

    async def _scheduler_loop(self, redis: Redis) -> None:
        """Loop that moves due tasks from the queue to the stream."""

        stream_due_tasks: _stream_due_tasks = cast(
            _stream_due_tasks,
            redis.register_script(
                # Lua script to atomically move scheduled tasks to the stream
                # KEYS[1]: queue key (sorted set)
                # KEYS[2]: stream key
                # ARGV[1]: current timestamp
                # ARGV[2]: docket name prefix
                """
            local total_work = redis.call('ZCARD', KEYS[1])
            local due_work = 0

            if total_work > 0 then
                local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])

                for i, key in ipairs(tasks) do
                    local hash_key = ARGV[2] .. ":" .. key
                    local task_data = redis.call('HGETALL', hash_key)

                    if #task_data > 0 then
                        local task = {}
                        for j = 1, #task_data, 2 do
                            task[task_data[j]] = task_data[j+1]
                        end

                        redis.call('XADD', KEYS[2], '*',
                            'key', task['key'],
                            'when', task['when'],
                            'function', task['function'],
                            'args', task['args'],
                            'kwargs', task['kwargs'],
                            'attempt', task['attempt'],
                            'generation', task['generation'] or '0'
                        )
                        redis.call('DEL', hash_key)

                        -- Set run state to queued
                        local run_key = ARGV[2] .. ":runs:" .. task['key']
                        redis.call('HSET', run_key, 'state', 'queued')

                        -- Publish state change event to pub/sub
                        local channel = ARGV[2] .. ":state:" .. task['key']
                        local payload = '{"type":"state","key":"' .. task['key'] .. '","state":"queued","when":"' .. task['when'] .. '"}'
                        redis.call('PUBLISH', channel, payload)

                        due_work = due_work + 1
                    end
                end
            end

            if due_work > 0 then
                redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
            end

            return {total_work, due_work}
            """
            ),
        )

        log_context = self._log_context()

        while not self._worker_stopping.is_set():  # pragma: no branch
            try:
                logger.debug("Scheduling due tasks", extra=log_context)
                with self._maybe_suppress_instrumentation():
                    total_work, due_work = await stream_due_tasks(
                        keys=[self.docket.queue_key, self.docket.stream_key],
                        args=[
                            datetime.now(timezone.utc).timestamp(),
                            self.docket.prefix,
                        ],
                    )

                if due_work > 0:
                    logger.debug(
                        "Moved %d/%d due tasks from %s to %s",
                        due_work,
                        total_work,
                        self.docket.queue_key,
                        self.docket.stream_key,
                        extra=log_context,
                    )
            except Exception:  # pragma: no cover
                logger.exception(
                    "Error in scheduler loop",
                    exc_info=True,
                    extra=log_context,
                )

            # Use interruptible wait so we respond to stopping quickly
            try:
                await asyncio.wait_for(
                    self._worker_stopping.wait(),
                    timeout=self.scheduling_resolution.total_seconds(),
                )
                return  # Event was set, exit the loop
            except asyncio.TimeoutError:
                pass  # Normal timeout, continue scheduling

    async def _renew_leases(
        self,
        redis: Redis,
        active_messages: dict[asyncio.Task[None], RedisMessageID],
    ) -> None:
        """Periodically renew leases on stream messages.

        Calls XCLAIM with idle=0 to reset the message's idle time, preventing
        XAUTOCLAIM from reclaiming it while we're still processing.
        """
        # Renew leases 4 times per redelivery_timeout period
        renewal_interval = self.redelivery_timeout.total_seconds() / 4

        while not self._worker_stopping.is_set():  # pragma: no branch
            # Use interruptible wait so we respond to stopping quickly
            try:
                await asyncio.wait_for(
                    self._worker_stopping.wait(), timeout=renewal_interval
                )
                # Event was set, exit the loop
                return
            except asyncio.TimeoutError:
                # Normal timeout, continue with lease renewal
                pass

            message_ids = list(active_messages.values())
            if not message_ids:
                continue

            try:
                with self._maybe_suppress_instrumentation():
                    await redis.xclaim(
                        name=self.docket.stream_key,
                        groupname=self.docket.worker_group_name,
                        consumername=self.name,
                        min_idle_time=0,
                        message_ids=message_ids,
                        idle=0,
                    )
            except Exception:
                logger.warning("Failed to renew leases", exc_info=True)

    async def _schedule_all_automatic_perpetual_tasks(self) -> None:
        # Wait for strikes to be fully loaded before scheduling to avoid
        # scheduling struck tasks or missing restored tasks
        await self.docket.wait_for_strikes_loaded()

        async with self.docket.redis() as redis:
            try:
                async with redis.lock(
                    self.docket.key("perpetual:lock"),
                    timeout=AUTOMATIC_PERPETUAL_LOCK_TIMEOUT_SECONDS,
                    blocking=False,
                ):
                    for task_function in self.docket.tasks.values():
                        perpetual = get_single_dependency_parameter_of_type(
                            task_function, Perpetual
                        )

                        if perpetual is not None and perpetual.automatic:
                            key = task_function.__name__
                            await self.docket.add(
                                task_function, when=perpetual.initial_when, key=key
                            )()
            except LockError:  # pragma: no cover
                return

    async def _delete_known_task(self, redis: Redis, execution: Execution) -> None:
        logger.debug("Deleting known task", extra=self._log_context())
        # Delete known/stream_id from runs hash to allow task rescheduling
        runs_key = self.docket.runs_key(execution.key)
        await redis.hdel(runs_key, "known", "stream_id")

        # TODO: Remove in next breaking release (v0.14.0) - legacy key cleanup
        known_task_key = self.docket.known_task_key(execution.key)
        stream_id_key = self.docket.stream_id_key(execution.key)
        await redis.delete(known_task_key, stream_id_key)

    async def _execute(self, execution: Execution) -> None:
        log_context = {**self._log_context(), **execution.specific_labels()}
        counter_labels = {**self.labels(), **execution.general_labels()}

        call = execution.call_repr()

        if self.docket.strike_list.is_stricken(execution):
            async with self.docket.redis() as redis:
                await self._delete_known_task(redis, execution)

            await execution.mark_as_cancelled()
            logger.warning("🗙 %s", call, extra=log_context)
            TASKS_STRICKEN.add(1, counter_labels | {"docket.where": "worker"})
            return

        # Atomically check supersession and claim task in a single round-trip
        if not await execution.claim(self.name):
            logger.info("↬ %s (superseded)", call, extra=log_context)
            TASKS_SUPERSEDED.add(1, counter_labels | {"docket.where": "worker"})
            return

        if execution.key in self._execution_counts:
            self._execution_counts[execution.key] += 1

        start = time.time()
        punctuality = start - execution.when.timestamp()
        log_context = {**log_context, "punctuality": punctuality}
        duration = 0.0

        TASKS_STARTED.add(1, counter_labels)
        if execution.redelivered:
            TASKS_REDELIVERED.add(1, counter_labels)
        TASKS_RUNNING.add(1, counter_labels)
        TASK_PUNCTUALITY.record(punctuality, counter_labels)

        arrow = "↬" if execution.attempt > 1 else "↪"
        logger.info(
            "%s [%s] %s", arrow, format_duration(punctuality), call, extra=log_context
        )

        dependencies: dict[str, Dependency] = {}

        with tracer.start_as_current_span(
            execution.function_name,
            kind=trace.SpanKind.CONSUMER,
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function_name,
            },
            links=execution.incoming_span_links(),
        ) as span:
            try:
                async with resolved_dependencies(self, execution) as dependencies:
                    dependency_failures = {
                        k: v
                        for k, v in dependencies.items()
                        if isinstance(v, FailedDependency)
                    }

                    # Check for AdmissionBlocked - re-raise directly (not wrapped in ExceptionGroup)
                    # This happens when ConcurrencyLimit couldn't acquire a slot
                    for failure in dependency_failures.values():
                        if isinstance(failure.error, AdmissionBlocked):
                            raise failure.error

                    if dependency_failures:
                        raise ExceptionGroup(
                            (
                                "Failed to resolve dependencies for parameter(s): "
                                + ", ".join(dependency_failures.keys())
                            ),
                            [
                                dependency.error
                                for dependency in dependency_failures.values()
                            ],
                        )

                    # Merge resolved dependencies into execution kwargs
                    final_kwargs = {**execution.kwargs, **dependencies}

                    # Check for a Runtime dependency (e.g., Timeout) that controls execution
                    runtime = get_single_dependency_of_type(dependencies, Runtime)
                    if runtime:
                        result = await runtime.run(
                            execution,
                            execution.function,
                            execution.args,
                            final_kwargs,
                        )
                    else:
                        result = await execution.function(
                            *execution.args, **final_kwargs
                        )

                    duration = log_context["duration"] = time.time() - start
                    TASKS_SUCCEEDED.add(1, counter_labels)

                    span.set_status(Status(StatusCode.OK))

                    # Check for completion handler (e.g., Perpetual)
                    completion_handler = get_single_dependency_of_type(
                        dependencies, CompletionHandler
                    )
                    outcome = TaskOutcome(
                        duration=timedelta(seconds=duration),
                        result=result,
                    )
                    if completion_handler and await completion_handler.on_complete(
                        execution, outcome
                    ):
                        # Handler took responsibility (rescheduled, logged, recorded metrics)
                        await execution.mark_as_completed(result_key=None)
                    else:
                        # No handler or handler didn't handle - normal completion
                        result_key = None
                        if result is not None and self.docket.execution_ttl:
                            # Serialize and store result
                            pickled_result = cloudpickle.dumps(result)  # type: ignore[arg-type]
                            # Base64-encode for JSON serialization
                            encoded_result = base64.b64encode(pickled_result).decode(
                                "ascii"
                            )
                            result_key = execution.key
                            ttl_seconds = int(self.docket.execution_ttl.total_seconds())
                            await self.docket.result_storage.put(
                                result_key, {"data": encoded_result}, ttl=ttl_seconds
                            )
                        await execution.mark_as_completed(result_key=result_key)
                        logger.info(
                            "↩ [%s] %s",
                            format_duration(duration),
                            call,
                            extra=log_context,
                        )
            except AdmissionBlocked:
                # Re-raise to be handled by process_completed_tasks
                raise
            except asyncio.CancelledError:
                # Task was cancelled externally via docket.cancel()
                duration = log_context["duration"] = time.time() - start
                span.set_status(Status(StatusCode.OK))
                await execution.mark_as_cancelled()
                logger.info(
                    "✗ [%s] %s (cancelled)",
                    format_duration(duration),
                    call,
                    extra=log_context,
                )
            except Exception as e:
                duration = log_context["duration"] = time.time() - start
                TASKS_FAILED.add(1, counter_labels)

                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))

                outcome = TaskOutcome(
                    duration=timedelta(seconds=duration),
                    exception=e,
                )

                # Check for failure handler (e.g., Retry)
                failure_handler = get_single_dependency_of_type(
                    dependencies, FailureHandler
                )
                if failure_handler and await failure_handler.handle_failure(
                    execution, outcome
                ):
                    # Handler took responsibility (scheduled retry, logged, recorded metrics)
                    # Don't mark as failed - task is being retried
                    pass
                else:
                    # Not retried - check for completion handler (e.g., Perpetual)
                    completion_handler = get_single_dependency_of_type(
                        dependencies, CompletionHandler
                    )
                    if completion_handler and await completion_handler.on_complete(
                        execution, outcome
                    ):
                        # Handler took responsibility (rescheduled, logged, recorded metrics)
                        pass
                    else:
                        # No handler took responsibility - log normally
                        logger.exception(
                            "↩ [%s] %s",
                            format_duration(duration),
                            call,
                            extra=log_context,
                        )

                    # Store exception in result_storage (only when not retrying)
                    result_key = None
                    if self.docket.execution_ttl:
                        pickled_exception = cloudpickle.dumps(e)  # type: ignore[arg-type]
                        # Base64-encode for JSON serialization
                        encoded_exception = base64.b64encode(pickled_exception).decode(
                            "ascii"
                        )
                        result_key = execution.key
                        ttl_seconds = int(self.docket.execution_ttl.total_seconds())
                        await self.docket.result_storage.put(
                            result_key, {"data": encoded_exception}, ttl=ttl_seconds
                        )

                    # Mark execution as failed with error message
                    error_msg = f"{type(e).__name__}: {str(e)}"
                    await execution.mark_as_failed(error_msg, result_key=result_key)
            finally:
                TASKS_RUNNING.add(-1, counter_labels)
                TASKS_COMPLETED.add(1, counter_labels)
                TASK_DURATION.record(duration, counter_labels)

    def _startup_log(self) -> None:
        logger.info("Starting worker %r with the following tasks:", self.name)
        for task_name, task in self.docket.tasks.items():
            logger.info("* %s(%s)", task_name, compact_signature(get_signature(task)))

    @property
    def workers_set(self) -> str:
        return self.docket.workers_set

    def worker_tasks_set(self, worker_name: str) -> str:
        return self.docket.worker_tasks_set(worker_name)

    def task_workers_set(self, task_name: str) -> str:
        return self.docket.task_workers_set(task_name)

    async def _heartbeat(self) -> None:
        while True:
            try:
                now = datetime.now(timezone.utc).timestamp()
                maximum_age = (
                    self.docket.heartbeat_interval * self.docket.missed_heartbeats
                )
                oldest = now - maximum_age.total_seconds()

                task_names = list(self.docket.tasks)

                async with self.docket.redis() as r:
                    with self._maybe_suppress_instrumentation():
                        async with r.pipeline() as pipeline:
                            pipeline.zremrangebyscore(self.workers_set, 0, oldest)
                            pipeline.zadd(self.workers_set, {self.name: now})

                            for task_name in task_names:
                                task_workers_set = self.task_workers_set(task_name)
                                pipeline.zremrangebyscore(task_workers_set, 0, oldest)
                                pipeline.zadd(task_workers_set, {self.name: now})

                            pipeline.sadd(self.worker_tasks_set(self.name), *task_names)
                            pipeline.expire(
                                self.worker_tasks_set(self.name),
                                max(
                                    maximum_age, timedelta(seconds=MINIMUM_TTL_SECONDS)
                                ),
                            )

                            await pipeline.execute()

                        async with r.pipeline() as pipeline:
                            pipeline.xlen(self.docket.stream_key)
                            pipeline.zcount(self.docket.queue_key, 0, now)
                            pipeline.zcount(self.docket.queue_key, now, "+inf")

                            results: list[int] = await pipeline.execute()

                    stream_depth = results[0]
                    overdue_depth = results[1]
                    schedule_depth = results[2]

                    QUEUE_DEPTH.set(stream_depth + overdue_depth, self.docket.labels())
                    SCHEDULE_DEPTH.set(schedule_depth, self.docket.labels())

            except asyncio.CancelledError:  # pragma: no cover
                return
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )
            except Exception:  # pragma: no cover
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )

            await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())

    async def _cancellation_listener(self) -> None:
        """Listen for cancellation signals and cancel matching tasks."""
        cancel_pattern = self.docket.key("cancel:*")
        log_context = self._log_context()

        while not self._worker_stopping.is_set():
            try:
                async with self.docket._pubsub() as pubsub:
                    await pubsub.psubscribe(cancel_pattern)
                    self._cancellation_ready.set()
                    # Poll for messages, checking _worker_stopping periodically
                    is_memory = self.docket.url.startswith("memory://")
                    while not self._worker_stopping.is_set():
                        message = await pubsub.get_message(
                            ignore_subscribe_messages=True, timeout=0.1
                        )
                        if message is not None and message["type"] == "pmessage":
                            await self._handle_cancellation(message)
                        elif is_memory:  # pragma: no cover
                            await asyncio.sleep(0.1)
            except ConnectionError:
                if self._worker_stopping.is_set():
                    return  # pragma: no cover
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.warning(
                    "Redis connection error in cancellation listener, reconnecting...",
                    extra=log_context,
                )
                await asyncio.sleep(1)
            except Exception:
                if self._worker_stopping.is_set():
                    return  # pragma: no cover
                logger.exception(
                    "Error in cancellation listener",
                    exc_info=True,
                    extra=log_context,
                )
                await asyncio.sleep(1)

    async def _handle_cancellation(self, message: PubSubMessage) -> None:
        """Handle a cancellation message by cancelling the matching task."""
        data = message["data"]
        key: TaskKey = data.decode() if isinstance(data, bytes) else data

        if task := self._tasks_by_key.get(key):  # pragma: no branch
            logger.info(
                "Cancelling running task %r",
                key,
                extra=self._log_context(),
            )
            task.cancel()

run(docket_name='docket', url='redis://localhost:6379/0', name=None, concurrency=10, redelivery_timeout=timedelta(minutes=5), reconnection_delay=timedelta(seconds=5), minimum_check_interval=timedelta(milliseconds=100), scheduling_resolution=timedelta(milliseconds=250), schedule_automatic_tasks=True, enable_internal_instrumentation=False, until_finished=False, healthcheck_port=None, metrics_port=None, tasks=['docket.tasks:standard_tasks'], fallback_task=None) async classmethod

Run a worker as the main entry point (CLI).

This method installs signal handlers for graceful shutdown since it assumes ownership of the event loop. When embedding Docket in another framework (e.g., FastAPI with uvicorn), use Worker.run_forever() or Worker.run_until_finished() directly - those methods do not install signal handlers and rely on the framework to handle shutdown signals.

Source code in src/docket/worker.py
@classmethod
async def run(
    cls,
    docket_name: str = "docket",
    url: str = "redis://localhost:6379/0",
    name: str | None = None,
    concurrency: int = 10,
    redelivery_timeout: timedelta = timedelta(minutes=5),
    reconnection_delay: timedelta = timedelta(seconds=5),
    minimum_check_interval: timedelta = timedelta(milliseconds=100),
    scheduling_resolution: timedelta = timedelta(milliseconds=250),
    schedule_automatic_tasks: bool = True,
    enable_internal_instrumentation: bool = False,
    until_finished: bool = False,
    healthcheck_port: int | None = None,
    metrics_port: int | None = None,
    tasks: list[str] = ["docket.tasks:standard_tasks"],
    fallback_task: str | None = None,
) -> None:
    """Run a worker as the main entry point (CLI).

    This method installs signal handlers for graceful shutdown since it
    assumes ownership of the event loop. When embedding Docket in another
    framework (e.g., FastAPI with uvicorn), use Worker.run_forever() or
    Worker.run_until_finished() directly - those methods do not install
    signal handlers and rely on the framework to handle shutdown signals.
    """
    # Parse fallback_task string if provided (module:function format)
    resolved_fallback_task: TaskFunction | None = None
    if fallback_task:
        module_name, _, member_name = fallback_task.rpartition(":")
        module = importlib.import_module(module_name)
        resolved_fallback_task = getattr(module, member_name)

    with (
        healthcheck_server(port=healthcheck_port),
        metrics_server(port=metrics_port),
    ):
        async with Docket(
            name=docket_name,
            url=url,
            enable_internal_instrumentation=enable_internal_instrumentation,
        ) as docket:
            for task_path in tasks:
                docket.register_collection(task_path)

            async with (
                Worker(  # pragma: no branch - context manager exit varies across interpreters
                    docket=docket,
                    name=name,
                    concurrency=concurrency,
                    redelivery_timeout=redelivery_timeout,
                    reconnection_delay=reconnection_delay,
                    minimum_check_interval=minimum_check_interval,
                    scheduling_resolution=scheduling_resolution,
                    schedule_automatic_tasks=schedule_automatic_tasks,
                    enable_internal_instrumentation=enable_internal_instrumentation,
                    fallback_task=resolved_fallback_task,
                ) as worker
            ):
                # Install signal handlers for graceful shutdown.
                # This is only appropriate when we own the event loop (CLI entry point).
                # Embedded usage should let the framework handle signals.
                loop = asyncio.get_running_loop()
                run_task: asyncio.Task[None] | None = None

                def handle_shutdown(sig_name: str) -> None:  # pragma: no cover
                    logger.info(
                        "Received %s, initiating graceful shutdown...", sig_name
                    )
                    if run_task and not run_task.done():
                        run_task.cancel()

                try:  # pragma: no cover
                    loop.add_signal_handler(
                        signal.SIGTERM, lambda: handle_shutdown("SIGTERM")
                    )
                    loop.add_signal_handler(
                        signal.SIGINT, lambda: handle_shutdown("SIGINT")
                    )
                except NotImplementedError:  # pragma: no cover
                    pass  # Windows doesn't support loop signal handlers

                try:
                    if until_finished:
                        run_task = asyncio.create_task(
                            worker.run_until_finished(),
                            name=f"{docket_name} - worker",
                        )
                    else:
                        run_task = asyncio.create_task(
                            worker.run_forever(),
                            name=f"{docket_name} - worker",
                        )  # pragma: no cover
                    await run_task
                except asyncio.CancelledError:  # pragma: no cover
                    pass
                finally:
                    try:  # pragma: no cover
                        loop.remove_signal_handler(signal.SIGTERM)
                        loop.remove_signal_handler(signal.SIGINT)
                    except NotImplementedError:  # pragma: no cover
                        pass

run_at_most(iterations_by_key) async

Run the worker until there are no more tasks to process, but limit specified task keys to a maximum number of iterations.

This is particularly useful for testing self-perpetuating tasks that would otherwise run indefinitely.

Parameters:

Name Type Description Default
iterations_by_key Mapping[str, int]

Maps task keys to their maximum allowed executions

required
Source code in src/docket/worker.py
async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
    """
    Run the worker until there are no more tasks to process, but limit specified
    task keys to a maximum number of iterations.

    This is particularly useful for testing self-perpetuating tasks that would
    otherwise run indefinitely.

    Args:
        iterations_by_key: Maps task keys to their maximum allowed executions
    """
    self._execution_counts = {key: 0 for key in iterations_by_key}

    def has_reached_max_iterations(execution: Execution) -> bool:
        key = execution.key

        if key not in iterations_by_key:
            return False

        if self._execution_counts[key] >= iterations_by_key[key]:
            return True

        return False

    self.docket.strike_list.add_condition(has_reached_max_iterations)
    try:
        await self.run_until_finished()
    finally:
        self.docket.strike_list.remove_condition(has_reached_max_iterations)
        self._execution_counts = {}

run_forever() async

Run the worker indefinitely.

Source code in src/docket/worker.py
async def run_forever(self) -> None:
    """Run the worker indefinitely."""
    return await self._run(forever=True)  # pragma: no cover

run_until_finished() async

Run the worker until there are no more tasks to process.

Source code in src/docket/worker.py
async def run_until_finished(self) -> None:
    """Run the worker until there are no more tasks to process."""
    return await self._run(forever=False)

CurrentDocket()

A dependency to access the current Docket.

Example:

@task
async def my_task(docket: Docket = CurrentDocket()) -> None:
    assert isinstance(docket, Docket)
Source code in src/docket/dependencies/_contextual.py
def CurrentDocket() -> Docket:
    """A dependency to access the current Docket.

    Example:

    ```python
    @task
    async def my_task(docket: Docket = CurrentDocket()) -> None:
        assert isinstance(docket, Docket)
    ```
    """
    return cast("Docket", _CurrentDocket())

CurrentExecution()

A dependency to access the current Execution.

Example:

@task
async def my_task(execution: Execution = CurrentExecution()) -> None:
    assert isinstance(execution, Execution)
Source code in src/docket/dependencies/_contextual.py
def CurrentExecution() -> Execution:
    """A dependency to access the current Execution.

    Example:

    ```python
    @task
    async def my_task(execution: Execution = CurrentExecution()) -> None:
        assert isinstance(execution, Execution)
    ```
    """
    return cast("Execution", _CurrentExecution())

CurrentWorker()

A dependency to access the current Worker.

Example:

@task
async def my_task(worker: Worker = CurrentWorker()) -> None:
    assert isinstance(worker, Worker)
Source code in src/docket/dependencies/_contextual.py
def CurrentWorker() -> Worker:
    """A dependency to access the current Worker.

    Example:

    ```python
    @task
    async def my_task(worker: Worker = CurrentWorker()) -> None:
        assert isinstance(worker, Worker)
    ```
    """
    return cast("Worker", _CurrentWorker())

Depends(dependency)

Include a user-defined function as a dependency. Dependencies may be: - Synchronous functions returning a value - Asynchronous functions returning a value (awaitable) - Synchronous context managers (using @contextmanager) - Asynchronous context managers (using @asynccontextmanager)

If a dependency returns a context manager, it will be entered and exited around the task, giving an opportunity to control the lifetime of a resource.

Important: Synchronous dependencies should NOT include blocking I/O operations (file access, network calls, database queries, etc.). Use async dependencies for any I/O. Sync dependencies are best for: - Pure computations - In-memory data structure access - Configuration lookups from memory - Non-blocking transformations

Examples:

# Sync dependency - pure computation, no I/O
def get_config() -> dict:
    # Access in-memory config, no I/O
    return {"api_url": "https://api.example.com", "timeout": 30}

# Sync dependency - compute value from arguments
def build_query_params(
    user_id: int = TaskArgument(),
    config: dict = Depends(get_config)
) -> dict:
    # Pure computation, no I/O
    return {"user_id": user_id, "timeout": config["timeout"]}

# Async dependency - I/O operations
async def get_user(user_id: int = TaskArgument()) -> User:
    # Network I/O - must be async
    return await fetch_user_from_api(user_id)

# Async context manager - I/O resource management
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_connection():
    # I/O operations - must be async
    conn = await db.connect()
    try:
        yield conn
    finally:
        await conn.close()

@task
async def my_task(
    params: dict = Depends(build_query_params),
    user: User = Depends(get_user),
    db: Connection = Depends(get_db_connection),
) -> None:
    await db.execute("UPDATE users SET ...", params)
Source code in src/docket/dependencies/_functional.py
def Depends(dependency: DependencyFunction[R]) -> R:
    """Include a user-defined function as a dependency.  Dependencies may be:
    - Synchronous functions returning a value
    - Asynchronous functions returning a value (awaitable)
    - Synchronous context managers (using @contextmanager)
    - Asynchronous context managers (using @asynccontextmanager)

    If a dependency returns a context manager, it will be entered and exited around
    the task, giving an opportunity to control the lifetime of a resource.

    **Important**: Synchronous dependencies should NOT include blocking I/O operations
    (file access, network calls, database queries, etc.). Use async dependencies for
    any I/O. Sync dependencies are best for:
    - Pure computations
    - In-memory data structure access
    - Configuration lookups from memory
    - Non-blocking transformations

    Examples:

    ```python
    # Sync dependency - pure computation, no I/O
    def get_config() -> dict:
        # Access in-memory config, no I/O
        return {"api_url": "https://api.example.com", "timeout": 30}

    # Sync dependency - compute value from arguments
    def build_query_params(
        user_id: int = TaskArgument(),
        config: dict = Depends(get_config)
    ) -> dict:
        # Pure computation, no I/O
        return {"user_id": user_id, "timeout": config["timeout"]}

    # Async dependency - I/O operations
    async def get_user(user_id: int = TaskArgument()) -> User:
        # Network I/O - must be async
        return await fetch_user_from_api(user_id)

    # Async context manager - I/O resource management
    from contextlib import asynccontextmanager

    @asynccontextmanager
    async def get_db_connection():
        # I/O operations - must be async
        conn = await db.connect()
        try:
            yield conn
        finally:
            await conn.close()

    @task
    async def my_task(
        params: dict = Depends(build_query_params),
        user: User = Depends(get_user),
        db: Connection = Depends(get_db_connection),
    ) -> None:
        await db.execute("UPDATE users SET ...", params)
    ```
    """
    return cast(R, _Depends(dependency))

Shared(factory)

Declare a worker-scoped dependency shared across all tasks.

The factory initializes once when first needed and the returned/yielded value is shared by all tasks for the lifetime of the worker. Factories may be: - Synchronous functions returning a value - Asynchronous functions returning a value (awaitable) - Synchronous context managers (using @contextmanager) - Asynchronous context managers (using @asynccontextmanager)

Context managers are useful when cleanup is needed at worker shutdown.

Identity is the factory function - multiple Shared(same_factory) calls anywhere in the codebase resolve to the same cached value.

Example with async context manager (for resources needing cleanup):

from contextlib import asynccontextmanager

@asynccontextmanager
async def create_db_pool():
    pool = await AsyncConnectionPool.create(conninfo="...")
    try:
        yield pool
    finally:
        await pool.close()

@task
async def my_task(pool: Pool = Shared(create_db_pool)):
    async with pool.connection() as conn:
        await conn.execute("SELECT ...")

Example with async function (for simple shared values):

async def load_config() -> Config:
    return await fetch_config_from_remote()

@task
async def my_task(config: Config = Shared(load_config)):
    # Same config instance across all tasks
    print(config.api_url)

Shared dependencies can depend on other Shared dependencies, Depends, and contextual dependencies like CurrentDocket and CurrentWorker:

@asynccontextmanager
async def create_pool(
    docket: Docket = CurrentDocket(),
    url: str = Depends(get_connection_string),
):
    logger.info(f"Creating pool for {docket.name}")
    pool = await create_pool(url)
    yield pool
    await pool.close()
Source code in src/docket/dependencies/_functional.py
def Shared(factory: DependencyFunction[R]) -> R:
    """Declare a worker-scoped dependency shared across all tasks.

    The factory initializes once when first needed and the returned/yielded value is
    shared by all tasks for the lifetime of the worker. Factories may be:
    - Synchronous functions returning a value
    - Asynchronous functions returning a value (awaitable)
    - Synchronous context managers (using @contextmanager)
    - Asynchronous context managers (using @asynccontextmanager)

    Context managers are useful when cleanup is needed at worker shutdown.

    Identity is the factory function - multiple Shared(same_factory) calls anywhere
    in the codebase resolve to the same cached value.

    Example with async context manager (for resources needing cleanup):

    ```python
    from contextlib import asynccontextmanager

    @asynccontextmanager
    async def create_db_pool():
        pool = await AsyncConnectionPool.create(conninfo="...")
        try:
            yield pool
        finally:
            await pool.close()

    @task
    async def my_task(pool: Pool = Shared(create_db_pool)):
        async with pool.connection() as conn:
            await conn.execute("SELECT ...")
    ```

    Example with async function (for simple shared values):

    ```python
    async def load_config() -> Config:
        return await fetch_config_from_remote()

    @task
    async def my_task(config: Config = Shared(load_config)):
        # Same config instance across all tasks
        print(config.api_url)
    ```

    Shared dependencies can depend on other Shared dependencies, Depends, and
    contextual dependencies like CurrentDocket and CurrentWorker:

    ```python
    @asynccontextmanager
    async def create_pool(
        docket: Docket = CurrentDocket(),
        url: str = Depends(get_connection_string),
    ):
        logger.info(f"Creating pool for {docket.name}")
        pool = await create_pool(url)
        yield pool
        await pool.close()
    ```
    """
    return cast(R, _Shared(factory))

TaskArgument(parameter=None, optional=False)

A dependency to access a argument of the currently executing task. This is often useful in dependency functions so they can access the arguments of the task they are injected into.

Example:

async def customer_name(customer_id: int = TaskArgument()) -> str:
    ...look up the customer's name by ID...
    return "John Doe"

@task
async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
    print(f"Hello, {name}!")
Source code in src/docket/dependencies/_contextual.py
def TaskArgument(parameter: str | None = None, optional: bool = False) -> Any:
    """A dependency to access a argument of the currently executing task.  This is
    often useful in dependency functions so they can access the arguments of the
    task they are injected into.

    Example:

    ```python
    async def customer_name(customer_id: int = TaskArgument()) -> str:
        ...look up the customer's name by ID...
        return "John Doe"

    @task
    async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
        print(f"Hello, {name}!")
    ```
    """
    return cast(Any, _TaskArgument(parameter, optional))

TaskKey()

A dependency to access the key of the currently executing task.

Example:

@task
async def my_task(key: str = TaskKey()) -> None:
    assert isinstance(key, str)
Source code in src/docket/dependencies/_contextual.py
def TaskKey() -> str:
    """A dependency to access the key of the currently executing task.

    Example:

    ```python
    @task
    async def my_task(key: str = TaskKey()) -> None:
        assert isinstance(key, str)
    ```
    """
    return cast(str, _TaskKey())

TaskLogger()

A dependency to access a logger for the currently executing task. The logger will automatically inject contextual information such as the worker and docket name, the task key, and the current execution attempt number.

Example:

@task
async def my_task(logger: "LoggerAdapter[Logger]" = TaskLogger()) -> None:
    logger.info("Hello, world!")
Source code in src/docket/dependencies/_contextual.py
def TaskLogger() -> logging.LoggerAdapter[logging.Logger]:
    """A dependency to access a logger for the currently executing task.  The logger
    will automatically inject contextual information such as the worker and docket
    name, the task key, and the current execution attempt number.

    Example:

    ```python
    @task
    async def my_task(logger: "LoggerAdapter[Logger]" = TaskLogger()) -> None:
        logger.info("Hello, world!")
    ```
    """
    return cast("logging.LoggerAdapter[logging.Logger]", _TaskLogger())