Skip to content

Python: Automating Everything — APIs and Infrastructure

  • lesson
  • python
  • requests
  • boto3
  • kubernetes-client
  • prometheus-api
  • json/yaml
  • argparse
  • logging
  • retries
  • slack-webhooks
  • cli-tools ---# Python — Automating Everything: APIs and Infrastructure

Topics: Python, requests, boto3, kubernetes-client, Prometheus API, JSON/YAML, argparse, logging, retries, Slack webhooks, CLI tools Strategy: Build-up + incident-driven Level: L1–L2 (Operations → Applied) Time: 90–120 minutes Prerequisites: None required (but you'll move faster if you've read Python for Ops — The Bash Expert's Bridge)


The Mission

It's Monday morning. You're a platform engineer responsible for three Kubernetes clusters, a Prometheus stack, and a fleet of AWS resources. Every morning, you open six browser tabs: Grafana, the K8s dashboard, the AWS console, Slack, your email, PagerDuty. You click around for 15 minutes assembling a mental picture: Are the clusters healthy? Any pods crashlooping? Any alerts firing? Any EC2 instances in weird states? Any S3 buckets growing suspiciously?

Today you're going to replace those six tabs with one Python script.

Your mission: build a morning check tool that queries Kubernetes, Prometheus, and AWS, then posts a formatted summary to Slack. Along the way, you'll learn every library and pattern you need to automate real infrastructure with Python.


Part 1: Talking to APIs — requests Deep Dive

Every infrastructure API speaks HTTP. Kubernetes, Prometheus, AWS, Slack, PagerDuty, Grafana, GitHub — all of them. The requests library is how Python talks HTTP.

The Basics

import requests

# GET — retrieve data
response = requests.get("https://httpbin.org/get", timeout=10)
print(response.status_code)   # 200
print(response.json())        # parsed JSON as a Python dict

# POST — send data
response = requests.post(
    "https://httpbin.org/post",
    json={"hostname": "web-prod-01", "status": "healthy"},
    timeout=10,
)

# PUT — replace a resource
# DELETE — remove a resource
# PATCH — partial update

That timeout=10 is not optional. Without it, your script blocks forever if the server doesn't respond.

Gotcha: timeout=10 sets a single timeout for both connection and read. For production scripts, split them: timeout=(5, 30) means "5 seconds to establish the connection, 30 seconds to read the response." An API that takes 20 seconds to compute a heavy query is fine — an API that takes 20 seconds to accept a TCP connection is down.

Headers, Auth, and Tokens

# Bearer token auth (most modern APIs)
headers = {
    "Authorization": "Bearer eyJhbGciOiJSUzI1NiIs...",
    "Accept": "application/json",
}
response = requests.get(
    "https://api.example.com/v1/clusters",
    headers=headers,
    timeout=10,
)

# Basic auth (legacy APIs, some internal tools)
response = requests.get(
    "https://prometheus.internal:9090/api/v1/query",
    auth=("admin", "prom-readonly-token"),
    timeout=10,
)

Sessions: Stop Repeating Yourself

If you're making multiple calls to the same API, use a session. It reuses TCP connections (faster), persists headers (cleaner), and enables retry logic (safer).

session = requests.Session()
session.headers.update({
    "Authorization": f"Bearer {token}",
    "Accept": "application/json",
})

# Now every call through this session includes those headers
clusters = session.get("https://api.example.com/v1/clusters", timeout=10)
nodes = session.get("https://api.example.com/v1/nodes", timeout=10)

Under the Hood: HTTP/1.1 supports persistent connections (keep-alive). A requests.Session holds a urllib3 connection pool that reuses TCP sockets across calls to the same host. Without a session, each requests.get() opens a new TCP connection, does the TLS handshake (for HTTPS), sends the request, and tears down the connection. With a session hitting the same host 20 times, you do 1 TLS handshake instead of 20. For internal APIs over a VPN, this cuts latency dramatically.

Retry Logic: The Non-Negotiable

APIs fail. Networks have blips. Load balancers return 502 during deploys. Your script needs to handle this gracefully, not crash at 6:01 AM.

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def get_session(retries=3, backoff_factor=0.5, timeout=10):
    """Create a requests session with automatic retries."""
    session = requests.Session()
    retry = Retry(
        total=retries,
        backoff_factor=backoff_factor,     # 0.5s, 1s, 2s between retries
        status_forcelist=[500, 502, 503, 504],
        allowed_methods=["GET", "HEAD"],   # Only retry safe methods
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session
Parameter What it does Example
total=3 Maximum number of retries 3 retries = 4 total attempts
backoff_factor=0.5 Wait time multiplier: {factor} * 2^(attempt-1) 0.5s, 1s, 2s
status_forcelist HTTP codes that trigger a retry 500, 502, 503, 504
allowed_methods Only retry these HTTP methods GET and HEAD (safe to retry)

War Story: A monitoring script at a mid-size SaaS company ran every 60 seconds and checked 40 internal endpoints. It had no retry logic. During a routine deploy that caused 3 seconds of 502s from the load balancer, the script fired 40 "service down" alerts to Slack and paged the on-call engineer at 3 AM. The engineer woke up, checked the dashboards, and everything was green — the deploy had finished 57 seconds before they opened their laptop. After that, the team added retries with a 2-second backoff. The false alert rate dropped by 90%. The remaining 10% were real.


Flashcard Check #1

Question Answer
What happens if you call requests.get(url) with no timeout? The call blocks indefinitely if the server never responds. Your script hangs, and if it's a cron job, the next invocation starts while the first is still stuck.
What's the difference between timeout=10 and timeout=(5, 30)? Single value sets both connect and read timeout to 10s. Tuple sets connect timeout to 5s and read timeout to 30s separately.
Why should you only auto-retry GET and HEAD, not POST? GET and HEAD are idempotent — repeating them has the same effect. A retried POST might create a duplicate resource.

Part 2: JSON — The Language Infrastructure Speaks

Every API returns JSON. You need to be fast at parsing it, querying nested structures, and extracting the three numbers you actually care about from a 500-line response.

Parsing Responses

import json

# From an API response
response = session.get("https://prometheus.internal:9090/api/v1/targets", timeout=10)
data = response.json()  # dict

# From a file
with open("response.json") as f:
    data = json.load(f)

# From a string
data = json.loads('{"status": "success", "data": {"resultType": "vector"}}')

Kubernetes and AWS responses are deeply nested. You need a strategy.

# A real Kubernetes pod response (simplified)
pod = {
    "metadata": {
        "name": "web-api-7d8f9c6b4-xk2p9",
        "namespace": "production",
        "labels": {"app": "web-api", "version": "v2.3.1"},
    },
    "status": {
        "phase": "Running",
        "containerStatuses": [
            {
                "name": "web-api",
                "ready": True,
                "restartCount": 0,
                "state": {"running": {"startedAt": "2026-03-23T04:12:00Z"}},
            },
            {
                "name": "envoy-sidecar",
                "ready": True,
                "restartCount": 3,
                "state": {"running": {"startedAt": "2026-03-23T06:45:00Z"}},
            },
        ],
    },
}

# Extracting what you need
name = pod["metadata"]["name"]
namespace = pod["metadata"]["namespace"]
phase = pod["status"]["phase"]

# Safe access for optional fields (use .get())
node = pod["status"].get("hostIP", "unknown")

# Iterate container statuses
for container in pod["status"]["containerStatuses"]:
    if container["restartCount"] > 2:
        print(f"  WARNING: {container['name']} has restarted {container['restartCount']} times")

jmespath: jq for Python

When you're tired of writing three levels of dict access and list comprehensions, jmespath gives you jq-style queries in Python.

import jmespath

# Instead of:
names = [c["name"] for c in pod["status"]["containerStatuses"] if c["restartCount"] > 0]

# You can write:
names = jmespath.search("status.containerStatuses[?restartCount > `0`].name", pod)

# Query Prometheus results
prom_response = {
    "data": {
        "result": [
            {"metric": {"instance": "web-01:9090"}, "value": [1711152000, "0.85"]},
            {"metric": {"instance": "web-02:9090"}, "value": [1711152000, "0.42"]},
        ]
    }
}

instances = jmespath.search("data.result[].metric.instance", prom_response)
# ['web-01:9090', 'web-02:9090']

Trivia: jmespath (pronounced "James path") was created by James Saryerwinnie while working on the AWS CLI. It's the query language that powers --query in the aws CLI: aws ec2 describe-instances --query 'Reservations[].Instances[].InstanceId'. The Python library is a direct port of the same specification. If you already know --query syntax from the AWS CLI, you already know jmespath.


Part 3: Working with YAML — Reading the Configs That Run Everything

Kubernetes manifests. Ansible inventories. Helm values. Docker Compose files. YAML is everywhere in infrastructure, and Python reads it natively with PyYAML.

import yaml

# Read a Kubernetes deployment manifest
with open("deployment.yaml") as f:
    manifest = yaml.safe_load(f)

# Access fields just like JSON
replicas = manifest["spec"]["replicas"]
image = manifest["spec"]["template"]["spec"]["containers"][0]["image"]
print(f"Deployment: {manifest['metadata']['name']}, replicas: {replicas}, image: {image}")

# Read a multi-document YAML file (multiple --- separated docs)
with open("all-resources.yaml") as f:
    for doc in yaml.safe_load_all(f):
        if doc is None:
            continue
        kind = doc.get("kind", "unknown")
        name = doc.get("metadata", {}).get("name", "unnamed")
        print(f"  {kind}: {name}")

Gotcha: Always use yaml.safe_load(), never yaml.load(). The unsafe version can execute arbitrary Python code embedded in the YAML. This is not a theoretical risk — it's a known attack vector. If you see yaml.load(f) without a Loader argument in a codebase, that's a security bug. PyYAML will even warn you about it since version 5.1.

YAML's Type Surprises

# YAML has automatic type inference that bites people
import yaml

# These are all valid YAML — and none of them are strings
gotchas = yaml.safe_load("""
norway_code: NO       # boolean False (YAML 1.1)
version: 1.10         # float 1.1 (trailing zero dropped)
timestamp: 2026-03-23 # datetime object
port: 8080            # integer
octal_trap: 0777      # integer 511 (octal in YAML 1.1)
""")

print(type(gotchas["port"]))        # <class 'int'> — fine
print(type(gotchas["version"]))     # <class 'float'> — 1.1, not "1.10"!
print(gotchas["norway_code"])       # False — not the string "NO"

Trivia: The "Norway problem" is famous in YAML circles. Country codes like NO (Norway), FR (could be parsed as truthy in some contexts), and version strings like 1.10 being truncated to 1.1 have caused real deployment failures. This is one reason Helm and Kubernetes recommend quoting string values that could be misinterpreted. The YAML 1.2 spec (2009) fixed the boolean parsing, but PyYAML still implements YAML 1.1.


Part 4: Building CLI Tools — argparse Patterns

Your morning check script needs options: which clusters to check, verbosity, output format, whether to actually post to Slack or just dry-run. argparse is in the standard library and handles all of this.

The Pattern That Works

import argparse
import os

def parse_args():
    parser = argparse.ArgumentParser(
        description="Morning infrastructure health check",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s --cluster prod-us-east-1 --cluster prod-eu-west-1
  %(prog)s --dry-run --verbose
  %(prog)s --output json > morning-report.json
        """,
    )

    parser.add_argument(
        "--cluster", "-c",
        action="append",
        default=None,
        help="Kubernetes cluster to check (repeatable, default: all)",
    )
    parser.add_argument(
        "--output", "-o",
        choices=["text", "json", "slack"],
        default="text",
        help="Output format (default: text)",
    )
    parser.add_argument(
        "--dry-run", "-n",
        action="store_true",
        help="Show what would be sent without posting to Slack",
    )
    parser.add_argument(
        "--verbose", "-v",
        action="count",
        default=0,
        help="Increase verbosity (-v info, -vv debug)",
    )
    parser.add_argument(
        "--timeout",
        type=int,
        default=int(os.environ.get("MORNING_CHECK_TIMEOUT", "30")),
        help="API timeout in seconds (default: 30, env: MORNING_CHECK_TIMEOUT)",
    )

    return parser.parse_args()

Configuration Precedence

Real tools need configuration from multiple sources. Here's the pattern that matches how every serious CLI tool (kubectl, aws, terraform) works:

CLI flags  →  override  →  Environment variables  →  override  →  Config file  →  override  →  Defaults
(highest)                                                                                     (lowest)
import os
import yaml
from pathlib import Path

def load_config(args):
    """Load config with CLI > env > file > defaults precedence."""
    # Defaults
    config = {
        "prometheus_url": "http://prometheus.internal:9090",
        "slack_webhook": None,
        "timeout": 30,
        "clusters": ["prod-us-east-1"],
    }

    # Config file (override defaults)
    config_path = Path.home() / ".config" / "morning-check" / "config.yaml"
    if config_path.exists():
        with open(config_path) as f:
            file_config = yaml.safe_load(f) or {}
        config.update({k: v for k, v in file_config.items() if v is not None})

    # Environment variables (override file)
    env_map = {
        "PROMETHEUS_URL": "prometheus_url",
        "SLACK_WEBHOOK_URL": "slack_webhook",
        "MORNING_CHECK_TIMEOUT": "timeout",
    }
    for env_key, config_key in env_map.items():
        val = os.environ.get(env_key)
        if val is not None:
            config[config_key] = val

    # CLI args (override everything)
    if args.timeout:
        config["timeout"] = args.timeout
    if args.cluster:
        config["clusters"] = args.cluster

    return config

Mental Model: Think of configuration as layers in a stack. Each layer can override the one below it. Defaults are the foundation. The config file is the team's baseline. Environment variables let the deploy environment customize behavior. CLI flags let the operator override everything in the moment. This is the same model kubectl uses: cluster defaults < kubeconfig < KUBECONFIG env var < --kubeconfig flag.


Flashcard Check #2

Question Answer
What does yaml.safe_load() protect you from that yaml.load() doesn't? safe_load() only deserializes basic Python types. yaml.load() can instantiate arbitrary Python objects, enabling code execution from untrusted YAML files.
In what order should a CLI tool check configuration sources? CLI flags > environment variables > config file > defaults. Most specific wins. This matches kubectl, aws CLI, terraform, and most serious tools.
Why does jmespath.search("status.containerStatuses[?restartCount >0].name", pod) use backticks around 0? In jmespath, backticks denote literal values. Without them, 0 would be interpreted as a field name. The backticks tell jmespath "this is the number zero, not a key called 0."

Part 5: The Kubernetes Python Client

You could kubectl get pods -o json via subprocess. But the Kubernetes Python client gives you typed objects, watch streams, and proper error handling.

When to Use the Client vs. kubectl

Use the Python client Use kubectl (via subprocess)
Parsing pod status, events, metrics Quick one-off: kubectl apply -f manifest.yaml
Watching for changes (event stream) Interactive debugging: kubectl exec, kubectl logs
Building tools that query multiple resources When the Python client's API mapping is unclear
Automated scaling, patching, rollouts When you need kubectl plugins

Setup and Authentication

from kubernetes import client, config

# Inside a pod (uses the mounted service account)
config.load_incluster_config()

# From your laptop (uses ~/.kube/config)
config.load_kube_config()

# Specific context from kubeconfig
config.load_kube_config(context="prod-us-east-1")

# Create API clients
v1 = client.CoreV1Api()       # pods, services, nodes, events
apps_v1 = client.AppsV1Api()  # deployments, statefulsets, daemonsets

Listing Pods and Checking Health

def get_unhealthy_pods(namespace="default"):
    """Find pods that aren't running happily."""
    v1 = client.CoreV1Api()
    pods = v1.list_namespaced_pod(namespace)
    unhealthy = []

    for pod in pods.items:
        issues = []

        # Check pod phase
        if pod.status.phase not in ("Running", "Succeeded"):
            issues.append(f"phase={pod.status.phase}")

        # Check container statuses
        if pod.status.container_statuses:
            for cs in pod.status.container_statuses:
                if cs.restart_count > 5:
                    issues.append(f"{cs.name}: {cs.restart_count} restarts")
                if cs.state.waiting:
                    issues.append(f"{cs.name}: waiting ({cs.state.waiting.reason})")

        if issues:
            unhealthy.append({
                "name": pod.metadata.name,
                "namespace": pod.metadata.namespace,
                "issues": issues,
            })

    return unhealthy

Reading Events — The Cluster's Diary

from datetime import datetime, timedelta, timezone

def get_recent_warnings(minutes=30):
    """Get warning events from the last N minutes."""
    v1 = client.CoreV1Api()
    events = v1.list_event_for_all_namespaces()

    cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
    warnings = []

    for event in events.items:
        if event.type != "Warning":
            continue
        event_time = event.last_timestamp or event.event_time
        if event_time and event_time.replace(tzinfo=timezone.utc) > cutoff:
            warnings.append({
                "namespace": event.metadata.namespace,
                "object": event.involved_object.name,
                "reason": event.reason,
                "message": event.message,
                "count": event.count or 1,
            })

    return warnings

Scaling a Deployment

def scale_deployment(name, namespace, replicas):
    """Scale a deployment to the desired replica count."""
    apps_v1 = client.AppsV1Api()
    body = {"spec": {"replicas": replicas}}
    apps_v1.patch_namespaced_deployment_scale(
        name=name,
        namespace=namespace,
        body=body,
    )
    print(f"Scaled {namespace}/{name} to {replicas} replicas")

Under the Hood: The Kubernetes Python client is auto-generated from the Kubernetes OpenAPI specification. Every resource and every API version has a corresponding class. This is why the method names are long — list_namespaced_pod, patch_namespaced_deployment_scale — they mirror the REST API paths exactly. The upside: if you know the API path (GET /api/v1/namespaces/{ns}/pods), you can predict the method name.


Part 6: Querying Prometheus

Prometheus has an HTTP API. You don't need a special client library — just requests and the PromQL query language.

Instant Queries

def query_prometheus(prom_url, query, timeout=10):
    """Run a PromQL query and return results."""
    session = get_session()
    response = session.get(
        f"{prom_url}/api/v1/query",
        params={"query": query},
        timeout=timeout,
    )
    response.raise_for_status()
    data = response.json()

    if data["status"] != "success":
        raise RuntimeError(f"Prometheus query failed: {data.get('error', 'unknown')}")

    return data["data"]["result"]

Practical Queries for Your Morning Check

prom_url = "http://prometheus.internal:9090"

# Error rate across all services (last 5 minutes)
error_rate = query_prometheus(
    prom_url,
    'sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))',
)

# Nodes with high CPU (over 80%)
hot_nodes = query_prometheus(
    prom_url,
    '100 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80',
)

# Disk space predictions: disks that will fill in 24 hours
filling_disks = query_prometheus(
    prom_url,
    'predict_linear(node_filesystem_free_bytes{mountpoint="/"}[6h], 24*3600) < 0',
)

# Parse the results
for result in hot_nodes:
    instance = result["metric"]["instance"]
    cpu_pct = float(result["value"][1])
    print(f"  {instance}: {cpu_pct:.1f}% CPU")

Checking Alert Status

def get_firing_alerts(prom_url):
    """Get all currently firing alerts."""
    session = get_session()
    response = session.get(f"{prom_url}/api/v1/alerts", timeout=10)
    response.raise_for_status()
    data = response.json()

    firing = []
    for alert in data["data"]["alerts"]:
        if alert["state"] == "firing":
            firing.append({
                "name": alert["labels"]["alertname"],
                "severity": alert["labels"].get("severity", "unknown"),
                "summary": alert["annotations"].get("summary", ""),
                "since": alert["activeAt"],
            })

    return firing

Remember: Prometheus responses always have the structure {"status": "success", "data": {"resultType": "...", "result": [...]}}. The actual metric values live inside result[N]["value"][1] (as strings — always convert with float()). The value[0] is the Unix timestamp of the sample.


Part 7: AWS with boto3

boto3 is the AWS SDK for Python. It wraps every AWS API.

Session Management and the Credential Chain

import boto3
from botocore.exceptions import ClientError

# Default session — uses the standard credential chain
ec2 = boto3.client("ec2", region_name="us-east-1")

# Explicit profile (from ~/.aws/credentials)
session = boto3.Session(profile_name="prod-readonly")
ec2 = session.client("ec2")
s3 = session.client("s3")

The credential chain (checked in order):

  1. Explicit parameters (aws_access_key_id=...) — never do this
  2. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  3. Shared credentials file (~/.aws/credentials)
  4. AWS config file (~/.aws/config)
  5. Container credentials (ECS task role)
  6. Instance metadata (EC2 instance profile / IAM role)

Gotcha: boto3 reads credentials lazily — it doesn't check them until you make your first API call. This means boto3.client("ec2") always succeeds, even with bad credentials. The error hits when you call ec2.describe_instances(). Always test your credentials early in the script: boto3.client("sts").get_caller_identity().

EC2: Instance Health for Your Morning Check

def get_instance_summary(region="us-east-1"):
    """Get a summary of EC2 instance states."""
    ec2 = boto3.client("ec2", region_name=region)
    paginator = ec2.get_paginator("describe_instances")

    summary = {"running": 0, "stopped": 0, "terminated": 0, "other": 0}
    problems = []

    for page in paginator.paginate():
        for reservation in page["Reservations"]:
            for instance in reservation["Instances"]:
                state = instance["State"]["Name"]
                summary[state] = summary.get(state, 0) + 1

                # Flag instances with failed status checks
                instance_id = instance["InstanceId"]

    # Check status for running instances
    statuses = ec2.describe_instance_status(
        Filters=[{"Name": "instance-status.status", "Values": ["impaired"]}]
    )
    for status in statuses.get("InstanceStatuses", []):
        problems.append({
            "instance_id": status["InstanceId"],
            "status": status["InstanceStatus"]["Status"],
            "system": status["SystemStatus"]["Status"],
        })

    return summary, problems

Trivia: boto3 (released 2015) is named after the Amazon river dolphin, the boto. The original library was just "boto" (by Mitch Garnaat at AWS, 2006), then "boto2." The name was chosen because boto dolphins are native to the Amazon — a playful reference to Amazon Web Services. boto3 averages over 1 billion downloads per month from PyPI, making it the most-used AWS SDK in any programming language.


Flashcard Check #3

Question Answer
What's the first thing boto3 checks for credentials? Explicit parameters passed to the client constructor. Then environment variables, then ~/.aws/credentials, then config file, then container credentials, then EC2 instance metadata.
Why must you use paginators for AWS list_* and describe_* calls? Most AWS APIs return at most 100–1000 items per page. Without pagination, you silently miss items beyond the first page. A script that works in dev (12 instances) breaks in prod (2000 instances).
What Prometheus API endpoint do you query for current alert status? GET /api/v1/alerts returns all alerts with their current state (firing, pending, inactive).

Part 8: Sending Slack Notifications

Your morning check needs to tell you the results. Slack incoming webhooks are the simplest integration — one POST request with a JSON body.

def send_slack_message(webhook_url, text):
    """Send a message to Slack via incoming webhook."""
    response = requests.post(
        webhook_url,
        json={"text": text},
        timeout=10,
    )
    response.raise_for_status()

# Usage
send_slack_message(
    os.environ["SLACK_WEBHOOK_URL"],
    "*Morning Check*\nK8s: 3 unhealthy pods\nAlerts: 1 firing\nAWS: 47 running, 12 stopped",
)

For richer formatting, Slack's Block Kit lets you build structured messages with sections, headers, and markdown. The json payload takes a blocks key alongside text (which serves as the plain-text fallback for notifications).

Gotcha: Slack webhook URLs are secrets. They allow anyone with the URL to post to your channel. Never hardcode them in scripts or commit them to git. Use environment variables: os.environ["SLACK_WEBHOOK_URL"]. If the URL leaks, you can regenerate it in Slack's app settings — but you have to update every script that uses it.


Part 9: Retry Patterns — The tenacity Library

The urllib3.Retry on requests sessions handles HTTP retries. But what about retrying any Python function — a boto3 call, a Kubernetes API call, a database connection?

tenacity: Retries for Everything

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from botocore.exceptions import ClientError

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type((ClientError, ConnectionError)),
)
def get_instances():
    ec2 = boto3.client("ec2")
    return ec2.describe_instances()
tenacity parameter What it does Example
stop_after_attempt(3) Give up after 3 tries 3 attempts total
wait_exponential(multiplier=1, min=1, max=10) Wait 1s, 2s, 4s, 8s... capped at 10s Exponential backoff with ceiling
retry_if_exception_type(...) Only retry these exception types Don't retry ValueError
before_sleep=log_retry Call a function before each retry sleep Log retry attempts

Why Exponential Backoff Matters

Linear retry (1s, 1s, 1s):
  Request → fail → 1s → Request → fail → 1s → Request → fail

  If 100 scripts all retry at the same interval, the target gets
  100 requests every second. This is a thundering herd.

Exponential backoff (1s, 2s, 4s):
  Request → fail → 1s → Request → fail → 2s → Request → fail → 4s →

  After a few retries, the load on the target drops. Clients naturally
  spread out. Add jitter (randomness) and they spread further.

War Story (from the source material): A team had a monitoring script that checked 40 endpoints. It had retries — but with a fixed 1-second delay. During a brief network partition, all 40 checks failed simultaneously and retried in lockstep. The upstream load balancer, already struggling, received 40 retry requests at the exact same second, three times in a row. The retries made the outage worse. Switching to exponential backoff with jitter (wait_exponential + wait_random) eliminated the thundering herd entirely.


Part 10: Structured Logging

print() is for debugging. Logging is for production scripts.

import logging
import sys

def setup_logging(verbosity=0):
    """Configure logging based on verbosity level."""
    levels = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}
    level = levels.get(verbosity, logging.DEBUG)

    handler = logging.StreamHandler(sys.stderr)
    handler.setFormatter(logging.Formatter(
        "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%S",
    ))

    root = logging.getLogger()
    root.setLevel(level)
    root.addHandler(handler)

    # Quiet noisy libraries
    logging.getLogger("urllib3").setLevel(logging.WARNING)
    logging.getLogger("kubernetes").setLevel(logging.WARNING)
    logging.getLogger("botocore").setLevel(logging.WARNING)

    return logging.getLogger("morning-check")

Why logging beats print:

Feature print() logging
Goes to stdout stderr (by default) — doesn't pollute piped output
Levels None DEBUG, INFO, WARNING, ERROR, CRITICAL
Timestamps Manual Automatic with formatter
Per-library control No Yes — silence urllib3 while keeping your logs
Output destination stdout only stderr, file, syslog, remote — all at once
log = setup_logging(verbosity=1)

log.info("Starting morning check for clusters=%s", config["clusters"])
log.debug("Prometheus URL: %s", config["prometheus_url"])
log.warning("Cluster %s has %d unhealthy pods", cluster, len(unhealthy))
log.error("Failed to query Prometheus: %s", err)

Gotcha: Use log.info("Processing %s", item), not log.info(f"Processing {item}"). The %-style formatting is lazy — if the log level is disabled, the string is never formatted. With f-strings, Python formats the string even if the message is never logged. For hot loops logging thousands of debug messages, this matters.


Part 11: Error Handling for APIs

APIs fail in specific, predictable ways. Handle them specifically.

Tier What happened HTTP codes Example
Connection Can't reach the API N/A (no response) Network down, DNS failure, timeout
Auth Reached API, bad credentials 401, 403 Expired token, wrong IAM policy
Application Authenticated, bad request 404, 422, 429, 500 Resource not found, rate limited
from kubernetes.client.exceptions import ApiException
from botocore.exceptions import ClientError, BotoCoreError

# Kubernetes: ApiException carries .status (HTTP code) and .reason
try:
    pods = v1.list_pod_for_all_namespaces(timeout_seconds=10)
except ApiException as e:
    if e.status == 403:
        log.error("RBAC denies this operation — check your ClusterRole")
    else:
        log.error("K8s API error: %d %s", e.status, e.reason)

# AWS: ClientError wraps the error code and message
try:
    ec2.describe_instances()
except ClientError as e:
    code = e.response["Error"]["Code"]
    msg = e.response["Error"]["Message"]
    log.error("AWS %s: %s", code, msg)  # e.g. "UnauthorizedOperation: ..."
except BotoCoreError as e:
    log.error("AWS connection error: %s", e)  # DNS, timeout, SSL

Mental Model: Error handling for APIs has three tiers. Tier 1: connection errors (can't reach the API — network, DNS, timeout). Tier 2: auth errors (reached the API but credentials are bad — 401/403). Tier 3: application errors (authenticated but the request failed — 404/429/500). Handle each tier differently because the remediation is different: fix the network vs. fix the credentials vs. fix the request.


Part 12: Putting It Together — The Morning Check

Here's the skeleton that connects every piece from this lesson. Each function below is defined in the parts above — this is how they wire together.

#!/usr/bin/env python3
"""Morning infrastructure health check.

Queries Kubernetes, Prometheus, and AWS, then posts a summary to Slack.
"""

import argparse
import json
import os
import sys
from datetime import datetime, timezone

# Functions from earlier parts:
#   get_session()          — Part 1 (requests with retries)
#   setup_logging()        — Part 10 (structured logging)
#   get_unhealthy_pods()   — Part 5 (K8s client)
#   get_firing_alerts()    — Part 6 (Prometheus API)
#   query_prometheus()     — Part 6
#   get_instance_summary() — Part 7 (boto3)
#   send_slack_message()   — Part 8 (Slack webhook)
#   load_config()          — Part 4 (config precedence)

def main():
    parser = argparse.ArgumentParser(description="Morning infrastructure health check")
    parser.add_argument("--cluster", "-c", action="append", help="K8s context (repeatable)")
    parser.add_argument("--region", "-r", action="append", help="AWS region (repeatable)")
    parser.add_argument("--prometheus-url",
                        default=os.environ.get("PROMETHEUS_URL", "http://prometheus.internal:9090"))
    parser.add_argument("--slack-webhook", default=os.environ.get("SLACK_WEBHOOK_URL"))
    parser.add_argument("--output", "-o", choices=["text", "json"], default="text")
    parser.add_argument("--dry-run", "-n", action="store_true")
    parser.add_argument("--verbose", "-v", action="count", default=0)
    args = parser.parse_args()

    log = setup_logging(args.verbose)
    session = get_session()
    clusters = args.cluster or ["prod-us-east-1"]
    regions = args.region or ["us-east-1"]

    # Run all checks
    results = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "kubernetes": [],
        "prometheus": {},
        "aws": [],
    }

    for cluster in clusters:
        log.info("Checking K8s cluster: %s", cluster)
        try:
            k8s_config.load_kube_config(context=cluster)
            unhealthy = get_unhealthy_pods()
            results["kubernetes"].append({"cluster": cluster, "unhealthy": unhealthy})
        except Exception as e:
            results["kubernetes"].append({"cluster": cluster, "error": str(e)})

    log.info("Checking Prometheus")
    results["prometheus"] = {
        "firing_alerts": get_firing_alerts(args.prometheus_url),
        "hot_nodes": query_prometheus(args.prometheus_url,
            '100 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80'),
    }

    for region in regions:
        log.info("Checking AWS %s", region)
        summary, problems = get_instance_summary(region)
        results["aws"].append({"region": region, "summary": summary, "problems": problems})

    # Output
    if args.output == "json":
        print(json.dumps(results, indent=2, default=str))
    else:
        print(format_text_report(results))

    # Slack
    if args.slack_webhook and not args.dry_run:
        send_slack_message(args.slack_webhook, format_text_report(results))
        log.info("Posted summary to Slack")

if __name__ == "__main__":
    main()

Run it:

# Basic check with text output
python3 morning-check.py -v

# Check multiple clusters, JSON output
python3 morning-check.py -c prod-us-east-1 -c prod-eu-west-1 -o json

# Full run with Slack posting
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/T00/B00/xxxx"
python3 morning-check.py -c prod-us-east-1 -r us-east-1 -r eu-west-1 -v

# Dry run to test without posting
python3 morning-check.py --dry-run -vv

Exercises

Exercise 1: Add a Prometheus Query (5 minutes)

Add a check for disk space predictions to the check_prometheus function. Use this PromQL:

predict_linear(node_filesystem_free_bytes{mountpoint="/"}[6h], 24*3600) < 0

Parse the results and include them in the output.

Hint Follow the same pattern as the `hot_nodes` query. Query the `/api/v1/query` endpoint, parse `data.result`, extract `metric.instance` and the predicted value.

Exercise 2: Add Error Counting (10 minutes)

Add a function that queries Prometheus for the 5xx error rate across all services:

sum by(service) (rate(http_requests_total{status=~"5.."}[5m]))

Flag any service where the 5xx rate exceeds 0.1 requests/second.

Exercise 3: Build a Config File (15 minutes)

Create a YAML config file format for the morning check:

clusters:
  - context: prod-us-east-1
    critical: true
  - context: staging-us-east-1
    critical: false

prometheus:
  url: http://prometheus.internal:9090
  alert_severity_threshold: warning

aws:
  regions: [us-east-1, eu-west-1]
  profile: prod-readonly

slack:
  webhook_env: SLACK_WEBHOOK_URL
  channel: "#ops-morning"

Modify main() to load this config and merge it with CLI args (CLI wins).

Exercise 4: Add Concurrent Checks (20 minutes)

The script currently checks clusters sequentially. Use concurrent.futures.ThreadPoolExecutor to check all clusters, Prometheus, and AWS regions in parallel.

Hint
from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor(max_workers=10) as executor:
    k8s_futures = {executor.submit(check_kubernetes, c, log): c for c in clusters}
    aws_futures = {executor.submit(check_aws, r, log): r for r in regions}
    prom_future = executor.submit(check_prometheus, args.prometheus_url, session, log)

    # Collect results as they complete
    k8s_results = [f.result() for f in as_completed(k8s_futures)]

Cheat Sheet

Task Code
HTTP GET with retries session = get_session(); session.get(url, timeout=10)
Parse JSON response data = response.json()
Query nested JSON jmespath.search("data.result[].metric.instance", data)
Read YAML safely yaml.safe_load(open("file.yaml"))
Prometheus instant query GET /api/v1/query?query=up
Prometheus firing alerts GET /api/v1/alerts
K8s list pods v1.list_namespaced_pod(namespace)
K8s all-namespace events v1.list_event_for_all_namespaces()
boto3 describe instances ec2.describe_instances() — use paginator!
boto3 credential test boto3.client("sts").get_caller_identity()
Slack webhook requests.post(webhook_url, json={"text": msg})
Retry any function @retry(stop=stop_after_attempt(3), wait=wait_exponential())
Structured logging logging.basicConfig(level=INFO, format="%(asctime)s ...")
Quiet noisy libraries logging.getLogger("urllib3").setLevel(WARNING)
Config precedence CLI flags > env vars > config file > defaults
argparse subcommands subparsers = parser.add_subparsers(dest="command")
Safe YAML string Always quote values that look like booleans or numbers

Takeaways

  • Every infrastructure API is HTTP. Learn requests with sessions and retries, and you can talk to anything — Kubernetes, Prometheus, AWS, Slack, GitHub, PagerDuty.

  • Retry with exponential backoff is not optional. Scripts without retries generate false alerts. Scripts with fixed-interval retries cause thundering herds. Use urllib3.Retry for HTTP or tenacity for everything else.

  • The kubernetes Python client beats kubectl for automation. Typed objects, watch streams, and proper error handling. Use kubectl for interactive work, use the client for scripts.

  • boto3 pagination is the most common AWS scripting bug. If your script works in dev but misses resources in prod, check for missing paginators. Every list_* and describe_* call needs one.

  • Configuration has a precedence stack. CLI flags > env vars > config file > defaults. This matches every serious infrastructure tool and your users will expect it.

  • yaml.safe_load(), always. The unsafe version executes arbitrary code. This is not theoretical — it's a known attack vector with real CVEs.