Quick Start Guide¶
Get up and running with pqrun in 5 minutes.
Prerequisites¶
- Python 3.10+
- PostgreSQL 12+
- A PostgreSQL database (can be local or remote)
Step 1: Install¶
Step 2: Setup Database¶
Create the jobs table and indexes:
# Using psql
psql $DATABASE_URL < venv/lib/python*/site-packages/pqrun/ddl.sql
# Or manually
psql $DATABASE_URL -c "$(cat venv/lib/python*/site-packages/pqrun/ddl.sql)"
Or in Python:
import asyncpg
import asyncio
async def setup_db():
conn = await asyncpg.connect("postgresql://user:pass@localhost/mydb")
# Read DDL from package
import pqrun
from pathlib import Path
ddl_path = Path(pqrun.__file__).parent / "ddl.sql"
ddl = ddl_path.read_text()
await conn.execute(ddl)
await conn.close()
print("✓ Database schema created")
asyncio.run(setup_db())
Step 3: Create Your App¶
Create main.py:
import os
from fastapi import FastAPI
from pqrun import PgJobStore, Worker, JobContext
# 1. Define your job handler
async def send_email(ctx: JobContext) -> dict:
user_id = ctx.job.payload["user_id"]
template = ctx.job.payload["template"]
# TODO: Implement your email sending logic
print(f"Sending {template} email to user {user_id}")
return {"status": "sent", "user_id": user_id}
# 2. Setup store and worker
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/mydb")
store = PgJobStore(dsn=DATABASE_URL)
worker = Worker(
store=store,
handlers={
"send_email": send_email,
}
)
# 3. Create FastAPI app
app = FastAPI(lifespan=worker.lifespan)
# 4. Add enqueue endpoint
@app.post("/send-email")
async def enqueue_email(user_id: int, template: str = "welcome"):
job_id = await store.enqueue(
job_type="send_email",
payload={"user_id": user_id, "template": template}
)
return {"job_id": job_id, "status": "enqueued"}
@app.get("/")
async def root():
return {"message": "pqrun is running!"}
Step 4: Run¶
# Set your database URL
export DATABASE_URL="postgresql://user:pass@localhost/mydb"
# Start the server
uvicorn main:app --reload
Step 5: Test¶
# Enqueue a job
curl -X POST "http://localhost:8000/send-email?user_id=123&template=welcome"
# Response:
# {"job_id": 1, "status": "enqueued"}
# Check the logs - you should see:
# INFO:pqrun.worker:Executing job 1 type=send_email (attempt 1)
# Sending welcome email to user 123
# INFO:pqrun.store:Job 1 completed (duration=...ms)
Next Steps¶
Add More Handlers¶
async def process_payment(ctx: JobContext) -> dict:
payment_id = ctx.job.payload["payment_id"]
# ... payment processing logic ...
return {"payment_id": payment_id, "status": "completed"}
worker = Worker(
store=store,
handlers={
"send_email": send_email,
"process_payment": process_payment, # Add more handlers
}
)
Use Deduplication¶
Prevent duplicate jobs:
@app.post("/send-welcome-email/{user_id}")
async def send_welcome(user_id: int):
job_id = await store.enqueue(
job_type="send_email",
payload={"user_id": user_id, "template": "welcome"},
dedupe_key=f"welcome:user:{user_id}" # Only one welcome email per user
)
return {"job_id": job_id}
Current behavior:
- With dedupe_key, enqueue returns the existing active job id when duplicated.
Schedule Delayed Jobs¶
from datetime import datetime, timedelta
@app.post("/schedule-reminder/{user_id}")
async def schedule_reminder(user_id: int, hours: int = 24):
run_at = datetime.now() + timedelta(hours=hours)
job_id = await store.enqueue(
job_type="send_email",
payload={"user_id": user_id, "template": "reminder"},
run_after=run_at # Job will run after specified time
)
return {"job_id": job_id, "scheduled_at": run_at.isoformat()}
Chain Jobs¶
Create follow-up jobs from handlers:
async def process_order(ctx: JobContext) -> dict:
order_id = ctx.job.payload["order_id"]
# Process the order
# ...
# Chain: Send confirmation email
await ctx.store.enqueue(
job_type="send_email",
payload={"user_id": user_id, "template": "order_confirmation"}
)
return {"order_id": order_id, "status": "processed"}
Monitor Jobs¶
@app.get("/jobs/stats")
async def job_stats():
async with store.connection() as conn:
rows = await conn.fetch("""
SELECT status, count(*) as count
FROM jobs
GROUP BY status
ORDER BY status
""")
return {row["status"]: row["count"] for row in rows}
Separate Worker and API¶
For production, run workers separately:
# Terminal 1: API only
WORKER_ENABLED=false uvicorn main:app
# Terminal 2: Worker only (multiple instances)
WORKER_ENABLED=true WORKER_CONCURRENCY=4 python -c "
from main import worker, store
import asyncio
async def run_worker():
# Simulate FastAPI lifespan
async with worker.lifespan(None):
# Keep running
await asyncio.Event().wait()
asyncio.run(run_worker())
"
Or reuse the provided example module:
Current behavior: - Shutdown stops new pickup and waits briefly for in-flight jobs, then cancels remaining tasks (bounded total wait: 30s by default) and uses reaper-based recovery for interrupted RUNNING jobs.
Troubleshooting¶
"No handler registered for job_type=X"¶
Make sure the handler is registered in the handlers dict:
Jobs not being picked up¶
Check:
1. Worker is enabled: WORKER_ENABLED=true
2. Database connection is working
3. Jobs table exists: \dt jobs in psql
4. Jobs are in READY status: SELECT status, count(*) FROM jobs GROUP BY status;
Jobs stuck in RUNNING¶
The reaper will automatically recover stale jobs after 20 minutes (default). Or manually reset:
Learn More¶
- Full Documentation: Complete feature guide
- Design Document: Architecture and internals
- Examples: More complex patterns
- SQL Patterns: Batch jobs, triggers, pg_cron
You're all set! 🎉