This site and guide is freshly launched. If you come across this page, it’s because Google has sent you here before I did final edits and verificaitons on this information.
Fellow Pythonistas, let’s talk about something that might surprise you: Cloudflare isn’t just a CDN anymore. It’s become a full-stack platform that speaks our language—literally. With the recent launch of Python Workers (beta), you can now deploy Python code to 300+ data centers worldwide without wrestling with JavaScript or infrastructure. Here’s why the Python community should pay attention.
from js import Response
async def on_fetch(request): return Response.new("Hello from Python on the edge!")
That’s it. That’s a globally deployed Python application. No containers, no Kubernetes, no cold starts worth worrying about. Python Workers brings first-class Python support to Cloudflare’s edge network, complete with:
asyncio
, json
, datetime
, re
, urllib
and friendsAs Python developers, we’re often at the forefront of AI/ML adoption. Cloudflare’s edge-native AI stack eliminates the typical deployment headaches:
from js import Response, env
async def on_fetch(request): ai = env.AI
response = await ai.run( "@cf/meta/llama-3-8b-instruct", messages=[{"role": "user", "content": "Explain quantum computing"}] )
return Response.json(response)
50+ open-source models available instantly. No GPU provisioning. No model serving frameworks. Just ai.run()
and you’re in production.
Building RAG applications? Semantic search? Vectorize gives you a globally distributed vector database that works seamlessly with Workers AI:
# Generate embeddings and store themembedding = await ai.run("@cf/baai/bge-base-en-v1.5", text="Python is awesome")await env.VECTORIZE_INDEX.insert([{ "id": "1", "values": embedding.data[0], "metadata": {"language": "python", "sentiment": "positive"}}])
Skip the boilerplate. AutoRAG handles document ingestion, chunking, embedding, and retrieval automatically:
# That's it. Your documents are now queryable.results = await env.AUTORAG.query("How do I implement async generators?")
from js import Response, env
async def on_fetch(request): db = env.DB
# It's just SQLite, but globally distributed results = await db.prepare(""" SELECT user_id, COUNT(*) as request_count FROM api_logs WHERE timestamp > datetime('now', '-1 hour') GROUP BY user_id """).all()
return Response.json(results)
Time Travel included. Point-in-time recovery for 30 days. No backup scripts needed.
# Your boto3 code works as-isimport boto3
s3 = boto3.client('s3', endpoint_url='https://YOUR_ACCOUNT_ID.r2.cloudflarestorage.com', aws_access_key_id='YOUR_ACCESS_KEY', aws_secret_access_key='YOUR_SECRET_KEY')
# Upload that 10GB model file. Download it 1000 times. Pay $0 in bandwidth.s3.upload_file('model.pkl', 'ml-artifacts', 'models/latest.pkl')
# Producerasync def on_fetch(request): await env.TASK_QUEUE.send({ "task": "process_upload", "file_id": "abc123", "user_id": request.headers.get("X-User-ID") }) return Response.new("Processing started", status=202)
# Consumer (another Worker)async def queue_handler(batch): for message in batch.messages: await process_file(message.body["file_id"]) message.ack()
Automatic retries, dead letter queues, and batching included. No message broker to manage.
async def on_fetch(request): browser = env.BROWSER
# Screenshot any website, extract data, generate PDFs page = await browser.new_page() await page.goto("https://news.ycombinator.com") screenshot = await page.screenshot()
return Response.new(screenshot, headers={"Content-Type": "image/png"})
Your existing Django/SQLAlchemy app connecting to RDS? Add Hyperdrive, change one connection string:
# Before: postgresql://user:pass@db.region.rds.amazonaws.com/myapp# After: postgresql://user:pass@hyperdrive-id.hyperdrive.workers.dev/myapp
# That's it. Global connection pooling and caching. No code changes.
async def on_fetch(request): # Write millions of events await env.ANALYTICS.writeDataPoint({ "index": ["user_id", "endpoint"], "dimension": [request.headers.get("X-User-ID"), request.url], "metric": {"response_time": 42, "status_code": 200} })
# Query with SQL results = await env.ANALYTICS.query(""" SELECT endpoint, AVG(response_time) as avg_latency FROM analytics WHERE timestamp > NOW() - INTERVAL '1 hour' GROUP BY endpoint """)
# It just works™npx wrangler dev main.py
# Hot reload? ✓# Local bindings? ✓# Debugging? ✓
npx wrangler deploy main.py# ⛅️ Deployed to 300+ locations in ~30 seconds
Edge-First Architecture: Your code runs <50ms from users globally. No region selection. No multi-region complexity.
Billing That Makes Sense: Pay for what you use. No idle servers. First 100k requests/day free.
Security by Default: Automatic DDoS protection, WAF rules, and rate limiting. Your Flask app never could.
Ecosystem Integration: Your existing Python knowledge transfers. SQLAlchemy patterns work with D1. Boto3 works with R2. FastAPI works in Workers.
No Ops Required: No Kubernetes. No Docker. No load balancers. No auto-scaling groups. Just Python.
from js import Response, envimport json
async def on_fetch(request): if request.method == "POST": data = await request.json()
# Generate embedding embedding = await env.AI.run( "@cf/baai/bge-base-en-v1.5", text=data["query"] )
# Semantic search results = await env.VECTORIZE_INDEX.query( embedding.data[0], topK=5 )
# Generate response context = "\n".join([r.metadata.content for r in results.matches]) response = await env.AI.run( "@cf/meta/llama-3-8b-instruct", messages=[ {"role": "system", "content": f"Context: {context}"}, {"role": "user", "content": data["query"]} ] )
return Response.json({"answer": response.response})
# API endpoint triggers processingasync def on_fetch(request): file_url = (await request.json())["file_url"]
# Store in R2 file_data = await fetch(file_url) await env.BUCKET.put(f"uploads/{uuid4()}", file_data.body)
# Queue for processing await env.PROCESS_QUEUE.send({ "key": key, "user_id": request.headers.get("X-User-ID") })
return Response.json({"status": "processing"})
# Queue consumer processes filesasync def queue_handler(batch): for msg in batch.messages: # Get from R2 obj = await env.BUCKET.get(msg.body["key"]) data = await obj.text()
# Process with AI summary = await env.AI.run( "@cf/facebook/bart-large-cnn", text=data )
# Store results in D1 await env.DB.prepare(""" INSERT INTO summaries (user_id, summary, created_at) VALUES (?, ?, datetime('now')) """).bind(msg.body["user_id"], summary.summary).run()
msg.ack()
Here’s what you can stop installing and start using on Cloudflare’s platform:
You’re Using | Replace With | Why Switch |
---|---|---|
Gunicorn/uWSGI/Uvicorn | Python Workers | No server management, auto-scaling, global deployment |
Flask/FastAPI | Python Workers + FastAPI | FastAPI is pre-installed, runs at the edge |
Nginx reverse proxy | Cloudflare Load Balancer | Built-in, no configuration needed |
You’re Using | Replace With | Why Switch |
---|---|---|
Redis/Memcached | Workers KV | Globally distributed, no server management |
PostgreSQL/MySQL | D1 (for new apps) | Serverless, automatic backups, Time Travel |
SQLAlchemy + Postgres | Hyperdrive + SQLAlchemy | Keep your ORM, get 10x performance |
boto3 + S3 | boto3 + R2 | Same API, zero egress fees |
Pinecone/Weaviate | Vectorize | Integrated with Workers, no separate billing |
You’re Using | Replace With | Why Switch |
---|---|---|
Celery + RabbitMQ/Redis | Queues | No broker needed, automatic retries |
APScheduler | Cron Triggers | Simpler configuration, guaranteed execution |
Python RQ | Queues | Built-in dead letter queues, batching |
You’re Using | Replace With | Why Switch |
---|---|---|
Transformers + CUDA | Workers AI | No GPU management, 50+ models ready |
LangChain + OpenAI | LangChain + Workers AI | LangChain pre-installed, multiple models |
ChromaDB/FAISS | Vectorize | Managed vector search, global distribution |
Hugging Face Inference API | Workers AI | Lower latency, integrated platform |
You’re Using | Replace With | Why Switch |
---|---|---|
Selenium/Playwright | Browser Rendering | No browser management, API-based |
BeautifulSoup + requests | Browser Rendering | JavaScript rendering included |
Puppeteer | Browser Rendering | Managed browsers, no memory leaks |
You’re Using | Replace With | Why Switch |
---|---|---|
InfluxDB/TimescaleDB | Analytics Engine | Unlimited cardinality, SQL queries |
Prometheus + Grafana | Analytics Engine | No infrastructure, built-in visualization |
Custom analytics with Pandas | Analytics Engine | Real-time processing, no data pipeline |
You’re Using | Replace With | Why Switch |
---|---|---|
Varnish | Cloudflare Cache | Global edge caching, no configuration |
django-cache | Cache API | Edge caching, programmatic control |
Flask-Caching | Workers KV | Persistent, globally distributed |
You’re Using | Replace With | Why Switch |
---|---|---|
Kong/Traefik | API Shield | Built-in rate limiting, schema validation |
OAuth2/JWT libraries | Zero Trust | Managed authentication, no token management |
Flask-Limiter | Rate Limiting | Edge-based, DDoS protection included |
Before (Celery + Redis):
from celery import Celeryimport redis
app = Celery('tasks', broker='redis://localhost:6379')
@app.taskdef process_image(image_url): # Download, process, save to S3 return "processed"
# main.pyfrom tasks import process_imageprocess_image.delay("https://example.com/image.jpg")
After (Cloudflare Queues):
# No separate task file needed!async def on_fetch(request): await env.IMAGE_QUEUE.send({ "url": "https://example.com/image.jpg" }) return Response.new("Queued for processing")
async def queue_handler(batch): for msg in batch.messages: # Download, process, save to R2 msg.ack()
Before:
from fastapi import FastAPIimport uvicorn
app = FastAPI()
@app.get("/api/users/{user_id}")async def get_user(user_id: int): # Database query return {"user_id": user_id}
# Dockerfile, nginx.conf, gunicorn config, k8s manifests...
After:
from fastapi import FastAPIfrom js import Response
app = FastAPI()
@app.get("/api/users/{user_id}")async def get_user(user_id: int): result = await env.DB.prepare( "SELECT * FROM users WHERE id = ?" ).bind(user_id).first() return {"user": result}
# That's it. Deploy with: wrangler deploy
Before:
import pandas as pdimport psycopg2from sqlalchemy import create_engine
# Complex ETL pipelineengine = create_engine('postgresql://...')df = pd.read_sql("SELECT * FROM events WHERE...", engine)df_grouped = df.groupby(['user_id', 'action']).agg({ 'duration': 'mean', 'timestamp': 'count'})df_grouped.to_sql('analytics_summary', engine)
After:
# Write events directlyawait env.ANALYTICS.writeDataPoint({ "dimension": [user_id, action], "metric": {"duration": duration}})
# Query with SQLresults = await env.ANALYTICS.query(""" SELECT user_id, action, AVG(duration) as avg_duration, COUNT(*) as count FROM analytics GROUP BY user_id, action""")
If you’re used to containerizing Python apps, here’s the paradigm shift: Cloudflare’s platform replaces your entire Docker stack. Let’s look at a typical docker-compose.yml
and what disappears:
version: '3.8'services: nginx: image: nginx:alpine # ❌ Not needed - Workers serve static assets volumes: - ./static:/usr/share/nginx/html - ./nginx.conf:/etc/nginx/nginx.conf
app: build: . image: myapp:latest # ❌ Not needed - Deploy Python directly command: gunicorn app:app --workers 4 # ❌ No WSGI server needed environment: - DATABASE_URL=postgresql://...
redis: image: redis:alpine # ❌ Use Workers KV instead
postgres: image: postgres:14 # ❌ Use D1 or Hyperdrive volumes: - pgdata:/var/lib/postgresql/data
celery: build: . command: celery -A tasks worker # ❌ Use Queues instead
celery-beat: build: . command: celery -A tasks beat # ❌ Use Cron Triggers
prometheus: image: prom/prometheus # ❌ Use Analytics Engine
grafana: image: grafana/grafana # ❌ Built-in analytics
# main.py - Your entire "stack" in one filefrom js import Response, envfrom fastapi import FastAPIimport asyncio
app = FastAPI()
# Serve static assets directly@app.get("/static/{path:path}")async def static(path: str): # Workers can serve from KV or R2 asset = await env.ASSETS.get(path) return Response.new(asset.body, headers={ "Content-Type": asset.httpMetadata.contentType })
# Your app logic@app.post("/api/process")async def process(data: dict): # Cache in KV (replaces Redis) await env.KV.put(f"cache:{data['id']}", data)
# Queue background work (replaces Celery) await env.QUEUE.send(data)
# Store in D1 (replaces Postgres) await env.DB.prepare( "INSERT INTO items (data) VALUES (?)" ).bind(data).run()
# Track metrics (replaces Prometheus) await env.ANALYTICS.writeDataPoint({ "dimension": ["process"], "metric": {"count": 1} })
return {"status": "processed"}
# Scheduled tasks (replaces Celery Beat)# Configure in wrangler.toml:# [triggers]# crons = ["0 * * * *"]
Docker Service | Purpose | Cloudflare Replacement | What Changes |
---|---|---|---|
nginx | Reverse proxy, static files, SSL | Workers built-in | No config files, automatic SSL, global CDN |
gunicorn/uwsgi | WSGI server | Workers runtime | No process management, auto-scaling |
redis | Caching, sessions, queues | Workers KV | Globally distributed, no memory limits |
postgres/mysql | Primary database | D1 or Hyperdrive | Serverless or accelerated existing DB |
celery workers | Background tasks | Queues | No broker needed, automatic retries |
celery beat | Scheduled tasks | Cron Triggers | Simple cron syntax, guaranteed execution |
nginx (static) | Serve static assets | Workers + R2/KV | Global CDN included, no nginx.conf |
prometheus | Metrics collection | Analytics Engine | No scraping, unlimited cardinality |
grafana | Visualization | Cloudflare Dashboard | Built-in analytics, API access |
elasticsearch | Full-text search | R2 + Workers AI | Use embeddings for semantic search |
rabbitmq | Message broker | Queues | Direct producer-consumer, no broker |
memcached | In-memory cache | Workers KV | Persistent, globally distributed |
haproxy | Load balancer | Cloudflare LB | Automatic, no configuration |
Before (Multi-stage Dockerfile):
# DockerfileFROM python:3.11-slim as builderWORKDIR /appCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slimWORKDIR /appCOPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packagesCOPY . .RUN apt-get update && apt-get install -y curl # Health checksEXPOSE 8000CMD ["gunicorn", "app:app", "--bind", "0.0.0.0:8000", "--workers", "4"]
After (No Dockerfile):
from js import Response
async def on_fetch(request): return Response.new("Hello from Python!")
# Deploy with: wrangler deploy main.py# That's it. No containers. No images. No registry.
Docker approach:
environment: - DATABASE_URL=${DATABASE_URL} - REDIS_URL=${REDIS_URL} - SECRET_KEY=${SECRET_KEY} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
Cloudflare approach:
# Set secrets (encrypted at rest)wrangler secret put DATABASE_URLwrangler secret put API_KEY
# Access in codeasync def on_fetch(request): db_url = env.DATABASE_URL # Securely injected
Docker Workflow:
# Local developmentdocker-compose up -ddocker-compose logs -f appdocker-compose exec app pytestdocker-compose down
# Build and pushdocker build -t myapp:latest .docker push registry.com/myapp:latest
# Deploy (still need k8s, ECS, etc.)kubectl apply -f k8s/
Cloudflare Workflow:
# Install wrangler (once)bun install -g wrangler
# Local developmentwrangler dev main.py # Hot reload included
# Run testspython -m pytest # Just regular Python
# Deploy to productionwrangler deploy # 30 seconds to global deployment
Docker volumes:
volumes: - ./static:/app/static # Static files - ./uploads:/app/uploads # User uploads - pgdata:/var/lib/postgresql/data # Database - ./logs:/app/logs # Log files
Cloudflare bindings:
[[kv_namespaces]]binding = "STATIC" # Replaces static file volume
[[r2_buckets]]binding = "UPLOADS" # Replaces uploads volume
[[d1_databases]]binding = "DB" # Replaces database volume
# Logs automatically available via wrangler tail
From Containers to Functions: Stop thinking about long-running processes. Your code runs on-demand.
From Orchestration to Platform: No need for Kubernetes, Docker Swarm, or ECS. The platform handles it.
From Regional to Global: Your app doesn’t run in us-east-1. It runs everywhere, automatically.
From Configuration to Convention: No more nginx.conf, gunicorn.conf, redis.conf. Sensible defaults that just work.
From Monitoring to Observability: Logs, metrics, and traces are built-in, not bolted on.
Traditional Docker Stack (5 files, 3 containers):
├── docker-compose.yml├── Dockerfile├── requirements.txt├── app.py└── nginx.conf
Cloudflare Stack (1 file):
from js import Response, envimport hashlib
async def on_fetch(request): url = request.url path = url.pathname
if request.method == "POST": # Create short URL data = await request.json() long_url = data["url"] short_code = hashlib.md5(long_url.encode()).hexdigest()[:6]
# Store in KV (replaces Redis/Postgres) await env.URLS.put(short_code, long_url)
return Response.json({ "short_url": f"https://{url.hostname}/{short_code}" })
elif path != "/": # Redirect short_code = path[1:] # Remove leading / long_url = await env.URLS.get(short_code)
if long_url: return Response.redirect(long_url, 301) else: return Response.new("Not found", status=404)
# Serve homepage (no nginx needed) return Response.new(""" <form method="post"> <input name="url" placeholder="Enter URL"> <button>Shorten</button> </form> """, headers={"Content-Type": "text/html"})
Deploy with wrangler deploy
. No Docker, no nginx, no Redis. Just Python.
You’re absolutely right - the inability to SSH into containers or tail stdout directly is a fundamental shift. Here’s how to debug effectively in the Workers/Durable Objects world:
# Stream logs from production in real-timewrangler tail
# Filter logs by status, method, or IPwrangler tail --status 500wrangler tail --method POSTwrangler tail --ip 192.168.1.1
# Pretty print with formattingwrangler tail --format pretty
# Save logs to file for analysiswrangler tail > debug.log
# Unlike containers, console.log goes to wrangler tailasync def on_fetch(request): console.log("Request received:", request.url) console.log("Headers:", dict(request.headers))
try: result = await risky_operation() console.log("Success:", result) except Exception as e: console.error("Failed:", str(e)) console.error("Stack trace:", traceback.format_exc())
return Response.new("Done")
When your Worker won’t start (syntax errors, import issues, binding problems):
# debug_wrapper.py - Wrap your entire moduleimport sysimport traceback
try: # Your imports and code here from js import Response, env import fastapi
async def on_fetch(request): return Response.new("Worker started successfully!")
except Exception as e: # This WILL show in deployment logs print(f"STARTUP ERROR: {e}", file=sys.stderr) print(f"TRACEBACK: {traceback.format_exc()}", file=sys.stderr)
# Create a minimal handler that reports the error async def on_fetch(request): return Response.new( f"Worker failed to start: {str(e)}\n\n{traceback.format_exc()}", status=500, headers={"Content-Type": "text/plain"} )
# Start with the absolute minimumasync def on_fetch(request): return Response.new("Stage 1: Basic handler works")
# Then add imports one by onefrom js import Responseasync def on_fetch(request): return Response.new("Stage 2: Imports work")
# Then add bindingsfrom js import Response, envasync def on_fetch(request): try: # Test each binding kv_test = await env.KV.get("test") return Response.new(f"Stage 3: KV binding works: {kv_test}") except Exception as e: return Response.new(f"KV binding failed: {e}", status=500)
# Finally add your logic
# Run locally with all debugging tools availablewrangler dev main.py --local
# Now you can:# - Use Python debugger (pdb)# - Set breakpoints# - Inspect variables# - See full stack traces
# Use pdb in local developmentimport pdb
async def on_fetch(request): pdb.set_trace() # Works in local dev! data = await request.json() return Response.json({"received": data})
from js import Response, envimport tracebackimport json
async def on_fetch(request): try: # Your actual logic return await handle_request(request)
except Exception as e: # In development, return detailed errors if env.ENVIRONMENT == "development": return Response.new( json.dumps({ "error": str(e), "type": type(e).__name__, "traceback": traceback.format_exc(), "request": { "url": str(request.url), "method": request.method, "headers": dict(request.headers) } }, indent=2), status=500, headers={"Content-Type": "application/json"} ) else: # In production, log details but return generic error console.error(f"Error: {e}", traceback.format_exc()) return Response.new("Internal Server Error", status=500)
Durable Objects are even trickier since they’re stateful:
class Counter: def __init__(self, state, env): self.state = state self.env = env # Debug: Log initialization console.log(f"DO initialized with id: {state.id}")
async def fetch(self, request): # Debug: Log every request console.log(f"DO fetch: {request.method} {request.url}")
# Add debug endpoint if request.url.pathname == "/debug": # Return internal state for debugging storage_keys = await self.state.storage.list() return Response.json({ "id": self.state.id.toString(), "storage_keys": list(storage_keys.keys()), "env_bindings": list(self.env.keys()) })
# Regular logic with error handling try: # Your logic here pass except Exception as e: console.error(f"DO error: {e}") return Response.new(f"DO Error: {e}", status=500)
# Capture and replay problematic requestsasync def on_fetch(request): # Clone request for logging (body can only be read once) request_data = { "url": str(request.url), "method": request.method, "headers": dict(request.headers), "body": await request.text() if request.body else None }
try: # Your logic response = await handle_request(request_data) return response except Exception as e: # Log the full request for replay console.error("Failed request:", json.dumps(request_data)) console.error("Error:", str(e))
# Store in KV for later debugging await env.DEBUG_REQUESTS.put( f"error_{datetime.now().isoformat()}", json.dumps({ "request": request_data, "error": str(e), "traceback": traceback.format_exc() }) )
return Response.new("Error logged", status=500)
async def on_fetch(request): url = request.url
# Add health/debug endpoints if url.pathname == "/health": # Test all bindings and dependencies checks = {}
try: await env.KV.get("health_check") checks["kv"] = "ok" except: checks["kv"] = "failed"
try: await env.DB.prepare("SELECT 1").first() checks["d1"] = "ok" except: checks["d1"] = "failed"
try: await env.QUEUE.send({"test": True}) checks["queue"] = "ok" except: checks["queue"] = "failed"
status = 200 if all(v == "ok" for v in checks.values()) else 503
return Response.json({ "status": "healthy" if status == 200 else "unhealthy", "checks": checks, "timestamp": datetime.now().isoformat() }, status=status)
# Regular routing return await handle_request(request)
# Configure Logpush to send logs to R2 for analysiswrangler logpush create \ --dataset workers_trace_events \ --destination r2://logs-bucket/workers/{DATE}
# Then analyze with Python!
import jsonimport boto3
s3 = boto3.client('s3', endpoint_url='https://YOUR_ACCOUNT.r2.cloudflarestorage.com')
# Download logss3.download_file('logs-bucket', 'workers/2024-01-20/batch_1.json', 'logs.json')
# Analyze errorswith open('logs.json') as f: for line in f: log = json.loads(line) if log.get('outcome') == 'exception': print(f"Error at {log['timestamp']}: {log['exceptions']}") print(f"Request: {log['request']['url']}")
wrangler dev --local
The lack of SSH is actually a feature - it forces you to build observable, debuggable systems from the start. No more “SSH in and check what’s wrong” - your code must tell you what’s wrong.
While Python Workers are revolutionary for edge computing, the Cloudflare Python SDK (pip install cloudflare
) gives you programmatic control over Cloudflare’s entire platform from your existing Python applications. Think of it as boto3 for Cloudflare—but cleaner, type-safe, and covering everything from DNS to AI.
pip install cloudflare# or with your favorite package manageruv pip install cloudflarepoetry add cloudflare
import osfrom cloudflare import Cloudflare
# Async support built-infrom cloudflare import AsyncCloudflare
client = Cloudflare( api_token=os.environ.get("CLOUDFLARE_API_TOKEN"), # Recommended # or use api_email + api_key (legacy))
Every API call is fully typed with TypedDicts for requests and Pydantic models for responses:
# Your IDE knows exactly what fields are availablezone = client.zones.create( account={"id": "your-account-id"}, name="example.com", type="full", # IDE autocompletes: "full" | "partial" | "secondary")
# Response objects are Pydantic modelsprint(zone.id) # Fully typed!print(zone.to_json(indent=2)) # Easy serializationzone_dict = zone.to_dict() # Convert to dict
from cloudflare import Cloudflareimport boto3from datetime import timedelta
client = Cloudflare()
# Create an R2 bucketbucket = client.r2.buckets.create( account_id="your-account-id", name="ml-models", storage_class="Standard",)
# Get temporary S3-compatible credentialscreds = client.r2.temporary_credentials.create( account_id="your-account-id", bucket=bucket.name, permission="read_write", ttl=timedelta(hours=1),)
# Use with boto3 - zero code changes!s3 = boto3.client( "s3", endpoint_url=f"https://{creds.account_id}.r2.cloudflarestorage.com", aws_access_key_id=creds.access_key_id, aws_secret_access_key=creds.secret_access_key, aws_session_token=creds.session_token,)
# Upload your ML models3.upload_file("model.pkl", bucket.name, "models/latest.pkl")
# List objects - no egress fees!for obj in s3.list_objects_v2(Bucket=bucket.name)["Contents"]: print(f"{obj['Key']}: {obj['Size']} bytes")
from cloudflare import Cloudflare
client = Cloudflare()
# Translate text using Meta's M2M modelresult = client.ai.run( "@cf/meta/m2m100-1.2b", account_id="your-account-id", text="Hello, how are you?", source_lang="english", target_lang="spanish",)
print(result) # "Hola, ¿cómo estás?"
# Batch translation for efficiencytexts = ["Hello", "Goodbye", "Thank you"]translations = []
for text in texts: result = client.ai.run( "@cf/meta/m2m100-1.2b", account_id="your-account-id", text=text, source_lang="english", target_lang="japanese", ) translations.append(result)
from cloudflare import Cloudflarefrom typing import List, Dict
client = Cloudflare()
def sync_dns_records(zone_id: str, desired_records: List[Dict]): """Declarative DNS management - define desired state"""
# Get current records current_records = list(client.dns.records.list(zone_id=zone_id))
# Create a map for easy lookup current_map = {(r.type, r.name): r for r in current_records}
for desired in desired_records: key = (desired["type"], desired["name"])
if key in current_map: # Update existing record record = current_map[key] client.dns.records.update( zone_id=zone_id, dns_record_id=record.id, **desired ) print(f"Updated: {desired['name']} ({desired['type']})") else: # Create new record client.dns.records.create( zone_id=zone_id, **desired ) print(f"Created: {desired['name']} ({desired['type']})")
# Define your DNS configurationmy_dns_config = [ { "type": "A", "name": "api.example.com", "content": "192.0.2.1", "proxied": True, # Enable Cloudflare proxy }, { "type": "CNAME", "name": "www.example.com", "content": "example.com", "proxied": True, }, { "type": "MX", "name": "example.com", "content": "mail.example.com", "priority": 10, "proxied": False, # MX records can't be proxied },]
sync_dns_records("your-zone-id", my_dns_config)
from cloudflare import Cloudflareimport jsonimport time
client = Cloudflare()
# Create a KV namespacenamespace = client.kv.namespaces.create( account_id="your-account-id", title="user-sessions",)
class CloudflareKVStore: """Redis-like interface for Workers KV"""
def __init__(self, client: Cloudflare, account_id: str, namespace_id: str): self.client = client self.account_id = account_id self.namespace_id = namespace_id
def set(self, key: str, value: any, ttl: int = None): """Set a value with optional TTL""" metadata = {"timestamp": time.time()}
self.client.kv.namespaces.values.update( account_id=self.account_id, namespace_id=self.namespace_id, key_name=key, value=json.dumps(value), metadata=json.dumps(metadata), expiration_ttl=ttl, )
def get(self, key: str): """Get a value""" try: result = self.client.kv.namespaces.values.get( account_id=self.account_id, namespace_id=self.namespace_id, key_name=key, ) return json.loads(result) if result else None except: return None
def delete(self, key: str): """Delete a key""" self.client.kv.namespaces.values.delete( account_id=self.account_id, namespace_id=self.namespace_id, key_name=key, )
def list_keys(self, prefix: str = None): """List all keys with optional prefix""" return list(self.client.kv.namespaces.keys.list( account_id=self.account_id, namespace_id=self.namespace_id, prefix=prefix, ))
# Use it like Redis/Memcachedkv = CloudflareKVStore(client, "your-account-id", namespace.id)
# Cache user sessionkv.set("session:user123", { "user_id": "user123", "email": "user@example.com", "login_time": time.time(),}, ttl=3600) # 1 hour TTL
# Retrieve sessionsession = kv.get("session:user123")print(session)
# List all sessionsfor key in kv.list_keys(prefix="session:"): print(key.name)
from cloudflare import Cloudflareimport numpy as npfrom typing import List, Dict
client = Cloudflare()
# Create a vector indexindex = client.vectorize.indexes.create( account_id="your-account-id", name="product-search", dimensions=384, # Using all-MiniLM-L6-v2 embeddings metric="cosine",)
class SemanticSearch: """Semantic search engine using Vectorize"""
def __init__(self, client: Cloudflare, account_id: str, index_name: str): self.client = client self.account_id = account_id self.index_name = index_name
async def index_documents(self, documents: List[Dict[str, any]]): """Index documents with their embeddings""" vectors = []
for doc in documents: # Generate embedding using Workers AI embedding_response = await self.client.ai.run( "@cf/baai/bge-base-en-v1.5", account_id=self.account_id, text=doc["content"], )
vectors.append({ "id": doc["id"], "values": embedding_response.data[0], "metadata": { "title": doc["title"], "content": doc["content"], "category": doc.get("category", "general"), } })
# Batch insert self.client.vectorize.indexes.insert( account_id=self.account_id, index_name=self.index_name, vectors=vectors, )
async def search(self, query: str, top_k: int = 5): """Search for similar documents""" # Generate query embedding embedding_response = await self.client.ai.run( "@cf/baai/bge-base-en-v1.5", account_id=self.account_id, text=query, )
# Search results = self.client.vectorize.indexes.query( account_id=self.account_id, index_name=self.index_name, vector=embedding_response.data[0], top_k=top_k, return_metadata=True, )
return results.matches
# Usagesearch_engine = SemanticSearch(client, "your-account-id", "product-search")
# Index your productsawait search_engine.index_documents([ { "id": "1", "title": "Python Cookbook", "content": "Advanced Python programming techniques and recipes", "category": "books", }, { "id": "2", "title": "Machine Learning with Python", "content": "Comprehensive guide to ML algorithms using Python", "category": "books", },])
# Search semanticallyresults = await search_engine.search("Python programming books")for result in results: print(f"Score: {result.score:.3f} - {result.metadata['title']}")
from cloudflare import Cloudflareimport pandas as pdfrom typing import List, Dict, Any
client = Cloudflare()
# Create a D1 databasedatabase = client.d1.database.create( account_id="your-account-id", name="analytics",)
class D1Analytics: """Analytics database using D1"""
def __init__(self, client: Cloudflare, account_id: str, database_id: str): self.client = client self.account_id = account_id self.database_id = database_id self._init_schema()
def _init_schema(self): """Initialize database schema""" self.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, event_type TEXT NOT NULL, properties TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) """)
self.execute(""" CREATE INDEX IF NOT EXISTS idx_user_events ON events(user_id, timestamp) """)
def execute(self, query: str, params: List[Any] = None): """Execute a query""" return self.client.d1.database.query( account_id=self.account_id, database_id=self.database_id, sql=query, params=params or [], )
def track_event(self, user_id: str, event_type: str, properties: Dict = None): """Track an analytics event""" import json
self.execute( "INSERT INTO events (user_id, event_type, properties) VALUES (?, ?, ?)", [user_id, event_type, json.dumps(properties or {})] )
def get_user_events(self, user_id: str, limit: int = 100): """Get events for a user""" result = self.execute( """ SELECT * FROM events WHERE user_id = ? ORDER BY timestamp DESC LIMIT ? """, [user_id, limit] ) return result.results
def get_event_counts(self, start_date: str, end_date: str): """Get event counts by type""" result = self.execute( """ SELECT event_type, COUNT(*) as count, COUNT(DISTINCT user_id) as unique_users FROM events WHERE timestamp BETWEEN ? AND ? GROUP BY event_type ORDER BY count DESC """, [start_date, end_date] ) return result.results
def to_dataframe(self, query_result) -> pd.DataFrame: """Convert query results to pandas DataFrame""" if not query_result.results: return pd.DataFrame()
return pd.DataFrame(query_result.results)
# Usageanalytics = D1Analytics(client, "your-account-id", database.id)
# Track eventsanalytics.track_event("user123", "page_view", {"page": "/products"})analytics.track_event("user123", "add_to_cart", {"product_id": "SKU-001"})
# Analyze with pandasevents_df = analytics.to_dataframe( analytics.get_event_counts("2024-01-01", "2024-12-31"))print(events_df.head())
from cloudflare import Cloudflareimport jsonfrom typing import Callable, Dict, Anyfrom datetime import datetime
client = Cloudflare()
# Create a queuequeue = client.queues.create( account_id="your-account-id", queue_name="background-tasks",)
class TaskQueue: """Celery-like task queue using Cloudflare Queues"""
def __init__(self, client: Cloudflare, account_id: str, queue_id: str): self.client = client self.account_id = account_id self.queue_id = queue_id self.handlers: Dict[str, Callable] = {}
def task(self, name: str = None): """Decorator to register a task""" def decorator(func: Callable): task_name = name or func.__name__ self.handlers[task_name] = func
# Return a wrapper that enqueues the task def delay(*args, **kwargs): self.send_task(task_name, args, kwargs)
func.delay = delay return func
return decorator
def send_task(self, task_name: str, args: tuple = (), kwargs: dict = None): """Send a task to the queue""" message = { "task": task_name, "args": args, "kwargs": kwargs or {}, "timestamp": datetime.utcnow().isoformat(), }
self.client.queues.messages.ack( account_id=self.account_id, queue_id=self.queue_id, body=message, )
def process_messages(self, batch_size: int = 10): """Process messages from the queue""" # Pull messages messages = self.client.queues.messages.ack( account_id=self.account_id, queue_id=self.queue_id, batch_size=batch_size, )
for msg in messages.messages: try: data = msg.body task_name = data["task"]
if task_name in self.handlers: # Execute the task result = self.handlers[task_name]( *data["args"], **data["kwargs"] ) print(f"Task {task_name} completed: {result}") else: print(f"Unknown task: {task_name}")
# Acknowledge the message msg.ack() except Exception as e: print(f"Task failed: {e}") # Message will be retried
# Usagetask_queue = TaskQueue(client, "your-account-id", queue.id)
@task_queue.task()def send_email(to: str, subject: str, body: str): """Send an email (example task)""" print(f"Sending email to {to}: {subject}") # Your email sending logic here return f"Email sent to {to}"
@task_queue.task()def process_image(image_url: str, operations: List[str]): """Process an image (example task)""" print(f"Processing image: {image_url}") # Your image processing logic here return f"Processed with operations: {operations}"
# Enqueue taskssend_email.delay("user@example.com", "Welcome!", "Thanks for signing up")process_image.delay("https://example.com/image.jpg", ["resize", "watermark"])
# In your worker scriptwhile True: task_queue.process_messages(batch_size=10) time.sleep(1)
from cloudflare import Cloudflarefrom typing import Dict, Listimport yaml
client = Cloudflare()
class APIProtection: """API protection using Cloudflare API Shield"""
def __init__(self, client: Cloudflare, zone_id: str): self.client = client self.zone_id = zone_id
def upload_openapi_schema(self, schema_path: str): """Upload OpenAPI schema for validation""" with open(schema_path, 'r') as f: schema = yaml.safe_load(f)
result = self.client.api_gateway.schemas.create( zone_id=self.zone_id, file=schema_path, kind="openapi_v3", )
return result.schema_id
def create_rate_limit(self, endpoint: str, requests_per_minute: int): """Create rate limiting rule""" rule = self.client.rate_limits.create( zone_id=self.zone_id, action="block", match={ "request": { "url_pattern": f"*{endpoint}*", "methods": ["GET", "POST", "PUT", "DELETE"], } }, threshold=requests_per_minute, period=60, # 1 minute description=f"Rate limit for {endpoint}", )
return rule.id
def enable_bot_protection(self, sensitivity: str = "high"): """Enable bot protection""" self.client.bot_management.update( zone_id=self.zone_id, enable_js_detection=True, fight_mode=True, sensitivity=sensitivity, )
def create_waf_rules(self, rules: List[Dict]): """Create WAF rules for common attacks""" for rule in rules: self.client.firewall.rules.create( zone_id=self.zone_id, filter={ "expression": rule["expression"], }, action=rule.get("action", "block"), description=rule.get("description", ""), )
def get_security_analytics(self, start_time: str, end_time: str): """Get security analytics""" analytics = self.client.zones.analytics.colos.get( zone_id=self.zone_id, since=start_time, until=end_time, )
return { "total_requests": analytics.totals.requests, "threats_blocked": analytics.totals.threats, "bot_requests": analytics.totals.pageviews.bot, "human_requests": analytics.totals.pageviews.human, }
# Usageapi_shield = APIProtection(client, "your-zone-id")
# Upload your API schema for automatic validationschema_id = api_shield.upload_openapi_schema("openapi.yaml")
# Set up rate limitingapi_shield.create_rate_limit("/api/v1/users", requests_per_minute=100)api_shield.create_rate_limit("/api/v1/auth/login", requests_per_minute=10)
# Enable bot protectionapi_shield.enable_bot_protection(sensitivity="high")
# Create WAF ruleswaf_rules = [ { "expression": '(http.request.uri.path contains "../") or (http.request.uri.path contains "..\\")', "action": "block", "description": "Block path traversal attempts", }, { "expression": '(http.request.uri.query contains "<script") or (http.request.body.raw contains "<script")', "action": "challenge", "description": "Challenge potential XSS", },]api_shield.create_waf_rules(waf_rules)
# Monitor securitystats = api_shield.get_security_analytics("2024-01-01T00:00:00Z", "2024-01-31T23:59:59Z")print(f"Threats blocked this month: {stats['threats_blocked']}")
from cloudflare import Cloudflarefrom typing import List, Dict
client = Cloudflare()
class LoadBalancer: """Global load balancing with health checks"""
def __init__(self, client: Cloudflare, account_id: str): self.client = client self.account_id = account_id
def create_health_check_monitor(self, name: str, endpoint: str): """Create a health check monitor""" monitor = self.client.load_balancers.monitors.create( account_id=self.account_id, type="https", description=name, method="GET", path=endpoint, interval=60, # Check every 60 seconds timeout=10, retries=2, expected_codes="200", follow_redirects=True, probe_zone="all", # Check from all regions )
return monitor.id
def create_origin_pool(self, name: str, origins: List[Dict[str, str]], monitor_id: str): """Create an origin pool""" pool_origins = [ { "name": origin["name"], "address": origin["address"], "enabled": True, "weight": origin.get("weight", 1), } for origin in origins ]
pool = self.client.load_balancers.pools.create( account_id=self.account_id, name=name, origins=pool_origins, monitor=monitor_id, notification_email="ops@example.com", minimum_origins=1, )
return pool.id
def create_load_balancer(self, hostname: str, pool_ids: List[str], steering_policy: str = "dynamic_steering"): """Create a load balancer""" lb = self.client.zones.load_balancers.create( zone_id="your-zone-id", name=hostname, fallback_pool=pool_ids[0], default_pools=pool_ids, proxied=True, steering_policy=steering_policy, # dynamic_steering, geo, random, etc. session_affinity="cookie", session_affinity_ttl=1800, # 30 minutes )
return lb.id
def get_pool_health(self, pool_id: str): """Get health status of a pool""" health = self.client.load_balancers.pools.health.get( account_id=self.account_id, pool_id=pool_id, )
return { "healthy_origins": len([o for o in health.origins if o.healthy]), "total_origins": len(health.origins), "pool_healthy": health.healthy, "origins": [ { "name": o.name, "address": o.address, "healthy": o.healthy, "failure_reason": o.failure_reason, } for o in health.origins ], }
# Usagelb_manager = LoadBalancer(client, "your-account-id")
# Create health checkmonitor_id = lb_manager.create_health_check_monitor( "API Health Check", "/health")
# Create origin pools for different regionsus_pool_id = lb_manager.create_origin_pool( "US API Servers", [ {"name": "us-east-1", "address": "api-us-east.example.com", "weight": 1}, {"name": "us-west-1", "address": "api-us-west.example.com", "weight": 1}, ], monitor_id)
eu_pool_id = lb_manager.create_origin_pool( "EU API Servers", [ {"name": "eu-west-1", "address": "api-eu-west.example.com", "weight": 1}, {"name": "eu-central-1", "address": "api-eu-central.example.com", "weight": 1}, ], monitor_id)
# Create global load balancerlb_id = lb_manager.create_load_balancer( "api.example.com", [us_pool_id, eu_pool_id], steering_policy="geo" # Route by geography)
# Monitor healthfor pool_id, pool_name in [(us_pool_id, "US"), (eu_pool_id, "EU")]: health = lb_manager.get_pool_health(pool_id) print(f"{pool_name} Pool: {health['healthy_origins']}/{health['total_origins']} healthy")
from cloudflare import Cloudflareimport osimport hashlibfrom pathlib import Path
class CloudflareDeployment: """CI/CD deployment automation"""
def __init__(self, api_token: str): self.client = Cloudflare(api_token=api_token)
def deploy_worker(self, script_name: str, script_path: str, env_vars: Dict[str, str]): """Deploy a Worker script""" with open(script_path, 'r') as f: script_content = f.read()
# Calculate hash for versioning script_hash = hashlib.sha256(script_content.encode()).hexdigest()[:8]
# Upload with environment variables as secrets bindings = [ { "type": "secret_text", "name": key, "text": value, } for key, value in env_vars.items() ]
result = self.client.workers.scripts.update( script_name, account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"], metadata={ "main_module": "index.js", "bindings": bindings, "compatibility_date": "2024-01-01", "tags": [f"version:{script_hash}", "env:production"], }, files={ "index.js": ( "index.js", script_content.encode(), "application/javascript", ) }, )
print(f"Deployed {script_name} version {script_hash}") return result
def deploy_pages_project(self, project_name: str, build_directory: str): """Deploy a Pages project""" # Create deployment deployment = self.client.pages.projects.deployments.create( account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"], project_name=project_name, )
# Upload files for file_path in Path(build_directory).rglob("*"): if file_path.is_file(): relative_path = file_path.relative_to(build_directory)
with open(file_path, 'rb') as f: self.client.pages.projects.deployments.retry( account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"], project_name=project_name, deployment_id=deployment.id, path=str(relative_path), body=f.read(), )
print(f"Deployed Pages project: {deployment.url}") return deployment
def purge_cache(self, zone_id: str, urls: List[str] = None): """Purge cache after deployment""" if urls: # Purge specific URLs self.client.cache.purge( zone_id=zone_id, files=urls, ) print(f"Purged cache for {len(urls)} URLs") else: # Purge everything self.client.cache.purge( zone_id=zone_id, purge_everything=True, ) print("Purged entire cache")
def rollback_worker(self, script_name: str, version_tag: str): """Rollback to a previous Worker version""" # List all versions versions = self.client.workers.scripts.versions.list( account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"], script_name=script_name, )
# Find the version to rollback to for version in versions: if version_tag in version.tags: # Deploy this version self.client.workers.scripts.versions.deployment.create( account_id=os.environ["CLOUDFLARE_ACCOUNT_ID"], script_name=script_name, version_id=version.id, ) print(f"Rolled back to version {version_tag}") return
raise ValueError(f"Version {version_tag} not found")
# GitHub Actions exampleif __name__ == "__main__": deployer = CloudflareDeployment(os.environ["CLOUDFLARE_API_TOKEN"])
# Deploy Worker deployer.deploy_worker( "api-worker", "dist/worker.js", { "DATABASE_URL": os.environ["DATABASE_URL"], "API_KEY": os.environ["API_KEY"], } )
# Deploy static site deployer.deploy_pages_project("my-site", "build/")
# Clear cache deployer.purge_cache(os.environ["CLOUDFLARE_ZONE_ID"])
import cloudflarefrom cloudflare import Cloudflareimport time
client = Cloudflare()
try: zone = client.zones.get(zone_id="invalid-id")except cloudflare.NotFoundError: print("Zone not found")except cloudflare.RateLimitError as e: print(f"Rate limited. Retry after {e.response.headers.get('Retry-After')} seconds") time.sleep(int(e.response.headers.get('Retry-After', 60)))except cloudflare.APIStatusError as e: print(f"API error {e.status_code}: {e.response}")except cloudflare.APIConnectionError: print("Network connection failed")
# Automatic paginationall_zones = []for zone in client.zones.list(): all_zones.append(zone) print(f"Processing zone: {zone.name}")
# Manual pagination for more controlpage = client.zones.list(per_page=50)while True: for zone in page.result: print(zone.name)
if not page.has_next_page(): break
page = page.get_next_page()
import asynciofrom cloudflare import AsyncCloudflare
async def process_zones(): async with AsyncCloudflare() as client: # Concurrent API calls tasks = [] async for zone in client.zones.list(): task = process_zone(client, zone) tasks.append(task)
# Process all zones concurrently results = await asyncio.gather(*tasks) return results
async def process_zone(client: AsyncCloudflare, zone): # Get DNS records concurrently records = await client.dns.records.list(zone_id=zone.id) return {"zone": zone.name, "record_count": len(list(records))}
# Runresults = asyncio.run(process_zones())
import osfrom cloudflare import Cloudflare
class CloudflareManager: def __init__(self): # Different tokens for different environments env = os.environ.get("ENVIRONMENT", "development")
if env == "production": self.client = Cloudflare( api_token=os.environ["CLOUDFLARE_PROD_TOKEN"], max_retries=5, timeout=30.0, ) else: self.client = Cloudflare( api_token=os.environ["CLOUDFLARE_DEV_TOKEN"], max_retries=2, timeout=10.0, )
def __enter__(self): return self.client
def __exit__(self, exc_type, exc_val, exc_tb): self.client.close()
# Usagewith CloudflareManager() as cf: zones = list(cf.zones.list())
bun install -g wrangler
(or npm install -g wrangler
)wrangler init my-python-app --type python
wrangler deploy
That’s it. You’re running Python at the edge.
Cloudflare has quietly built the platform we’ve been asking for: Python-first, globally distributed, with built-in AI/ML capabilities, and zero operational overhead. It’s not just about running Python in Workers—it’s about having an entire ecosystem designed for modern Python applications.
Whether you’re building AI agents, processing data pipelines, or serving APIs, Cloudflare’s edge platform offers a compelling alternative to traditional cloud providers. The best part? You can start for free and scale to millions of requests without changing your architecture.
Welcome to edge computing, Python style. No JavaScript required.
Ready to dive in? Check out the Python Workers documentation and join the Cloudflare Developers Discord to connect with other Python developers building on the edge.
wrangler dev
debugging experience lacks parity with traditional Python developmentpdb
debugger support in Workers runtimecloudflare-sqlalchemy
package with D1 dialect# Dream APIfrom cloudflare_sqlalchemy import create_d1_engineengine = create_d1_engine("d1://account_id/database_id")
langchain-cloudflare
package# Dream APIfrom langchain_cloudflare import CloudflareVectorStorevectorstore = CloudflareVectorStore( index_name="my-index", embedding_service="@cf/baai/bge-base-en-v1.5")
cloudflare dev
command that:
d1-migrate
tool supporting:
d1-migrate initd1-migrate generate "add users table"d1-migrate upd1-migrate rollback
class RoomState(DurableObject): async def fetch(self, request: Request) -> Response: # Stateful Python at the edge! self.state.users = self.state.users or []
@schedule("0 9 * * *") # Daily at 9 AMasync def daily_report(): await run_notebook("reports/daily_metrics.ipynb", params={"date": "today"})
Python Workers Adoption
Developer Satisfaction
Platform Growth
awesome-cloudflare-python
GitHub repoPosition Cloudflare as “The Python Cloud” - where Python developers go to build and deploy modern applications without infrastructure complexity. Make Cloudflare the obvious choice for every Python developer, from data scientists to web developers to ML engineers.
Tag line: “Python at the speed of light. No servers required.”
These recommendations come from analyzing the gaps between what Python developers need and what Cloudflare currently offers. Implementing even half of these would make Cloudflare the most Python-friendly edge platform in the market.