Python: Automating Everything — APIs and Infrastructure
- lesson
- python
- requests
- boto3
- kubernetes-client
- prometheus-api
- json/yaml
- argparse
- logging
- retries
- slack-webhooks
- cli-tools ---# Python — Automating Everything: APIs and Infrastructure
Topics: Python, requests, boto3, kubernetes-client, Prometheus API, JSON/YAML, argparse, logging, retries, Slack webhooks, CLI tools Strategy: Build-up + incident-driven Level: L1–L2 (Operations → Applied) Time: 90–120 minutes Prerequisites: None required (but you'll move faster if you've read Python for Ops — The Bash Expert's Bridge)
The Mission¶
It's Monday morning. You're a platform engineer responsible for three Kubernetes clusters, a Prometheus stack, and a fleet of AWS resources. Every morning, you open six browser tabs: Grafana, the K8s dashboard, the AWS console, Slack, your email, PagerDuty. You click around for 15 minutes assembling a mental picture: Are the clusters healthy? Any pods crashlooping? Any alerts firing? Any EC2 instances in weird states? Any S3 buckets growing suspiciously?
Today you're going to replace those six tabs with one Python script.
Your mission: build a morning check tool that queries Kubernetes, Prometheus, and AWS, then posts a formatted summary to Slack. Along the way, you'll learn every library and pattern you need to automate real infrastructure with Python.
Part 1: Talking to APIs — requests Deep Dive¶
Every infrastructure API speaks HTTP. Kubernetes, Prometheus, AWS, Slack, PagerDuty,
Grafana, GitHub — all of them. The requests library is how Python talks HTTP.
The Basics¶
import requests
# GET — retrieve data
response = requests.get("https://httpbin.org/get", timeout=10)
print(response.status_code) # 200
print(response.json()) # parsed JSON as a Python dict
# POST — send data
response = requests.post(
"https://httpbin.org/post",
json={"hostname": "web-prod-01", "status": "healthy"},
timeout=10,
)
# PUT — replace a resource
# DELETE — remove a resource
# PATCH — partial update
That timeout=10 is not optional. Without it, your script blocks forever if the server
doesn't respond.
Gotcha:
timeout=10sets a single timeout for both connection and read. For production scripts, split them:timeout=(5, 30)means "5 seconds to establish the connection, 30 seconds to read the response." An API that takes 20 seconds to compute a heavy query is fine — an API that takes 20 seconds to accept a TCP connection is down.
Headers, Auth, and Tokens¶
# Bearer token auth (most modern APIs)
headers = {
"Authorization": "Bearer eyJhbGciOiJSUzI1NiIs...",
"Accept": "application/json",
}
response = requests.get(
"https://api.example.com/v1/clusters",
headers=headers,
timeout=10,
)
# Basic auth (legacy APIs, some internal tools)
response = requests.get(
"https://prometheus.internal:9090/api/v1/query",
auth=("admin", "prom-readonly-token"),
timeout=10,
)
Sessions: Stop Repeating Yourself¶
If you're making multiple calls to the same API, use a session. It reuses TCP connections (faster), persists headers (cleaner), and enables retry logic (safer).
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {token}",
"Accept": "application/json",
})
# Now every call through this session includes those headers
clusters = session.get("https://api.example.com/v1/clusters", timeout=10)
nodes = session.get("https://api.example.com/v1/nodes", timeout=10)
Under the Hood: HTTP/1.1 supports persistent connections (keep-alive). A
requests.Sessionholds aurllib3connection pool that reuses TCP sockets across calls to the same host. Without a session, eachrequests.get()opens a new TCP connection, does the TLS handshake (for HTTPS), sends the request, and tears down the connection. With a session hitting the same host 20 times, you do 1 TLS handshake instead of 20. For internal APIs over a VPN, this cuts latency dramatically.
Retry Logic: The Non-Negotiable¶
APIs fail. Networks have blips. Load balancers return 502 during deploys. Your script needs to handle this gracefully, not crash at 6:01 AM.
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def get_session(retries=3, backoff_factor=0.5, timeout=10):
"""Create a requests session with automatic retries."""
session = requests.Session()
retry = Retry(
total=retries,
backoff_factor=backoff_factor, # 0.5s, 1s, 2s between retries
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET", "HEAD"], # Only retry safe methods
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
| Parameter | What it does | Example |
|---|---|---|
total=3 |
Maximum number of retries | 3 retries = 4 total attempts |
backoff_factor=0.5 |
Wait time multiplier: {factor} * 2^(attempt-1) |
0.5s, 1s, 2s |
status_forcelist |
HTTP codes that trigger a retry | 500, 502, 503, 504 |
allowed_methods |
Only retry these HTTP methods | GET and HEAD (safe to retry) |
War Story: A monitoring script at a mid-size SaaS company ran every 60 seconds and checked 40 internal endpoints. It had no retry logic. During a routine deploy that caused 3 seconds of 502s from the load balancer, the script fired 40 "service down" alerts to Slack and paged the on-call engineer at 3 AM. The engineer woke up, checked the dashboards, and everything was green — the deploy had finished 57 seconds before they opened their laptop. After that, the team added retries with a 2-second backoff. The false alert rate dropped by 90%. The remaining 10% were real.
Flashcard Check #1¶
| Question | Answer |
|---|---|
What happens if you call requests.get(url) with no timeout? |
The call blocks indefinitely if the server never responds. Your script hangs, and if it's a cron job, the next invocation starts while the first is still stuck. |
What's the difference between timeout=10 and timeout=(5, 30)? |
Single value sets both connect and read timeout to 10s. Tuple sets connect timeout to 5s and read timeout to 30s separately. |
| Why should you only auto-retry GET and HEAD, not POST? | GET and HEAD are idempotent — repeating them has the same effect. A retried POST might create a duplicate resource. |
Part 2: JSON — The Language Infrastructure Speaks¶
Every API returns JSON. You need to be fast at parsing it, querying nested structures, and extracting the three numbers you actually care about from a 500-line response.
Parsing Responses¶
import json
# From an API response
response = session.get("https://prometheus.internal:9090/api/v1/targets", timeout=10)
data = response.json() # dict
# From a file
with open("response.json") as f:
data = json.load(f)
# From a string
data = json.loads('{"status": "success", "data": {"resultType": "vector"}}')
Navigating Nested JSON¶
Kubernetes and AWS responses are deeply nested. You need a strategy.
# A real Kubernetes pod response (simplified)
pod = {
"metadata": {
"name": "web-api-7d8f9c6b4-xk2p9",
"namespace": "production",
"labels": {"app": "web-api", "version": "v2.3.1"},
},
"status": {
"phase": "Running",
"containerStatuses": [
{
"name": "web-api",
"ready": True,
"restartCount": 0,
"state": {"running": {"startedAt": "2026-03-23T04:12:00Z"}},
},
{
"name": "envoy-sidecar",
"ready": True,
"restartCount": 3,
"state": {"running": {"startedAt": "2026-03-23T06:45:00Z"}},
},
],
},
}
# Extracting what you need
name = pod["metadata"]["name"]
namespace = pod["metadata"]["namespace"]
phase = pod["status"]["phase"]
# Safe access for optional fields (use .get())
node = pod["status"].get("hostIP", "unknown")
# Iterate container statuses
for container in pod["status"]["containerStatuses"]:
if container["restartCount"] > 2:
print(f" WARNING: {container['name']} has restarted {container['restartCount']} times")
jmespath: jq for Python¶
When you're tired of writing three levels of dict access and list comprehensions,
jmespath gives you jq-style queries in Python.
import jmespath
# Instead of:
names = [c["name"] for c in pod["status"]["containerStatuses"] if c["restartCount"] > 0]
# You can write:
names = jmespath.search("status.containerStatuses[?restartCount > `0`].name", pod)
# Query Prometheus results
prom_response = {
"data": {
"result": [
{"metric": {"instance": "web-01:9090"}, "value": [1711152000, "0.85"]},
{"metric": {"instance": "web-02:9090"}, "value": [1711152000, "0.42"]},
]
}
}
instances = jmespath.search("data.result[].metric.instance", prom_response)
# ['web-01:9090', 'web-02:9090']
Trivia: jmespath (pronounced "James path") was created by James Saryerwinnie while working on the AWS CLI. It's the query language that powers
--queryin theawsCLI:aws ec2 describe-instances --query 'Reservations[].Instances[].InstanceId'. The Python library is a direct port of the same specification. If you already know--querysyntax from the AWS CLI, you already know jmespath.
Part 3: Working with YAML — Reading the Configs That Run Everything¶
Kubernetes manifests. Ansible inventories. Helm values. Docker Compose files. YAML is everywhere in infrastructure, and Python reads it natively with PyYAML.
import yaml
# Read a Kubernetes deployment manifest
with open("deployment.yaml") as f:
manifest = yaml.safe_load(f)
# Access fields just like JSON
replicas = manifest["spec"]["replicas"]
image = manifest["spec"]["template"]["spec"]["containers"][0]["image"]
print(f"Deployment: {manifest['metadata']['name']}, replicas: {replicas}, image: {image}")
# Read a multi-document YAML file (multiple --- separated docs)
with open("all-resources.yaml") as f:
for doc in yaml.safe_load_all(f):
if doc is None:
continue
kind = doc.get("kind", "unknown")
name = doc.get("metadata", {}).get("name", "unnamed")
print(f" {kind}: {name}")
Gotcha: Always use
yaml.safe_load(), neveryaml.load(). The unsafe version can execute arbitrary Python code embedded in the YAML. This is not a theoretical risk — it's a known attack vector. If you seeyaml.load(f)without aLoaderargument in a codebase, that's a security bug. PyYAML will even warn you about it since version 5.1.
YAML's Type Surprises¶
# YAML has automatic type inference that bites people
import yaml
# These are all valid YAML — and none of them are strings
gotchas = yaml.safe_load("""
norway_code: NO # boolean False (YAML 1.1)
version: 1.10 # float 1.1 (trailing zero dropped)
timestamp: 2026-03-23 # datetime object
port: 8080 # integer
octal_trap: 0777 # integer 511 (octal in YAML 1.1)
""")
print(type(gotchas["port"])) # <class 'int'> — fine
print(type(gotchas["version"])) # <class 'float'> — 1.1, not "1.10"!
print(gotchas["norway_code"]) # False — not the string "NO"
Trivia: The "Norway problem" is famous in YAML circles. Country codes like
NO(Norway),FR(could be parsed as truthy in some contexts), and version strings like1.10being truncated to1.1have caused real deployment failures. This is one reason Helm and Kubernetes recommend quoting string values that could be misinterpreted. The YAML 1.2 spec (2009) fixed the boolean parsing, but PyYAML still implements YAML 1.1.
Part 4: Building CLI Tools — argparse Patterns¶
Your morning check script needs options: which clusters to check, verbosity, output format,
whether to actually post to Slack or just dry-run. argparse is in the standard library
and handles all of this.
The Pattern That Works¶
import argparse
import os
def parse_args():
parser = argparse.ArgumentParser(
description="Morning infrastructure health check",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --cluster prod-us-east-1 --cluster prod-eu-west-1
%(prog)s --dry-run --verbose
%(prog)s --output json > morning-report.json
""",
)
parser.add_argument(
"--cluster", "-c",
action="append",
default=None,
help="Kubernetes cluster to check (repeatable, default: all)",
)
parser.add_argument(
"--output", "-o",
choices=["text", "json", "slack"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--dry-run", "-n",
action="store_true",
help="Show what would be sent without posting to Slack",
)
parser.add_argument(
"--verbose", "-v",
action="count",
default=0,
help="Increase verbosity (-v info, -vv debug)",
)
parser.add_argument(
"--timeout",
type=int,
default=int(os.environ.get("MORNING_CHECK_TIMEOUT", "30")),
help="API timeout in seconds (default: 30, env: MORNING_CHECK_TIMEOUT)",
)
return parser.parse_args()
Configuration Precedence¶
Real tools need configuration from multiple sources. Here's the pattern that matches how every serious CLI tool (kubectl, aws, terraform) works:
CLI flags → override → Environment variables → override → Config file → override → Defaults
(highest) (lowest)
import os
import yaml
from pathlib import Path
def load_config(args):
"""Load config with CLI > env > file > defaults precedence."""
# Defaults
config = {
"prometheus_url": "http://prometheus.internal:9090",
"slack_webhook": None,
"timeout": 30,
"clusters": ["prod-us-east-1"],
}
# Config file (override defaults)
config_path = Path.home() / ".config" / "morning-check" / "config.yaml"
if config_path.exists():
with open(config_path) as f:
file_config = yaml.safe_load(f) or {}
config.update({k: v for k, v in file_config.items() if v is not None})
# Environment variables (override file)
env_map = {
"PROMETHEUS_URL": "prometheus_url",
"SLACK_WEBHOOK_URL": "slack_webhook",
"MORNING_CHECK_TIMEOUT": "timeout",
}
for env_key, config_key in env_map.items():
val = os.environ.get(env_key)
if val is not None:
config[config_key] = val
# CLI args (override everything)
if args.timeout:
config["timeout"] = args.timeout
if args.cluster:
config["clusters"] = args.cluster
return config
Mental Model: Think of configuration as layers in a stack. Each layer can override the one below it. Defaults are the foundation. The config file is the team's baseline. Environment variables let the deploy environment customize behavior. CLI flags let the operator override everything in the moment. This is the same model
kubectluses: cluster defaults < kubeconfig <KUBECONFIGenv var <--kubeconfigflag.
Flashcard Check #2¶
| Question | Answer |
|---|---|
What does yaml.safe_load() protect you from that yaml.load() doesn't? |
safe_load() only deserializes basic Python types. yaml.load() can instantiate arbitrary Python objects, enabling code execution from untrusted YAML files. |
| In what order should a CLI tool check configuration sources? | CLI flags > environment variables > config file > defaults. Most specific wins. This matches kubectl, aws CLI, terraform, and most serious tools. |
Why does jmespath.search("status.containerStatuses[?restartCount >0].name", pod) use backticks around 0? |
In jmespath, backticks denote literal values. Without them, 0 would be interpreted as a field name. The backticks tell jmespath "this is the number zero, not a key called 0." |
Part 5: The Kubernetes Python Client¶
You could kubectl get pods -o json via subprocess. But the Kubernetes Python client
gives you typed objects, watch streams, and proper error handling.
When to Use the Client vs. kubectl¶
| Use the Python client | Use kubectl (via subprocess) |
|---|---|
| Parsing pod status, events, metrics | Quick one-off: kubectl apply -f manifest.yaml |
| Watching for changes (event stream) | Interactive debugging: kubectl exec, kubectl logs |
| Building tools that query multiple resources | When the Python client's API mapping is unclear |
| Automated scaling, patching, rollouts | When you need kubectl plugins |
Setup and Authentication¶
from kubernetes import client, config
# Inside a pod (uses the mounted service account)
config.load_incluster_config()
# From your laptop (uses ~/.kube/config)
config.load_kube_config()
# Specific context from kubeconfig
config.load_kube_config(context="prod-us-east-1")
# Create API clients
v1 = client.CoreV1Api() # pods, services, nodes, events
apps_v1 = client.AppsV1Api() # deployments, statefulsets, daemonsets
Listing Pods and Checking Health¶
def get_unhealthy_pods(namespace="default"):
"""Find pods that aren't running happily."""
v1 = client.CoreV1Api()
pods = v1.list_namespaced_pod(namespace)
unhealthy = []
for pod in pods.items:
issues = []
# Check pod phase
if pod.status.phase not in ("Running", "Succeeded"):
issues.append(f"phase={pod.status.phase}")
# Check container statuses
if pod.status.container_statuses:
for cs in pod.status.container_statuses:
if cs.restart_count > 5:
issues.append(f"{cs.name}: {cs.restart_count} restarts")
if cs.state.waiting:
issues.append(f"{cs.name}: waiting ({cs.state.waiting.reason})")
if issues:
unhealthy.append({
"name": pod.metadata.name,
"namespace": pod.metadata.namespace,
"issues": issues,
})
return unhealthy
Reading Events — The Cluster's Diary¶
from datetime import datetime, timedelta, timezone
def get_recent_warnings(minutes=30):
"""Get warning events from the last N minutes."""
v1 = client.CoreV1Api()
events = v1.list_event_for_all_namespaces()
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
warnings = []
for event in events.items:
if event.type != "Warning":
continue
event_time = event.last_timestamp or event.event_time
if event_time and event_time.replace(tzinfo=timezone.utc) > cutoff:
warnings.append({
"namespace": event.metadata.namespace,
"object": event.involved_object.name,
"reason": event.reason,
"message": event.message,
"count": event.count or 1,
})
return warnings
Scaling a Deployment¶
def scale_deployment(name, namespace, replicas):
"""Scale a deployment to the desired replica count."""
apps_v1 = client.AppsV1Api()
body = {"spec": {"replicas": replicas}}
apps_v1.patch_namespaced_deployment_scale(
name=name,
namespace=namespace,
body=body,
)
print(f"Scaled {namespace}/{name} to {replicas} replicas")
Under the Hood: The Kubernetes Python client is auto-generated from the Kubernetes OpenAPI specification. Every resource and every API version has a corresponding class. This is why the method names are long —
list_namespaced_pod,patch_namespaced_deployment_scale— they mirror the REST API paths exactly. The upside: if you know the API path (GET /api/v1/namespaces/{ns}/pods), you can predict the method name.
Part 6: Querying Prometheus¶
Prometheus has an HTTP API. You don't need a special client library — just requests and
the PromQL query language.
Instant Queries¶
def query_prometheus(prom_url, query, timeout=10):
"""Run a PromQL query and return results."""
session = get_session()
response = session.get(
f"{prom_url}/api/v1/query",
params={"query": query},
timeout=timeout,
)
response.raise_for_status()
data = response.json()
if data["status"] != "success":
raise RuntimeError(f"Prometheus query failed: {data.get('error', 'unknown')}")
return data["data"]["result"]
Practical Queries for Your Morning Check¶
prom_url = "http://prometheus.internal:9090"
# Error rate across all services (last 5 minutes)
error_rate = query_prometheus(
prom_url,
'sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))',
)
# Nodes with high CPU (over 80%)
hot_nodes = query_prometheus(
prom_url,
'100 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80',
)
# Disk space predictions: disks that will fill in 24 hours
filling_disks = query_prometheus(
prom_url,
'predict_linear(node_filesystem_free_bytes{mountpoint="/"}[6h], 24*3600) < 0',
)
# Parse the results
for result in hot_nodes:
instance = result["metric"]["instance"]
cpu_pct = float(result["value"][1])
print(f" {instance}: {cpu_pct:.1f}% CPU")
Checking Alert Status¶
def get_firing_alerts(prom_url):
"""Get all currently firing alerts."""
session = get_session()
response = session.get(f"{prom_url}/api/v1/alerts", timeout=10)
response.raise_for_status()
data = response.json()
firing = []
for alert in data["data"]["alerts"]:
if alert["state"] == "firing":
firing.append({
"name": alert["labels"]["alertname"],
"severity": alert["labels"].get("severity", "unknown"),
"summary": alert["annotations"].get("summary", ""),
"since": alert["activeAt"],
})
return firing
Remember: Prometheus responses always have the structure
{"status": "success", "data": {"resultType": "...", "result": [...]}}. The actual metric values live insideresult[N]["value"][1](as strings — always convert withfloat()). Thevalue[0]is the Unix timestamp of the sample.
Part 7: AWS with boto3¶
boto3 is the AWS SDK for Python. It wraps every AWS API.
Session Management and the Credential Chain¶
import boto3
from botocore.exceptions import ClientError
# Default session — uses the standard credential chain
ec2 = boto3.client("ec2", region_name="us-east-1")
# Explicit profile (from ~/.aws/credentials)
session = boto3.Session(profile_name="prod-readonly")
ec2 = session.client("ec2")
s3 = session.client("s3")
The credential chain (checked in order):
- Explicit parameters (
aws_access_key_id=...) — never do this - Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - Shared credentials file (
~/.aws/credentials) - AWS config file (
~/.aws/config) - Container credentials (ECS task role)
- Instance metadata (EC2 instance profile / IAM role)
Gotcha: boto3 reads credentials lazily — it doesn't check them until you make your first API call. This means
boto3.client("ec2")always succeeds, even with bad credentials. The error hits when you callec2.describe_instances(). Always test your credentials early in the script:boto3.client("sts").get_caller_identity().
EC2: Instance Health for Your Morning Check¶
def get_instance_summary(region="us-east-1"):
"""Get a summary of EC2 instance states."""
ec2 = boto3.client("ec2", region_name=region)
paginator = ec2.get_paginator("describe_instances")
summary = {"running": 0, "stopped": 0, "terminated": 0, "other": 0}
problems = []
for page in paginator.paginate():
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
state = instance["State"]["Name"]
summary[state] = summary.get(state, 0) + 1
# Flag instances with failed status checks
instance_id = instance["InstanceId"]
# Check status for running instances
statuses = ec2.describe_instance_status(
Filters=[{"Name": "instance-status.status", "Values": ["impaired"]}]
)
for status in statuses.get("InstanceStatuses", []):
problems.append({
"instance_id": status["InstanceId"],
"status": status["InstanceStatus"]["Status"],
"system": status["SystemStatus"]["Status"],
})
return summary, problems
Trivia: boto3 (released 2015) is named after the Amazon river dolphin, the boto. The original library was just "boto" (by Mitch Garnaat at AWS, 2006), then "boto2." The name was chosen because boto dolphins are native to the Amazon — a playful reference to Amazon Web Services. boto3 averages over 1 billion downloads per month from PyPI, making it the most-used AWS SDK in any programming language.
Flashcard Check #3¶
| Question | Answer |
|---|---|
| What's the first thing boto3 checks for credentials? | Explicit parameters passed to the client constructor. Then environment variables, then ~/.aws/credentials, then config file, then container credentials, then EC2 instance metadata. |
Why must you use paginators for AWS list_* and describe_* calls? |
Most AWS APIs return at most 100–1000 items per page. Without pagination, you silently miss items beyond the first page. A script that works in dev (12 instances) breaks in prod (2000 instances). |
| What Prometheus API endpoint do you query for current alert status? | GET /api/v1/alerts returns all alerts with their current state (firing, pending, inactive). |
Part 8: Sending Slack Notifications¶
Your morning check needs to tell you the results. Slack incoming webhooks are the simplest integration — one POST request with a JSON body.
def send_slack_message(webhook_url, text):
"""Send a message to Slack via incoming webhook."""
response = requests.post(
webhook_url,
json={"text": text},
timeout=10,
)
response.raise_for_status()
# Usage
send_slack_message(
os.environ["SLACK_WEBHOOK_URL"],
"*Morning Check*\nK8s: 3 unhealthy pods\nAlerts: 1 firing\nAWS: 47 running, 12 stopped",
)
For richer formatting, Slack's Block Kit lets you build structured messages with sections,
headers, and markdown. The json payload takes a blocks key alongside text (which
serves as the plain-text fallback for notifications).
Gotcha: Slack webhook URLs are secrets. They allow anyone with the URL to post to your channel. Never hardcode them in scripts or commit them to git. Use environment variables:
os.environ["SLACK_WEBHOOK_URL"]. If the URL leaks, you can regenerate it in Slack's app settings — but you have to update every script that uses it.
Part 9: Retry Patterns — The tenacity Library¶
The urllib3.Retry on requests sessions handles HTTP retries. But what about retrying
any Python function — a boto3 call, a Kubernetes API call, a database connection?
tenacity: Retries for Everything¶
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from botocore.exceptions import ClientError
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((ClientError, ConnectionError)),
)
def get_instances():
ec2 = boto3.client("ec2")
return ec2.describe_instances()
| tenacity parameter | What it does | Example |
|---|---|---|
stop_after_attempt(3) |
Give up after 3 tries | 3 attempts total |
wait_exponential(multiplier=1, min=1, max=10) |
Wait 1s, 2s, 4s, 8s... capped at 10s | Exponential backoff with ceiling |
retry_if_exception_type(...) |
Only retry these exception types | Don't retry ValueError |
before_sleep=log_retry |
Call a function before each retry sleep | Log retry attempts |
Why Exponential Backoff Matters¶
Linear retry (1s, 1s, 1s):
Request → fail → 1s → Request → fail → 1s → Request → fail
If 100 scripts all retry at the same interval, the target gets
100 requests every second. This is a thundering herd.
Exponential backoff (1s, 2s, 4s):
Request → fail → 1s → Request → fail → 2s → Request → fail → 4s →
After a few retries, the load on the target drops. Clients naturally
spread out. Add jitter (randomness) and they spread further.
War Story (from the source material): A team had a monitoring script that checked 40 endpoints. It had retries — but with a fixed 1-second delay. During a brief network partition, all 40 checks failed simultaneously and retried in lockstep. The upstream load balancer, already struggling, received 40 retry requests at the exact same second, three times in a row. The retries made the outage worse. Switching to exponential backoff with jitter (
wait_exponential+wait_random) eliminated the thundering herd entirely.
Part 10: Structured Logging¶
print() is for debugging. Logging is for production scripts.
import logging
import sys
def setup_logging(verbosity=0):
"""Configure logging based on verbosity level."""
levels = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}
level = levels.get(verbosity, logging.DEBUG)
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
))
root = logging.getLogger()
root.setLevel(level)
root.addHandler(handler)
# Quiet noisy libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("kubernetes").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
return logging.getLogger("morning-check")
Why logging beats print:
| Feature | print() |
logging |
|---|---|---|
| Goes to | stdout | stderr (by default) — doesn't pollute piped output |
| Levels | None | DEBUG, INFO, WARNING, ERROR, CRITICAL |
| Timestamps | Manual | Automatic with formatter |
| Per-library control | No | Yes — silence urllib3 while keeping your logs |
| Output destination | stdout only | stderr, file, syslog, remote — all at once |
log = setup_logging(verbosity=1)
log.info("Starting morning check for clusters=%s", config["clusters"])
log.debug("Prometheus URL: %s", config["prometheus_url"])
log.warning("Cluster %s has %d unhealthy pods", cluster, len(unhealthy))
log.error("Failed to query Prometheus: %s", err)
Gotcha: Use
log.info("Processing %s", item), notlog.info(f"Processing {item}"). The %-style formatting is lazy — if the log level is disabled, the string is never formatted. With f-strings, Python formats the string even if the message is never logged. For hot loops logging thousands of debug messages, this matters.
Part 11: Error Handling for APIs¶
APIs fail in specific, predictable ways. Handle them specifically.
| Tier | What happened | HTTP codes | Example |
|---|---|---|---|
| Connection | Can't reach the API | N/A (no response) | Network down, DNS failure, timeout |
| Auth | Reached API, bad credentials | 401, 403 | Expired token, wrong IAM policy |
| Application | Authenticated, bad request | 404, 422, 429, 500 | Resource not found, rate limited |
from kubernetes.client.exceptions import ApiException
from botocore.exceptions import ClientError, BotoCoreError
# Kubernetes: ApiException carries .status (HTTP code) and .reason
try:
pods = v1.list_pod_for_all_namespaces(timeout_seconds=10)
except ApiException as e:
if e.status == 403:
log.error("RBAC denies this operation — check your ClusterRole")
else:
log.error("K8s API error: %d %s", e.status, e.reason)
# AWS: ClientError wraps the error code and message
try:
ec2.describe_instances()
except ClientError as e:
code = e.response["Error"]["Code"]
msg = e.response["Error"]["Message"]
log.error("AWS %s: %s", code, msg) # e.g. "UnauthorizedOperation: ..."
except BotoCoreError as e:
log.error("AWS connection error: %s", e) # DNS, timeout, SSL
Mental Model: Error handling for APIs has three tiers. Tier 1: connection errors (can't reach the API — network, DNS, timeout). Tier 2: auth errors (reached the API but credentials are bad — 401/403). Tier 3: application errors (authenticated but the request failed — 404/429/500). Handle each tier differently because the remediation is different: fix the network vs. fix the credentials vs. fix the request.
Part 12: Putting It Together — The Morning Check¶
Here's the skeleton that connects every piece from this lesson. Each function below is defined in the parts above — this is how they wire together.
#!/usr/bin/env python3
"""Morning infrastructure health check.
Queries Kubernetes, Prometheus, and AWS, then posts a summary to Slack.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
# Functions from earlier parts:
# get_session() — Part 1 (requests with retries)
# setup_logging() — Part 10 (structured logging)
# get_unhealthy_pods() — Part 5 (K8s client)
# get_firing_alerts() — Part 6 (Prometheus API)
# query_prometheus() — Part 6
# get_instance_summary() — Part 7 (boto3)
# send_slack_message() — Part 8 (Slack webhook)
# load_config() — Part 4 (config precedence)
def main():
parser = argparse.ArgumentParser(description="Morning infrastructure health check")
parser.add_argument("--cluster", "-c", action="append", help="K8s context (repeatable)")
parser.add_argument("--region", "-r", action="append", help="AWS region (repeatable)")
parser.add_argument("--prometheus-url",
default=os.environ.get("PROMETHEUS_URL", "http://prometheus.internal:9090"))
parser.add_argument("--slack-webhook", default=os.environ.get("SLACK_WEBHOOK_URL"))
parser.add_argument("--output", "-o", choices=["text", "json"], default="text")
parser.add_argument("--dry-run", "-n", action="store_true")
parser.add_argument("--verbose", "-v", action="count", default=0)
args = parser.parse_args()
log = setup_logging(args.verbose)
session = get_session()
clusters = args.cluster or ["prod-us-east-1"]
regions = args.region or ["us-east-1"]
# Run all checks
results = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"kubernetes": [],
"prometheus": {},
"aws": [],
}
for cluster in clusters:
log.info("Checking K8s cluster: %s", cluster)
try:
k8s_config.load_kube_config(context=cluster)
unhealthy = get_unhealthy_pods()
results["kubernetes"].append({"cluster": cluster, "unhealthy": unhealthy})
except Exception as e:
results["kubernetes"].append({"cluster": cluster, "error": str(e)})
log.info("Checking Prometheus")
results["prometheus"] = {
"firing_alerts": get_firing_alerts(args.prometheus_url),
"hot_nodes": query_prometheus(args.prometheus_url,
'100 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80'),
}
for region in regions:
log.info("Checking AWS %s", region)
summary, problems = get_instance_summary(region)
results["aws"].append({"region": region, "summary": summary, "problems": problems})
# Output
if args.output == "json":
print(json.dumps(results, indent=2, default=str))
else:
print(format_text_report(results))
# Slack
if args.slack_webhook and not args.dry_run:
send_slack_message(args.slack_webhook, format_text_report(results))
log.info("Posted summary to Slack")
if __name__ == "__main__":
main()
Run it:
# Basic check with text output
python3 morning-check.py -v
# Check multiple clusters, JSON output
python3 morning-check.py -c prod-us-east-1 -c prod-eu-west-1 -o json
# Full run with Slack posting
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/T00/B00/xxxx"
python3 morning-check.py -c prod-us-east-1 -r us-east-1 -r eu-west-1 -v
# Dry run to test without posting
python3 morning-check.py --dry-run -vv
Exercises¶
Exercise 1: Add a Prometheus Query (5 minutes)¶
Add a check for disk space predictions to the check_prometheus function. Use this PromQL:
Parse the results and include them in the output.
Hint
Follow the same pattern as the `hot_nodes` query. Query the `/api/v1/query` endpoint, parse `data.result`, extract `metric.instance` and the predicted value.Exercise 2: Add Error Counting (10 minutes)¶
Add a function that queries Prometheus for the 5xx error rate across all services:
Flag any service where the 5xx rate exceeds 0.1 requests/second.
Exercise 3: Build a Config File (15 minutes)¶
Create a YAML config file format for the morning check:
clusters:
- context: prod-us-east-1
critical: true
- context: staging-us-east-1
critical: false
prometheus:
url: http://prometheus.internal:9090
alert_severity_threshold: warning
aws:
regions: [us-east-1, eu-west-1]
profile: prod-readonly
slack:
webhook_env: SLACK_WEBHOOK_URL
channel: "#ops-morning"
Modify main() to load this config and merge it with CLI args (CLI wins).
Exercise 4: Add Concurrent Checks (20 minutes)¶
The script currently checks clusters sequentially. Use concurrent.futures.ThreadPoolExecutor
to check all clusters, Prometheus, and AWS regions in parallel.
Hint
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=10) as executor:
k8s_futures = {executor.submit(check_kubernetes, c, log): c for c in clusters}
aws_futures = {executor.submit(check_aws, r, log): r for r in regions}
prom_future = executor.submit(check_prometheus, args.prometheus_url, session, log)
# Collect results as they complete
k8s_results = [f.result() for f in as_completed(k8s_futures)]
Cheat Sheet¶
| Task | Code |
|---|---|
| HTTP GET with retries | session = get_session(); session.get(url, timeout=10) |
| Parse JSON response | data = response.json() |
| Query nested JSON | jmespath.search("data.result[].metric.instance", data) |
| Read YAML safely | yaml.safe_load(open("file.yaml")) |
| Prometheus instant query | GET /api/v1/query?query=up |
| Prometheus firing alerts | GET /api/v1/alerts |
| K8s list pods | v1.list_namespaced_pod(namespace) |
| K8s all-namespace events | v1.list_event_for_all_namespaces() |
| boto3 describe instances | ec2.describe_instances() — use paginator! |
| boto3 credential test | boto3.client("sts").get_caller_identity() |
| Slack webhook | requests.post(webhook_url, json={"text": msg}) |
| Retry any function | @retry(stop=stop_after_attempt(3), wait=wait_exponential()) |
| Structured logging | logging.basicConfig(level=INFO, format="%(asctime)s ...") |
| Quiet noisy libraries | logging.getLogger("urllib3").setLevel(WARNING) |
| Config precedence | CLI flags > env vars > config file > defaults |
| argparse subcommands | subparsers = parser.add_subparsers(dest="command") |
| Safe YAML string | Always quote values that look like booleans or numbers |
Takeaways¶
-
Every infrastructure API is HTTP. Learn
requestswith sessions and retries, and you can talk to anything — Kubernetes, Prometheus, AWS, Slack, GitHub, PagerDuty. -
Retry with exponential backoff is not optional. Scripts without retries generate false alerts. Scripts with fixed-interval retries cause thundering herds. Use
urllib3.Retryfor HTTP ortenacityfor everything else. -
The kubernetes Python client beats kubectl for automation. Typed objects, watch streams, and proper error handling. Use kubectl for interactive work, use the client for scripts.
-
boto3 pagination is the most common AWS scripting bug. If your script works in dev but misses resources in prod, check for missing paginators. Every
list_*anddescribe_*call needs one. -
Configuration has a precedence stack. CLI flags > env vars > config file > defaults. This matches every serious infrastructure tool and your users will expect it.
-
yaml.safe_load(), always. The unsafe version executes arbitrary code. This is not theoretical — it's a known attack vector with real CVEs.
Related Lessons¶
- Python for Ops — The Bash Expert's Bridge — start here if you're coming from pure Bash
- Prometheus and the Art of Not Alerting — the alerting philosophy behind the metrics you're querying
- Kubernetes Debugging: When Pods Won't Behave — deeper dive into K8s troubleshooting
- AWS IAM — The Permissions Puzzle — understanding the credential chain and IAM policies
- Why YAML Keeps Breaking Your Deploys — more on YAML's type surprises