Skip to content

Implementation Decisions

๊ตฌํ˜„ ์‹œ์ž‘ ์ „์— ๊ฒฐ์ •์ด ํ•„์š”ํ•œ ์‚ฌํ•ญ๋“ค์„ ์ •๋ฆฌํ•ฉ๋‹ˆ๋‹ค.


๐Ÿ”ด Critical Decisions (๋ฐ˜๋“œ์‹œ ๊ฒฐ์ • ํ•„์š”)

1. Connection Pool ๊ด€๋ฆฌ ๋ฐฉ์‹

ํ˜„์žฌ ์„ค๊ณ„:

PgJobStore(
    dsn="postgresql://...",  # ์˜ต์…˜ A
    pool=existing_pool       # ์˜ต์…˜ B
)

๊ฒฐ์ • ํ•„์š”:

  • ์˜ต์…˜ A: Store๊ฐ€ ์ž์ฒด pool ์ƒ์„ฑ/๊ด€๋ฆฌ
  • ์žฅ์ : ์‚ฌ์šฉ ๊ฐ„๋‹จ, ๋…๋ฆฝ์ 
  • ๋‹จ์ : ์•ฑ๊ณผ pool ๋ถ„๋ฆฌ โ†’ connection ๋‚ญ๋น„ ๊ฐ€๋Šฅ

  • ์˜ต์…˜ B: ์•ฑ์ด pool ์ „๋‹ฌ

  • ์žฅ์ : Connection ํšจ์œจ์  ๊ณต์œ 
  • ๋‹จ์ : ์‚ฌ์šฉ์ž๊ฐ€ pool ๊ด€๋ฆฌ ํ•„์š”

  • ์˜ต์…˜ C: ๋‘˜ ๋‹ค ์ง€์› (์ถ”์ฒœ)

  • dsn ์žˆ์œผ๋ฉด ์ž์ฒด pool ์ƒ์„ฑ
  • pool ์žˆ์œผ๋ฉด ์žฌ์‚ฌ์šฉ

์ถ”์ฒœ: ์˜ต์…˜ C - ์œ ์—ฐ์„ฑ ์ตœ๋Œ€ํ™”


2. Handler ๊ฒฐ๊ณผ๊ฐ’ ์ฒ˜๋ฆฌ

ํ˜„์žฌ ์„ค๊ณ„:

async def handler(ctx: JobContext) -> None:
    # result๋ฅผ ์–ด๋–ป๊ฒŒ ์ €์žฅ?

์„ ํƒ์ง€:

์˜ต์…˜ A: Handler๊ฐ€ return ๊ฐ’์œผ๋กœ result ์ „๋‹ฌ

async def handler(ctx: JobContext) -> dict | None:
    return {"summary": "...", "tokens": 123}

# Worker๊ฐ€ ์ž๋™์œผ๋กœ jobs.result์— ์ €์žฅ
์žฅ์ : ๊น”๋”, ๋ช…์‹œ์  ๋‹จ์ : Handler ์‹œ๊ทธ๋‹ˆ์ฒ˜ ๋ณ€๊ฒฝ (-> None โ†’ -> dict | None)

์˜ต์…˜ B: Context๋ฅผ ํ†ตํ•ด ๋ช…์‹œ์  ์ €์žฅ

async def handler(ctx: JobContext) -> None:
    result = {"summary": "..."}
    # ์˜ต์…˜ B-1: ctx.set_result(result)
    # ์˜ต์…˜ B-2: ์ง์ ‘ DB ์ €์žฅ (store.mark_done์—์„œ)
์žฅ์ : ์œ ์—ฐ์„ฑ, side-effect ๋ช…ํ™• ๋‹จ์ : ๋ณด์ผ๋Ÿฌํ”Œ๋ ˆ์ดํŠธ

์˜ต์…˜ C: result ์ €์žฅ ์•ˆ ํ•จ (์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ DB์— ์ง์ ‘)

async def handler(ctx: JobContext) -> None:
    async with ctx.store.transaction() as conn:
        await conn.execute("INSERT INTO summaries ...")
    # jobs.result๋Š” NULL
์žฅ์ : ๊ฐ€์žฅ ๋‹จ์ˆœ ๋‹จ์ : ๊ด€์ธก์„ฑ ๋‚ฎ์Œ

ํ˜„์žฌ ๋ธŒ๋ ˆ์ธ์Šคํ† ๋ฐ: ์˜ต์…˜ A์™€ ์œ ์‚ฌํ•˜์ง€๋งŒ -> None ์œ ์ง€ ๊ฒฐ์ • ํ•„์š”: ์–ด๋А ๋ฐฉ์‹์„ ์ฑ„ํƒํ•  ๊ฒƒ์ธ๊ฐ€?

์ถ”์ฒœ: ์˜ต์…˜ A - return ๊ฐ’ ํ™œ์šฉ, ์‹œ๊ทธ๋‹ˆ์ฒ˜๋Š” -> dict | None ํ—ˆ์šฉ


3. Transaction ์ œ๊ณต ๋ฐฉ์‹

Handler๊ฐ€ DB ์ž‘์—…์„ ํ•  ๋•Œ:

์˜ต์…˜ A: Context์— connection ์ œ๊ณต

@dataclass(frozen=True)
class JobContext:
    job: Job
    store: PgJobStore
    worker_id: str
    conn: asyncpg.Connection  # โ† ์ถ”๊ฐ€

async def handler(ctx: JobContext):
    await ctx.conn.execute("UPDATE ...")
์žฅ์ : ๊ฐ„ํŽธ ๋‹จ์ : ๋ชจ๋“  handler๊ฐ€ conn์„ ๋ฐ›์Œ (๋ถˆํ•„์š”ํ•  ์ˆ˜๋„)

์˜ต์…˜ B: Store๋ฅผ ํ†ตํ•ด on-demand ์ œ๊ณต

async def handler(ctx: JobContext):
    async with ctx.store.transaction() as conn:
        await conn.execute("UPDATE ...")
์žฅ์ : ํ•„์š”ํ•  ๋•Œ๋งŒ ์‚ฌ์šฉ ๋‹จ์ : ์ฝ”๋“œ๊ฐ€ ์•ฝ๊ฐ„ ๊ธธ์–ด์ง

์ถ”์ฒœ: ์˜ต์…˜ B - on-demand๊ฐ€ ๋” ์œ ์—ฐ


4. Logging ์ „๋žต

์„ ํƒ์ง€:

์˜ต์…˜ A: ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์ž์ฒด logger ์ œ๊ณต

import logging
logger = logging.getLogger("pqrun")

# Worker์—์„œ:
logger.info(f"Picked job {job.id} type={job.job_type}")
์žฅ์ : ์ฆ‰์‹œ ์‚ฌ์šฉ ๊ฐ€๋Šฅ ๋‹จ์ : ์•ฑ์˜ logging ์„ค์ •๊ณผ ์ถฉ๋Œ ๊ฐ€๋Šฅ

์˜ต์…˜ B: Context์— logger ์ฃผ์ž…

@dataclass(frozen=True)
class JobContext:
    job: Job
    store: PgJobStore
    worker_id: str
    logger: logging.Logger  # โ† ์ถ”๊ฐ€

async def handler(ctx: JobContext):
    ctx.logger.info("Processing...")
์žฅ์ : ์•ฑ์ด ์ œ์–ด ๊ฐ€๋Šฅ ๋‹จ์ : Worker ์ดˆ๊ธฐํ™” ์‹œ logger ์„ค์ • ํ•„์š”

์˜ต์…˜ C: Logging ๋ฏธ์ œ๊ณต (์ตœ์†Œ์ฃผ์˜)

# Handler์—์„œ ์ง์ ‘:
import logging
logger = logging.getLogger(__name__)
์žฅ์ : ๊ฐ€์žฅ ๋‹จ์ˆœ ๋‹จ์ : ํ‘œ์ค€ํ™” ์•ˆ ๋จ

์ถ”์ฒœ: ์˜ต์…˜ A - ๊ธฐ๋ณธ logger ์ œ๊ณต, ์ถ”ํ›„ ์˜ต์…˜ B๋กœ ํ™•์žฅ ๊ฐ€๋Šฅ


๐ŸŸก Important Decisions (๊ถŒ์žฅ)

5. Worker ์ข…๋ฃŒ ์‹œ Job ์ฒ˜๋ฆฌ

Worker๊ฐ€ shutdown๋  ๋•Œ ์‹คํ–‰ ์ค‘์ธ job์€?

์˜ต์…˜ A: Graceful shutdown (ํ˜„์žฌ job ์™„๋ฃŒ ๋Œ€๊ธฐ)

# lifespan finally:
stop_event.set()  # ์ƒˆ job pickup ์ค‘์ง€
await asyncio.gather(*tasks)  # ํ˜„์žฌ job ์™„๋ฃŒ๊นŒ์ง€ ๋Œ€๊ธฐ
์žฅ์ : ์•ˆ์ „, ์ž‘์—… ์†์‹ค ์—†์Œ ๋‹จ์ : Shutdown ์ง€์—ฐ ๊ฐ€๋Šฅ

์˜ต์…˜ B: Immediate shutdown (์ฆ‰์‹œ ์ทจ์†Œ)

for t in tasks:
    t.cancel()
์žฅ์ : ๋น ๋ฅธ ์ข…๋ฃŒ ๋‹จ์ : Reaper๊ฐ€ ๋ณต๊ตฌํ•ด์•ผ ํ•จ (RUNNING โ†’ READY)

์˜ต์…˜ C: Hybrid shutdown (์ทจ์†Œ + ์ œํ•œ ์‹œ๊ฐ„ ๋Œ€๊ธฐ)

stop_event.set()
# 1) ๋จผ์ € ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ job์ด ๋๋‚  ์‹œ๊ฐ„์„ ์กฐ๊ธˆ ์ค€ ๋’ค
await asyncio.wait_for(asyncio.gather(*worker_tasks, return_exceptions=True), timeout=shutdown_grace_s)
# 2) ๊ทธ๋ž˜๋„ ์•ˆ ๋๋‚˜๋ฉด cancel๋กœ ๊ฐ•์ œ ์ข…๋ฃŒํ•˜๊ณ 
for t in remaining_tasks:
    t.cancel()
# 3) ์ „์ฒด shutdown์€ bounded timeout ๋‚ด์—์„œ ๋งˆ๋ฌด๋ฆฌ
await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=shutdown_timeout_s)
์žฅ์ : ์ข…๋ฃŒ ์ง€์—ฐ์„ ์ œํ•œํ•˜๋ฉด์„œ๋„ ์ •๋ฆฌ ์‹œ๋„ ๊ฐ€๋Šฅ ๋‹จ์ : ์‹คํ–‰ ์ค‘์ด๋˜ job์€ ์žฌ์‹œ์ž‘ ํ›„ reaper ๋ณต๊ตฌ ํ•„์š”

ํ˜„์žฌ ๊ตฌํ˜„: ์˜ต์…˜ C (๊ธฐ๋ณธ๊ฐ’: grace 10s, hard cap 30s)


6. Dedupe ์ถฉ๋Œ ์‹œ ๋™์ž‘

await store.enqueue(job_type="x", dedupe_key="k1")
await store.enqueue(job_type="x", dedupe_key="k1")  # โ† ์ค‘๋ณต

ํ˜„์žฌ ์„ค๊ณ„: ON CONFLICT ... DO UPDATE SET updated_at=now() RETURNING id ์ค‘๋ณต์ด๋ฉด ๊ธฐ์กด active job์˜ id๋ฅผ ๋ฐ˜ํ™˜

์„ ํƒ์ง€:

์˜ต์…˜ A: 0 ๋ฐ˜ํ™˜

job_id = await store.enqueue(...)
if job_id == 0:
    # ์ด๋ฏธ ์กด์žฌ
์žฅ์ : ๊ฐ„๋‹จ ๋‹จ์ : ๊ธฐ์กด job_id๋ฅผ ๋ชจ๋ฆ„

์˜ต์…˜ B: ๊ธฐ์กด job_id ๋ฐ˜ํ™˜ (ํ˜„์žฌ)

INSERT INTO jobs (...)
VALUES (...)
ON CONFLICT (dedupe_key) WHERE ... DO UPDATE SET updated_at=now()
RETURNING id;
์žฅ์ : ๊ธฐ์กด job ์ถ”์  ๊ฐ€๋Šฅ ๋‹จ์ : UPDATE ๋ฐœ์ƒ (์„ฑ๋Šฅ ์˜ํ–ฅ ๋ฏธ๋ฏธ)

์˜ต์…˜ C: Exception ๋ฐœ์ƒ

raise DuplicateJobError("Job with dedupe_key already exists")
์žฅ์ : ๋ช…์‹œ์  ๋‹จ์ : ์ •์ƒ ๋™์ž‘์—์„œ exception์€ ๊ณผํ•จ

์ถ”์ฒœ: ์˜ต์…˜ B - ๊ธฐ์กด job_id ๋ฐ˜ํ™˜


7. Job Cleanup ์ „๋žต

์™„๋ฃŒ๋œ job์€ ์–ธ์ œ ์‚ญ์ œ?

์˜ต์…˜ A: ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ cleanup job ์ œ๊ณต

# Built-in handler
"__cleanup": cleanup_handler

# Auto-enqueued by Worker or pg_cron
DELETE FROM jobs
WHERE status IN ('DONE','FAILED','CANCELLED')
  AND finished_at < now() - interval '7 days';
์žฅ์ : ๊ธฐ๋ณธ ์ œ๊ณต ๋‹จ์ : ์•ฑ๋งˆ๋‹ค ์ •์ฑ… ๋‹ค๋ฅผ ์ˆ˜ ์žˆ์Œ

์˜ต์…˜ B: ์˜ˆ์‹œ๋งŒ ์ œ๊ณต, ๊ตฌํ˜„์€ ์•ฑ ์ฑ…์ž„

-- examples/cleanup.sql
DELETE FROM jobs WHERE ...;
์žฅ์ : ์œ ์—ฐ์„ฑ ๋‹จ์ : ์‚ฌ์šฉ์ž๊ฐ€ ์ง์ ‘ ๊ตฌํ˜„

์˜ต์…˜ C: ์ œ๊ณต ์•ˆ ํ•จ

์žฅ์ : ์ตœ์†Œ์ฃผ์˜ ๋‹จ์ : Production์—์„œ ํ…Œ์ด๋ธ” ๋ฌดํ•œ ์ฆ๊ฐ€

์ถ”์ฒœ: ์˜ต์…˜ B - ์˜ˆ์‹œ SQL ์ œ๊ณต


8. Error Serialization

Handler exception์„ last_error์— ์ €์žฅํ•  ๋•Œ:

ํ˜„์žฌ: repr(e) โ†’ "ValueError('invalid input')"

์„ ํƒ์ง€:

์˜ต์…˜ A: repr (ํ˜„์žฌ)

์žฅ์ : ๊ฐ„๋‹จ ๋‹จ์ : Traceback ์—†์Œ

์˜ต์…˜ B: Traceback ํฌํ•จ

import traceback
last_error = traceback.format_exc()
์žฅ์ : ๋””๋ฒ„๊น… ํŽธํ•จ ๋‹จ์ : ๊ธธ์ด ์ œํ•œ ํ•„์š” (TEXT ํƒ€์ž…์ด๋ผ ๊ดœ์ฐฎ์Œ)

์˜ต์…˜ C: Structured error (JSON)

last_error = json.dumps({
    "type": type(e).__name__,
    "message": str(e),
    "traceback": traceback.format_exc()
})
์žฅ์ : ํŒŒ์‹ฑ ๊ฐ€๋Šฅ ๋‹จ์ : ๋ณต์žก

์ถ”์ฒœ: ์˜ต์…˜ B - traceback ํฌํ•จ, ๊ธธ์ด ์ œํ•œ (์˜ˆ: 10KB)


๐ŸŸข Nice-to-Have Decisions (์„ ํƒ์ )

9. Type Hints ์—„๊ฒฉ๋„

์˜ต์…˜ A: Strict (mypy --strict)

์žฅ์ : ํƒ€์ž… ์•ˆ์ „์„ฑ ์ตœ๋Œ€ ๋‹จ์ : ๊ฐœ๋ฐœ ์†๋„ ๋А๋ฆผ

์˜ต์…˜ B: Moderate (mypy ๊ธฐ๋ณธ)

์žฅ์ : ๊ท ํ˜• ๋‹จ์ : Runtime error ๊ฐ€๋Šฅ์„ฑ

์ถ”์ฒœ: ์˜ต์…˜ B - ์ ์ง„์  ๊ฐ•ํ™”


10. Python ๋ฒ„์ „ ์ง€์›

์„ ํƒ์ง€: - Python 3.10+ (ํ˜„์žฌ ์„ค๊ณ„) - Python 3.11+ (match/case, better asyncio) - Python 3.12+ (์ตœ์‹ )

์ถ”์ฒœ: Python 3.10+ - ๋„๋ฆฌ ์‚ฌ์šฉ๋˜๋Š” ์ตœ์†Œ ๋ฒ„์ „


11. Testing Framework

์˜ต์…˜ A: pytest + pytest-asyncio

์žฅ์ : ํ‘œ์ค€, ํ”Œ๋Ÿฌ๊ทธ์ธ ๋งŽ์Œ

์˜ต์…˜ B: unittest (stdlib)

์žฅ์ : ์˜์กด์„ฑ ์—†์Œ

์ถ”์ฒœ: ์˜ต์…˜ A - pytest


12. Documentation

์˜ต์…˜ A: README.md๋งŒ

์žฅ์ : ๋‹จ์ˆœ

์˜ต์…˜ B: Sphinx / MkDocs

์žฅ์ : ๊ตฌ์กฐํ™”๋œ ๋ฌธ์„œ

์ถ”์ฒœ: ์˜ต์…˜ A - ์ดˆ๊ธฐ์—๋Š” README๋งŒ, ์ถ”ํ›„ ํ™•์žฅ


๐Ÿ“‹ Decision Summary

# ํ•ญ๋ชฉ ์ถ”์ฒœ ์šฐ์„ ์ˆœ์œ„
1 Connection Pool ์˜ต์…˜ C (๋‘˜ ๋‹ค ์ง€์›) ๐Ÿ”ด Critical
2 Handler ๊ฒฐ๊ณผ๊ฐ’ ์˜ต์…˜ A (return dict) ๐Ÿ”ด Critical
3 Transaction ์ œ๊ณต ์˜ต์…˜ B (on-demand) ๐Ÿ”ด Critical
4 Logging ์˜ต์…˜ A (์ž์ฒด logger) ๐Ÿ”ด Critical
5 Worker ์ข…๋ฃŒ ์˜ต์…˜ A (graceful) ๐ŸŸก Important
6 Dedupe ์ถฉ๋Œ ์˜ต์…˜ B (๊ธฐ์กด id ๋ฐ˜ํ™˜) ๐ŸŸก Important
7 Job Cleanup ์˜ต์…˜ B (์˜ˆ์‹œ ์ œ๊ณต) ๐ŸŸก Important
8 Error Serialization ์˜ต์…˜ B (traceback ํฌํ•จ) ๐ŸŸก Important
9 Type Hints ์˜ต์…˜ B (moderate) ๐ŸŸข Nice-to-have
10 Python ๋ฒ„์ „ 3.10+ ๐ŸŸข Nice-to-have
11 Testing pytest ๐ŸŸข Nice-to-have
12 Documentation README only ๐ŸŸข Nice-to-have

โœ… Action Items

๊ตฌํ˜„ ์‹œ์ž‘ ์ „์—:

  1. Critical decisions (1~4) ํ™•์ •
  2. Important decisions (5~8) ๊ฒ€ํ† 
  3. Nice-to-have decisions (9~12) ๊ฐ„๋‹จํžˆ ๊ฒฐ์ •
  4. DESIGN.md์— ์ตœ์ข… ๊ฒฐ์ •์‚ฌํ•ญ ๋ฐ˜์˜

Next Step: ์œ„ ๊ฒฐ์ •์‚ฌํ•ญ์„ ํ™•์ •ํ•œ ํ›„ ํŒจํ‚ค์ง€ ๊ตฌ์กฐ ์ƒ์„ฑ ์‹œ์ž‘