Skip to content

Python Async & Concurrency - Street-Level Ops

Real-world patterns for concurrent Python in DevOps: bulk API calls, parallel processing, async services, and debugging stuck code.

Async HTTP Client for Bulk API Calls

The most common DevOps concurrency pattern: hit an API hundreds or thousands of times.

import asyncio
import aiohttp
import time

async def fetch_page(
    sem: asyncio.Semaphore,
    session: aiohttp.ClientSession,
    url: str,
    retries: int = 3,
) -> dict:
    """Fetch a URL with semaphore rate limiting and retry."""
    for attempt in range(retries):
        async with sem:
            try:
                async with session.get(url) as resp:
                    if resp.status == 429:  # Rate limited
                        retry_after = int(resp.headers.get("Retry-After", 2))
                        await asyncio.sleep(retry_after)
                        continue
                    resp.raise_for_status()
                    return {"url": url, "status": resp.status, "data": await resp.json()}
            except aiohttp.ClientError as e:
                if attempt == retries - 1:
                    return {"url": url, "status": "error", "error": str(e)}
                await asyncio.sleep(2 ** attempt)  # Exponential backoff

async def bulk_api_scan(base_url: str, endpoints: list[str], max_concurrent: int = 20):
    """Hit many endpoints concurrently with rate limiting."""
    sem = asyncio.Semaphore(max_concurrent)
    timeout = aiohttp.ClientTimeout(total=30, connect=5)
    connector = aiohttp.TCPConnector(limit=max_concurrent, limit_per_host=10)

    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        tasks = [
            asyncio.create_task(fetch_page(sem, session, f"{base_url}{ep}"))
            for ep in endpoints
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = [r for r in results if isinstance(r, dict) and r.get("status") == 200]
    failures = [r for r in results if not isinstance(r, dict) or r.get("status") != 200]
    print(f"Done: {len(successes)} OK, {len(failures)} failed")
    return results

# Usage
asyncio.run(bulk_api_scan("https://api.example.com", [f"/v1/items/{i}" for i in range(500)]))

Parallel File Processing with ProcessPoolExecutor

For CPU-bound work on many files (log parsing, checksum verification, compression).

import hashlib
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path

def compute_checksum(filepath: str) -> tuple[str, str, int]:
    """Compute SHA-256 of a file. Runs in a worker process."""
    h = hashlib.sha256()
    size = 0
    with open(filepath, "rb") as f:
        while chunk := f.read(8192):
            h.update(chunk)
            size += len(chunk)
    return filepath, h.hexdigest(), size

def parallel_checksum(directory: str, max_workers: int = None) -> dict:
    """Checksum all files in a directory tree using multiple processes."""
    files = [str(p) for p in Path(directory).rglob("*") if p.is_file()]
    results = {}

    # Default workers = min(32, os.cpu_count() + 4) in Python 3.13+
    with ProcessPoolExecutor(max_workers=max_workers or os.cpu_count()) as executor:
        future_to_path = {executor.submit(compute_checksum, f): f for f in files}
        done_count = 0
        for future in as_completed(future_to_path):
            done_count += 1
            if done_count % 100 == 0:
                print(f"  Progress: {done_count}/{len(files)}")
            try:
                path, checksum, size = future.result(timeout=60)
                results[path] = {"sha256": checksum, "size": size}
            except Exception as e:
                path = future_to_path[future]
                results[path] = {"error": str(e)}

    print(f"Processed {len(results)} files")
    return results

Async FastAPI / ASGI Understanding

FastAPI runs on an ASGI server (uvicorn). Each request handler is a coroutine. Blocking inside a handler blocks the entire event loop.

from fastapi import FastAPI
import asyncio
import httpx

app = FastAPI()

# GOOD: async endpoint with async HTTP client
@app.get("/external-data")
async def get_external_data():
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com/data")
        return resp.json()

# BAD: blocking call inside async handler - blocks the event loop
@app.get("/bad-endpoint")
async def bad_endpoint():
    import requests
    resp = requests.get("https://api.example.com/data")  # BLOCKS EVENT LOOP
    return resp.json()

# OK: use run_in_executor for unavoidable sync code
@app.get("/sync-wrapped")
async def sync_wrapped():
    import requests
    loop = asyncio.get_event_loop()
    resp = await loop.run_in_executor(None, requests.get, "https://api.example.com/data")
    return resp.json()

# OK: define as sync function - FastAPI auto-runs it in a thread pool
@app.get("/sync-endpoint")
def sync_endpoint():
    import requests
    resp = requests.get("https://api.example.com/data")
    return resp.json()

Key rule: if your endpoint is async def, every I/O call inside it must be await-ed. If you cannot use an async library, either define the endpoint as plain def (FastAPI runs it in a threadpool) or use run_in_executor.

Gotcha: FastAPI runs plain def endpoints in a threadpool (default 40 threads). If you have 50 concurrent requests to a sync endpoint that each takes 2 seconds, 10 requests queue behind the thread pool. The symptom is intermittent latency spikes that only appear under load. Check uvicorn access logs for requests with suspiciously variable response times.

Debugging Stuck Async Code

Symptom: program hangs, no output, no error

import asyncio
import traceback

# Enable asyncio debug mode
asyncio.run(main(), debug=True)

# Or via environment variable
# PYTHONASYNCIODEBUG=1 python my_script.py

# Debug mode warnings:
# - Coroutines that were never awaited
# - Callbacks taking longer than 100ms (blocking the loop)
# - Unclosed resources (sessions, connections)

Dump all running tasks

import asyncio
import sys

async def dump_tasks():
    """Print all running tasks and their stack traces."""
    for task in asyncio.all_tasks():
        print(f"Task: {task.get_name()}, State: {task._state}")
        task.print_stack(file=sys.stderr)
        print("---")

Common causes of async hangs

# 1. Forgot to await a coroutine
async def main():
    fetch_data("http://example.com")  # Missing await! Returns coroutine object, does nothing.
    # RuntimeWarning in debug mode: "coroutine 'fetch_data' was never awaited"

# 2. Deadlock: awaiting something that will never complete
async def main():
    queue = asyncio.Queue()
    item = await queue.get()  # Hangs forever - nothing ever puts to queue

# 3. Blocking call in async code
async def main():
    time.sleep(30)  # Blocks entire event loop for 30 seconds
    # Use: await asyncio.sleep(30)

# 4. Exception swallowed in gather
async def main():
    results = await asyncio.gather(task1(), task2(), task3())
    # If task2 raises, the whole gather fails and task1/task3 results are lost
    # Fix: use return_exceptions=True

Thread Safety in Shared State

import threading
from collections import defaultdict

# WRONG: shared dict without lock
metrics = defaultdict(int)

def record_metric(name):
    metrics[name] += 1  # Race condition: read-modify-write is not atomic

# RIGHT: lock around shared state
metrics_lock = threading.Lock()
metrics = defaultdict(int)

def record_metric(name):
    with metrics_lock:
        metrics[name] += 1

# BETTER: use thread-safe structures
from queue import Queue

metric_queue = Queue()

def record_metric(name):
    metric_queue.put(name)  # Thread-safe by design

def flush_metrics():
    counts = defaultdict(int)
    while not metric_queue.empty():
        try:
            name = metric_queue.get_nowait()
            counts[name] += 1
        except queue.Empty:
            break
    return counts

Multiprocessing in Docker

Docker containers typically have PID 1 issues with multiprocessing. Orphaned child processes become zombies because PID 1 does not reap them by default.

# BAD: Python as PID 1 - zombie processes accumulate
CMD ["python", "worker.py"]

# GOOD: Use tini as init process
RUN apt-get update && apt-get install -y tini
ENTRYPOINT ["tini", "--"]
CMD ["python", "worker.py"]

# Or use Docker's built-in init
# docker run --init myimage
# Fork safety: never fork after creating threads
import multiprocessing

# Set start method BEFORE creating any threads
multiprocessing.set_start_method("spawn")  # Safe but slower (re-imports module)
# "fork" (default on Linux): copies everything including broken locks from threads
# "spawn" (default on macOS/Windows): starts fresh Python process
# Debug clue: if multiprocessing hangs silently, check if you're forking after
# creating threads — the child inherits locked mutexes that no thread exists to unlock.
# "forkserver": compromise - forks from a clean server process

# In Docker, always prefer "spawn" or "forkserver"

Graceful Shutdown of Async Services

import asyncio
import signal

class AsyncService:
    def __init__(self):
        self.running = True
        self.tasks: set[asyncio.Task] = set()

    async def start(self):
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, lambda: asyncio.create_task(self.stop()))

        # Start background workers
        for i in range(5):
            task = asyncio.create_task(self.worker(f"worker-{i}"))
            self.tasks.add(task)
            task.add_done_callback(self.tasks.discard)

        # Wait until shutdown
        await asyncio.gather(*self.tasks, return_exceptions=True)

    async def stop(self):
        print("Shutting down gracefully...")
        self.running = False

        # Give tasks 10 seconds to finish current work
        for task in self.tasks:
            task.cancel()
        await asyncio.gather(*self.tasks, return_exceptions=True)

    async def worker(self, name: str):
        while self.running:
            try:
                await self.process_next_item(name)
            except asyncio.CancelledError:
                print(f"{name}: finishing current item before exit")
                break
            except Exception as e:
                print(f"{name}: error - {e}, continuing...")
                await asyncio.sleep(1)

asyncio.run(AsyncService().start())

Rate-Limiting Patterns

Semaphore for concurrent request cap

import asyncio

async def concurrent_batch(coros, max_concurrent: int = 10):
    """Run coroutines with a maximum concurrency window."""
    sem = asyncio.Semaphore(max_concurrent)

    async def bounded(coro):
        async with sem:
            return await coro

    return await asyncio.gather(*[bounded(c) for c in coros], return_exceptions=True)

# Process 1000 items, max 50 at a time
results = await concurrent_batch(
    [process_item(item) for item in items],
    max_concurrent=50,
)