Skip to content

Architecture & Data Flow

PostgreSQL 하나만으로 동작하는 async Python 잡 큐 라이브러리. FastAPI lifespan에 붙여 쓰는 것을 주요 패턴으로 설계되었으며, 별도의 Redis/RabbitMQ 등 추가 인프라가 필요 없다.


1. 패키지 구조

src/pqrun/
├── __init__.py        # 공개 API 노출 (re-export)
├── models.py          # 데이터 모델 (Job, JobContext, Handler 타입)
├── store_asyncpg.py   # DB 연산 레이어 (PgJobStore)
├── worker.py          # 잡 소비 루프 (Worker)
├── backoff.py         # 재시도/폴링 정책
└── ddl.sql            # DB 스키마 (jobs 테이블 + 인덱스)

2. 컴포넌트 관계

각 컴포넌트의 역할과 의존 방향을 나타낸다.

graph LR
    APP["FastAPI App"]
    W["Worker"]
    S["PgJobStore"]
    BP["Backoff<br/>Policies"]
    M["Job / JobContext"]
    DB[("PostgreSQL")]

    APP -->|lifespan| W
    W -->|"pickup / mark_*<br/>reap_stale"| S
    W -->|delay 계산| BP
    S -->|asyncpg| DB
    S -->|Job| M
    W -->|JobContext| M

3. job lifecycle

flowchart LR
    NEW([start])
    READY[READY]
    RUNNING[RUNNING]
    DONE([DONE ✓])
    FAILED([FAILED ✗])
    CANCELLED([CANCELLED])
    END([end])

    NEW     -->|enqueue| READY
    READY   -->|pickup| RUNNING

    RUNNING -->|success| DONE
    RUNNING -->|"retry exhausted<br/>(attempts >= max)"| FAILED
    RUNNING -->|cancel| CANCELLED
    READY   -->|cancel| CANCELLED

    RUNNING -->|"retry possible<br/>(attempts < max)"| READY
    RUNNING -. "timeout<br/>(reap_stale)" .-> READY

    DONE      --> END
    FAILED    --> END
    CANCELLED --> END

4. job enqueue

dedupe_key가 있으면 READY/RUNNING 중 중복 삽입을 DB 레벨에서 방지한다.

sequenceDiagram
    participant P as Producer
    participant S as PgJobStore
    participant DB as PostgreSQL

    P->>S: enqueue(job_type, payload, dedupe_key?)
    S->>DB: INSERT ... ON CONFLICT DO UPDATE
    DB-->>S: id
    S-->>P: job_id

5. job execution (pickup → handler → complete)

sequenceDiagram
    participant W as Worker
    participant S as PgJobStore
    participant DB as PostgreSQL
    participant H as Handler

    W->>S: pickup(worker_id)
    S->>DB: SELECT FOR UPDATE SKIP LOCKED<br/>→ UPDATE status=RUNNING
    DB-->>S: job row
    S-->>W: Job

    W->>H: handler(JobContext)
    H-->>W: result / exception

    alt 성공
        W->>S: mark_done(result)
        S->>DB: status=DONE
    else 재시도 가능
        W->>S: mark_error(retry_after)
        S->>DB: status=READY, run_after+=delay
    else 재시도 소진
        W->>S: mark_error(terminal=True)
        S->>DB: status=FAILED
    end

6. worker loop structure

6-1. lifespan

flowchart LR
    A([시작]) --> B[store.start]
    B --> C["_run_loop × N"]
    B --> D["_reaper_loop × 1"]
    C --> Y([앱 실행 중])
    D --> Y
    Y --> E([stop])
    E --> F["태스크 cancel<br/>30s 대기"]
    F --> G[store.close]

6-2. _run_loop

flowchart TD
    ST([시작]) --> P[pickup]
    P --> Q{job?}
    Q -- No  --> I["idle sleep<br/>1→2→5→10s"]
    I --> P
    Q -- Yes --> R[_dispatch]
    R --> V{성공?}
    V -- Yes --> D[mark_done]
    V -- No  --> E["mark_error<br/>+ backoff delay"]
    D --> P
    E --> P

6-3. _reaper_loop

flowchart TD
    ST([시작]) --> W["sleep 60s"]
    W --> R["reap_stale()<br/>RUNNING + 타임아웃 초과"]
    R --> U[→ READY 복귀]
    U --> W

7. multi-worker concurrency (SKIP LOCKED)

sequenceDiagram
    participant W1 as Worker-1
    participant W2 as Worker-2
    participant DB as PostgreSQL

    Note over DB: id=1,2,3 READY

    par
        W1->>DB: pickup (SKIP LOCKED)
    and
        W2->>DB: pickup (SKIP LOCKED)
    end

    DB-->>W1: id=1 → RUNNING
    DB-->>W2: id=2 → RUNNING (id=1 skip)

    W1->>DB: mark_done(id=1)
    W2->>DB: mark_done(id=2)

8. jobs 테이블 주요 컬럼

컬럼 타입 역할
job_type text 핸들러 라우팅 키
payload jsonb 입력 데이터
status job_status 상태 머신
priority int 높을수록 먼저 실행
run_after timestamptz 실행 가능 최소 시각 (지연/재시도)
dedupe_key text READY/RUNNING 중 유일 (중복 방지)
locked_by / locked_at text / timestamptz 현재 점유 중인 워커
attempts / max_attempts int 재시도 카운터
last_error text 마지막 실패 traceback
result jsonb 핸들러 반환값
duration_ms int 실행 소요 시간

9. 주요 설계 결정

결정 이유
FOR UPDATE SKIP LOCKED 워커 간 경쟁 없이 원자적 픽업 보장
ON CONFLICT DO UPDATE DB 레벨 dedupe — 애플리케이션 코드 불필요
attempts >= max_attempts 체크를 DB에서 워커 재시작 등 경쟁 상황에서도 정확한 카운트
asyncpg (ORM 없음) 바이너리 프로토콜, asyncio 네이티브, 의존성 최소화
Pool 외부 주입 지원 FastAPI 앱과 커넥션 풀 공유 가능
reaper loop 분리 워커 크래시로 인한 좀비 잡을 별도 루프에서 복구