Design Document¶
Version: 0.1.0
Date: 2026-02-24
Status: Final
1. Overview¶
1.1 Purpose¶
pqrun는 PostgreSQL을 백엔드로 사용하는 경량 비동기 작업 큐 라이브러리입니다. FastAPI와 같은 async Python 애플리케이션에서 백그라운드 작업을 안전하고 효율적으로 처리하기 위해 설계되었습니다.
1.2 Design Goals¶
- Simple: ORM 비의존, 순수 SQL 기반으로 최소 의존성
- Safe: SKIP LOCKED를 통한 안전한 동시성 제어
- Flexible: 3가지 enqueue 방식 모두 지원
- Observable: 작업 상태, 실행 시간, 결과 추적 가능
- Production-ready: 재시도, timeout, stale recovery 등 운영 필수 기능 내장
1.3 Non-Goals¶
- 분산 트랜잭션 지원 (2PC, saga 등)
- Exactly-once 보장 (at-least-once 전제)
- 복잡한 워크플로우 엔진 (DAG, branching 등)
- 다른 DB 백엔드 지원 (PostgreSQL 전용)
2. Architecture¶
2.1 High-Level Architecture¶
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ FastAPI │ │ Handler │ │ Handler │ │
│ │ App │ │ (summarize) │ │ (embed) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
└─────────┼─────────────────┼─────────────────┼───────────────┘
│ │ │
│ ┌───────▼─────────────────▼───────┐
│ │ Worker (pqrun) │
│ │ - Poll loop │
│ │ - Concurrency control │
│ │ - Handler dispatch │
│ │ - Retry/backoff │
│ └───────┬─────────────────────────┘
│ │
└─────────────────▼─────────────────────────┐
│ PgJobStore (pqrun) │
│ - enqueue() │
│ - pickup() [SKIP LOCKED] │
│ - mark_done/error() │
│ - reap_stale() │
└───────┬─────────────────────────┘
│
┌───────▼─────────┐
│ PostgreSQL │
│ jobs table │
└─────────────────┘
2.2 Component Responsibilities¶
PgJobStore¶
- 책임: PostgreSQL과의 모든 상호작용
- 제공 메서드:
enqueue(): 작업 등록pickup(): 작업 획득 (SKIP LOCKED)mark_done(): 완료 처리mark_error(): 실패 처리 및 재시도 스케줄링reap_stale(): 장애 복구 (stale RUNNING → READY)connection()/transaction(): 핸들러용 DB 접근
Worker¶
- 책임: 작업 소비 및 실행 제어
- 주요 기능:
- Poll loop (idle backoff 포함)
- Concurrency 제어 (worker loop 개수 기반)
- Handler routing 및 실행
- 실행 시간 측정
- 재시도 정책 적용
- Stale job reaper (백그라운드 루프)
Handler¶
- 책임: 실제 비즈니스 로직 구현
- 인터페이스:
async def handler(ctx: JobContext) -> dict | None - 반환값: 작업 결과 (jobs.result에 저장), None이면 저장 안 함
- 제약: Idempotent 구현 권장 (at-least-once 보장)
3. Data Model¶
3.1 Database Schema¶
-- Job lifecycle states
CREATE TYPE job_status AS ENUM (
'READY', -- 실행 대기 중
'RUNNING', -- 현재 실행 중
'DONE', -- 성공 완료
'FAILED', -- 최종 실패 (재시도 소진)
'CANCELLED' -- 사용자 취소
);
-- Jobs table
CREATE TABLE jobs (
id bigserial PRIMARY KEY,
-- Job identification
job_type text NOT NULL, -- Handler routing key
payload jsonb NOT NULL DEFAULT '{}'::jsonb,
-- Execution control
status job_status NOT NULL DEFAULT 'READY',
priority int NOT NULL DEFAULT 0, -- Higher = earlier
attempts int NOT NULL DEFAULT 0,
max_attempts int NOT NULL DEFAULT 5,
run_after timestamptz NOT NULL DEFAULT now(),
timeout_seconds int, -- Per-job stale threshold
-- Lock tracking
locked_at timestamptz,
locked_by text, -- worker_id
-- Deduplication
dedupe_key text, -- Unique within active jobs
-- Error tracking
last_error text,
-- Observability
finished_at timestamptz,
duration_ms int,
result jsonb, -- Optional handler output
-- Metadata
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
-- Indexes
CREATE INDEX idx_jobs_pick
ON jobs (status, run_after, priority DESC, id);
CREATE INDEX idx_jobs_type
ON jobs (job_type);
CREATE UNIQUE INDEX uq_jobs_active_dedupe
ON jobs (dedupe_key)
WHERE dedupe_key IS NOT NULL AND status IN ('READY','RUNNING');
3.2 Job Lifecycle¶
[CREATED]
│
▼
READY ◄──────────────┐
│ │
│ pickup() │ retry (attempts < max)
▼ │
RUNNING ──────────────┤
│ │
├─ success ───────▼── DONE
│
├─ error (terminal) ──► FAILED
│
├─ error (retryable) ──► READY (delayed)
│
└─ stale (timeout) ──► READY (reaped)
3.3 Python Models¶
@dataclass(frozen=True)
class Job:
id: int
job_type: str
payload: dict[str, Any]
status: JobStatus
priority: int
attempts: int
max_attempts: int
run_after: datetime
timeout_seconds: Optional[int]
locked_at: Optional[datetime]
locked_by: Optional[str]
dedupe_key: Optional[str]
last_error: Optional[str]
finished_at: Optional[datetime]
duration_ms: Optional[int]
result: Optional[dict[str, Any]]
created_at: datetime
updated_at: datetime
@dataclass(frozen=True)
class JobContext:
job: Job
store: PgJobStore
worker_id: str
4. Core Mechanisms¶
4.1 Enqueue Patterns¶
Pattern 1: Application Enqueue¶
애플리케이션 코드에서 직접 호출:
await store.enqueue(
job_type="summarize",
payload={"conversation_id": 123},
dedupe_key="summarize:conv:123",
priority=10
)
Pattern 2: Scheduled Enqueue (pg_cron)¶
주기적 배치 작업:
-- Function called by pg_cron
INSERT INTO jobs (job_type, payload, dedupe_key)
SELECT
'cleanup',
jsonb_build_object('target_date', current_date - 30),
'cleanup:' || (current_date - 30)::text
ON CONFLICT DO NOTHING;
Pattern 3: Handler Chain¶
핸들러 내부에서 후속 작업 생성:
async def summarize_handler(ctx: JobContext) -> dict:
# ... do summarization ...
summary_id = ...
# Chain next job
await ctx.store.enqueue(
job_type="embed",
payload={"summary_id": summary_id}
)
# Return result for observability
return {"summary_id": summary_id, "tokens": 1234}
4.2 Safe Pickup (SKIP LOCKED)¶
WITH picked AS (
SELECT id
FROM jobs
WHERE status='READY' AND run_after <= now()
ORDER BY priority DESC, id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs SET
status='RUNNING',
locked_at=now(),
locked_by=$1,
attempts = attempts + 1
FROM picked
WHERE jobs.id = picked.id
RETURNING *;
특징: - 다중 워커/레플리카 간 경합 없음 - Row-level lock으로 안전한 소비 보장 - Deadlock 발생 없음
4.3 Retry & Backoff¶
Retry Policy (BackoffPolicy)¶
Loop Infra Error Policy (LoopErrorPolicy)¶
pickup/mark_* 같은 인프라 레벨 예외 시 worker loop 재시도 지연을 제어:
- 기본: 즉시 재시도 (0s)
- 커스텀 정책 주입 가능 (Worker(..., loop_error_policy=...))
Idle Poll Backoff (IdlePollPolicy)¶
작업이 없을 때 점진적 sleep:
4.4 Stale Job Recovery¶
문제: Worker가 crash하면 RUNNING 상태로 고착 해결: Reaper가 주기적으로 stale job을 READY로 복구
# Default: 20분 이상 RUNNING인 작업은 stale로 간주
await store.reap_stale(default_stale_after=timedelta(minutes=20))
# Per-job timeout 지원:
# timeout_seconds가 설정된 작업은 개별 기준 적용
5. Concurrency & Scalability¶
5.1 Deployment Patterns¶
Pattern A: API + Worker 겸업 (초기)¶
┌──────────────────────┐
│ FastAPI Container │
│ - API endpoints │
│ - Worker (conc=1) │
└──────────────────────┘
Pattern B: 분리 배포 (확장)¶
┌──────────────────┐ ┌──────────────────┐
│ API Container │ │ Worker Container │
│ WORKER_ENABLED= │ │ WORKER_ENABLED= │
│ false │ │ true │
│ │ │ CONCURRENCY=4 │
└──────────────────┘ └──────────────────┘
│ │
└───────┬───────────────┘
▼
[PostgreSQL]
5.2 Concurrency Control¶
Worker(
store=store,
handlers=handlers,
concurrency=1, # Per-replica concurrency
enabled=True # WORKER_ENABLED env override
)
설계 원칙: - 레플리카당 concurrency=1로 시작 (API 보호) - Worker 분리 후 concurrency 증가 - Worker loop 개수로 동시 실행 제한
6. Observability¶
6.1 Job Metrics¶
모든 작업에 자동 기록:
- finished_at: 완료 시각
- duration_ms: 실행 시간 (밀리초)
- attempts: 시도 횟수
- last_error: 마지막 에러 메시지
6.2 Monitoring Queries¶
-- 실행 중인 작업
SELECT * FROM jobs WHERE status='RUNNING';
-- 실패 작업 (최근 1시간)
SELECT * FROM jobs
WHERE status='FAILED'
AND finished_at > now() - interval '1 hour';
-- 평균 실행 시간 (job_type별)
SELECT job_type, avg(duration_ms), count(*)
FROM jobs
WHERE status='DONE'
GROUP BY job_type;
-- Stale 후보 (20분 이상 RUNNING)
SELECT * FROM jobs
WHERE status='RUNNING'
AND locked_at < now() - interval '20 minutes';
7. Error Handling¶
7.1 Error Categories¶
Retryable Errors¶
- Transient failures (network, DB connection)
- External service timeout
- Rate limit
처리: mark_error(terminal=False) → READY로 백오프
Terminal Errors¶
- Invalid payload
- Business logic violation
- No handler registered
처리: mark_error(terminal=True) → FAILED로 즉시 종료
7.2 Handler Error Contract¶
async def my_handler(ctx: JobContext):
try:
# Business logic
...
except TemporaryError:
raise # → Worker가 retryable로 처리
except PermanentError:
raise # → Worker가 max_attempts 체크
# 정상 완료 → mark_done()
Worker가 보장하는 것: - 모든 exception 캐치 - duration_ms 기록 - 재시도 정책 적용
8. Configuration¶
8.1 Environment Variables¶
# Worker 활성화 여부
WORKER_ENABLED=true|false
# 동시 실행 작업 수 (레플리카당)
WORKER_CONCURRENCY=1
# Reaper 실행 주기(초)
WORKER_REAP_INTERVAL=60
# 기본 stale timeout(초)
WORKER_STALE_TIMEOUT=1200
8.2 Code Configuration¶
store = PgJobStore(
dsn="postgresql://user:pass@host/db",
# 또는 pool=existing_pool
)
worker = Worker(
store=store,
handlers={
"summarize": summarize_handler,
"embed": embed_handler,
},
concurrency=1,
enabled=True,
idle_policy=IdlePollPolicy(base_seconds=1.0, max_seconds=10.0),
backoff=BackoffPolicy(), # 또는 custom
reap_stale_every_seconds=60,
default_stale_after=timedelta(minutes=20),
worker_id="custom-id" # 기본: hostname-pid
)
9. Implementation Decisions¶
구현에 적용된 핵심 결정사항입니다. (상세 근거는 Decisions 참조)
9.1 Connection Pool Management¶
- 결정: DSN 및 기존 Pool 모두 지원
- 이유: 유연성 최대화 - 간단한 사용과 효율적 공유 모두 가능
9.2 Handler Result Handling¶
- 결정: Handler가 dict 반환 가능
- 이유: 관측성 향상, 결과 추적 용이
9.3 Transaction Provision¶
- 결정: On-demand 방식 (store.transaction())
- 이유: 필요할 때만 사용, Context 오염 방지
9.4 Logging Strategy¶
- 결정: 라이브러리 자체 logger 제공
- 이유: 즉시 사용 가능, 표준 logging 모듈 활용
9.5 Shutdown Strategy¶
- 결정: bounded cancellation + reaper recovery
- 이유: 종료 지연을 제한하면서 장애 복구 가능
- 구현: lifespan finally에서 stop_event 설정 후 task cancel, 최대 30초 대기
9.6 Dedupe Conflict Behavior¶
- 결정: 기존 job_id 반환
- 이유: 중복 감지 및 추적 가능
9.7 Job Cleanup Strategy¶
- 결정: 예시 SQL 제공, 구현은 사용자 책임
- 이유: 정책이 앱마다 다름 (보관 기간, 조건 등)
- 제공:
examples/cleanup.sql
9.8 Error Serialization¶
- 결정: Traceback 포함 (길이 제한 10KB)
- 이유: 디버깅 편의성
9.9 Additional Decisions¶
- Type Hints: Moderate (mypy 기본 모드)
- Python Version: 3.10+
- Testing: pytest + pytest-asyncio
- Documentation: README.md (초기), 필요시 확장
10. Security Considerations¶
10.1 Payload Security¶
payload는 jsonb로 저장 → SQL injection 안전- 민감정보 저장 금지 (암호화 미지원)
- 필요시 애플리케이션 레벨에서 암호화 후 저장
10.2 Handler Isolation¶
- Handler는 trust boundary 내부
- Payload validation은 handler 책임
- Untrusted input은 enqueue 전에 검증
10.3 Database Permissions¶
-- Worker/API 계정에 필요한 최소 권한
GRANT SELECT, INSERT, UPDATE ON jobs TO app_user;
GRANT USAGE ON SEQUENCE jobs_id_seq TO app_user;
11. Testing Strategy¶
11.1 Unit Tests¶
PgJobStore각 메서드 (enqueue, pickup, mark_*)BackoffPolicy/IdlePollPolicy로직Workerlifecycle (start, stop, cancel)
11.2 Integration Tests¶
- 다중 워커 동시성 (SKIP LOCKED 검증)
- Retry/backoff 플로우
- Stale recovery
- Handler chain
11.3 Load Tests¶
- N개 레플리카 × M concurrency
- Pickup throughput
- Dedupe 효과 확인
12. Migration Path¶
12.1 Initial Setup¶
12.2 Future Schema Changes¶
-- migrations/002_add_new_field.sql (example)
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS new_field text;
원칙:
- SQL 파일 기반 (ORM migration 비의존)
- Backward-compatible 변경 우선
- 프로덕션 적용 전 staging 검증
13. Performance Characteristics¶
13.1 Throughput¶
- Pickup: O(1) with index (idx_jobs_pick)
- Enqueue: O(1) insert
- Dedupe check: O(1) unique index lookup
13.2 Scaling Limits¶
- 단일 DB 처리량: ~1000 jobs/sec (pickup + mark)
- Bottleneck: PostgreSQL connection pool
- 권장: Worker 분리 후 connection pool 조정
13.3 Storage Growth¶
- 완료된 작업 정리 필요 (별도 cleanup job)
- Partition by created_at (선택)
14. Comparison with Alternatives¶
| Feature | pqrun | Celery | TaskIQ | RQ |
|---|---|---|---|---|
| Backend | PostgreSQL | Redis/RabbitMQ | Redis/etc | Redis |
| Async/Await | ✅ | ⚠️ (제한적) | ✅ | ❌ |
| FastAPI 통합 | ✅ Native | ⚠️ 별도 프로세스 | ✅ | ⚠️ |
| At-least-once | ✅ | ✅ | ✅ | ✅ |
| Exactly-once | ❌ | ❌ | ❌ | ❌ |
| Complexity | Low | High | Medium | Low |
| 추가 인프라 | ❌ | ✅ | ✅ | ✅ |
pqrun 선택 이유: - 이미 PostgreSQL 사용 중 - 단순성 > 복잡한 기능 - FastAPI async 네이티브 통합
15. Future Enhancements (Out of Scope)¶
Phase 2 (고려 중)¶
- Job priority queue per type
- Dead letter queue (DLQ)
- Job cancellation (soft/hard)
- Periodic jobs (cron-like, without pg_cron)
Phase 3 (연구 필요)¶
- Workflow engine (DAG)
- Job dependencies
- Distributed tracing integration
- Prometheus metrics exporter