- devops
- l2
- topic-pack
- python-async-concurrency
- python-automation --- Portal | Level: L2: Operations | Topics: Python Async & Concurrency, Python Automation | Domain: DevOps & Tooling
Python Async & Concurrency - Primer¶
Why This Matters¶
Every DevOps tool eventually hits a concurrency wall: a deployment script that provisions 200 VMs sequentially, a monitoring agent that polls 500 endpoints one at a time, a log shipper that blocks on network I/O. Python gives you three concurrency models (threading, multiprocessing, asyncio) and choosing the wrong one means your "parallel" code runs slower than serial. Understanding the GIL, the event loop, and when to reach for each tool is the difference between a 3-minute deploy and a 45-minute one.
The GIL (Global Interpreter Lock)¶
Under the hood: The GIL was introduced in CPython's earliest days (early 1990s) by Guido van Rossum as a pragmatic choice: it made the C extension API safe and simple at the cost of true thread parallelism. PEP 703 (accepted 2023) introduces a "free-threaded" CPython build without the GIL, expected to be production-ready around Python 3.15+.
The GIL is a mutex in CPython that allows only one thread to execute Python bytecode at a time. This is the single most important fact about Python concurrency.
What the GIL does¶
- Protects CPython's reference-counting garbage collector from race conditions
- Ensures only one thread runs Python bytecode at any given moment
- Is released during I/O operations (file reads, network calls,
time.sleep) - Is released by C extensions that explicitly drop it (NumPy,
hashlib, etc.)
What the GIL does NOT do¶
- It does not prevent data races on Python objects (you still need locks for shared mutable state)
- It does not affect multiprocessing (each process has its own GIL)
- It does not block I/O-bound threads (the GIL is released during I/O)
- It does not exist in all Python implementations (Jython, IronPython have no GIL)
When the GIL matters¶
| Workload | GIL Impact | Right Tool |
|---|---|---|
| CPU-bound (math, parsing, compression) | Severe - threads give zero speedup | multiprocessing |
| I/O-bound (HTTP calls, DB queries, file I/O) | Minimal - GIL released during I/O | threading or asyncio |
| Mixed (some CPU, some I/O) | Moderate | Combine approaches |
import threading
import time
counter = 0
def cpu_work():
"""This will NOT run faster with threads due to GIL."""
global counter
for _ in range(10_000_000):
counter += 1
# Two threads doing CPU work: ~2x SLOWER than single-threaded (GIL contention)
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)
start = time.time()
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Threaded CPU work: {time.time() - start:.2f}s")
# Also: counter will NOT be 20_000_000 due to race condition
Threading¶
The threading module provides OS-level threads. Good for I/O-bound work where the GIL is released.
Core primitives¶
import threading
# Basic thread
t = threading.Thread(target=my_func, args=(arg1, arg2))
t.start()
t.join(timeout=10) # Wait up to 10s for completion
# Daemon threads: killed when main thread exits
t = threading.Thread(target=background_work, daemon=True)
t.start()
# No join needed - dies with main thread
Synchronization primitives¶
import threading
# Lock - mutual exclusion
lock = threading.Lock()
with lock:
shared_resource.update(data)
# RLock - reentrant lock (same thread can acquire multiple times)
rlock = threading.RLock()
with rlock:
with rlock: # Same thread - OK with RLock, deadlock with Lock
pass
# Semaphore - limit concurrent access
sem = threading.Semaphore(5) # Max 5 concurrent
with sem:
access_limited_resource()
# Event - signal between threads
ready = threading.Event()
# Thread 1:
ready.wait() # Blocks until set
# Thread 2:
ready.set() # Unblocks all waiters
# Condition - complex coordination
cond = threading.Condition()
with cond:
while not data_available():
cond.wait()
process(data)
# Other thread:
with cond:
produce_data()
cond.notify_all()
Thread-safe queue¶
import queue
import threading
q = queue.Queue(maxsize=100)
def producer():
for item in generate_items():
q.put(item) # Blocks if queue full
q.put(None) # Sentinel to signal done
def consumer():
while True:
item = q.get() # Blocks if queue empty
if item is None:
break
process(item)
q.task_done()
# Start workers
threading.Thread(target=producer).start()
for _ in range(4):
threading.Thread(target=consumer, daemon=True).start()
q.join() # Wait until all items processed
ThreadPoolExecutor¶
The high-level interface for thread pools. Preferred over manual thread management.
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
urls = ["https://api.example.com/1", "https://api.example.com/2", ...]
def fetch(url):
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.json()
# Submit and collect results
with ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url}: {len(data)} items")
except Exception as e:
print(f"{url}: failed - {e}")
# Simpler: map (preserves order, but you lose per-item error handling)
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(executor.map(fetch, urls))
Multiprocessing¶
Separate OS processes, each with its own Python interpreter and GIL. The only way to achieve true CPU parallelism in CPython.
Basic process¶
from multiprocessing import Process, Queue
def worker(q, data_chunk):
result = expensive_computation(data_chunk)
q.put(result)
q = Queue()
processes = []
for chunk in split_data(data, num_chunks=4):
p = Process(target=worker, args=(q, chunk))
p.start()
processes.append(p)
results = [q.get() for _ in processes]
for p in processes:
p.join()
Pool (map/starmap)¶
from multiprocessing import Pool
def process_file(filepath):
with open(filepath) as f:
return analyze(f.read())
# Pool with context manager
with Pool(processes=4) as pool:
results = pool.map(process_file, file_list)
# Async variant (non-blocking)
async_result = pool.map_async(process_file, file_list)
# Do other work...
results = async_result.get(timeout=60)
# starmap for multiple arguments
args = [(f, config) for f in file_list]
results = pool.starmap(process_with_config, args)
ProcessPoolExecutor¶
The concurrent.futures interface for multiprocessing. Consistent API with ThreadPoolExecutor.
from concurrent.futures import ProcessPoolExecutor, as_completed
def cpu_intensive(n):
return sum(i * i for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(cpu_intensive, n): n for n in range(10)}
for future in as_completed(futures):
n = futures[future]
print(f"Sum of squares up to {n}: {future.result()}")
asyncio¶
The single-threaded cooperative concurrency model. Coroutines voluntarily yield control at await points, allowing other coroutines to run. No threads, no locks (usually), no GIL issues.
Core concepts¶
- Event loop: The scheduler that runs coroutines. One loop per thread.
- Coroutine: A function defined with
async def. Does nothing until awaited or scheduled. - Task: A coroutine wrapped and scheduled on the event loop. Runs concurrently.
await: Suspends the current coroutine and yields control to the event loop.
Basic patterns¶
import asyncio
async def fetch_data(url: str) -> dict:
"""A coroutine - does nothing until awaited."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
async def main():
# Sequential (no concurrency)
result1 = await fetch_data("https://api.example.com/1")
result2 = await fetch_data("https://api.example.com/2")
# Concurrent (both requests in flight at once)
result1, result2 = await asyncio.gather(
fetch_data("https://api.example.com/1"),
fetch_data("https://api.example.com/2"),
)
asyncio.run(main()) # Entry point - creates event loop and runs
Gotcha: A common mistake is calling
awaiton a coroutine that does blocking I/O (requests.get,time.sleep). This blocks the entire event loop — no other coroutines run. Useaiohttpinstead ofrequests, andasyncio.sleepinstead oftime.sleep. If you must call blocking code, wrap it withloop.run_in_executor().
Tasks and create_task¶
async def main():
# create_task schedules the coroutine immediately
task1 = asyncio.create_task(fetch_data(url1))
task2 = asyncio.create_task(fetch_data(url2))
# Both are running now. Do other work...
await do_something_else()
# Collect results
result1 = await task1
result2 = await task2
gather vs wait¶
import asyncio
# gather: run all, get results in order. One failure = all fail (by default)
results = await asyncio.gather(
coro1(), coro2(), coro3(),
return_exceptions=True, # Don't fail fast - return exceptions as results
)
for r in results:
if isinstance(r, Exception):
handle_error(r)
else:
handle_success(r)
# wait: more control over completion
done, pending = await asyncio.wait(
[asyncio.create_task(c) for c in coros],
timeout=10,
return_when=asyncio.FIRST_COMPLETED, # or ALL_COMPLETED, FIRST_EXCEPTION
)
for task in done:
if task.exception():
handle_error(task.exception())
else:
handle_success(task.result())
for task in pending:
task.cancel()
Semaphore for rate limiting¶
import asyncio
import aiohttp
async def fetch_with_limit(sem, session, url):
async with sem: # Limits concurrent requests
async with session.get(url) as resp:
return await resp.json()
async def main():
sem = asyncio.Semaphore(10) # Max 10 concurrent requests
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks)
concurrent.futures¶
The unified interface for both thread and process pools. Import one executor, swap implementations.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures import as_completed, wait, Future
# Same API, different backend
def run_parallel(func, items, executor_class, max_workers=None):
with executor_class(max_workers=max_workers) as executor:
futures = {executor.submit(func, item): item for item in items}
results = {}
for future in as_completed(futures):
item = futures[future]
try:
results[item] = future.result(timeout=30)
except TimeoutError:
results[item] = None
except Exception as e:
results[item] = e
return results
# I/O bound: use threads
results = run_parallel(fetch_url, urls, ThreadPoolExecutor, max_workers=50)
# CPU bound: use processes
results = run_parallel(crunch_data, chunks, ProcessPoolExecutor, max_workers=8)
When to Use What¶
| Scenario | Tool | Why |
|---|---|---|
| Bulk HTTP requests | asyncio + aiohttp |
Thousands of concurrent connections, single thread |
| Parallel file I/O | ThreadPoolExecutor |
Simple, GIL released during I/O |
| CPU-bound processing | ProcessPoolExecutor |
Bypasses GIL entirely |
| Real-time websockets | asyncio |
Event-driven, long-lived connections |
| subprocess calls | asyncio.create_subprocess_exec |
Non-blocking process management |
| Quick parallelism in scripts | ThreadPoolExecutor |
Minimal boilerplate |
| Data pipeline (CPU) | multiprocessing.Pool |
map/starmap for batch processing |
| Mixed I/O + CPU | Combine: async I/O, process pool for CPU | loop.run_in_executor(process_pool, func) |
Remember: Decision mnemonic: "I/O waits? Threads. CPU burns? Processes. Thousands of sockets? Async." If you cannot decide, start with
ThreadPoolExecutor— it is the simplest, requires no code restructuring, and works well for the 90% case of I/O-bound ops scripts.
Combining asyncio with ProcessPoolExecutor¶
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_work(data):
"""Runs in a separate process."""
return heavy_computation(data)
async def main():
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=4)
# Offload CPU work to process pool from async code
results = await asyncio.gather(*[
loop.run_in_executor(executor, cpu_work, chunk)
for chunk in data_chunks
])
executor.shutdown(wait=False)
subprocess Module for External Commands¶
import subprocess
import asyncio
# Synchronous - blocks until complete
result = subprocess.run(
["kubectl", "get", "pods", "-o", "json"],
capture_output=True, text=True, timeout=30,
check=True, # Raises CalledProcessError on non-zero exit
)
pods = json.loads(result.stdout)
# Async subprocess
async def run_command(cmd: list[str]) -> str:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"{cmd[0]} failed: {stderr.decode()}")
return stdout.decode()
# Run multiple commands concurrently
async def main():
pods, nodes, svcs = await asyncio.gather(
run_command(["kubectl", "get", "pods", "-o", "json"]),
run_command(["kubectl", "get", "nodes", "-o", "json"]),
run_command(["kubectl", "get", "svc", "-o", "json"]),
)
Signal Handling in Concurrent Code¶
import asyncio
import signal
async def shutdown(sig, loop):
print(f"Received {sig.name}, shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
async def main():
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig, lambda s=sig: asyncio.create_task(shutdown(s, loop))
)
await run_service()
# For threading: signal handlers only work in the main thread
import threading
import signal
def handler(signum, frame):
print("Signal received, setting shutdown flag")
shutdown_event.set()
shutdown_event = threading.Event()
signal.signal(signal.SIGTERM, handler) # Must be called from main thread
Wiki Navigation¶
Prerequisites¶
- Python for Infrastructure (Topic Pack, L1)
Related Content¶
- Perl Flashcards (CLI) (flashcard_deck, L1) — Python Automation
- Python Debugging (Topic Pack, L1) — Python Automation
- Python Drills (Drill, L0) — Python Automation
- Python Exercises (Quest Ladder) (CLI) (Exercise Set, L0) — Python Automation
- Python Flashcards (CLI) (flashcard_deck, L1) — Python Automation
- Python Packaging (Topic Pack, L2) — Python Automation
- Python for Infrastructure (Topic Pack, L1) — Python Automation
- Skillcheck: Python Automation (Assessment, L0) — Python Automation
- Software Development Flashcards (CLI) (flashcard_deck, L1) — Python Automation