Queues: Async Task Processing
Cloudflare Queues provides a simple, powerful message queue with automatic retries, dead letter queues, and batching — no message broker to manage. Think Celery, but without Redis.
Producer / Consumer Pattern
The basic pattern: a producer Worker sends messages to a queue, and a consumer Worker processes them in batches.
# Producer
async def on_fetch(request):
await env.TASK_QUEUE.send({
"task": "process_upload",
"file_id": "abc123",
"user_id": request.headers.get("X-User-ID")
})
return Response.new("Processing started", status=202)
# Consumer (another Worker)
async def queue_handler(batch):
for message in batch.messages:
await process_file(message.body["file_id"])
message.ack() Automatic retries, dead letter queues, and batching are included. No message broker to manage.
Migrating from Celery
If you're coming from Celery + Redis, the migration is straightforward. Here's a side-by-side comparison:
Before: Celery + Redis
# tasks.py
from celery import Celery
import redis
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def process_image(image_url):
# Download, process, save to S3
return "processed"
# main.py
from tasks import process_image
process_image.delay("https://example.com/image.jpg") After: Cloudflare Queues
# No separate task file needed!
async def on_fetch(request):
await env.IMAGE_QUEUE.send({
"url": "https://example.com/image.jpg"
})
return Response.new("Queued for processing")
async def queue_handler(batch):
for msg in batch.messages:
# Download, process, save to R2
msg.ack() Advanced: Task Queue with Decorators
Build a Celery-like task queue interface using the Python SDK:
from cloudflare import Cloudflare
import json
from typing import Callable, Dict, Any
from datetime import datetime
client = Cloudflare()
# Create a queue
queue = client.queues.create(
account_id="your-account-id",
queue_name="background-tasks",
)
class TaskQueue:
"""Celery-like task queue using Cloudflare Queues"""
def __init__(self, client: Cloudflare, account_id: str, queue_id: str):
self.client = client
self.account_id = account_id
self.queue_id = queue_id
self.handlers: Dict[str, Callable] = {}
def task(self, name: str = None):
"""Decorator to register a task"""
def decorator(func: Callable):
task_name = name or func.__name__
self.handlers[task_name] = func
# Return a wrapper that enqueues the task
def delay(*args, **kwargs):
self.send_message({
"task": task_name,
"args": args,
"kwargs": kwargs,
"enqueued_at": datetime.now().isoformat()
})
func.delay = delay
return func
return decorator
def send_message(self, message: Dict):
"""Send a message to the queue"""
self.client.queues.messages.create(
account_id=self.account_id,
queue_id=self.queue_id,
body=json.dumps(message),
)
def process_messages(self, batch_size: int = 10):
"""Process messages from the queue"""
messages = self.client.queues.messages.pull(
account_id=self.account_id,
queue_id=self.queue_id,
batch_size=batch_size,
)
for msg in messages:
try:
body = json.loads(msg.body)
handler = self.handlers.get(body["task"])
if handler:
handler(*body.get("args", []), **body.get("kwargs", {}))
msg.ack()
except Exception as e:
print(f"Task failed: {e}")
# Message will be retried
# Usage
task_queue = TaskQueue(client, "your-account-id", queue.id)
@task_queue.task()
def send_email(to: str, subject: str, body: str):
"""Send an email (example task)"""
print(f"Sending email to {to}: {subject}")
return f"Email sent to {to}"
@task_queue.task()
def process_image(image_url: str, operations: list):
"""Process an image (example task)"""
print(f"Processing image: {image_url}")
return f"Processed with operations: {operations}"
# Enqueue tasks
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
process_image.delay("https://example.com/image.jpg", ["resize", "watermark"]) Async Data Pipeline with Queues
Combine Queues with R2, AI, and D1 for a complete async processing pipeline:
# API endpoint triggers processing
async def on_fetch(request):
file_url = (await request.json())["file_url"]
# Store in R2
file_data = await fetch(file_url)
await env.BUCKET.put(f"uploads/{uuid4()}", file_data.body)
# Queue for processing
await env.PROCESS_QUEUE.send({
"key": key,
"user_id": request.headers.get("X-User-ID")
})
return Response.json({"status": "processing"})
# Queue consumer processes files
async def queue_handler(batch):
for msg in batch.messages:
# Get from R2
obj = await env.BUCKET.get(msg.body["key"])
data = await obj.text()
# Process with AI
summary = await env.AI.run(
"@cf/facebook/bart-large-cnn", text=data
)
# Store results in D1
await env.DB.prepare("""
INSERT INTO summaries (user_id, summary, created_at)
VALUES (?, ?, datetime('now'))
""").bind(msg.body["user_id"], summary.summary).run()
msg.ack() Wrangler Configuration
# wrangler.toml
[[queues.producers]]
binding = "TASK_QUEUE"
queue = "my-task-queue"
[[queues.consumers]]
queue = "my-task-queue"
max_batch_size = 10
max_batch_timeout = 30