Skip to content

Python Async & Concurrency - Primer

Why This Matters

Every DevOps tool eventually hits a concurrency wall: a deployment script that provisions 200 VMs sequentially, a monitoring agent that polls 500 endpoints one at a time, a log shipper that blocks on network I/O. Python gives you three concurrency models (threading, multiprocessing, asyncio) and choosing the wrong one means your "parallel" code runs slower than serial. Understanding the GIL, the event loop, and when to reach for each tool is the difference between a 3-minute deploy and a 45-minute one.


The GIL (Global Interpreter Lock)

Under the hood: The GIL was introduced in CPython's earliest days (early 1990s) by Guido van Rossum as a pragmatic choice: it made the C extension API safe and simple at the cost of true thread parallelism. PEP 703 (accepted 2023) introduces a "free-threaded" CPython build without the GIL, expected to be production-ready around Python 3.15+.

The GIL is a mutex in CPython that allows only one thread to execute Python bytecode at a time. This is the single most important fact about Python concurrency.

What the GIL does

  • Protects CPython's reference-counting garbage collector from race conditions
  • Ensures only one thread runs Python bytecode at any given moment
  • Is released during I/O operations (file reads, network calls, time.sleep)
  • Is released by C extensions that explicitly drop it (NumPy, hashlib, etc.)

What the GIL does NOT do

  • It does not prevent data races on Python objects (you still need locks for shared mutable state)
  • It does not affect multiprocessing (each process has its own GIL)
  • It does not block I/O-bound threads (the GIL is released during I/O)
  • It does not exist in all Python implementations (Jython, IronPython have no GIL)

When the GIL matters

Workload GIL Impact Right Tool
CPU-bound (math, parsing, compression) Severe - threads give zero speedup multiprocessing
I/O-bound (HTTP calls, DB queries, file I/O) Minimal - GIL released during I/O threading or asyncio
Mixed (some CPU, some I/O) Moderate Combine approaches
import threading
import time

counter = 0

def cpu_work():
    """This will NOT run faster with threads due to GIL."""
    global counter
    for _ in range(10_000_000):
        counter += 1

# Two threads doing CPU work: ~2x SLOWER than single-threaded (GIL contention)
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)
start = time.time()
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Threaded CPU work: {time.time() - start:.2f}s")
# Also: counter will NOT be 20_000_000 due to race condition

Threading

The threading module provides OS-level threads. Good for I/O-bound work where the GIL is released.

Core primitives

import threading

# Basic thread
t = threading.Thread(target=my_func, args=(arg1, arg2))
t.start()
t.join(timeout=10)  # Wait up to 10s for completion

# Daemon threads: killed when main thread exits
t = threading.Thread(target=background_work, daemon=True)
t.start()
# No join needed - dies with main thread

Synchronization primitives

import threading

# Lock - mutual exclusion
lock = threading.Lock()
with lock:
    shared_resource.update(data)

# RLock - reentrant lock (same thread can acquire multiple times)
rlock = threading.RLock()
with rlock:
    with rlock:  # Same thread - OK with RLock, deadlock with Lock
        pass

# Semaphore - limit concurrent access
sem = threading.Semaphore(5)  # Max 5 concurrent
with sem:
    access_limited_resource()

# Event - signal between threads
ready = threading.Event()
# Thread 1:
ready.wait()  # Blocks until set
# Thread 2:
ready.set()   # Unblocks all waiters

# Condition - complex coordination
cond = threading.Condition()
with cond:
    while not data_available():
        cond.wait()
    process(data)
# Other thread:
with cond:
    produce_data()
    cond.notify_all()

Thread-safe queue

import queue
import threading

q = queue.Queue(maxsize=100)

def producer():
    for item in generate_items():
        q.put(item)  # Blocks if queue full
    q.put(None)      # Sentinel to signal done

def consumer():
    while True:
        item = q.get()  # Blocks if queue empty
        if item is None:
            break
        process(item)
        q.task_done()

# Start workers
threading.Thread(target=producer).start()
for _ in range(4):
    threading.Thread(target=consumer, daemon=True).start()

q.join()  # Wait until all items processed

ThreadPoolExecutor

The high-level interface for thread pools. Preferred over manual thread management.

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

urls = ["https://api.example.com/1", "https://api.example.com/2", ...]

def fetch(url):
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    return resp.json()

# Submit and collect results
with ThreadPoolExecutor(max_workers=20) as executor:
    future_to_url = {executor.submit(fetch, url): url for url in urls}
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {len(data)} items")
        except Exception as e:
            print(f"{url}: failed - {e}")

# Simpler: map (preserves order, but you lose per-item error handling)
with ThreadPoolExecutor(max_workers=20) as executor:
    results = list(executor.map(fetch, urls))

Multiprocessing

Separate OS processes, each with its own Python interpreter and GIL. The only way to achieve true CPU parallelism in CPython.

Basic process

from multiprocessing import Process, Queue

def worker(q, data_chunk):
    result = expensive_computation(data_chunk)
    q.put(result)

q = Queue()
processes = []
for chunk in split_data(data, num_chunks=4):
    p = Process(target=worker, args=(q, chunk))
    p.start()
    processes.append(p)

results = [q.get() for _ in processes]
for p in processes:
    p.join()

Pool (map/starmap)

from multiprocessing import Pool

def process_file(filepath):
    with open(filepath) as f:
        return analyze(f.read())

# Pool with context manager
with Pool(processes=4) as pool:
    results = pool.map(process_file, file_list)

    # Async variant (non-blocking)
    async_result = pool.map_async(process_file, file_list)
    # Do other work...
    results = async_result.get(timeout=60)

    # starmap for multiple arguments
    args = [(f, config) for f in file_list]
    results = pool.starmap(process_with_config, args)

ProcessPoolExecutor

The concurrent.futures interface for multiprocessing. Consistent API with ThreadPoolExecutor.

from concurrent.futures import ProcessPoolExecutor, as_completed

def cpu_intensive(n):
    return sum(i * i for i in range(n))

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(cpu_intensive, n): n for n in range(10)}
    for future in as_completed(futures):
        n = futures[future]
        print(f"Sum of squares up to {n}: {future.result()}")

asyncio

The single-threaded cooperative concurrency model. Coroutines voluntarily yield control at await points, allowing other coroutines to run. No threads, no locks (usually), no GIL issues.

Core concepts

  • Event loop: The scheduler that runs coroutines. One loop per thread.
  • Coroutine: A function defined with async def. Does nothing until awaited or scheduled.
  • Task: A coroutine wrapped and scheduled on the event loop. Runs concurrently.
  • await: Suspends the current coroutine and yields control to the event loop.

Basic patterns

import asyncio

async def fetch_data(url: str) -> dict:
    """A coroutine - does nothing until awaited."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()

async def main():
    # Sequential (no concurrency)
    result1 = await fetch_data("https://api.example.com/1")
    result2 = await fetch_data("https://api.example.com/2")

    # Concurrent (both requests in flight at once)
    result1, result2 = await asyncio.gather(
        fetch_data("https://api.example.com/1"),
        fetch_data("https://api.example.com/2"),
    )

asyncio.run(main())  # Entry point - creates event loop and runs

Gotcha: A common mistake is calling await on a coroutine that does blocking I/O (requests.get, time.sleep). This blocks the entire event loop — no other coroutines run. Use aiohttp instead of requests, and asyncio.sleep instead of time.sleep. If you must call blocking code, wrap it with loop.run_in_executor().

Tasks and create_task

async def main():
    # create_task schedules the coroutine immediately
    task1 = asyncio.create_task(fetch_data(url1))
    task2 = asyncio.create_task(fetch_data(url2))

    # Both are running now. Do other work...
    await do_something_else()

    # Collect results
    result1 = await task1
    result2 = await task2

gather vs wait

import asyncio

# gather: run all, get results in order. One failure = all fail (by default)
results = await asyncio.gather(
    coro1(), coro2(), coro3(),
    return_exceptions=True,  # Don't fail fast - return exceptions as results
)
for r in results:
    if isinstance(r, Exception):
        handle_error(r)
    else:
        handle_success(r)

# wait: more control over completion
done, pending = await asyncio.wait(
    [asyncio.create_task(c) for c in coros],
    timeout=10,
    return_when=asyncio.FIRST_COMPLETED,  # or ALL_COMPLETED, FIRST_EXCEPTION
)
for task in done:
    if task.exception():
        handle_error(task.exception())
    else:
        handle_success(task.result())
for task in pending:
    task.cancel()

Semaphore for rate limiting

import asyncio
import aiohttp

async def fetch_with_limit(sem, session, url):
    async with sem:  # Limits concurrent requests
        async with session.get(url) as resp:
            return await resp.json()

async def main():
    sem = asyncio.Semaphore(10)  # Max 10 concurrent requests
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(sem, session, url) for url in urls]
        results = await asyncio.gather(*tasks)

concurrent.futures

The unified interface for both thread and process pools. Import one executor, swap implementations.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures import as_completed, wait, Future

# Same API, different backend
def run_parallel(func, items, executor_class, max_workers=None):
    with executor_class(max_workers=max_workers) as executor:
        futures = {executor.submit(func, item): item for item in items}
        results = {}
        for future in as_completed(futures):
            item = futures[future]
            try:
                results[item] = future.result(timeout=30)
            except TimeoutError:
                results[item] = None
            except Exception as e:
                results[item] = e
        return results

# I/O bound: use threads
results = run_parallel(fetch_url, urls, ThreadPoolExecutor, max_workers=50)

# CPU bound: use processes
results = run_parallel(crunch_data, chunks, ProcessPoolExecutor, max_workers=8)

When to Use What

Scenario Tool Why
Bulk HTTP requests asyncio + aiohttp Thousands of concurrent connections, single thread
Parallel file I/O ThreadPoolExecutor Simple, GIL released during I/O
CPU-bound processing ProcessPoolExecutor Bypasses GIL entirely
Real-time websockets asyncio Event-driven, long-lived connections
subprocess calls asyncio.create_subprocess_exec Non-blocking process management
Quick parallelism in scripts ThreadPoolExecutor Minimal boilerplate
Data pipeline (CPU) multiprocessing.Pool map/starmap for batch processing
Mixed I/O + CPU Combine: async I/O, process pool for CPU loop.run_in_executor(process_pool, func)

Remember: Decision mnemonic: "I/O waits? Threads. CPU burns? Processes. Thousands of sockets? Async." If you cannot decide, start with ThreadPoolExecutor — it is the simplest, requires no code restructuring, and works well for the 90% case of I/O-bound ops scripts.

Combining asyncio with ProcessPoolExecutor

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_work(data):
    """Runs in a separate process."""
    return heavy_computation(data)

async def main():
    loop = asyncio.get_event_loop()
    executor = ProcessPoolExecutor(max_workers=4)

    # Offload CPU work to process pool from async code
    results = await asyncio.gather(*[
        loop.run_in_executor(executor, cpu_work, chunk)
        for chunk in data_chunks
    ])
    executor.shutdown(wait=False)

subprocess Module for External Commands

import subprocess
import asyncio

# Synchronous - blocks until complete
result = subprocess.run(
    ["kubectl", "get", "pods", "-o", "json"],
    capture_output=True, text=True, timeout=30,
    check=True,  # Raises CalledProcessError on non-zero exit
)
pods = json.loads(result.stdout)

# Async subprocess
async def run_command(cmd: list[str]) -> str:
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode != 0:
        raise RuntimeError(f"{cmd[0]} failed: {stderr.decode()}")
    return stdout.decode()

# Run multiple commands concurrently
async def main():
    pods, nodes, svcs = await asyncio.gather(
        run_command(["kubectl", "get", "pods", "-o", "json"]),
        run_command(["kubectl", "get", "nodes", "-o", "json"]),
        run_command(["kubectl", "get", "svc", "-o", "json"]),
    )

Signal Handling in Concurrent Code

import asyncio
import signal

async def shutdown(sig, loop):
    print(f"Received {sig.name}, shutting down...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)
    loop.stop()

async def main():
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(
            sig, lambda s=sig: asyncio.create_task(shutdown(s, loop))
        )
    await run_service()

# For threading: signal handlers only work in the main thread
import threading
import signal

def handler(signum, frame):
    print("Signal received, setting shutdown flag")
    shutdown_event.set()

shutdown_event = threading.Event()
signal.signal(signal.SIGTERM, handler)  # Must be called from main thread

Wiki Navigation

Prerequisites

  • Perl Flashcards (CLI) (flashcard_deck, L1) — Python Automation
  • Python Debugging (Topic Pack, L1) — Python Automation
  • Python Drills (Drill, L0) — Python Automation
  • Python Exercises (Quest Ladder) (CLI) (Exercise Set, L0) — Python Automation
  • Python Flashcards (CLI) (flashcard_deck, L1) — Python Automation
  • Python Packaging (Topic Pack, L2) — Python Automation
  • Python for Infrastructure (Topic Pack, L1) — Python Automation
  • Skillcheck: Python Automation (Assessment, L0) — Python Automation
  • Software Development Flashcards (CLI) (flashcard_deck, L1) — Python Automation