Skip to content

Portal | Level: L1: Foundations | Topics: Python Automation | Domain: DevOps & Tooling

Python for Infrastructure - Primer

Why This Matters

You already know Bash. You have written hundreds of scripts that glue infrastructure together. But Bash breaks down when you need error handling, data structures, API integration, parallel execution, or anything beyond string manipulation. Python is the language that bridges ops scripting and software engineering. Every major infrastructure tool — Ansible, SaltStack, AWS CLI, Terraform providers — is written in or extensible with Python.

Fun fact: Python was created by Guido van Rossum in 1991 and named after Monty Python's Flying Circus, not the snake. The language's design philosophy is captured in "The Zen of Python" (import this), which includes "Readability counts" and "There should be one -- and preferably only one -- obvious way to do it." These principles make Python ideal for infrastructure scripts that must be maintained by teams.

This is not a Python tutorial. This is Python patterns for ops engineers who already think in terms of servers, APIs, and automation. You know what you want to do. This shows you how to do it in Python instead of a 500-line Bash script that nobody can maintain.

Core Concepts

1. boto3 for AWS

boto3 is the AWS SDK for Python. Every AWS API is accessible through it.

import boto3
from botocore.exceptions import ClientError

# Initialize clients
ec2 = boto3.client('ec2', region_name='us-east-1')
s3 = boto3.client('s3')
ssm = boto3.client('ssm')

# List instances with a specific tag
def get_instances_by_tag(tag_key, tag_value):
    response = ec2.describe_instances(
        Filters=[
            {'Name': f'tag:{tag_key}', 'Values': [tag_value]},
            {'Name': 'instance-state-name', 'Values': ['running']},
        ]
    )
    instances = []
    for reservation in response['Reservations']:
        for instance in reservation['Instances']:
            instances.append({
                'id': instance['InstanceId'],
                'ip': instance.get('PrivateIpAddress'),
                'type': instance['InstanceType'],
                'az': instance['Placement']['AvailabilityZone'],
            })
    return instances

> **Gotcha:** boto3 has two interfaces: `client` (low-level, 1:1 with the API) and `resource` (high-level, object-oriented). The `resource` interface is in maintenance mode (no new features) as of 2023. Prefer `client` for new code. Also, boto3 reads credentials in this order: (1) explicit parameters, (2) environment variables, (3) `~/.aws/credentials` profile, (4) EC2 instance metadata / ECS task role. Hardcoding credentials in code is never necessary.

# Paginate (critical for large results)
def list_all_s3_objects(bucket, prefix=''):
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    for page in pages:
        for obj in page.get('Contents', []):
            yield obj['Key'], obj['Size']

# Error handling
def stop_instance(instance_id):
    try:
        ec2.stop_instances(InstanceIds=[instance_id])
        print(f"Stopping {instance_id}")
    except ClientError as e:
        if e.response['Error']['Code'] == 'InvalidInstanceID.NotFound':
            print(f"Instance {instance_id} not found")
        else:
            raise

2. paramiko for SSH

import paramiko

def run_remote_command(host, user, key_path, command):
    """Run a command on a remote host via SSH."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        client.connect(
            hostname=host,
            username=user,
            key_filename=key_path,
            timeout=10,
        )
        stdin, stdout, stderr = client.exec_command(command, timeout=30)
        exit_code = stdout.channel.recv_exit_status()

        return {
            'host': host,
            'stdout': stdout.read().decode().strip(),
            'stderr': stderr.read().decode().strip(),
            'exit_code': exit_code,
        }
    finally:
        client.close()

# Run across multiple hosts
def run_on_fleet(hosts, user, key_path, command):
    results = []
    for host in hosts:
        try:
            result = run_remote_command(host, user, key_path, command)
            results.append(result)
        except Exception as e:
            results.append({'host': host, 'error': str(e)})
    return results

# SFTP file transfer
def upload_file(host, user, key_path, local_path, remote_path):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(hostname=host, username=user, key_filename=key_path)
    sftp = client.open_sftp()
    sftp.put(local_path, remote_path)
    sftp.close()
    client.close()

3. Click for CLI Tools

One-liner: The fastest way to check if a host is responding from Python: subprocess.run(["ping", "-c", "1", "-W", "2", host], capture_output=True).returncode == 0. But for anything beyond one-off checks, use the requests library with proper timeouts and retries.

Click is the standard for building Python CLI tools. It handles argument parsing, help text, validation, and subcommands.

import click
import json

@click.group()
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose output')
@click.pass_context
def cli(ctx, verbose):
    """Infrastructure management tool."""
    ctx.ensure_object(dict)
    ctx.obj['verbose'] = verbose

@cli.command()
@click.argument('environment', type=click.Choice(['dev', 'staging', 'prod']))
@click.option('--region', '-r', default='us-east-1', help='AWS region')
@click.option('--output', '-o', type=click.Choice(['table', 'json']), default='table')
def list_servers(environment, region, output):
    """List servers in an environment."""
    servers = get_instances_by_tag('Environment', environment)

    if output == 'json':
        click.echo(json.dumps(servers, indent=2))
    else:
        click.echo(f"{'ID':<22} {'IP':<16} {'Type':<12} {'AZ'}")
        click.echo('-' * 60)
        for s in servers:
            click.echo(f"{s['id']:<22} {s['ip']:<16} {s['type']:<12} {s['az']}")

@cli.command()
@click.argument('instance_id')
@click.confirmation_option(prompt='Are you sure you want to stop this instance?')
def stop(instance_id):
    """Stop an EC2 instance."""
    stop_instance(instance_id)
    click.echo(f"Stop request sent for {instance_id}")

if __name__ == '__main__':
    cli()

4. Jinja2 for Templating

Generate config files, reports, and infrastructure code from templates.

from jinja2 import Environment, FileSystemLoader

env = Environment(
    loader=FileSystemLoader('templates'),
    trim_blocks=True,
    lstrip_blocks=True,
)

# templates/nginx.conf.j2
NGINX_TEMPLATE = """
upstream {{ service_name }} {
    {% for server in backends %}
    server {{ server.ip }}:{{ server.port }} weight={{ server.weight }};
    {% endfor %}
}

server {
    listen {{ listen_port }};
    server_name {{ domain }};

    location / {
        proxy_pass http://{{ service_name }};
    }
}
"""

def generate_nginx_config(service_name, backends, domain, listen_port=443):
    template = env.from_string(NGINX_TEMPLATE)
    return template.render(
        service_name=service_name,
        backends=backends,
        domain=domain,
        listen_port=listen_port,
    )

# Usage
config = generate_nginx_config(
    service_name='myapp',
    backends=[
        {'ip': '10.0.1.10', 'port': 8080, 'weight': 100},
        {'ip': '10.0.1.11', 'port': 8080, 'weight': 100},
    ],
    domain='app.example.com',
)

5. requests for REST APIs

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Session with retries (essential for infra scripts)
def get_session(retries=3, backoff=0.5):
    session = requests.Session()
    retry = Retry(
        total=retries,
        backoff_factor=backoff,
        status_forcelist=[500, 502, 503, 504],
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

# Prometheus query
def query_prometheus(prom_url, query):
    session = get_session()
    response = session.get(
        f"{prom_url}/api/v1/query",
        params={'query': query},
        timeout=10,
    )
    response.raise_for_status()
    data = response.json()
    return data['data']['result']

# PagerDuty incident creation
def create_pagerduty_incident(api_key, service_id, title, body):
    session = get_session()
    response = session.post(
        'https://api.pagerduty.com/incidents',
        headers={
            'Authorization': f'Token token={api_key}',
            'Content-Type': 'application/json',
        },
        json={
            'incident': {
                'type': 'incident',
                'title': title,
                'service': {'id': service_id, 'type': 'service_reference'},
                'body': {'type': 'incident_body', 'details': body},
            }
        },
        timeout=10,
    )
    response.raise_for_status()
    return response.json()

6. subprocess for Shell Commands

import subprocess
import shlex

def run_cmd(cmd, timeout=30, check=True):
    """Run a shell command safely."""
    result = subprocess.run(
        shlex.split(cmd),       # Safe argument splitting
        capture_output=True,
        text=True,
        timeout=timeout,
        check=check,            # Raise on non-zero exit
    )
    return result

# Stream output in real time
def run_streaming(cmd):
    """Run a command and stream output line by line."""
    process = subprocess.Popen(
        shlex.split(cmd),
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
    )
    for line in process.stdout:
        print(line, end='')
    process.wait()
    return process.returncode

# Never use shell=True with user input
# BAD:  subprocess.run(f"ls {user_input}", shell=True)
# GOOD: subprocess.run(["ls", user_input])

7. pathlib for File Operations

from pathlib import Path
import json

# Path manipulation (no more os.path.join)
config_dir = Path('/etc/myapp')
config_file = config_dir / 'config.json'
backup_dir = Path('/backup') / 'myapp'

# Read/write files
config = json.loads(config_file.read_text())
config_file.write_text(json.dumps(config, indent=2))

# Directory operations
backup_dir.mkdir(parents=True, exist_ok=True)

# Find files
for log_file in Path('/var/log').glob('*.log'):
    size_mb = log_file.stat().st_size / (1024 * 1024)
    if size_mb > 100:
        print(f"Large log: {log_file} ({size_mb:.1f} MB)")

# Recursive glob
for yaml_file in Path('/etc').rglob('*.yaml'):
    print(yaml_file)

# Atomic write (safe for config files)
import tempfile
def atomic_write(path, content):
    """Write content to file atomically."""
    path = Path(path)
    fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix='.tmp')
    try:
        Path(tmp_path).write_text(content)
        Path(tmp_path).rename(path)
    except Exception:
        Path(tmp_path).unlink(missing_ok=True)
        raise

8. concurrent.futures for Parallelism

from concurrent.futures import ThreadPoolExecutor, as_completed

def check_host_health(host):
    """Check if a host is healthy."""
    try:
        resp = requests.get(f'http://{host}:8080/health', timeout=5)
        return {'host': host, 'status': resp.status_code, 'healthy': resp.ok}
    except requests.exceptions.RequestException as e:
        return {'host': host, 'status': 0, 'healthy': False, 'error': str(e)}

def parallel_health_check(hosts, max_workers=20):
    """Check health of many hosts in parallel."""
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_host = {
            executor.submit(check_host_health, host): host
            for host in hosts
        }
        for future in as_completed(future_to_host):
            results.append(future.result())
    return results

# Process results
hosts = ['10.0.1.10', '10.0.1.11', '10.0.1.12']
results = parallel_health_check(hosts)
unhealthy = [r for r in results if not r['healthy']]
if unhealthy:
    print(f"ALERT: {len(unhealthy)} unhealthy hosts")
    for h in unhealthy:
        print(f"  {h['host']}: {h.get('error', h['status'])}")

Common Pitfalls

  1. Not using sessions with retries — Infra APIs are flaky. A single requests.get() without retries will fail in production. Always use a session with Retry and HTTPAdapter.
  2. subprocess with shell=True — Opens you to shell injection. Use shlex.split() or pass a list of arguments. The only time shell=True is acceptable is when you need shell features like pipes, and even then, prefer Python-native alternatives.
  3. Blocking on SSH to 500 hosts sequentially — Use concurrent.futures.ThreadPoolExecutor for parallel SSH. A sequential loop takes 500x longer than 20 parallel threads.
  4. Hardcoding credentials — Never put AWS keys, passwords, or tokens in Python files. Use environment variables, AWS profiles, or a secrets manager.
  5. No timeout on HTTP requestsrequests.get(url) with no timeout will block forever if the server does not respond. Always set timeout=.
  6. Ignoring pagination — AWS APIs return at most 100-1000 results per call. If you have 5,000 instances and do not paginate, you only see the first 1,000.
  7. Using os.path when pathlib existsPath objects are cleaner, more readable, and handle edge cases better. Stop writing os.path.join(os.path.dirname(...)).

Wiki Navigation

Prerequisites

Next Steps

  • Perl Flashcards (CLI) (flashcard_deck, L1) — Python Automation
  • Python Async & Concurrency (Topic Pack, L2) — Python Automation
  • Python Debugging (Topic Pack, L1) — Python Automation
  • Python Drills (Drill, L0) — Python Automation
  • Python Exercises (Quest Ladder) (CLI) (Exercise Set, L0) — Python Automation
  • Python Flashcards (CLI) (flashcard_deck, L1) — Python Automation
  • Python Packaging (Topic Pack, L2) — Python Automation
  • Skillcheck: Python Automation (Assessment, L0) — Python Automation
  • Software Development Flashcards (CLI) (flashcard_deck, L1) — Python Automation