DRAFT

This site and guide is freshly launched. If you come across this page, it’s because Google has sent you here before I did final edits and verificaitons on this information.


Cloudflare for Python Developers: Your Edge-Native Development Stack

Fellow Pythonistas, let’s talk about something that might surprise you: Cloudflare isn’t just a CDN anymore. It’s become a full-stack platform that speaks our language—literally. With the recent launch of Python Workers (beta), you can now deploy Python code to 300+ data centers worldwide without wrestling with JavaScript or infrastructure. Here’s why the Python community should pay attention.

The Game Changer: Python Workers

from js import Response
async def on_fetch(request):
return Response.new("Hello from Python on the edge!")

That’s it. That’s a globally deployed Python application. No containers, no Kubernetes, no cold starts worth worrying about. Python Workers brings first-class Python support to Cloudflare’s edge network, complete with:

The AI/ML Stack That Actually Scales

As Python developers, we’re often at the forefront of AI/ML adoption. Cloudflare’s edge-native AI stack eliminates the typical deployment headaches:

Workers AI: Inference Without Infrastructure

from js import Response, env
async def on_fetch(request):
ai = env.AI
response = await ai.run(
"@cf/meta/llama-3-8b-instruct",
messages=[{"role": "user", "content": "Explain quantum computing"}]
)
return Response.json(response)

50+ open-source models available instantly. No GPU provisioning. No model serving frameworks. Just ai.run() and you’re in production.

Vectorize: Your Edge-Native Vector Database

Building RAG applications? Semantic search? Vectorize gives you a globally distributed vector database that works seamlessly with Workers AI:

# Generate embeddings and store them
embedding = await ai.run("@cf/baai/bge-base-en-v1.5", text="Python is awesome")
await env.VECTORIZE_INDEX.insert([{
"id": "1",
"values": embedding.data[0],
"metadata": {"language": "python", "sentiment": "positive"}
}])

AutoRAG: Because Building RAG is Tedious

Skip the boilerplate. AutoRAG handles document ingestion, chunking, embedding, and retrieval automatically:

# That's it. Your documents are now queryable.
results = await env.AUTORAG.query("How do I implement async generators?")

Data Layer Without the DevOps

D1: SQLite at Scale

from js import Response, env
async def on_fetch(request):
db = env.DB
# It's just SQLite, but globally distributed
results = await db.prepare("""
SELECT user_id, COUNT(*) as request_count
FROM api_logs
WHERE timestamp > datetime('now', '-1 hour')
GROUP BY user_id
""").all()
return Response.json(results)

Time Travel included. Point-in-time recovery for 30 days. No backup scripts needed.

R2: S3-Compatible Storage, Zero Egress Fees

# Your boto3 code works as-is
import boto3
s3 = boto3.client('s3',
endpoint_url='https://YOUR_ACCOUNT_ID.r2.cloudflarestorage.com',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY'
)
# Upload that 10GB model file. Download it 1000 times. Pay $0 in bandwidth.
s3.upload_file('model.pkl', 'ml-artifacts', 'models/latest.pkl')

Async Processing That Makes Sense

Queues: It’s Like Celery, But Simpler

# Producer
async def on_fetch(request):
await env.TASK_QUEUE.send({
"task": "process_upload",
"file_id": "abc123",
"user_id": request.headers.get("X-User-ID")
})
return Response.new("Processing started", status=202)
# Consumer (another Worker)
async def queue_handler(batch):
for message in batch.messages:
await process_file(message.body["file_id"])
message.ack()

Automatic retries, dead letter queues, and batching included. No message broker to manage.

The “I Can’t Believe This Works” Features

Browser Rendering: Puppeteer as a Service

async def on_fetch(request):
browser = env.BROWSER
# Screenshot any website, extract data, generate PDFs
page = await browser.new_page()
await page.goto("https://news.ycombinator.com")
screenshot = await page.screenshot()
return Response.new(screenshot, headers={"Content-Type": "image/png"})

Hyperdrive: Make Your Postgres 10x Faster

Your existing Django/SQLAlchemy app connecting to RDS? Add Hyperdrive, change one connection string:

# Before: postgresql://user:pass@db.region.rds.amazonaws.com/myapp
# After: postgresql://user:pass@hyperdrive-id.hyperdrive.workers.dev/myapp
# That's it. Global connection pooling and caching. No code changes.

Analytics Engine: Forget ClickHouse, Forget TimescaleDB

async def on_fetch(request):
# Write millions of events
await env.ANALYTICS.writeDataPoint({
"index": ["user_id", "endpoint"],
"dimension": [request.headers.get("X-User-ID"), request.url],
"metric": {"response_time": 42, "status_code": 200}
})
# Query with SQL
results = await env.ANALYTICS.query("""
SELECT endpoint, AVG(response_time) as avg_latency
FROM analytics
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY endpoint
""")

The Developer Experience We Deserve

Local Development That Actually Works

Terminal window
# It just works™
npx wrangler dev main.py
# Hot reload? ✓
# Local bindings? ✓
# Debugging? ✓

Deployment in Seconds

Terminal window
npx wrangler deploy main.py
# ⛅️ Deployed to 300+ locations in ~30 seconds

Observability Built-In

Why This Matters for Python Developers

  1. Edge-First Architecture: Your code runs <50ms from users globally. No region selection. No multi-region complexity.

  2. Billing That Makes Sense: Pay for what you use. No idle servers. First 100k requests/day free.

  3. Security by Default: Automatic DDoS protection, WAF rules, and rate limiting. Your Flask app never could.

  4. Ecosystem Integration: Your existing Python knowledge transfers. SQLAlchemy patterns work with D1. Boto3 works with R2. FastAPI works in Workers.

  5. No Ops Required: No Kubernetes. No Docker. No load balancers. No auto-scaling groups. Just Python.

Real-World Patterns

Pattern 1: AI-Powered API

from js import Response, env
import json
async def on_fetch(request):
if request.method == "POST":
data = await request.json()
# Generate embedding
embedding = await env.AI.run(
"@cf/baai/bge-base-en-v1.5",
text=data["query"]
)
# Semantic search
results = await env.VECTORIZE_INDEX.query(
embedding.data[0],
topK=5
)
# Generate response
context = "\n".join([r.metadata.content for r in results.matches])
response = await env.AI.run(
"@cf/meta/llama-3-8b-instruct",
messages=[
{"role": "system", "content": f"Context: {context}"},
{"role": "user", "content": data["query"]}
]
)
return Response.json({"answer": response.response})

Pattern 2: Async Data Pipeline

# API endpoint triggers processing
async def on_fetch(request):
file_url = (await request.json())["file_url"]
# Store in R2
file_data = await fetch(file_url)
await env.BUCKET.put(f"uploads/{uuid4()}", file_data.body)
# Queue for processing
await env.PROCESS_QUEUE.send({
"key": key,
"user_id": request.headers.get("X-User-ID")
})
return Response.json({"status": "processing"})
# Queue consumer processes files
async def queue_handler(batch):
for msg in batch.messages:
# Get from R2
obj = await env.BUCKET.get(msg.body["key"])
data = await obj.text()
# Process with AI
summary = await env.AI.run(
"@cf/facebook/bart-large-cnn",
text=data
)
# Store results in D1
await env.DB.prepare("""
INSERT INTO summaries (user_id, summary, created_at)
VALUES (?, ?, datetime('now'))
""").bind(msg.body["user_id"], summary.summary).run()
msg.ack()

The Great Python Library Replacement Guide

Here’s what you can stop installing and start using on Cloudflare’s platform:

Web Frameworks & Servers

You’re UsingReplace WithWhy Switch
Gunicorn/uWSGI/UvicornPython WorkersNo server management, auto-scaling, global deployment
Flask/FastAPIPython Workers + FastAPIFastAPI is pre-installed, runs at the edge
Nginx reverse proxyCloudflare Load BalancerBuilt-in, no configuration needed

Data Storage

You’re UsingReplace WithWhy Switch
Redis/MemcachedWorkers KVGlobally distributed, no server management
PostgreSQL/MySQLD1 (for new apps)Serverless, automatic backups, Time Travel
SQLAlchemy + PostgresHyperdrive + SQLAlchemyKeep your ORM, get 10x performance
boto3 + S3boto3 + R2Same API, zero egress fees
Pinecone/WeaviateVectorizeIntegrated with Workers, no separate billing

Task Queues & Background Jobs

You’re UsingReplace WithWhy Switch
Celery + RabbitMQ/RedisQueuesNo broker needed, automatic retries
APSchedulerCron TriggersSimpler configuration, guaranteed execution
Python RQQueuesBuilt-in dead letter queues, batching

AI/ML Infrastructure

You’re UsingReplace WithWhy Switch
Transformers + CUDAWorkers AINo GPU management, 50+ models ready
LangChain + OpenAILangChain + Workers AILangChain pre-installed, multiple models
ChromaDB/FAISSVectorizeManaged vector search, global distribution
Hugging Face Inference APIWorkers AILower latency, integrated platform

Web Scraping & Automation

You’re UsingReplace WithWhy Switch
Selenium/PlaywrightBrowser RenderingNo browser management, API-based
BeautifulSoup + requestsBrowser RenderingJavaScript rendering included
PuppeteerBrowser RenderingManaged browsers, no memory leaks

Analytics & Monitoring

You’re UsingReplace WithWhy Switch
InfluxDB/TimescaleDBAnalytics EngineUnlimited cardinality, SQL queries
Prometheus + GrafanaAnalytics EngineNo infrastructure, built-in visualization
Custom analytics with PandasAnalytics EngineReal-time processing, no data pipeline

Caching & CDN

You’re UsingReplace WithWhy Switch
VarnishCloudflare CacheGlobal edge caching, no configuration
django-cacheCache APIEdge caching, programmatic control
Flask-CachingWorkers KVPersistent, globally distributed

API Management

You’re UsingReplace WithWhy Switch
Kong/TraefikAPI ShieldBuilt-in rate limiting, schema validation
OAuth2/JWT librariesZero TrustManaged authentication, no token management
Flask-LimiterRate LimitingEdge-based, DDoS protection included

Real Migration Examples

Example 1: From Celery to Queues

Before (Celery + Redis):

tasks.py
from celery import Celery
import redis
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def process_image(image_url):
# Download, process, save to S3
return "processed"
# main.py
from tasks import process_image
process_image.delay("https://example.com/image.jpg")

After (Cloudflare Queues):

# No separate task file needed!
async def on_fetch(request):
await env.IMAGE_QUEUE.send({
"url": "https://example.com/image.jpg"
})
return Response.new("Queued for processing")
async def queue_handler(batch):
for msg in batch.messages:
# Download, process, save to R2
msg.ack()

Example 2: From FastAPI + Gunicorn + Nginx to Python Workers

Before:

app.py
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
# Database query
return {"user_id": user_id}
# Dockerfile, nginx.conf, gunicorn config, k8s manifests...

After:

main.py
from fastapi import FastAPI
from js import Response
app = FastAPI()
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
result = await env.DB.prepare(
"SELECT * FROM users WHERE id = ?"
).bind(user_id).first()
return {"user": result}
# That's it. Deploy with: wrangler deploy

Example 3: From Pandas + PostgreSQL to Analytics Engine

Before:

import pandas as pd
import psycopg2
from sqlalchemy import create_engine
# Complex ETL pipeline
engine = create_engine('postgresql://...')
df = pd.read_sql("SELECT * FROM events WHERE...", engine)
df_grouped = df.groupby(['user_id', 'action']).agg({
'duration': 'mean',
'timestamp': 'count'
})
df_grouped.to_sql('analytics_summary', engine)

After:

# Write events directly
await env.ANALYTICS.writeDataPoint({
"dimension": [user_id, action],
"metric": {"duration": duration}
})
# Query with SQL
results = await env.ANALYTICS.query("""
SELECT
user_id,
action,
AVG(duration) as avg_duration,
COUNT(*) as count
FROM analytics
GROUP BY user_id, action
""")

From Docker Compose to Cloudflare: What You Don’t Need Anymore

If you’re used to containerizing Python apps, here’s the paradigm shift: Cloudflare’s platform replaces your entire Docker stack. Let’s look at a typical docker-compose.yml and what disappears:

Your Typical Docker Compose Stack

docker-compose.yml
version: '3.8'
services:
nginx:
image: nginx:alpine # ❌ Not needed - Workers serve static assets
volumes:
- ./static:/usr/share/nginx/html
- ./nginx.conf:/etc/nginx/nginx.conf
app:
build: .
image: myapp:latest # ❌ Not needed - Deploy Python directly
command: gunicorn app:app --workers 4 # ❌ No WSGI server needed
environment:
- DATABASE_URL=postgresql://...
redis:
image: redis:alpine # ❌ Use Workers KV instead
postgres:
image: postgres:14 # ❌ Use D1 or Hyperdrive
volumes:
- pgdata:/var/lib/postgresql/data
celery:
build: .
command: celery -A tasks worker # ❌ Use Queues instead
celery-beat:
build: .
command: celery -A tasks beat # ❌ Use Cron Triggers
prometheus:
image: prom/prometheus # ❌ Use Analytics Engine
grafana:
image: grafana/grafana # ❌ Built-in analytics

The Cloudflare Equivalent

# main.py - Your entire "stack" in one file
from js import Response, env
from fastapi import FastAPI
import asyncio
app = FastAPI()
# Serve static assets directly
@app.get("/static/{path:path}")
async def static(path: str):
# Workers can serve from KV or R2
asset = await env.ASSETS.get(path)
return Response.new(asset.body, headers={
"Content-Type": asset.httpMetadata.contentType
})
# Your app logic
@app.post("/api/process")
async def process(data: dict):
# Cache in KV (replaces Redis)
await env.KV.put(f"cache:{data['id']}", data)
# Queue background work (replaces Celery)
await env.QUEUE.send(data)
# Store in D1 (replaces Postgres)
await env.DB.prepare(
"INSERT INTO items (data) VALUES (?)"
).bind(data).run()
# Track metrics (replaces Prometheus)
await env.ANALYTICS.writeDataPoint({
"dimension": ["process"],
"metric": {"count": 1}
})
return {"status": "processed"}
# Scheduled tasks (replaces Celery Beat)
# Configure in wrangler.toml:
# [triggers]
# crons = ["0 * * * *"]

What Each Docker Service Becomes

Docker ServicePurposeCloudflare ReplacementWhat Changes
nginxReverse proxy, static files, SSLWorkers built-inNo config files, automatic SSL, global CDN
gunicorn/uwsgiWSGI serverWorkers runtimeNo process management, auto-scaling
redisCaching, sessions, queuesWorkers KVGlobally distributed, no memory limits
postgres/mysqlPrimary databaseD1 or HyperdriveServerless or accelerated existing DB
celery workersBackground tasksQueuesNo broker needed, automatic retries
celery beatScheduled tasksCron TriggersSimple cron syntax, guaranteed execution
nginx (static)Serve static assetsWorkers + R2/KVGlobal CDN included, no nginx.conf
prometheusMetrics collectionAnalytics EngineNo scraping, unlimited cardinality
grafanaVisualizationCloudflare DashboardBuilt-in analytics, API access
elasticsearchFull-text searchR2 + Workers AIUse embeddings for semantic search
rabbitmqMessage brokerQueuesDirect producer-consumer, no broker
memcachedIn-memory cacheWorkers KVPersistent, globally distributed
haproxyLoad balancerCloudflare LBAutomatic, no configuration

The Dockerfile You Don’t Write

Before (Multi-stage Dockerfile):

# Dockerfile
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY . .
RUN apt-get update && apt-get install -y curl # Health checks
EXPOSE 8000
CMD ["gunicorn", "app:app", "--bind", "0.0.0.0:8000", "--workers", "4"]

After (No Dockerfile):

main.py
from js import Response
async def on_fetch(request):
return Response.new("Hello from Python!")
# Deploy with: wrangler deploy main.py
# That's it. No containers. No images. No registry.

Environment Variables & Secrets

Docker approach:

docker-compose.yml
environment:
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=${REDIS_URL}
- SECRET_KEY=${SECRET_KEY}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}

Cloudflare approach:

Terminal window
# Set secrets (encrypted at rest)
wrangler secret put DATABASE_URL
wrangler secret put API_KEY
# Access in code
async def on_fetch(request):
db_url = env.DATABASE_URL # Securely injected

Development Workflow Comparison

Docker Workflow:

Terminal window
# Local development
docker-compose up -d
docker-compose logs -f app
docker-compose exec app pytest
docker-compose down
# Build and push
docker build -t myapp:latest .
docker push registry.com/myapp:latest
# Deploy (still need k8s, ECS, etc.)
kubectl apply -f k8s/

Cloudflare Workflow:

Terminal window
# Install wrangler (once)
bun install -g wrangler
# Local development
wrangler dev main.py # Hot reload included
# Run tests
python -m pytest # Just regular Python
# Deploy to production
wrangler deploy # 30 seconds to global deployment

Volume Mounts Become Bindings

Docker volumes:

volumes:
- ./static:/app/static # Static files
- ./uploads:/app/uploads # User uploads
- pgdata:/var/lib/postgresql/data # Database
- ./logs:/app/logs # Log files

Cloudflare bindings:

wrangler.toml
[[kv_namespaces]]
binding = "STATIC" # Replaces static file volume
[[r2_buckets]]
binding = "UPLOADS" # Replaces uploads volume
[[d1_databases]]
binding = "DB" # Replaces database volume
# Logs automatically available via wrangler tail

The Mental Model Shift

  1. From Containers to Functions: Stop thinking about long-running processes. Your code runs on-demand.

  2. From Orchestration to Platform: No need for Kubernetes, Docker Swarm, or ECS. The platform handles it.

  3. From Regional to Global: Your app doesn’t run in us-east-1. It runs everywhere, automatically.

  4. From Configuration to Convention: No more nginx.conf, gunicorn.conf, redis.conf. Sensible defaults that just work.

  5. From Monitoring to Observability: Logs, metrics, and traces are built-in, not bolted on.

A Complete Example: URL Shortener

Traditional Docker Stack (5 files, 3 containers):

├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── app.py
└── nginx.conf

Cloudflare Stack (1 file):

main.py
from js import Response, env
import hashlib
async def on_fetch(request):
url = request.url
path = url.pathname
if request.method == "POST":
# Create short URL
data = await request.json()
long_url = data["url"]
short_code = hashlib.md5(long_url.encode()).hexdigest()[:6]
# Store in KV (replaces Redis/Postgres)
await env.URLS.put(short_code, long_url)
return Response.json({
"short_url": f"https://{url.hostname}/{short_code}"
})
elif path != "/":
# Redirect
short_code = path[1:] # Remove leading /
long_url = await env.URLS.get(short_code)
if long_url:
return Response.redirect(long_url, 301)
else:
return Response.new("Not found", status=404)
# Serve homepage (no nginx needed)
return Response.new("""
<form method="post">
<input name="url" placeholder="Enter URL">
<button>Shorten</button>
</form>
""", headers={"Content-Type": "text/html"})

Deploy with wrangler deploy. No Docker, no nginx, no Redis. Just Python.

Debugging Without SSH: The Edge Developer’s Toolkit

You’re absolutely right - the inability to SSH into containers or tail stdout directly is a fundamental shift. Here’s how to debug effectively in the Workers/Durable Objects world:

1. Real-Time Log Streaming with Wrangler Tail

Terminal window
# Stream logs from production in real-time
wrangler tail
# Filter logs by status, method, or IP
wrangler tail --status 500
wrangler tail --method POST
wrangler tail --ip 192.168.1.1
# Pretty print with formatting
wrangler tail --format pretty
# Save logs to file for analysis
wrangler tail > debug.log

2. Console Logging That Actually Works

# Unlike containers, console.log goes to wrangler tail
async def on_fetch(request):
console.log("Request received:", request.url)
console.log("Headers:", dict(request.headers))
try:
result = await risky_operation()
console.log("Success:", result)
except Exception as e:
console.error("Failed:", str(e))
console.error("Stack trace:", traceback.format_exc())
return Response.new("Done")

3. Debugging Startup Issues

When your Worker won’t start (syntax errors, import issues, binding problems):

# debug_wrapper.py - Wrap your entire module
import sys
import traceback
try:
# Your imports and code here
from js import Response, env
import fastapi
async def on_fetch(request):
return Response.new("Worker started successfully!")
except Exception as e:
# This WILL show in deployment logs
print(f"STARTUP ERROR: {e}", file=sys.stderr)
print(f"TRACEBACK: {traceback.format_exc()}", file=sys.stderr)
# Create a minimal handler that reports the error
async def on_fetch(request):
return Response.new(
f"Worker failed to start: {str(e)}\n\n{traceback.format_exc()}",
status=500,
headers={"Content-Type": "text/plain"}
)

4. Progressive Debugging Strategy

# Start with the absolute minimum
async def on_fetch(request):
return Response.new("Stage 1: Basic handler works")
# Then add imports one by one
from js import Response
async def on_fetch(request):
return Response.new("Stage 2: Imports work")
# Then add bindings
from js import Response, env
async def on_fetch(request):
try:
# Test each binding
kv_test = await env.KV.get("test")
return Response.new(f"Stage 3: KV binding works: {kv_test}")
except Exception as e:
return Response.new(f"KV binding failed: {e}", status=500)
# Finally add your logic

5. Local Development with Full Debugging

Terminal window
# Run locally with all debugging tools available
wrangler dev main.py --local
# Now you can:
# - Use Python debugger (pdb)
# - Set breakpoints
# - Inspect variables
# - See full stack traces
# Use pdb in local development
import pdb
async def on_fetch(request):
pdb.set_trace() # Works in local dev!
data = await request.json()
return Response.json({"received": data})

6. Error Boundaries and Detailed Responses

from js import Response, env
import traceback
import json
async def on_fetch(request):
try:
# Your actual logic
return await handle_request(request)
except Exception as e:
# In development, return detailed errors
if env.ENVIRONMENT == "development":
return Response.new(
json.dumps({
"error": str(e),
"type": type(e).__name__,
"traceback": traceback.format_exc(),
"request": {
"url": str(request.url),
"method": request.method,
"headers": dict(request.headers)
}
}, indent=2),
status=500,
headers={"Content-Type": "application/json"}
)
else:
# In production, log details but return generic error
console.error(f"Error: {e}", traceback.format_exc())
return Response.new("Internal Server Error", status=500)

7. Durable Object Debugging

Durable Objects are even trickier since they’re stateful:

class Counter:
def __init__(self, state, env):
self.state = state
self.env = env
# Debug: Log initialization
console.log(f"DO initialized with id: {state.id}")
async def fetch(self, request):
# Debug: Log every request
console.log(f"DO fetch: {request.method} {request.url}")
# Add debug endpoint
if request.url.pathname == "/debug":
# Return internal state for debugging
storage_keys = await self.state.storage.list()
return Response.json({
"id": self.state.id.toString(),
"storage_keys": list(storage_keys.keys()),
"env_bindings": list(self.env.keys())
})
# Regular logic with error handling
try:
# Your logic here
pass
except Exception as e:
console.error(f"DO error: {e}")
return Response.new(f"DO Error: {e}", status=500)

8. Request Replay for Debugging

# Capture and replay problematic requests
async def on_fetch(request):
# Clone request for logging (body can only be read once)
request_data = {
"url": str(request.url),
"method": request.method,
"headers": dict(request.headers),
"body": await request.text() if request.body else None
}
try:
# Your logic
response = await handle_request(request_data)
return response
except Exception as e:
# Log the full request for replay
console.error("Failed request:", json.dumps(request_data))
console.error("Error:", str(e))
# Store in KV for later debugging
await env.DEBUG_REQUESTS.put(
f"error_{datetime.now().isoformat()}",
json.dumps({
"request": request_data,
"error": str(e),
"traceback": traceback.format_exc()
})
)
return Response.new("Error logged", status=500)

9. Health Check and Status Endpoints

async def on_fetch(request):
url = request.url
# Add health/debug endpoints
if url.pathname == "/health":
# Test all bindings and dependencies
checks = {}
try:
await env.KV.get("health_check")
checks["kv"] = "ok"
except:
checks["kv"] = "failed"
try:
await env.DB.prepare("SELECT 1").first()
checks["d1"] = "ok"
except:
checks["d1"] = "failed"
try:
await env.QUEUE.send({"test": True})
checks["queue"] = "ok"
except:
checks["queue"] = "failed"
status = 200 if all(v == "ok" for v in checks.values()) else 503
return Response.json({
"status": "healthy" if status == 200 else "unhealthy",
"checks": checks,
"timestamp": datetime.now().isoformat()
}, status=status)
# Regular routing
return await handle_request(request)

10. Using Logpush for Production Debugging

Terminal window
# Configure Logpush to send logs to R2 for analysis
wrangler logpush create \
--dataset workers_trace_events \
--destination r2://logs-bucket/workers/{DATE}
# Then analyze with Python!
analyze_logs.py
import json
import boto3
s3 = boto3.client('s3',
endpoint_url='https://YOUR_ACCOUNT.r2.cloudflarestorage.com'
)
# Download logs
s3.download_file('logs-bucket', 'workers/2024-01-20/batch_1.json', 'logs.json')
# Analyze errors
with open('logs.json') as f:
for line in f:
log = json.loads(line)
if log.get('outcome') == 'exception':
print(f"Error at {log['timestamp']}: {log['exceptions']}")
print(f"Request: {log['request']['url']}")

Key Debugging Principles for Edge Computing

  1. Log Early, Log Often: Unlike containers where you can inspect state, you need proactive logging
  2. Fail Gracefully: Always wrap in try/catch and return meaningful errors
  3. Use Local Dev: Most debugging should happen in wrangler dev --local
  4. Progressive Enhancement: Start minimal, add complexity gradually
  5. Health Endpoints: Always include debug/health endpoints
  6. Error Storage: Store failed requests in KV/R2 for post-mortem analysis

The lack of SSH is actually a feature - it forces you to build observable, debuggable systems from the start. No more “SSH in and check what’s wrong” - your code must tell you what’s wrong.

The Cloudflare Python SDK: Your Swiss Army Knife

While Python Workers are revolutionary for edge computing, the Cloudflare Python SDK (pip install cloudflare) gives you programmatic control over Cloudflare’s entire platform from your existing Python applications. Think of it as boto3 for Cloudflare—but cleaner, type-safe, and covering everything from DNS to AI.

Installation & Setup

Terminal window
pip install cloudflare
# or with your favorite package manager
uv pip install cloudflare
poetry add cloudflare
import os
from cloudflare import Cloudflare
# Async support built-in
from cloudflare import AsyncCloudflare
client = Cloudflare(
api_token=os.environ.get("CLOUDFLARE_API_TOKEN"), # Recommended
# or use api_email + api_key (legacy)
)

Type Safety & Auto-completion

Every API call is fully typed with TypedDicts for requests and Pydantic models for responses:

# Your IDE knows exactly what fields are available
zone = client.zones.create(
account={"id": "your-account-id"},
name="example.com",
type="full", # IDE autocompletes: "full" | "partial" | "secondary"
)
# Response objects are Pydantic models
print(zone.id) # Fully typed!
print(zone.to_json(indent=2)) # Easy serialization
zone_dict = zone.to_dict() # Convert to dict

Practical SDK Recipes for Python Developers

1. R2 Object Storage: Your S3 Replacement

from cloudflare import Cloudflare
import boto3
from datetime import timedelta
client = Cloudflare()
# Create an R2 bucket
bucket = client.r2.buckets.create(
account_id="your-account-id",
name="ml-models",
storage_class="Standard",
)
# Get temporary S3-compatible credentials
creds = client.r2.temporary_credentials.create(
account_id="your-account-id",
bucket=bucket.name,
permission="read_write",
ttl=timedelta(hours=1),
)
# Use with boto3 - zero code changes!
s3 = boto3.client(
"s3",
endpoint_url=f"https://{creds.account_id}.r2.cloudflarestorage.com",
aws_access_key_id=creds.access_key_id,
aws_secret_access_key=creds.secret_access_key,
aws_session_token=creds.session_token,
)
# Upload your ML model
s3.upload_file("model.pkl", bucket.name, "models/latest.pkl")
# List objects - no egress fees!
for obj in s3.list_objects_v2(Bucket=bucket.name)["Contents"]:
print(f"{obj['Key']}: {obj['Size']} bytes")

2. AI Translation API: Polyglot in 3 Lines

from cloudflare import Cloudflare
client = Cloudflare()
# Translate text using Meta's M2M model
result = client.ai.run(
"@cf/meta/m2m100-1.2b",
account_id="your-account-id",
text="Hello, how are you?",
source_lang="english",
target_lang="spanish",
)
print(result) # "Hola, ¿cómo estás?"
# Batch translation for efficiency
texts = ["Hello", "Goodbye", "Thank you"]
translations = []
for text in texts:
result = client.ai.run(
"@cf/meta/m2m100-1.2b",
account_id="your-account-id",
text=text,
source_lang="english",
target_lang="japanese",
)
translations.append(result)

3. DNS Management: Infrastructure as Code

from cloudflare import Cloudflare
from typing import List, Dict
client = Cloudflare()
def sync_dns_records(zone_id: str, desired_records: List[Dict]):
"""Declarative DNS management - define desired state"""
# Get current records
current_records = list(client.dns.records.list(zone_id=zone_id))
# Create a map for easy lookup
current_map = {(r.type, r.name): r for r in current_records}
for desired in desired_records:
key = (desired["type"], desired["name"])
if key in current_map:
# Update existing record
record = current_map[key]
client.dns.records.update(
zone_id=zone_id,
dns_record_id=record.id,
**desired
)
print(f"Updated: {desired['name']} ({desired['type']})")
else:
# Create new record
client.dns.records.create(
zone_id=zone_id,
**desired
)
print(f"Created: {desired['name']} ({desired['type']})")
# Define your DNS configuration
my_dns_config = [
{
"type": "A",
"name": "api.example.com",
"content": "192.0.2.1",
"proxied": True, # Enable Cloudflare proxy
},
{
"type": "CNAME",
"name": "www.example.com",
"content": "example.com",
"proxied": True,
},
{
"type": "MX",
"name": "example.com",
"content": "mail.example.com",
"priority": 10,
"proxied": False, # MX records can't be proxied
},
]
sync_dns_records("your-zone-id", my_dns_config)

4. Workers KV: Global Key-Value Store

from cloudflare import Cloudflare
import json
import time
client = Cloudflare()
# Create a KV namespace
namespace = client.kv.namespaces.create(
account_id="your-account-id",
title="user-sessions",
)
class CloudflareKVStore:
"""Redis-like interface for Workers KV"""
def __init__(self, client: Cloudflare, account_id: str, namespace_id: str):
self.client = client
self.account_id = account_id
self.namespace_id = namespace_id
def set(self, key: str, value: any, ttl: int = None):
"""Set a value with optional TTL"""
metadata = {"timestamp": time.time()}
self.client.kv.namespaces.values.update(
account_id=self.account_id,
namespace_id=self.namespace_id,
key_name=key,
value=json.dumps(value),
metadata=json.dumps(metadata),
expiration_ttl=ttl,
)
def get(self, key: str):
"""Get a value"""
try:
result = self.client.kv.namespaces.values.get(
account_id=self.account_id,
namespace_id=self.namespace_id,
key_name=key,
)
return json.loads(result) if result else None
except:
return None
def delete(self, key: str):
"""Delete a key"""
self.client.kv.namespaces.values.delete(
account_id=self.account_id,
namespace_id=self.namespace_id,
key_name=key,
)
def list_keys(self, prefix: str = None):
"""List all keys with optional prefix"""
return list(self.client.kv.namespaces.keys.list(
account_id=self.account_id,
namespace_id=self.namespace_id,
prefix=prefix,
))
# Use it like Redis/Memcached
kv = CloudflareKVStore(client, "your-account-id", namespace.id)
# Cache user session
kv.set("session:user123", {
"user_id": "user123",
"email": "user@example.com",
"login_time": time.time(),
}, ttl=3600) # 1 hour TTL
# Retrieve session
session = kv.get("session:user123")
print(session)
# List all sessions
for key in kv.list_keys(prefix="session:"):
print(key.name)

5. Vectorize: Semantic Search Made Simple

from cloudflare import Cloudflare
import numpy as np
from typing import List, Dict
client = Cloudflare()
# Create a vector index
index = client.vectorize.indexes.create(
account_id="your-account-id",
name="product-search",
dimensions=384, # Using all-MiniLM-L6-v2 embeddings
metric="cosine",
)
class SemanticSearch:
"""Semantic search engine using Vectorize"""
def __init__(self, client: Cloudflare, account_id: str, index_name: str):
self.client = client
self.account_id = account_id
self.index_name = index_name
async def index_documents(self, documents: List[Dict[str, any]]):
"""Index documents with their embeddings"""
vectors = []
for doc in documents:
# Generate embedding using Workers AI
embedding_response = await self.client.ai.run(
"@cf/baai/bge-base-en-v1.5",
account_id=self.account_id,
text=doc["content"],
)
vectors.append({
"id": doc["id"],
"values": embedding_response.data[0],
"metadata": {
"title": doc["title"],
"content": doc["content"],
"category": doc.get("category", "general"),
}
})
# Batch insert
self.client.vectorize.indexes.insert(
account_id=self.account_id,
index_name=self.index_name,
vectors=vectors,
)
async def search(self, query: str, top_k: int = 5):
"""Search for similar documents"""
# Generate query embedding
embedding_response = await self.client.ai.run(
"@cf/baai/bge-base-en-v1.5",
account_id=self.account_id,
text=query,
)
# Search
results = self.client.vectorize.indexes.query(
account_id=self.account_id,
index_name=self.index_name,
vector=embedding_response.data[0],
top_k=top_k,
return_metadata=True,
)
return results.matches
# Usage
search_engine = SemanticSearch(client, "your-account-id", "product-search")
# Index your products
await search_engine.index_documents([
{
"id": "1",
"title": "Python Cookbook",
"content": "Advanced Python programming techniques and recipes",
"category": "books",
},
{
"id": "2",
"title": "Machine Learning with Python",
"content": "Comprehensive guide to ML algorithms using Python",
"category": "books",
},
])
# Search semantically
results = await search_engine.search("Python programming books")
for result in results:
print(f"Score: {result.score:.3f} - {result.metadata['title']}")

6. D1 Database: SQLite at the Edge

from cloudflare import Cloudflare
import pandas as pd
from typing import List, Dict, Any
client = Cloudflare()
# Create a D1 database
database = client.d1.database.create(
account_id="your-account-id",
name="analytics",
)
class D1Analytics:
"""Analytics database using D1"""
def __init__(self, client: Cloudflare, account_id: str, database_id: str):
self.client = client
self.account_id = account_id
self.database_id = database_id
self._init_schema()
def _init_schema(self):
"""Initialize database schema"""
self.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
event_type TEXT NOT NULL,
properties TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
self.execute("""
CREATE INDEX IF NOT EXISTS idx_user_events
ON events(user_id, timestamp)
""")
def execute(self, query: str, params: List[Any] = None):
"""Execute a query"""
return self.client.d1.database.query(
account_id=self.account_id,
database_id=self.database_id,
sql=query,
params=params or [],
)
def track_event(self, user_id: str, event_type: str, properties: Dict = None):
"""Track an analytics event"""
import json
self.execute(
"INSERT INTO events (user_id, event_type, properties) VALUES (?, ?, ?)",
[user_id, event_type, json.dumps(properties or {})]
)
def get_user_events(self, user_id: str, limit: int = 100):
"""Get events for a user"""
result = self.execute(
"""
SELECT * FROM events
WHERE user_id = ?
ORDER BY timestamp DESC
LIMIT ?
""",
[user_id, limit]
)
return result.results
def get_event_counts(self, start_date: str, end_date: str):
"""Get event counts by type"""
result = self.execute(
"""
SELECT
event_type,
COUNT(*) as count,
COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE timestamp BETWEEN ? AND ?
GROUP BY event_type
ORDER BY count DESC
""",
[start_date, end_date]
)
return result.results
def to_dataframe(self, query_result) -> pd.DataFrame:
"""Convert query results to pandas DataFrame"""
if not query_result.results:
return pd.DataFrame()
return pd.DataFrame(query_result.results)
# Usage
analytics = D1Analytics(client, "your-account-id", database.id)
# Track events
analytics.track_event("user123", "page_view", {"page": "/products"})
analytics.track_event("user123", "add_to_cart", {"product_id": "SKU-001"})
# Analyze with pandas
events_df = analytics.to_dataframe(
analytics.get_event_counts("2024-01-01", "2024-12-31")
)
print(events_df.head())

7. Queue Processing: Async Task Management

from cloudflare import Cloudflare
import json
from typing import Callable, Dict, Any
from datetime import datetime
client = Cloudflare()
# Create a queue
queue = client.queues.create(
account_id="your-account-id",
queue_name="background-tasks",
)
class TaskQueue:
"""Celery-like task queue using Cloudflare Queues"""
def __init__(self, client: Cloudflare, account_id: str, queue_id: str):
self.client = client
self.account_id = account_id
self.queue_id = queue_id
self.handlers: Dict[str, Callable] = {}
def task(self, name: str = None):
"""Decorator to register a task"""
def decorator(func: Callable):
task_name = name or func.__name__
self.handlers[task_name] = func
# Return a wrapper that enqueues the task
def delay(*args, **kwargs):
self.send_task(task_name, args, kwargs)
func.delay = delay
return func
return decorator
def send_task(self, task_name: str, args: tuple = (), kwargs: dict = None):
"""Send a task to the queue"""
message = {
"task": task_name,
"args": args,
"kwargs": kwargs or {},
"timestamp": datetime.utcnow().isoformat(),
}
self.client.queues.messages.ack(
account_id=self.account_id,
queue_id=self.queue_id,
body=message,
)
def process_messages(self, batch_size: int = 10):
"""Process messages from the queue"""
# Pull messages
messages = self.client.queues.messages.ack(
account_id=self.account_id,
queue_id=self.queue_id,
batch_size=batch_size,
)
for msg in messages.messages:
try:
data = msg.body
task_name = data["task"]
if task_name in self.handlers:
# Execute the task
result = self.handlers[task_name](
*data["args"],
**data["kwargs"]
)
print(f"Task {task_name} completed: {result}")
else:
print(f"Unknown task: {task_name}")
# Acknowledge the message
msg.ack()
except Exception as e:
print(f"Task failed: {e}")
# Message will be retried
# Usage
task_queue = TaskQueue(client, "your-account-id", queue.id)
@task_queue.task()
def send_email(to: str, subject: str, body: str):
"""Send an email (example task)"""
print(f"Sending email to {to}: {subject}")
# Your email sending logic here
return f"Email sent to {to}"
@task_queue.task()
def process_image(image_url: str, operations: List[str]):
"""Process an image (example task)"""
print(f"Processing image: {image_url}")
# Your image processing logic here
return f"Processed with operations: {operations}"
# Enqueue tasks
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
process_image.delay("https://example.com/image.jpg", ["resize", "watermark"])
# In your worker script
while True:
task_queue.process_messages(batch_size=10)
time.sleep(1)

8. API Shield: Protect Your APIs

from cloudflare import Cloudflare
from typing import Dict, List
import yaml
client = Cloudflare()
class APIProtection:
"""API protection using Cloudflare API Shield"""
def __init__(self, client: Cloudflare, zone_id: str):
self.client = client
self.zone_id = zone_id
def upload_openapi_schema(self, schema_path: str):
"""Upload OpenAPI schema for validation"""
with open(schema_path, 'r') as f:
schema = yaml.safe_load(f)
result = self.client.api_gateway.schemas.create(
zone_id=self.zone_id,
file=schema_path,
kind="openapi_v3",
)
return result.schema_id
def create_rate_limit(self, endpoint: str, requests_per_minute: int):
"""Create rate limiting rule"""
rule = self.client.rate_limits.create(
zone_id=self.zone_id,
action="block",
match={
"request": {
"url_pattern": f"*{endpoint}*",
"methods": ["GET", "POST", "PUT", "DELETE"],
}
},
threshold=requests_per_minute,
period=60, # 1 minute
description=f"Rate limit for {endpoint}",
)
return rule.id
def enable_bot_protection(self, sensitivity: str = "high"):
"""Enable bot protection"""
self.client.bot_management.update(
zone_id=self.zone_id,
enable_js_detection=True,
fight_mode=True,
sensitivity=sensitivity,
)
def create_waf_rules(self, rules: List[Dict]):
"""Create WAF rules for common attacks"""
for rule in rules:
self.client.firewall.rules.create(
zone_id=self.zone_id,
filter={
"expression": rule["expression"],
},
action=rule.get("action", "block"),
description=rule.get("description", ""),
)
def get_security_analytics(self, start_time: str, end_time: str):
"""Get security analytics"""
analytics = self.client.zones.analytics.colos.get(
zone_id=self.zone_id,
since=start_time,
until=end_time,
)
return {
"total_requests": analytics.totals.requests,
"threats_blocked": analytics.totals.threats,
"bot_requests": analytics.totals.pageviews.bot,
"human_requests": analytics.totals.pageviews.human,
}
# Usage
api_shield = APIProtection(client, "your-zone-id")
# Upload your API schema for automatic validation
schema_id = api_shield.upload_openapi_schema("openapi.yaml")
# Set up rate limiting
api_shield.create_rate_limit("/api/v1/users", requests_per_minute=100)
api_shield.create_rate_limit("/api/v1/auth/login", requests_per_minute=10)
# Enable bot protection
api_shield.enable_bot_protection(sensitivity="high")
# Create WAF rules
waf_rules = [
{
"expression": '(http.request.uri.path contains "../") or (http.request.uri.path contains "..\\")',
"action": "block",
"description": "Block path traversal attempts",
},
{
"expression": '(http.request.uri.query contains "<script") or (http.request.body.raw contains "<script")',
"action": "challenge",
"description": "Challenge potential XSS",
},
]
api_shield.create_waf_rules(waf_rules)
# Monitor security
stats = api_shield.get_security_analytics("2024-01-01T00:00:00Z", "2024-01-31T23:59:59Z")
print(f"Threats blocked this month: {stats['threats_blocked']}")

9. Load Balancing & Health Checks

from cloudflare import Cloudflare
from typing import List, Dict
client = Cloudflare()
class LoadBalancer:
"""Global load balancing with health checks"""
def __init__(self, client: Cloudflare, account_id: str):
self.client = client
self.account_id = account_id
def create_health_check_monitor(self, name: str, endpoint: str):
"""Create a health check monitor"""
monitor = self.client.load_balancers.monitors.create(
account_id=self.account_id,
type="https",
description=name,
method="GET",
path=endpoint,
interval=60, # Check every 60 seconds
timeout=10,
retries=2,
expected_codes="200",
follow_redirects=True,
probe_zone="all", # Check from all regions
)
return monitor.id
def create_origin_pool(self, name: str, origins: List[Dict[str, str]], monitor_id: str):
"""Create an origin pool"""
pool_origins = [
{
"name": origin["name"],
"address": origin["address"],
"enabled": True,
"weight": origin.get("weight", 1),
}
for origin in origins
]
pool = self.client.load_balancers.pools.create(
account_id=self.account_id,
name=name,
origins=pool_origins,
monitor=monitor_id,
notification_email="ops@example.com",
minimum_origins=1,
)
return pool.id
def create_load_balancer(self, hostname: str, pool_ids: List[str], steering_policy: str = "dynamic_steering"):
"""Create a load balancer"""
lb = self.client.zones.load_balancers.create(
zone_id="your-zone-id",
name=hostname,
fallback_pool=pool_ids[0],
default_pools=pool_ids,
proxied=True,
steering_policy=steering_policy, # dynamic_steering, geo, random, etc.
session_affinity="cookie",
session_affinity_ttl=1800, # 30 minutes
)
return lb.id
def get_pool_health(self, pool_id: str):
"""Get health status of a pool"""
health = self.client.load_balancers.pools.health.get(
account_id=self.account_id,
pool_id=pool_id,
)
return {
"healthy_origins": len([o for o in health.origins if o.healthy]),
"total_origins": len(health.origins),
"pool_healthy": health.healthy,
"origins": [
{
"name": o.name,
"address": o.address,
"healthy": o.healthy,
"failure_reason": o.failure_reason,
}
for o in health.origins
],
}
# Usage
lb_manager = LoadBalancer(client, "your-account-id")
# Create health check
monitor_id = lb_manager.create_health_check_monitor(
"API Health Check",
"/health"
)
# Create origin pools for different regions
us_pool_id = lb_manager.create_origin_pool(
"US API Servers",
[
{"name": "us-east-1", "address": "api-us-east.example.com", "weight": 1},
{"name": "us-west-1", "address": "api-us-west.example.com", "weight": 1},
],
monitor_id
)
eu_pool_id = lb_manager.create_origin_pool(
"EU API Servers",
[
{"name": "eu-west-1", "address": "api-eu-west.example.com", "weight": 1},
{"name": "eu-central-1", "address": "api-eu-central.example.com", "weight": 1},
],
monitor_id
)
# Create global load balancer
lb_id = lb_manager.create_load_balancer(
"api.example.com",
[us_pool_id, eu_pool_id],
steering_policy="geo" # Route by geography
)
# Monitor health
for pool_id, pool_name in [(us_pool_id, "US"), (eu_pool_id, "EU")]:
health = lb_manager.get_pool_health(pool_id)
print(f"{pool_name} Pool: {health['healthy_origins']}/{health['total_origins']} healthy")

10. Automation & CI/CD Integration

from cloudflare import Cloudflare
import os
import hashlib
from pathlib import Path
class CloudflareDeployment:
"""CI/CD deployment automation"""
def __init__(self, api_token: str):
self.client = Cloudflare(api_token=api_token)
def deploy_worker(self, script_name: str, script_path: str, env_vars: Dict[str, str]):
"""Deploy a Worker script"""
with open(script_path, 'r') as f:
script_content = f.read()
# Calculate hash for versioning
script_hash = hashlib.sha256(script_content.encode()).hexdigest()[:8]
# Upload with environment variables as secrets
bindings = [
{
"type": "secret_text",
"name": key,
"text": value,
}
for key, value in env_vars.items()
]
result = self.client.workers.scripts.update(
script_name,
account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"],
metadata={
"main_module": "index.js",
"bindings": bindings,
"compatibility_date": "2024-01-01",
"tags": [f"version:{script_hash}", "env:production"],
},
files={
"index.js": (
"index.js",
script_content.encode(),
"application/javascript",
)
},
)
print(f"Deployed {script_name} version {script_hash}")
return result
def deploy_pages_project(self, project_name: str, build_directory: str):
"""Deploy a Pages project"""
# Create deployment
deployment = self.client.pages.projects.deployments.create(
account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"],
project_name=project_name,
)
# Upload files
for file_path in Path(build_directory).rglob("*"):
if file_path.is_file():
relative_path = file_path.relative_to(build_directory)
with open(file_path, 'rb') as f:
self.client.pages.projects.deployments.retry(
account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"],
project_name=project_name,
deployment_id=deployment.id,
path=str(relative_path),
body=f.read(),
)
print(f"Deployed Pages project: {deployment.url}")
return deployment
def purge_cache(self, zone_id: str, urls: List[str] = None):
"""Purge cache after deployment"""
if urls:
# Purge specific URLs
self.client.cache.purge(
zone_id=zone_id,
files=urls,
)
print(f"Purged cache for {len(urls)} URLs")
else:
# Purge everything
self.client.cache.purge(
zone_id=zone_id,
purge_everything=True,
)
print("Purged entire cache")
def rollback_worker(self, script_name: str, version_tag: str):
"""Rollback to a previous Worker version"""
# List all versions
versions = self.client.workers.scripts.versions.list(
account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"],
script_name=script_name,
)
# Find the version to rollback to
for version in versions:
if version_tag in version.tags:
# Deploy this version
self.client.workers.scripts.versions.deployment.create(
account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"],
script_name=script_name,
version_id=version.id,
)
print(f"Rolled back to version {version_tag}")
return
raise ValueError(f"Version {version_tag} not found")
# GitHub Actions example
if __name__ == "__main__":
deployer = CloudflareDeployment(os.environ["CLOUDFLARE_API_TOKEN"])
# Deploy Worker
deployer.deploy_worker(
"api-worker",
"dist/worker.js",
{
"DATABASE_URL": os.environ["DATABASE_URL"],
"API_KEY": os.environ["API_KEY"],
}
)
# Deploy static site
deployer.deploy_pages_project("my-site", "build/")
# Clear cache
deployer.purge_cache(os.environ["CLOUDFLARE_ZONE_ID"])

SDK Best Practices

1. Error Handling

import cloudflare
from cloudflare import Cloudflare
import time
client = Cloudflare()
try:
zone = client.zones.get(zone_id="invalid-id")
except cloudflare.NotFoundError:
print("Zone not found")
except cloudflare.RateLimitError as e:
print(f"Rate limited. Retry after {e.response.headers.get('Retry-After')} seconds")
time.sleep(int(e.response.headers.get('Retry-After', 60)))
except cloudflare.APIStatusError as e:
print(f"API error {e.status_code}: {e.response}")
except cloudflare.APIConnectionError:
print("Network connection failed")

2. Pagination

# Automatic pagination
all_zones = []
for zone in client.zones.list():
all_zones.append(zone)
print(f"Processing zone: {zone.name}")
# Manual pagination for more control
page = client.zones.list(per_page=50)
while True:
for zone in page.result:
print(zone.name)
if not page.has_next_page():
break
page = page.get_next_page()

3. Async Operations

import asyncio
from cloudflare import AsyncCloudflare
async def process_zones():
async with AsyncCloudflare() as client:
# Concurrent API calls
tasks = []
async for zone in client.zones.list():
task = process_zone(client, zone)
tasks.append(task)
# Process all zones concurrently
results = await asyncio.gather(*tasks)
return results
async def process_zone(client: AsyncCloudflare, zone):
# Get DNS records concurrently
records = await client.dns.records.list(zone_id=zone.id)
return {"zone": zone.name, "record_count": len(list(records))}
# Run
results = asyncio.run(process_zones())

4. Environment-Specific Configuration

import os
from cloudflare import Cloudflare
class CloudflareManager:
def __init__(self):
# Different tokens for different environments
env = os.environ.get("ENVIRONMENT", "development")
if env == "production":
self.client = Cloudflare(
api_token=os.environ["CLOUDFLARE_PROD_TOKEN"],
max_retries=5,
timeout=30.0,
)
else:
self.client = Cloudflare(
api_token=os.environ["CLOUDFLARE_DEV_TOKEN"],
max_retries=2,
timeout=10.0,
)
def __enter__(self):
return self.client
def __exit__(self, exc_type, exc_val, exc_tb):
self.client.close()
# Usage
with CloudflareManager() as cf:
zones = list(cf.zones.list())

The Money Talk: Cost Comparison

Traditional Stack (AWS):

Cloudflare Stack:

Getting Started

  1. Install Wrangler: bun install -g wrangler (or npm install -g wrangler)
  2. Create a project: wrangler init my-python-app --type python
  3. Deploy: wrangler deploy

That’s it. You’re running Python at the edge.

The Bottom Line

Cloudflare has quietly built the platform we’ve been asking for: Python-first, globally distributed, with built-in AI/ML capabilities, and zero operational overhead. It’s not just about running Python in Workers—it’s about having an entire ecosystem designed for modern Python applications.

Whether you’re building AI agents, processing data pipelines, or serving APIs, Cloudflare’s edge platform offers a compelling alternative to traditional cloud providers. The best part? You can start for free and scale to millions of requests without changing your architecture.

Welcome to edge computing, Python style. No JavaScript required.


Ready to dive in? Check out the Python Workers documentation and join the Cloudflare Developers Discord to connect with other Python developers building on the edge.

Actionable Feedback for Cloudflare Product & Engineering

🚀 High-Impact Improvements for Python Developers

1. Python Workers: Path to GA

2. Local Development Experience

3. D1 + SQLAlchemy Integration

# Dream API
from cloudflare_sqlalchemy import create_d1_engine
engine = create_d1_engine("d1://account_id/database_id")

4. Python SDK Completeness

5. Vectorize + LangChain Integration

# Dream API
from langchain_cloudflare import CloudflareVectorStore
vectorstore = CloudflareVectorStore(
index_name="my-index",
embedding_service="@cf/baai/bge-base-en-v1.5"
)

🔧 Developer Experience Enhancements

6. Unified Local Development Stack

7. Observability for Python

8. Database Migration Tooling

Terminal window
d1-migrate init
d1-migrate generate "add users table"
d1-migrate up
d1-migrate rollback

📚 Documentation & Learning

9. Python-First Documentation

10. Migration Guides

🏗️ Infrastructure & Platform

11. Durable Objects for Python

class RoomState(DurableObject):
async def fetch(self, request: Request) -> Response:
# Stateful Python at the edge!
self.state.users = self.state.users or []

12. Queue Consumer Scaling

13. Multi-Region D1 Replication

💰 Pricing & Business Model

14. Python Developer Pricing

15. Enterprise Python Support

🌟 Innovation Opportunities

16. Jupyter Notebooks on Workers

notebook_worker.py
@schedule("0 9 * * *") # Daily at 9 AM
async def daily_report():
await run_notebook("reports/daily_metrics.ipynb",
params={"date": "today"})

17. ML Model Serving Platform

18. Data Pipeline Platform

📊 Success Metrics to Track

  1. Python Workers Adoption

  2. Developer Satisfaction

  3. Platform Growth

🎯 Quick Wins (< 1 Month)

  1. Create awesome-cloudflare-python GitHub repo
  2. Host monthly Python developer office hours
  3. Add Python examples to every product doc
  4. Create Cloudflare Python developer newsletter
  5. Launch Python developer survey

💡 The Big Vision

Position Cloudflare as “The Python Cloud” - where Python developers go to build and deploy modern applications without infrastructure complexity. Make Cloudflare the obvious choice for every Python developer, from data scientists to web developers to ML engineers.

Tag line: “Python at the speed of light. No servers required.”


These recommendations come from analyzing the gaps between what Python developers need and what Cloudflare currently offers. Implementing even half of these would make Cloudflare the most Python-friendly edge platform in the market.