Python for Infrastructure - Street-Level Ops¶
What experienced infra engineers know about writing Python that runs in production, not just in a notebook.
Quick Diagnosis Commands¶
# Check Python version and location
python3 --version
which python3
# Check if a module is installed
python3 -c "import boto3; print(boto3.__version__)"
python3 -c "import paramiko; print(paramiko.__version__)"
# Quick pip freeze (what's installed)
pip3 list --format=columns 2>/dev/null | head -20
# Test AWS credentials
python3 -c "import boto3; print(boto3.client('sts').get_caller_identity()['Arn'])"
# Debug import errors
python3 -v -c "import mymodule" 2>&1 | tail -20
# Check for syntax errors without running
python3 -m py_compile myscript.py
# Profile a slow script
python3 -m cProfile -s cumtime myscript.py 2>&1 | head -30
# Run a quick one-liner from the shell
python3 -c "
import json, sys
data = json.load(sys.stdin)
print(json.dumps(data, indent=2))
" < response.json
Gotcha: boto3 Paginators Are Not Optional¶
You call ec2.describe_instances() and get 100 instances back. You actually have 847. The API returned one page and you threw away the rest.
Fix:
# BAD: Only gets first page
response = ec2.describe_instances()
instances = response['Reservations']
# GOOD: Gets all pages
paginator = ec2.get_paginator('describe_instances')
instances = []
for page in paginator.paginate():
for reservation in page['Reservations']:
instances.extend(reservation['Instances'])
# ALSO GOOD: Use page_iterator directly
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket='my-bucket'):
for obj in page.get('Contents', []):
process(obj)
Every AWS API that returns a list should be paginated. If you see NextToken or IsTruncated in the response, you missed items.
Debug clue: If your script works in dev (12 instances) but returns incorrect results in prod (800+ instances), pagination is the first thing to check. The default page size for most AWS APIs is 50-100 items. Search your codebase for
describe_,list_, andget_calls that do not use a paginator.
Gotcha: requests Without Timeout Hangs Forever¶
Your script calls an internal API. The target server is overloaded and does not respond. requests.get() with no timeout blocks indefinitely. Your cron job is still "running" 6 hours later. The next invocation starts, and now you have two stuck processes.
Fix:
# BAD: no timeout
response = requests.get('http://api.internal:8080/status')
# GOOD: always set timeout (connect_timeout, read_timeout)
response = requests.get('http://api.internal:8080/status', timeout=(5, 30))
# BETTER: session with retries and timeout
session = requests.Session()
retry = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503])
session.mount('http://', HTTPAdapter(max_retries=retry))
response = session.get('http://api.internal:8080/status', timeout=(5, 30))
Gotcha: subprocess.run with shell=True and User Input¶
You build a command string with user-provided input and pass it to shell=True. An attacker passes ; rm -rf / as the hostname. Your script executes it.
Fix:
# BAD: shell injection vulnerability
hostname = user_input
subprocess.run(f"ping -c 1 {hostname}", shell=True)
# GOOD: pass arguments as a list
subprocess.run(["ping", "-c", "1", hostname])
# If you must use shell features (pipes, redirects):
subprocess.run(
["grep", "ERROR", "/var/log/app.log"],
stdout=subprocess.PIPE,
)
# Then pipe in Python, not in shell
Gotcha: paramiko Connections Not Closed¶
You open SSH connections in a loop but do not close them on exceptions. After 200 hosts, you hit the file descriptor limit and everything fails.
Fix:
# BAD: connection leak on exception
client = paramiko.SSHClient()
client.connect(host, username=user, key_filename=key)
stdin, stdout, stderr = client.exec_command(cmd)
# If exec_command raises, client is never closed
# GOOD: use try/finally or context manager pattern
def run_remote(host, user, key, cmd):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(host, username=user, key_filename=key, timeout=10)
stdin, stdout, stderr = client.exec_command(cmd, timeout=30)
return stdout.read().decode().strip()
finally:
client.close()
Gotcha: JSON Dumps of datetime Objects¶
Your script fetches data from AWS that includes datetime fields. You try json.dumps(data) and get TypeError: Object of type datetime is not JSON serializable.
Fix:
import json
from datetime import datetime
# Custom JSON encoder for infra data
class InfraEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
output = json.dumps(data, indent=2, cls=InfraEncoder)
# Or use the quick one-liner approach:
json.dumps(data, indent=2, default=str)
# Gotcha: default=str silently converts ANY non-serializable object to its
# str() representation. This hides bugs where unexpected types slip through.
# Use InfraEncoder for production code, default=str for quick debugging only.
Pattern: Structured Logging for Infra Scripts¶
import logging
import json
import sys
def setup_logging(verbose=False):
"""Set up structured logging for infra scripts."""
level = logging.DEBUG if verbose else logging.INFO
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
))
logging.basicConfig(level=level, handlers=[handler])
return logging.getLogger(__name__)
log = setup_logging()
# Use structured messages
log.info("Starting backup", extra={'hosts': 5, 'type': 'borg'})
log.error("Backup failed for host=%s error=%s", host, str(e))
# For JSON output (to pipe into monitoring)
def log_json(event, **kwargs):
"""Emit a JSON log line."""
entry = {'event': event, 'timestamp': datetime.now(datetime.timezone.utc).isoformat()}
entry.update(kwargs)
print(json.dumps(entry), file=sys.stderr)
Pattern: Retry with Exponential Backoff¶
import time
import functools
def retry(max_attempts=3, backoff=2, exceptions=(Exception,)):
"""Retry decorator with exponential backoff."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts:
raise
wait = backoff ** attempt
log.warning(
"Attempt %d/%d failed: %s. Retrying in %ds",
attempt, max_attempts, e, wait,
)
time.sleep(wait)
return wrapper
return decorator
@retry(max_attempts=3, backoff=2, exceptions=(requests.RequestException,))
def call_api(url):
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.json()
Pattern: Safe Config File Updates¶
from pathlib import Path
import tempfile
import shutil
def safe_update_config(config_path, new_content, backup=True):
"""Update a config file atomically with backup."""
config_path = Path(config_path)
# Backup existing
if backup and config_path.exists():
backup_path = config_path.with_suffix(
f'.bak.{int(time.time())}'
)
shutil.copy2(config_path, backup_path)
# Write to temp file in same directory (same filesystem for atomic rename)
fd, tmp_path = tempfile.mkstemp(
dir=config_path.parent,
prefix=config_path.name,
suffix='.tmp',
)
try:
Path(tmp_path).write_text(new_content)
# Preserve original permissions
if config_path.exists():
shutil.copystat(config_path, tmp_path)
Path(tmp_path).rename(config_path)
except Exception:
Path(tmp_path).unlink(missing_ok=True)
raise
Pattern: Parallel Fleet Operations¶
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
def fleet_operation(hosts, operation, max_workers=20, fail_fast=False):
"""Run an operation across a fleet of hosts in parallel."""
results = {'success': [], 'failed': []}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(operation, host): host
for host in hosts
}
for future in as_completed(futures):
host = futures[future]
try:
result = future.result()
results['success'].append({'host': host, 'result': result})
print(f" OK: {host}", file=sys.stderr)
except Exception as e:
results['failed'].append({'host': host, 'error': str(e)})
print(f"FAIL: {host} - {e}", file=sys.stderr)
if fail_fast:
executor.shutdown(wait=False, cancel_futures=True)
break
return results
# Usage
def check_disk(host):
return run_remote(host, 'deploy', '/home/deploy/.ssh/id_rsa', 'df -h /')
results = fleet_operation(all_hosts, check_disk, max_workers=30)
print(f"\n{len(results['success'])} OK, {len(results['failed'])} FAILED")
Emergency: Script Consuming Too Much Memory¶
# BAD: Loading everything into memory
all_logs = open('/var/log/huge.log').readlines() # 10 GB in memory
matching = [line for line in all_logs if 'ERROR' in line]
# GOOD: Stream line by line
def grep_file(filepath, pattern):
"""Memory-efficient file search."""
with open(filepath) as f:
for line_num, line in enumerate(f, 1):
if pattern in line:
yield line_num, line.rstrip()
# GOOD: Use generators for large AWS results
def stream_s3_objects(bucket):
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket):
for obj in page.get('Contents', []):
yield obj # One at a time, not all at once
Emergency: Script Fails with SSL Error¶
# Common in corporate environments with internal CAs
# Quick fix (INSECURE - only for debugging):
# response = requests.get(url, verify=False)
# Correct fix: point to the CA bundle
response = requests.get(url, verify='/etc/ssl/certs/internal-ca.pem')
# Or set it globally via environment variable:
# export REQUESTS_CA_BUNDLE=/etc/ssl/certs/internal-ca.pem
# Or in boto3:
# export AWS_CA_BUNDLE=/etc/ssl/certs/internal-ca.pem
Quick Reference¶
- Cheatsheet: Python for DevOps