Docket in Production
This page covers configuring workers, connecting to Redis, managing state and results, monitoring, and operational tools for running Docket in production.
Worker Configuration
Core Settings
Workers have several configuration knobs for different environments:
async with Worker(
docket,
name="worker-1", # Unique worker identifier
concurrency=20, # Parallel task limit
redelivery_timeout=timedelta(minutes=5), # When to redeliver tasks
reconnection_delay=timedelta(seconds=5), # Redis reconnection backoff
minimum_check_interval=timedelta(milliseconds=100), # Polling frequency
scheduling_resolution=timedelta(milliseconds=250), # Future task check frequency
schedule_automatic_tasks=True # Enable perpetual task startup
) as worker:
await worker.run_forever()
Environment Variable Configuration
All settings can be configured via environment variables:
# Core docket settings
export DOCKET_NAME=orders
export DOCKET_URL=redis://redis.production.com:6379/0
# Worker settings
export DOCKET_WORKER_NAME=orders-worker-1
export DOCKET_WORKER_CONCURRENCY=50
export DOCKET_WORKER_REDELIVERY_TIMEOUT=10m
export DOCKET_WORKER_RECONNECTION_DELAY=5s
export DOCKET_WORKER_MINIMUM_CHECK_INTERVAL=100ms
export DOCKET_WORKER_SCHEDULING_RESOLUTION=250ms
# Monitoring
export DOCKET_WORKER_HEALTHCHECK_PORT=8080
export DOCKET_WORKER_METRICS_PORT=9090
# Logging
export DOCKET_LOGGING_LEVEL=INFO
export DOCKET_LOGGING_FORMAT=json
# Task modules
export DOCKET_TASKS=myapp.tasks:production_tasks
CLI Usage
Run workers in production using the CLI:
# Basic worker
docket worker --tasks myapp.tasks:all_tasks
# Production worker with full configuration
docket worker \
--docket orders \
--url redis://redis.prod.com:6379/0 \
--name orders-worker-1 \
--concurrency 50 \
--redelivery-timeout 10m \
--healthcheck-port 8080 \
--metrics-port 9090 \
--logging-format json \
--tasks myapp.tasks:production_tasks
Signal Handling
Workers catch SIGTERM and SIGINT and shut down gracefully — they stop accepting new tasks and wait for in-flight tasks to finish before exiting. On container orchestrators like Kubernetes, set terminationGracePeriodSeconds to be longer than your slowest expected task so the worker has time to drain. Tasks that don't finish before the grace period expires will be redelivered to other workers based on redelivery_timeout.
Connection Management
Redis Connection Pools
Docket automatically manages Redis connection pools. To use a custom pool:
from redis.asyncio import ConnectionPool
pool = ConnectionPool.from_url(
"redis://redis.prod.com:6379/0",
max_connections=50, # Match or exceed worker concurrency
retry_on_timeout=True
)
async with Docket(name="orders", connection_pool=pool) as docket:
pass
Redis Cluster Support
Docket supports Redis Cluster using the redis+cluster:// URL scheme:
# Connect to Redis Cluster
async with Docket(
name="orders",
url="redis+cluster://cluster-node-1:6379/0"
) as docket:
pass
# With authentication
async with Docket(
name="orders",
url="redis+cluster://user:password@cluster-node-1:6379/0"
) as docket:
pass
# TLS connection to cluster
async with Docket(
name="orders",
url="rediss+cluster://cluster-node-1:6379/0"
) as docket:
pass
When using cluster mode, Docket automatically:
- Uses hash-tagged keys (
{docket_name}:*) to ensure all keys hash to the same slot - Manages cluster client lifecycle and connection distribution
- Handles pub/sub through a dedicated node connection (cluster pub/sub limitation)
Note: All Docket data for a single docket name will be stored on the same cluster shard. This ensures atomicity for Lua scripts and simplifies data management, but means individual dockets don't benefit from cluster data distribution.
Authentication
Docket supports Redis authentication via URL credentials:
# Password-only authentication (Redis default user)
docket_url = "redis://:password@redis.prod.com:6379/0"
# Username and password authentication (Redis 6.0+ ACL)
docket_url = "redis://myuser:mypassword@redis.prod.com:6379/0"
ACL Configuration
When using Redis ACLs with a restricted user, grant access to the key pattern matching your docket name.
Standalone Redis: Keys follow the pattern {docket_name}:*
# Create a user with restricted permissions for a docket named "orders"
ACL SETUSER docket-user on >secure-password ~orders:* &orders:* +@all
Redis Cluster: Keys are hash-tagged with curly braces {docket_name}:*
# For cluster mode, the pattern includes the hash tag braces
ACL SETUSER docket-user on >secure-password ~{orders}:* &{orders}:* +@all
The required permissions are:
- Key pattern:
~{docket_name}:*(standalone) or~\{docket_name\}:*(cluster) - matches all Redis keys used by Docket - Channel pattern:
&{docket_name}:*(standalone) or&\{docket_name\}:*(cluster) - required for task cancellation pub/sub - Commands:
+@allor the specific command categories Docket uses
For production deployments, you may restrict to only the required command categories:
# More restrictive command permissions (standalone)
ACL SETUSER docket-user on >secure-password \
~orders:* &orders:* \
+@read +@write +@set +@sortedset +@hash +@stream +@pubsub +@scripting +@connection
# More restrictive command permissions (cluster)
ACL SETUSER docket-user on >secure-password \
~{orders}:* &{orders}:* \
+@read +@write +@set +@sortedset +@hash +@stream +@pubsub +@scripting +@connection
Valkey Support
Docket also works with Valkey (Redis fork):
State and Result Storage
Docket tracks execution state and stores task results by default. You can configure or disable this behavior based on your observability and throughput requirements.
Execution State Tracking
By default, Docket stores execution state (SCHEDULED, QUEUED, RUNNING, COMPLETED, FAILED) in Redis with a 15-minute TTL:
async with Docket(
name="orders",
url="redis://localhost:6379/0",
execution_ttl=timedelta(minutes=15) # Default
) as docket:
# State records expire 15 minutes after task completion
pass
The execution_ttl controls:
- How long state records persist in Redis after task completion
- How long result data is retained (see Result Storage below)
- How long progress information remains available
Fire-and-Forget Mode
For maximum throughput in high-volume scenarios, disable state tracking entirely:
async with Docket(
name="high-throughput",
url="redis://localhost:6379/0",
execution_ttl=timedelta(0) # Disable state persistence
) as docket:
# No state records, no result storage, no progress tracking
for event in events:
await docket.add(process_event)(event)
With execution_ttl=0:
- No state records: State transitions are not written to Redis
- No result storage: Task return values are not persisted
- No progress tracking: Progress updates are not recorded
- Maximum throughput: Minimizes Redis operations per task
- get_result() unavailable: Cannot retrieve task results
Result Storage Configuration
Task results are stored using the py-key-value-aio library. By default, Docket uses RedisStore but you can provide a custom storage backend:
from key_value.stores.redis import RedisStore
from key_value.stores.memory import MemoryStore
# Default: Redis-backed result storage
async with Docket(
name="production",
url="redis://localhost:6379/0",
result_storage=RedisStore(url="redis://localhost:6379/1") # Separate DB
) as docket:
pass
# Alternative: In-memory storage for testing
async with Docket(
name="test",
url="redis://localhost:6379/0",
result_storage=MemoryStore()
) as docket:
pass
Custom storage backends must implement the KeyValueStore protocol from py-key-value-aio.
Result Storage Tips
Separate Redis Database: Store results in a different Redis database than task queues:
result_storage = RedisStore(url="redis://localhost:6379/1") # DB 1 for results
docket = Docket(
name="orders",
url="redis://localhost:6379/0", # DB 0 for queues
result_storage=result_storage
)
TTL Management: Results inherit the execution_ttl setting. Adjust based on how long you need results:
# Keep results for 1 hour
async with Docket(execution_ttl=timedelta(hours=1)) as docket:
execution = await docket.add(generate_report)()
# Result available for 1 hour after completion
None Returns: Tasks that return None don't write to result storage, saving space:
async def send_notification(user_id: str) -> None:
await send_email(user_id)
# No result stored - task returns None
Large Results: For large result data, consider storing references instead:
# Instead of returning large data
async def process_large_dataset(dataset_id: str) -> dict:
data = await expensive_computation()
return data # Large object stored in Redis
# Store a reference
async def process_large_dataset(dataset_id: str) -> str:
data = await expensive_computation()
s3_key = await store_in_s3(data)
return s3_key # Small string stored in Redis
Monitoring State and Result Storage
Track Redis memory usage for state and result data:
# Check memory usage
redis-cli info memory
# Count state records
redis-cli --scan --pattern "docket:runs:*" | wc -l
# Count result keys
redis-cli --scan --pattern "docket:results:*" | wc -l
If memory usage is high:
- Reduce
execution_ttlto expire data sooner - Use
execution_ttl=0for fire-and-forget tasks - Store large results externally (S3, database) and return references
- Separate result storage to a different Redis instance
Monitoring and Observability
Prometheus Metrics
Enable Prometheus metrics with the --metrics-port option:
Available metrics include:
Task Counters
docket_tasks_added- Tasks scheduleddocket_tasks_started- Tasks begun executiondocket_tasks_succeeded- Successfully completed tasksdocket_tasks_failed- Failed tasksdocket_tasks_retried- Retry attemptsdocket_tasks_stricken- Tasks blocked by strikes
Task Timing
docket_task_duration- Histogram of task execution timesdocket_task_punctuality- How close tasks run to their scheduled time
System Health
docket_queue_depth- Tasks ready for immediate executiondocket_schedule_depth- Tasks scheduled for future executiondocket_tasks_running- Currently executing tasksdocket_redis_disruptions- Redis connection failuresdocket_strikes_in_effect- Active strike rules
All metrics include labels for docket name, worker name, and task function name.
Health Checks
Enable health check endpoints:
The health check endpoint (/) returns 200 OK when the worker is healthy and able to process tasks.
OpenTelemetry Traces
Docket automatically creates OpenTelemetry spans for task execution:
- Span name:
docket.task.{function_name} - Attributes: docket name, worker name, task key, attempt number
- Status: Success/failure with error details
- Duration: Complete task execution time
Configure your OpenTelemetry exporter to send traces to your observability platform. See the OpenTelemetry Python documentation for configuration examples with various backends like Jaeger, Zipkin, or cloud providers.
Structured Logging
Configure structured logging for production:
# JSON logs for log aggregation
docket worker --logging-format json --logging-level info
# Plain logs for simple deployments
docket worker --logging-format plain --logging-level warning
Log entries include:
- Task execution start/completion
- Error details with stack traces
- Worker lifecycle events
- Redis connection status
- Strike/restore operations
Striking and Restoring Tasks
Striking temporarily disables tasks without redeploying code.
Striking Entire Task Types
Disable all instances of a specific task:
# Disable all order processing during maintenance
await docket.strike(process_order)
# Orders added during this time won't be processed
await docket.add(process_order)(order_id=12345) # Won't run
await docket.add(process_order)(order_id=67890) # Won't run
# Re-enable when ready
await docket.restore(process_order)
Striking by Parameter Values
Disable tasks based on their arguments using comparison operators:
# Block all tasks for a problematic customer
await docket.strike(None, "customer_id", "==", "12345")
# Block low-priority work during high load
await docket.strike(process_order, "priority", "<=", "low")
# Block all orders above a certain value during fraud investigation
await docket.strike(process_payment, "amount", ">", 10000)
# Later, restore them
await docket.restore(None, "customer_id", "==", "12345")
await docket.restore(process_order, "priority", "<=", "low")
Supported operators include ==, !=, <, <=, >, >=.
Striking Specific Task-Parameter Combinations
Target very specific scenarios:
# Block only high-value orders for a specific customer
await docket.strike(process_order, "customer_id", "==", "12345")
await docket.strike(process_order, "amount", ">", 1000)
# This order won't run (blocked customer)
await docket.add(process_order)(customer_id="12345", amount=500)
# This order won't run (blocked customer AND high amount)
await docket.add(process_order)(customer_id="12345", amount=2000)
# This order WILL run (different customer)
await docket.add(process_order)(customer_id="67890", amount=2000)
Striking from the CLI
You can also strike and restore tasks from the command line:
# Block problematic tasks immediately
docket strike problematic_function
# Block tasks for specific customers
docket strike process_order customer_id == "problematic-customer"
# Restore when issues are resolved
docket restore problematic_function
Built-in Utility Tasks
Docket provides utility tasks for smoke-testing and debugging: