Configuration & Customization¶
1. Environment Variables¶
# Worker on/off
WORKER_ENABLED=true
# Reaper loop on/off
WORKER_REAPER_ENABLED=true
# Concurrent worker loops per process
WORKER_CONCURRENCY=1
# Reaper interval (seconds)
WORKER_REAP_INTERVAL=60
# Default stale timeout (seconds)
WORKER_STALE_TIMEOUT=1200
# Shutdown behavior
# - Gracefully wait for in-flight jobs up to this many seconds
WORKER_SHUTDOWN_GRACE=10
# - Hard cap for worker shutdown (seconds)
WORKER_SHUTDOWN_TIMEOUT=30
Notes:
- 잘못된 값(예:
abc)은 경고 후 기본값으로 fallback 됩니다. WORKER_REAPER_ENABLED=false면 stale RUNNING 복구는 수행되지 않습니다.WORKER_STALE_TIMEOUT은 job에timeout_seconds가 없을 때만 적용됩니다.
2. Worker Constructor Options¶
from datetime import timedelta
from pqrun import Worker, BackoffPolicy, IdlePollPolicy, LoopErrorPolicy
worker = Worker(
store=store,
handlers=handlers,
concurrency=1,
enabled=True,
enable_reaper=True,
backoff=BackoffPolicy(),
idle_policy=IdlePollPolicy(base_seconds=1.0, max_seconds=10.0),
loop_error_policy=LoopErrorPolicy(),
reap_stale_every_seconds=60,
default_stale_after=timedelta(minutes=20),
shutdown_grace=timedelta(seconds=10),
shutdown_timeout=timedelta(seconds=30),
worker_id="custom-worker-id",
)
3. Policy Customization¶
정책 3개는 각각 “어떤 상황에서 sleep/delay를 계산하는지”가 다릅니다.
3.0 Which Policy Does What?¶
-
BackoffPolicy- 시점: 핸들러 실행이 실패했을 때
- 결정: 다음 재시도까지의 지연 시간 (
retry_after) - 적용 지점:
mark_error(..., retry_after=...)
-
IdlePollPolicy- 시점: 큐에서 가져올 작업이 없을 때 (
pickup() -> None) - 결정: 다음 polling까지 sleep 시간
- 목적: 빈 큐 상태에서 DB polling 부하 완화
- 시점: 큐에서 가져올 작업이 없을 때 (
-
LoopErrorPolicy- 시점: 워커 루프 인프라 오류가 날 때
- 예:
pickup,mark_done,mark_error호출 중 예외
- 예:
- 결정: 다음 루프 재시도까지 sleep 시간
- 기본값:
0.0(즉시 재시도)
- 시점: 워커 루프 인프라 오류가 날 때
3.1 Handler failure retry (BackoffPolicy)¶
from datetime import timedelta
from pqrun import BackoffPolicy
class CustomBackoff(BackoffPolicy):
def retry_delay(self, attempts: int) -> timedelta:
return timedelta(seconds=min(2 ** attempts, 3600))
3.2 Empty queue polling (IdlePollPolicy)¶
from pqrun import IdlePollPolicy
class FastIdlePolicy(IdlePollPolicy):
def next_sleep(self, empty_streak: int) -> float:
return min(0.2 * (empty_streak + 1), 2.0)
3.3 Infra loop errors (LoopErrorPolicy)¶
from pqrun import LoopErrorPolicy
class InfraLoopPolicy(LoopErrorPolicy):
def next_sleep(self, consecutive_errors: int) -> float:
# Default is 0.0 (immediate retry)
return min(0.1 * consecutive_errors, 3.0)
4. Job-level Controls (enqueue)¶
job_id = await store.enqueue(
job_type="send_email",
payload={"user_id": 123, "template": "welcome"},
dedupe_key="welcome:user:123",
run_after=run_at,
priority=10,
max_attempts=5,
timeout_seconds=300,
)
Notes:
- payload / result는 JSON object(dict)만 지원합니다.
- dedupe_key 중복 시 기존 active job id를 반환합니다.