Queues: Async Task Processing

Cloudflare Queues provides a simple, powerful message queue with automatic retries, dead letter queues, and batching — no message broker to manage. Think Celery, but without Redis.

Producer / Consumer Pattern

The basic pattern: a producer Worker sends messages to a queue, and a consumer Worker processes them in batches.

# Producer
async def on_fetch(request):
    await env.TASK_QUEUE.send({
        "task": "process_upload",
        "file_id": "abc123",
        "user_id": request.headers.get("X-User-ID")
    })
    return Response.new("Processing started", status=202)

# Consumer (another Worker)
async def queue_handler(batch):
    for message in batch.messages:
        await process_file(message.body["file_id"])
        message.ack()

Automatic retries, dead letter queues, and batching are included. No message broker to manage.

Migrating from Celery

If you're coming from Celery + Redis, the migration is straightforward. Here's a side-by-side comparison:

Before: Celery + Redis

# tasks.py
from celery import Celery
import redis

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def process_image(image_url):
    # Download, process, save to S3
    return "processed"

# main.py
from tasks import process_image
process_image.delay("https://example.com/image.jpg")

After: Cloudflare Queues

# No separate task file needed!
async def on_fetch(request):
    await env.IMAGE_QUEUE.send({
        "url": "https://example.com/image.jpg"
    })
    return Response.new("Queued for processing")

async def queue_handler(batch):
    for msg in batch.messages:
        # Download, process, save to R2
        msg.ack()

Advanced: Task Queue with Decorators

Build a Celery-like task queue interface using the Python SDK:

from cloudflare import Cloudflare
import json
from typing import Callable, Dict, Any
from datetime import datetime

client = Cloudflare()

# Create a queue
queue = client.queues.create(
    account_id="your-account-id",
    queue_name="background-tasks",
)

class TaskQueue:
    """Celery-like task queue using Cloudflare Queues"""

    def __init__(self, client: Cloudflare, account_id: str, queue_id: str):
        self.client = client
        self.account_id = account_id
        self.queue_id = queue_id
        self.handlers: Dict[str, Callable] = {}

    def task(self, name: str = None):
        """Decorator to register a task"""
        def decorator(func: Callable):
            task_name = name or func.__name__
            self.handlers[task_name] = func

            # Return a wrapper that enqueues the task
            def delay(*args, **kwargs):
                self.send_message({
                    "task": task_name,
                    "args": args,
                    "kwargs": kwargs,
                    "enqueued_at": datetime.now().isoformat()
                })
            func.delay = delay
            return func
        return decorator

    def send_message(self, message: Dict):
        """Send a message to the queue"""
        self.client.queues.messages.create(
            account_id=self.account_id,
            queue_id=self.queue_id,
            body=json.dumps(message),
        )

    def process_messages(self, batch_size: int = 10):
        """Process messages from the queue"""
        messages = self.client.queues.messages.pull(
            account_id=self.account_id,
            queue_id=self.queue_id,
            batch_size=batch_size,
        )
        for msg in messages:
            try:
                body = json.loads(msg.body)
                handler = self.handlers.get(body["task"])
                if handler:
                    handler(*body.get("args", []), **body.get("kwargs", {}))
                msg.ack()
            except Exception as e:
                print(f"Task failed: {e}")
                # Message will be retried

# Usage
task_queue = TaskQueue(client, "your-account-id", queue.id)

@task_queue.task()
def send_email(to: str, subject: str, body: str):
    """Send an email (example task)"""
    print(f"Sending email to {to}: {subject}")
    return f"Email sent to {to}"

@task_queue.task()
def process_image(image_url: str, operations: list):
    """Process an image (example task)"""
    print(f"Processing image: {image_url}")
    return f"Processed with operations: {operations}"

# Enqueue tasks
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
process_image.delay("https://example.com/image.jpg", ["resize", "watermark"])

Async Data Pipeline with Queues

Combine Queues with R2, AI, and D1 for a complete async processing pipeline:

# API endpoint triggers processing
async def on_fetch(request):
    file_url = (await request.json())["file_url"]

    # Store in R2
    file_data = await fetch(file_url)
    await env.BUCKET.put(f"uploads/{uuid4()}", file_data.body)

    # Queue for processing
    await env.PROCESS_QUEUE.send({
        "key": key,
        "user_id": request.headers.get("X-User-ID")
    })
    return Response.json({"status": "processing"})

# Queue consumer processes files
async def queue_handler(batch):
    for msg in batch.messages:
        # Get from R2
        obj = await env.BUCKET.get(msg.body["key"])
        data = await obj.text()

        # Process with AI
        summary = await env.AI.run(
            "@cf/facebook/bart-large-cnn", text=data
        )

        # Store results in D1
        await env.DB.prepare("""
            INSERT INTO summaries (user_id, summary, created_at)
            VALUES (?, ?, datetime('now'))
        """).bind(msg.body["user_id"], summary.summary).run()

        msg.ack()

Wrangler Configuration

# wrangler.toml
[[queues.producers]]
binding = "TASK_QUEUE"
queue = "my-task-queue"

[[queues.consumers]]
queue = "my-task-queue"
max_batch_size = 10
max_batch_timeout = 30