Implementation Decisions¶
๊ตฌํ ์์ ์ ์ ๊ฒฐ์ ์ด ํ์ํ ์ฌํญ๋ค์ ์ ๋ฆฌํฉ๋๋ค.
๐ด Critical Decisions (๋ฐ๋์ ๊ฒฐ์ ํ์)¶
1. Connection Pool ๊ด๋ฆฌ ๋ฐฉ์¶
ํ์ฌ ์ค๊ณ:
๊ฒฐ์ ํ์:
- ์ต์ A: Store๊ฐ ์์ฒด pool ์์ฑ/๊ด๋ฆฌ
- ์ฅ์ : ์ฌ์ฉ ๊ฐ๋จ, ๋ ๋ฆฝ์
-
๋จ์ : ์ฑ๊ณผ pool ๋ถ๋ฆฌ โ connection ๋ญ๋น ๊ฐ๋ฅ
-
์ต์ B: ์ฑ์ด pool ์ ๋ฌ
- ์ฅ์ : Connection ํจ์จ์ ๊ณต์
-
๋จ์ : ์ฌ์ฉ์๊ฐ pool ๊ด๋ฆฌ ํ์
-
์ต์ C: ๋ ๋ค ์ง์ (์ถ์ฒ)
- dsn ์์ผ๋ฉด ์์ฒด pool ์์ฑ
- pool ์์ผ๋ฉด ์ฌ์ฌ์ฉ
์ถ์ฒ: ์ต์ C - ์ ์ฐ์ฑ ์ต๋ํ
2. Handler ๊ฒฐ๊ณผ๊ฐ ์ฒ๋ฆฌ¶
ํ์ฌ ์ค๊ณ:
์ ํ์ง:
์ต์ A: Handler๊ฐ return ๊ฐ์ผ๋ก result ์ ๋ฌ¶
async def handler(ctx: JobContext) -> dict | None:
return {"summary": "...", "tokens": 123}
# Worker๊ฐ ์๋์ผ๋ก jobs.result์ ์ ์ฅ
-> None โ -> dict | None)
์ต์ B: Context๋ฅผ ํตํด ๋ช ์์ ์ ์ฅ¶
async def handler(ctx: JobContext) -> None:
result = {"summary": "..."}
# ์ต์
B-1: ctx.set_result(result)
# ์ต์
B-2: ์ง์ DB ์ ์ฅ (store.mark_done์์)
์ต์ C: result ์ ์ฅ ์ ํจ (์ ํ๋ฆฌ์ผ์ด์ DB์ ์ง์ )¶
async def handler(ctx: JobContext) -> None:
async with ctx.store.transaction() as conn:
await conn.execute("INSERT INTO summaries ...")
# jobs.result๋ NULL
ํ์ฌ ๋ธ๋ ์ธ์คํ ๋ฐ: ์ต์
A์ ์ ์ฌํ์ง๋ง -> None ์ ์ง
๊ฒฐ์ ํ์: ์ด๋ ๋ฐฉ์์ ์ฑํํ ๊ฒ์ธ๊ฐ?
์ถ์ฒ: ์ต์
A - return ๊ฐ ํ์ฉ, ์๊ทธ๋์ฒ๋ -> dict | None ํ์ฉ
3. Transaction ์ ๊ณต ๋ฐฉ์¶
Handler๊ฐ DB ์์ ์ ํ ๋:
์ต์ A: Context์ connection ์ ๊ณต¶
@dataclass(frozen=True)
class JobContext:
job: Job
store: PgJobStore
worker_id: str
conn: asyncpg.Connection # โ ์ถ๊ฐ
async def handler(ctx: JobContext):
await ctx.conn.execute("UPDATE ...")
์ต์ B: Store๋ฅผ ํตํด on-demand ์ ๊ณต¶
async def handler(ctx: JobContext):
async with ctx.store.transaction() as conn:
await conn.execute("UPDATE ...")
์ถ์ฒ: ์ต์ B - on-demand๊ฐ ๋ ์ ์ฐ
4. Logging ์ ๋ต¶
์ ํ์ง:
์ต์ A: ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ์์ฒด logger ์ ๊ณต¶
import logging
logger = logging.getLogger("pqrun")
# Worker์์:
logger.info(f"Picked job {job.id} type={job.job_type}")
์ต์ B: Context์ logger ์ฃผ์ ¶
@dataclass(frozen=True)
class JobContext:
job: Job
store: PgJobStore
worker_id: str
logger: logging.Logger # โ ์ถ๊ฐ
async def handler(ctx: JobContext):
ctx.logger.info("Processing...")
์ต์ C: Logging ๋ฏธ์ ๊ณต (์ต์์ฃผ์)¶
์ฅ์ : ๊ฐ์ฅ ๋จ์ ๋จ์ : ํ์คํ ์ ๋จ์ถ์ฒ: ์ต์ A - ๊ธฐ๋ณธ logger ์ ๊ณต, ์ถํ ์ต์ B๋ก ํ์ฅ ๊ฐ๋ฅ
๐ก Important Decisions (๊ถ์ฅ)¶
5. Worker ์ข ๋ฃ ์ Job ์ฒ๋ฆฌ¶
Worker๊ฐ shutdown๋ ๋ ์คํ ์ค์ธ job์?
์ต์ A: Graceful shutdown (ํ์ฌ job ์๋ฃ ๋๊ธฐ)¶
# lifespan finally:
stop_event.set() # ์ job pickup ์ค์ง
await asyncio.gather(*tasks) # ํ์ฌ job ์๋ฃ๊น์ง ๋๊ธฐ
์ต์ B: Immediate shutdown (์ฆ์ ์ทจ์)¶
์ฅ์ : ๋น ๋ฅธ ์ข ๋ฃ ๋จ์ : Reaper๊ฐ ๋ณต๊ตฌํด์ผ ํจ (RUNNING โ READY)์ต์ C: Hybrid shutdown (์ทจ์ + ์ ํ ์๊ฐ ๋๊ธฐ)¶
stop_event.set()
# 1) ๋จผ์ ํ์ฌ ์คํ ์ค์ธ job์ด ๋๋ ์๊ฐ์ ์กฐ๊ธ ์ค ๋ค
await asyncio.wait_for(asyncio.gather(*worker_tasks, return_exceptions=True), timeout=shutdown_grace_s)
# 2) ๊ทธ๋๋ ์ ๋๋๋ฉด cancel๋ก ๊ฐ์ ์ข
๋ฃํ๊ณ
for t in remaining_tasks:
t.cancel()
# 3) ์ ์ฒด shutdown์ bounded timeout ๋ด์์ ๋ง๋ฌด๋ฆฌ
await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=shutdown_timeout_s)
ํ์ฌ ๊ตฌํ: ์ต์ C (๊ธฐ๋ณธ๊ฐ: grace 10s, hard cap 30s)
6. Dedupe ์ถฉ๋ ์ ๋์¶
await store.enqueue(job_type="x", dedupe_key="k1")
await store.enqueue(job_type="x", dedupe_key="k1") # โ ์ค๋ณต
ํ์ฌ ์ค๊ณ: ON CONFLICT ... DO UPDATE SET updated_at=now() RETURNING id
์ค๋ณต์ด๋ฉด ๊ธฐ์กด active job์ id๋ฅผ ๋ฐํ
์ ํ์ง:
์ต์ A: 0 ๋ฐํ¶
์ฅ์ : ๊ฐ๋จ ๋จ์ : ๊ธฐ์กด job_id๋ฅผ ๋ชจ๋ฆ์ต์ B: ๊ธฐ์กด job_id ๋ฐํ (ํ์ฌ)¶
INSERT INTO jobs (...)
VALUES (...)
ON CONFLICT (dedupe_key) WHERE ... DO UPDATE SET updated_at=now()
RETURNING id;
์ต์ C: Exception ๋ฐ์¶
์ฅ์ : ๋ช ์์ ๋จ์ : ์ ์ ๋์์์ exception์ ๊ณผํจ์ถ์ฒ: ์ต์ B - ๊ธฐ์กด job_id ๋ฐํ
7. Job Cleanup ์ ๋ต¶
์๋ฃ๋ job์ ์ธ์ ์ญ์ ?
์ต์ A: ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ cleanup job ์ ๊ณต¶
# Built-in handler
"__cleanup": cleanup_handler
# Auto-enqueued by Worker or pg_cron
DELETE FROM jobs
WHERE status IN ('DONE','FAILED','CANCELLED')
AND finished_at < now() - interval '7 days';
์ต์ B: ์์๋ง ์ ๊ณต, ๊ตฌํ์ ์ฑ ์ฑ ์¶
์ฅ์ : ์ ์ฐ์ฑ ๋จ์ : ์ฌ์ฉ์๊ฐ ์ง์ ๊ตฌํ์ต์ C: ์ ๊ณต ์ ํจ¶
์ฅ์ : ์ต์์ฃผ์ ๋จ์ : Production์์ ํ ์ด๋ธ ๋ฌดํ ์ฆ๊ฐ
์ถ์ฒ: ์ต์ B - ์์ SQL ์ ๊ณต
8. Error Serialization¶
Handler exception์ last_error์ ์ ์ฅํ ๋:
ํ์ฌ: repr(e) โ "ValueError('invalid input')"
์ ํ์ง:
์ต์ A: repr (ํ์ฌ)¶
์ฅ์ : ๊ฐ๋จ ๋จ์ : Traceback ์์
์ต์ B: Traceback ํฌํจ¶
์ฅ์ : ๋๋ฒ๊น ํธํจ ๋จ์ : ๊ธธ์ด ์ ํ ํ์ (TEXT ํ์ ์ด๋ผ ๊ด์ฐฎ์)์ต์ C: Structured error (JSON)¶
last_error = json.dumps({
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc()
})
์ถ์ฒ: ์ต์ B - traceback ํฌํจ, ๊ธธ์ด ์ ํ (์: 10KB)
๐ข Nice-to-Have Decisions (์ ํ์ )¶
9. Type Hints ์๊ฒฉ๋¶
์ต์ A: Strict (mypy --strict)¶
์ฅ์ : ํ์ ์์ ์ฑ ์ต๋ ๋จ์ : ๊ฐ๋ฐ ์๋ ๋๋ฆผ
์ต์ B: Moderate (mypy ๊ธฐ๋ณธ)¶
์ฅ์ : ๊ท ํ ๋จ์ : Runtime error ๊ฐ๋ฅ์ฑ
์ถ์ฒ: ์ต์ B - ์ ์ง์ ๊ฐํ
10. Python ๋ฒ์ ์ง์¶
์ ํ์ง: - Python 3.10+ (ํ์ฌ ์ค๊ณ) - Python 3.11+ (match/case, better asyncio) - Python 3.12+ (์ต์ )
์ถ์ฒ: Python 3.10+ - ๋๋ฆฌ ์ฌ์ฉ๋๋ ์ต์ ๋ฒ์
11. Testing Framework¶
์ต์ A: pytest + pytest-asyncio¶
์ฅ์ : ํ์ค, ํ๋ฌ๊ทธ์ธ ๋ง์
์ต์ B: unittest (stdlib)¶
์ฅ์ : ์์กด์ฑ ์์
์ถ์ฒ: ์ต์ A - pytest
12. Documentation¶
์ต์ A: README.md๋ง¶
์ฅ์ : ๋จ์
์ต์ B: Sphinx / MkDocs¶
์ฅ์ : ๊ตฌ์กฐํ๋ ๋ฌธ์
์ถ์ฒ: ์ต์ A - ์ด๊ธฐ์๋ README๋ง, ์ถํ ํ์ฅ
๐ Decision Summary¶
| # | ํญ๋ชฉ | ์ถ์ฒ | ์ฐ์ ์์ |
|---|---|---|---|
| 1 | Connection Pool | ์ต์ C (๋ ๋ค ์ง์) | ๐ด Critical |
| 2 | Handler ๊ฒฐ๊ณผ๊ฐ | ์ต์ A (return dict) | ๐ด Critical |
| 3 | Transaction ์ ๊ณต | ์ต์ B (on-demand) | ๐ด Critical |
| 4 | Logging | ์ต์ A (์์ฒด logger) | ๐ด Critical |
| 5 | Worker ์ข ๋ฃ | ์ต์ A (graceful) | ๐ก Important |
| 6 | Dedupe ์ถฉ๋ | ์ต์ B (๊ธฐ์กด id ๋ฐํ) | ๐ก Important |
| 7 | Job Cleanup | ์ต์ B (์์ ์ ๊ณต) | ๐ก Important |
| 8 | Error Serialization | ์ต์ B (traceback ํฌํจ) | ๐ก Important |
| 9 | Type Hints | ์ต์ B (moderate) | ๐ข Nice-to-have |
| 10 | Python ๋ฒ์ | 3.10+ | ๐ข Nice-to-have |
| 11 | Testing | pytest | ๐ข Nice-to-have |
| 12 | Documentation | README only | ๐ข Nice-to-have |
โ Action Items¶
๊ตฌํ ์์ ์ ์:
- Critical decisions (1~4) ํ์
- Important decisions (5~8) ๊ฒํ
- Nice-to-have decisions (9~12) ๊ฐ๋จํ ๊ฒฐ์
- DESIGN.md์ ์ต์ข ๊ฒฐ์ ์ฌํญ ๋ฐ์
Next Step: ์ ๊ฒฐ์ ์ฌํญ์ ํ์ ํ ํ ํจํค์ง ๊ตฌ์กฐ ์์ฑ ์์