Skip to content

Python for Infrastructure - Street-Level Ops

What experienced infra engineers know about writing Python that runs in production, not just in a notebook.

Quick Diagnosis Commands

# Check Python version and location
python3 --version
which python3

# Check if a module is installed
python3 -c "import boto3; print(boto3.__version__)"
python3 -c "import paramiko; print(paramiko.__version__)"

# Quick pip freeze (what's installed)
pip3 list --format=columns 2>/dev/null | head -20

# Test AWS credentials
python3 -c "import boto3; print(boto3.client('sts').get_caller_identity()['Arn'])"

# Debug import errors
python3 -v -c "import mymodule" 2>&1 | tail -20

# Check for syntax errors without running
python3 -m py_compile myscript.py

# Profile a slow script
python3 -m cProfile -s cumtime myscript.py 2>&1 | head -30

# Run a quick one-liner from the shell
python3 -c "
import json, sys
data = json.load(sys.stdin)
print(json.dumps(data, indent=2))
" < response.json

Gotcha: boto3 Paginators Are Not Optional

You call ec2.describe_instances() and get 100 instances back. You actually have 847. The API returned one page and you threw away the rest.

Fix:

# BAD: Only gets first page
response = ec2.describe_instances()
instances = response['Reservations']

# GOOD: Gets all pages
paginator = ec2.get_paginator('describe_instances')
instances = []
for page in paginator.paginate():
    for reservation in page['Reservations']:
        instances.extend(reservation['Instances'])

# ALSO GOOD: Use page_iterator directly
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket='my-bucket'):
    for obj in page.get('Contents', []):
        process(obj)

Every AWS API that returns a list should be paginated. If you see NextToken or IsTruncated in the response, you missed items.

Debug clue: If your script works in dev (12 instances) but returns incorrect results in prod (800+ instances), pagination is the first thing to check. The default page size for most AWS APIs is 50-100 items. Search your codebase for describe_, list_, and get_ calls that do not use a paginator.

Gotcha: requests Without Timeout Hangs Forever

Your script calls an internal API. The target server is overloaded and does not respond. requests.get() with no timeout blocks indefinitely. Your cron job is still "running" 6 hours later. The next invocation starts, and now you have two stuck processes.

Fix:

# BAD: no timeout
response = requests.get('http://api.internal:8080/status')

# GOOD: always set timeout (connect_timeout, read_timeout)
response = requests.get('http://api.internal:8080/status', timeout=(5, 30))

# BETTER: session with retries and timeout
session = requests.Session()
retry = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503])
session.mount('http://', HTTPAdapter(max_retries=retry))
response = session.get('http://api.internal:8080/status', timeout=(5, 30))

Gotcha: subprocess.run with shell=True and User Input

You build a command string with user-provided input and pass it to shell=True. An attacker passes ; rm -rf / as the hostname. Your script executes it.

Fix:

# BAD: shell injection vulnerability
hostname = user_input
subprocess.run(f"ping -c 1 {hostname}", shell=True)

# GOOD: pass arguments as a list
subprocess.run(["ping", "-c", "1", hostname])

# If you must use shell features (pipes, redirects):
subprocess.run(
    ["grep", "ERROR", "/var/log/app.log"],
    stdout=subprocess.PIPE,
)
# Then pipe in Python, not in shell

Gotcha: paramiko Connections Not Closed

You open SSH connections in a loop but do not close them on exceptions. After 200 hosts, you hit the file descriptor limit and everything fails.

Fix:

# BAD: connection leak on exception
client = paramiko.SSHClient()
client.connect(host, username=user, key_filename=key)
stdin, stdout, stderr = client.exec_command(cmd)
# If exec_command raises, client is never closed

# GOOD: use try/finally or context manager pattern
def run_remote(host, user, key, cmd):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        client.connect(host, username=user, key_filename=key, timeout=10)
        stdin, stdout, stderr = client.exec_command(cmd, timeout=30)
        return stdout.read().decode().strip()
    finally:
        client.close()

Gotcha: JSON Dumps of datetime Objects

Your script fetches data from AWS that includes datetime fields. You try json.dumps(data) and get TypeError: Object of type datetime is not JSON serializable.

Fix:

import json
from datetime import datetime

# Custom JSON encoder for infra data
class InfraEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

output = json.dumps(data, indent=2, cls=InfraEncoder)

# Or use the quick one-liner approach:
json.dumps(data, indent=2, default=str)

# Gotcha: default=str silently converts ANY non-serializable object to its
# str() representation. This hides bugs where unexpected types slip through.
# Use InfraEncoder for production code, default=str for quick debugging only.

Pattern: Structured Logging for Infra Scripts

import logging
import json
import sys

def setup_logging(verbose=False):
    """Set up structured logging for infra scripts."""
    level = logging.DEBUG if verbose else logging.INFO
    handler = logging.StreamHandler(sys.stderr)
    handler.setFormatter(logging.Formatter(
        '%(asctime)s %(levelname)s %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    ))
    logging.basicConfig(level=level, handlers=[handler])
    return logging.getLogger(__name__)

log = setup_logging()

# Use structured messages
log.info("Starting backup", extra={'hosts': 5, 'type': 'borg'})
log.error("Backup failed for host=%s error=%s", host, str(e))

# For JSON output (to pipe into monitoring)
def log_json(event, **kwargs):
    """Emit a JSON log line."""
    entry = {'event': event, 'timestamp': datetime.now(datetime.timezone.utc).isoformat()}
    entry.update(kwargs)
    print(json.dumps(entry), file=sys.stderr)

Pattern: Retry with Exponential Backoff

import time
import functools

def retry(max_attempts=3, backoff=2, exceptions=(Exception,)):
    """Retry decorator with exponential backoff."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        raise
                    wait = backoff ** attempt
                    log.warning(
                        "Attempt %d/%d failed: %s. Retrying in %ds",
                        attempt, max_attempts, e, wait,
                    )
                    time.sleep(wait)
        return wrapper
    return decorator

@retry(max_attempts=3, backoff=2, exceptions=(requests.RequestException,))
def call_api(url):
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    return resp.json()

Pattern: Safe Config File Updates

from pathlib import Path
import tempfile
import shutil

def safe_update_config(config_path, new_content, backup=True):
    """Update a config file atomically with backup."""
    config_path = Path(config_path)

    # Backup existing
    if backup and config_path.exists():
        backup_path = config_path.with_suffix(
            f'.bak.{int(time.time())}'
        )
        shutil.copy2(config_path, backup_path)

    # Write to temp file in same directory (same filesystem for atomic rename)
    fd, tmp_path = tempfile.mkstemp(
        dir=config_path.parent,
        prefix=config_path.name,
        suffix='.tmp',
    )
    try:
        Path(tmp_path).write_text(new_content)
        # Preserve original permissions
        if config_path.exists():
            shutil.copystat(config_path, tmp_path)
        Path(tmp_path).rename(config_path)
    except Exception:
        Path(tmp_path).unlink(missing_ok=True)
        raise

Pattern: Parallel Fleet Operations

from concurrent.futures import ThreadPoolExecutor, as_completed
import sys

def fleet_operation(hosts, operation, max_workers=20, fail_fast=False):
    """Run an operation across a fleet of hosts in parallel."""
    results = {'success': [], 'failed': []}

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(operation, host): host
            for host in hosts
        }

        for future in as_completed(futures):
            host = futures[future]
            try:
                result = future.result()
                results['success'].append({'host': host, 'result': result})
                print(f"  OK: {host}", file=sys.stderr)
            except Exception as e:
                results['failed'].append({'host': host, 'error': str(e)})
                print(f"FAIL: {host} - {e}", file=sys.stderr)
                if fail_fast:
                    executor.shutdown(wait=False, cancel_futures=True)
                    break

    return results

# Usage
def check_disk(host):
    return run_remote(host, 'deploy', '/home/deploy/.ssh/id_rsa', 'df -h /')

results = fleet_operation(all_hosts, check_disk, max_workers=30)
print(f"\n{len(results['success'])} OK, {len(results['failed'])} FAILED")

Emergency: Script Consuming Too Much Memory

# BAD: Loading everything into memory
all_logs = open('/var/log/huge.log').readlines()  # 10 GB in memory
matching = [line for line in all_logs if 'ERROR' in line]

# GOOD: Stream line by line
def grep_file(filepath, pattern):
    """Memory-efficient file search."""
    with open(filepath) as f:
        for line_num, line in enumerate(f, 1):
            if pattern in line:
                yield line_num, line.rstrip()

# GOOD: Use generators for large AWS results
def stream_s3_objects(bucket):
    paginator = s3.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket):
        for obj in page.get('Contents', []):
            yield obj  # One at a time, not all at once

Emergency: Script Fails with SSL Error

# Common in corporate environments with internal CAs

# Quick fix (INSECURE - only for debugging):
# response = requests.get(url, verify=False)

# Correct fix: point to the CA bundle
response = requests.get(url, verify='/etc/ssl/certs/internal-ca.pem')

# Or set it globally via environment variable:
# export REQUESTS_CA_BUNDLE=/etc/ssl/certs/internal-ca.pem

# Or in boto3:
# export AWS_CA_BUNDLE=/etc/ssl/certs/internal-ca.pem

Quick Reference