Skip to content

Python: Data Wrangling for Ops

  • lesson
  • python
  • log-parsing
  • csv
  • regex
  • datetime
  • generators
  • collections
  • data-pipelines
  • awk-replacement ---# Python Data Wrangling for Ops

Topics: Python, log parsing, csv, regex, datetime, generators, collections, data pipelines, awk replacement Strategy: Build-up + parallel (Bash equivalent shown alongside Python replacement) Level: L1–L2 (Foundations → Operations) Time: 90–120 minutes Prerequisites: None (but you'll get more from this if you've parsed logs in Bash and felt the pain)


The Mission

Your team runs 12 Nginx reverse proxies behind an AWS NLB. The observability stack (Datadog) is down for maintenance until Monday. Your manager needs a report by end of day:

  • Top 10 client IPs by request count
  • Slowest 20 endpoints by average response time
  • Error rate by hour (4xx and 5xx) for the last 7 days

You have the access logs on a jump box: seven gzipped files, one per day, about 50 million lines total. Each file is 800 MB uncompressed.

You could do this in awk. You've done it in awk. But the last time you did it, the one-liner became a 90-line awk script with three associative arrays, a datetime parser built from substr() calls, and a bug where requests at midnight got counted in the wrong hour.

This time you're going to do it in Python. By the end of this lesson, you'll have a reusable log analysis pipeline that handles any format, any size, and produces clean output you can paste into Slack or pipe into a CSV.


Part 1: Reading Files Without Killing the Server

The logs are 800 MB each. Your jump box has 4 GB of RAM. Let's start with what NOT to do.

The Memory Bomb

# DO NOT DO THIS
with open('access.log') as f:
    lines = f.readlines()  # loads 800 MB into RAM
    # plus Python object overhead: each line is a str object
    # actual memory: ~2.5 GB for an 800 MB file

Under the Hood: Python strings are objects. Each one carries a reference count, a type pointer, a length, a hash cache, and the actual character data. A 200-byte log line costs about 250 bytes as a Python object. For 7 million lines, that overhead alone is ~350 MB on top of the raw data. f.readlines() creates all those objects at once.

The Right Way: Line by Line

count = 0
with open('access.log') as f:
    for line in f:           # reads one line at a time
        if '500' in line:
            count += 1
print(f"500 errors: {count}")

Memory usage: roughly one line in RAM at a time. You could process a 100 GB file on a Raspberry Pi.

The Bash Comparison

# Bash — fast but limited
grep -c '500' access.log

# The Python version above is ~5x slower than grep for this exact task.
# But grep can't do: "count 500s per hour, grouped by endpoint, excluding health checks"
# That's where Python pays for itself.

Handling Gzipped Files

Your logs are gzipped. Python handles this natively:

import gzip

with gzip.open('access.log.gz', 'rt') as f:  # 'rt' = read as text
    for line in f:
        process(line)

The 'rt' mode matters. Without it you get bytes, not strings, and every comparison fails silently. Ask me how I know.

Gotcha: gzip.open() with 'rt' decodes the bytes to strings on the fly. If your log has non-UTF-8 bytes (binary data in POST bodies, corrupted lines), add errors='replace' to avoid UnicodeDecodeError crashing your script at line 4,287,331 of a 7-million-line file.

Processing Multiple Files

import gzip
from pathlib import Path

log_dir = Path('/var/log/nginx')
for log_file in sorted(log_dir.glob('access.log.*.gz')):
    with gzip.open(log_file, 'rt', errors='replace') as f:
        for line in f:
            process(line)

Flashcard Check #1

Question Answer
Why is f.readlines() dangerous on large files? It loads the entire file into memory at once. Python object overhead makes it ~3x the file size in RAM.
What mode string do you pass to gzip.open() for text? 'rt' — read as text. Without the t, you get raw bytes.
What does errors='replace' do when opening a file? Replaces undecodable bytes with the Unicode replacement character instead of raising UnicodeDecodeError.

Part 2: collections — The Ops Power Tools

The collections module is the single biggest productivity jump when you move from Bash to Python for data processing. Three tools do 90% of the work.

Counter: The Awk Killer

Here's the awk way to count IPs:

awk '{print $1}' access.log | sort | uniq -c | sort -rn | head -10

Four commands piped together. Works fine. Now here's Python:

from collections import Counter

ip_counts = Counter()
with open('access.log') as f:
    for line in f:
        ip = line.split()[0]
        ip_counts[ip] += 1

for ip, count in ip_counts.most_common(10):
    print(f"{count:>8}  {ip}")

Mental Model: A Counter is a dict where every key starts at zero. You never need to check "does this key exist?" before incrementing. counter[key] += 1 just works, even if key has never been seen. This eliminates the most common bug in awk associative arrays and Python dicts alike.

The awk version pipes through sort twice (once alphabetically for uniq, once numerically for ranking). The Python version sorts once, internally, and gives you the top N directly. But the real win comes when you need to count two things at once:

ip_counts = Counter()
status_counts = Counter()
endpoint_counts = Counter()

with open('access.log') as f:
    for line in f:
        parts = line.split()
        ip_counts[parts[0]] += 1
        status_counts[parts[8]] += 1
        endpoint_counts[parts[6]] += 1

One pass through the file. Three separate aggregations. In awk, you'd need three associative arrays and a longer END block. In Bash, you'd need three separate pipelines (three passes through an 800 MB file).

Trivia: Counter was added in Python 2.7 (2010) by Raymond Hettinger, who also created OrderedDict, namedtuple, and the @functools.lru_cache decorator. Hettinger is responsible for more of the "Python feels nice to use" experience than perhaps anyone other than Guido van Rossum himself.

defaultdict: The "I Don't Want to Check if the Key Exists" Dict

from collections import defaultdict

# Group log lines by status code
lines_by_status = defaultdict(list)
with open('access.log') as f:
    for line in f:
        status = line.split()[8]
        lines_by_status[status].append(line.rstrip())

# Now you have: {'200': [...], '404': [...], '500': [...]}
print(f"Unique 500 errors: {len(lines_by_status['500'])}")

Without defaultdict, you'd write:

if status not in lines_by_status:
    lines_by_status[status] = []
lines_by_status[status].append(line)

That if check on every single line of a 50-million-line file adds up — in code noise, in bugs, and in CPU cycles.

defaultdict argument Creates Use case
defaultdict(list) Empty list [] Grouping items
defaultdict(int) Zero 0 Counting (but use Counter instead)
defaultdict(set) Empty set set() Unique grouping
defaultdict(dict) Empty dict {} Nested grouping

namedtuple: Give Your Data a Skeleton

When you parse a log line, you get a tuple of strings. Was parts[6] the URL or the status code? Nobody remembers. namedtuple fixes that:

from collections import namedtuple

LogEntry = namedtuple('LogEntry', [
    'ip', 'ident', 'user', 'timestamp', 'request',
    'status', 'size', 'referer', 'user_agent'
])

Now every parsed line has named fields. entry.ip instead of parts[0]. entry.status instead of parts[8]. Your code reads like English, and you'll never mix up field indices again.

Name Origin: namedtuple was inspired by the concept of record types in languages like Pascal and C structs. It gives you the memory efficiency of a tuple (no per-instance __dict__) with the readability of a class. For millions of log entries, this matters — a namedtuple uses roughly 72 bytes per instance vs. ~200+ bytes for a regular class.


Part 3: Regular Expressions — Parsing Real Log Formats

Splitting on whitespace breaks the moment a log line has quoted strings. Nginx Combined Log Format:

10.0.1.47 - admin [22/Mar/2026:15:32:01 +0000] "GET /api/v2/products?page=3 HTTP/1.1" 200 1432 "https://app.example.com/dashboard" "Mozilla/5.0 (X11; Linux x86_64)"

line.split() turns this into 16 fields because the quoted strings get split. You need regex.

The Nginx Log Parser

import re

# Compile once, use millions of times
NGINX_PATTERN = re.compile(
    r'(?P<ip>\S+) \S+ (?P<user>\S+) '
    r'\[(?P<timestamp>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+) \S+" '
    r'(?P<status>\d{3}) (?P<size>\S+) '
    r'"(?P<referer>[^"]*)" '
    r'"(?P<user_agent>[^"]*)"'
)

line = '10.0.1.47 - admin [22/Mar/2026:15:32:01 +0000] "GET /api/v2/products?page=3 HTTP/1.1" 200 1432 "https://app.example.com/dashboard" "Mozilla/5.0 (X11; Linux x86_64)"'

match = NGINX_PATTERN.match(line)
if match:
    print(match.group('ip'))        # 10.0.1.47
    print(match.group('status'))    # 200
    print(match.group('path'))      # /api/v2/products?page=3
    print(match.group('method'))    # GET

Break down the regex pattern piece by piece:

Pattern fragment Matches Named group
(?P<ip>\S+) Non-whitespace (the IP) ip
\[(?P<timestamp>[^\]]+)\] Everything inside [...] timestamp
"(?P<method>\S+) (?P<path>\S+) \S+" HTTP method, path, protocol method, path
(?P<status>\d{3}) Three digits status
"(?P<user_agent>[^"]*)" Everything inside quotes user_agent

Remember: (?P<name>...) is Python's named group syntax. The P stands for "Python" — this extension was added by Python's regex engine and later adopted by other languages. Named groups let you access matches by name instead of index number, which means your code won't break when the log format changes.

Compiled vs. Uncompiled

# SLOW: recompiles the pattern on every call
for line in million_lines:
    match = re.match(r'(?P<ip>\S+)', line)

# FAST: compile once, match millions of times
pattern = re.compile(r'(?P<ip>\S+)')
for line in million_lines:
    match = pattern.match(line)

Under the Hood: re.compile() converts the regex string into a bytecode program that the regex engine can execute directly. Without compiling, re.match() has to parse the pattern string, build the bytecode, and cache it on every call. Python does cache the last few patterns internally, but in a loop over millions of lines, the cache lookup overhead alone is measurable. Compiling explicitly is free performance.

search vs. match vs. findall

Function Behavior Use when
re.match(pattern, string) Anchored to start of string Parsing structured lines
re.search(pattern, string) Finds first match anywhere Searching within text
re.findall(pattern, string) Returns all matches as list Extracting multiple values
re.finditer(pattern, string) Returns match objects lazily Memory-efficient extraction
# Extract all IPs from a firewall log (multiple IPs per line)
line = "src=10.0.1.5 dst=172.16.0.1 sport=443 dport=52341"
ips = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', line)
# ['10.0.1.5', '172.16.0.1']

Flashcard Check #2

Question Answer
What does Counter.most_common(10) return? A list of (element, count) tuples, sorted by count descending, limited to the top 10.
What's the advantage of re.compile() in a loop? It converts the pattern to bytecode once. Without it, the pattern is re-parsed on every call, adding overhead across millions of iterations.
What does (?P<name>...) do in a regex? Creates a named capture group. Access with match.group('name') instead of match.group(1).
defaultdict(list) — what happens when you access a missing key? It automatically creates an empty list [] for that key. No KeyError, no if key not in dict check needed.

Part 4: datetime — The Timezone Nightmare

Nginx timestamps look like this: 22/Mar/2026:15:32:01 +0000. You need to parse them, do time math (which hour does this request belong to?), and handle the fact that your servers might log in UTC but your manager thinks in US/Eastern.

Parsing Timestamps

from datetime import datetime

raw = "22/Mar/2026:15:32:01 +0000"
dt = datetime.strptime(raw, "%d/%b/%Y:%H:%M:%S %z")
# datetime(2026, 3, 22, 15, 32, 1, tzinfo=timezone.utc)

The format codes you'll use constantly:

Code Meaning Example
%d Day of month (zero-padded) 22
%b Abbreviated month name Mar
%Y Four-digit year 2026
%H:%M:%S 24-hour time 15:32:01
%z UTC offset +0000
%Z Timezone name UTC

Gotcha: %b depends on the system locale. On a server with LANG=de_DE.UTF-8, March is Mär, not Mar. Your log parser works on your laptop and breaks on the German team's server. Fix: set locale.setlocale(locale.LC_TIME, 'C') at the top of your script, or use a locale-independent parser.

Grouping by Hour

This is the core of the "error rate by hour" requirement:

from collections import Counter
from datetime import datetime

errors_by_hour = Counter()

with open('access.log') as f:
    for line in f:
        match = NGINX_PATTERN.match(line)
        if not match:
            continue
        status = int(match.group('status'))
        if status >= 400:
            dt = datetime.strptime(match.group('timestamp'),
                                   "%d/%b/%Y:%H:%M:%S %z")
            hour_key = dt.strftime("%Y-%m-%d %H:00")
            errors_by_hour[hour_key] += 1

for hour, count in sorted(errors_by_hour.items()):
    print(f"{hour}  {count:>6} errors")

Output:

2026-03-16 00:00     142 errors
2026-03-16 01:00      87 errors
2026-03-16 02:00      63 errors
...
2026-03-22 15:00    1847 errors   ← the spike

Timezone Conversion

from datetime import datetime, timezone
from zoneinfo import ZoneInfo  # Python 3.9+

# Parse as UTC
dt_utc = datetime.strptime("22/Mar/2026:15:32:01 +0000",
                            "%d/%b/%Y:%H:%M:%S %z")

# Convert to US/Eastern
eastern = ZoneInfo("America/New_York")
dt_eastern = dt_utc.astimezone(eastern)
print(dt_eastern)  # 2026-03-22 11:32:01-04:00  (EDT)

Under the Hood: zoneinfo (Python 3.9+) reads the IANA timezone database directly from the operating system's /usr/share/zoneinfo/ directory — the same database that powers the TZ environment variable and the date command. On minimal Docker images that strip timezone data, install the tzdata PyPI package as a fallback.

Time Math

How long between two requests? How many seconds since the last error?

from datetime import timedelta

dt1 = datetime.strptime("22/Mar/2026:15:32:01 +0000", "%d/%b/%Y:%H:%M:%S %z")
dt2 = datetime.strptime("22/Mar/2026:15:32:47 +0000", "%d/%b/%Y:%H:%M:%S %z")

delta = dt2 - dt1
print(delta)                # 0:00:46
print(delta.total_seconds())  # 46.0

# Is this request older than 7 days?
age = datetime.now(timezone.utc) - dt1
if age > timedelta(days=7):
    print("Outside our reporting window")

Gotcha: datetime.now() returns a naive datetime (no timezone). You cannot subtract a naive datetime from an aware datetime — Python raises TypeError. Always use datetime.now(timezone.utc) when comparing with timezone-aware timestamps. This catches more people than any other datetime bug.


Part 5: Generators — Processing Files Bigger Than RAM

You have 7 days of logs. Total: ~50 million lines, ~5.6 GB uncompressed. You cannot load this into memory. Generators let you build a processing pipeline where data flows through, one line at a time, like a Unix pipe.

The Pipeline Mental Model

Unix:    cat *.log | grep -v healthcheck | awk '{print $1}' | sort | uniq -c
Python:  read_lines() → filter_lines() → extract_field() → count()

Each stage is a generator. Each yields one item at a time. Nothing accumulates in memory unless you explicitly collect it.

Building a Generator Pipeline

import gzip
from pathlib import Path

def read_logs(log_dir):
    """Stage 1: Read all gzipped log files, yielding one line at a time."""
    for log_file in sorted(Path(log_dir).glob('access.log.*.gz')):
        with gzip.open(log_file, 'rt', errors='replace') as f:
            yield from f    # yields each line without loading the file

def parse_lines(lines):
    """Stage 2: Parse each line into a named dict. Skip unparseable lines."""
    for line in lines:
        match = NGINX_PATTERN.match(line)
        if match:
            yield match.groupdict()

def filter_errors(entries):
    """Stage 3: Keep only 4xx and 5xx responses."""
    for entry in entries:
        status = int(entry['status'])
        if status >= 400:
            yield entry

def add_parsed_time(entries):
    """Stage 4: Parse timestamp string into a datetime object."""
    for entry in entries:
        entry['dt'] = datetime.strptime(
            entry['timestamp'], "%d/%b/%Y:%H:%M:%S %z"
        )
        yield entry

Now connect them:

# The pipeline — reads 5.6 GB of data using ~10 MB of RAM
lines     = read_logs('/var/log/nginx')
entries   = parse_lines(lines)
errors    = filter_errors(entries)
timed     = add_parsed_time(errors)

errors_by_hour = Counter()
for entry in timed:
    hour_key = entry['dt'].strftime("%Y-%m-%d %H:00")
    errors_by_hour[hour_key] += 1

Mental Model: Think of generators like a conveyor belt in a factory. Each stage does one thing to each item and passes it to the next stage. No warehouse between stages. The item enters raw material (a text line), gets stamped (parsed), gets inspected (filtered), and exits as a finished product (aggregated count). At any given moment, only one item is on the belt.

yield from

The yield from in read_logs is worth understanding:

# These two are equivalent:
def read_file(path):
    with open(path) as f:
        for line in f:
            yield line

def read_file(path):
    with open(path) as f:
        yield from f    # delegates to f's iterator

yield from is cleaner and slightly faster — it avoids the Python-level for loop overhead by delegating directly to the underlying iterator.


Flashcard Check #3

Question Answer
What's the memory usage of a generator pipeline processing a 5 GB file? Roughly the size of one line plus the state of each generator. Typically under 10 MB regardless of file size.
What does yield from f do? Delegates to f's iterator, yielding each item. Equivalent to for item in f: yield item but faster.
Why can't you subtract a naive datetime from an aware datetime? Python raises TypeError. A naive datetime has no timezone info, so the subtraction is ambiguous. Use datetime.now(timezone.utc) for aware comparisons.

Part 6: Comprehensions — The One-Liner Power Move

List and dict comprehensions replace simple for loops with a single expression. For ops work, they're how you filter, transform, and reshape data in one shot.

List Comprehensions

# The loop way
error_ips = []
for entry in parsed_entries:
    if int(entry['status']) >= 500:
        error_ips.append(entry['ip'])

# The comprehension way
error_ips = [e['ip'] for e in parsed_entries if int(e['status']) >= 500]

Same result, one line instead of four. But don't abuse it:

# Too much in one comprehension — unreadable
result = [f"{e['ip']}:{e['status']}" for e in [NGINX_PATTERN.match(l).groupdict() for l in open('access.log') if NGINX_PATTERN.match(l)] if int(e['status']) >= 500]

# If it doesn't fit your brain in 3 seconds, use a loop.

Dict Comprehensions

# Flip a Counter to find IPs with more than 1000 requests
heavy_hitters = {ip: count for ip, count in ip_counts.items() if count > 1000}

# Build a lookup table: status code → description
STATUS_NAMES = {
    200: 'OK', 301: 'Moved', 302: 'Found',
    400: 'Bad Request', 401: 'Unauthorized', 403: 'Forbidden',
    404: 'Not Found', 500: 'Internal Server Error', 502: 'Bad Gateway',
    503: 'Service Unavailable', 504: 'Gateway Timeout',
}

Generator Expressions (Lazy Comprehensions)

# List comprehension: builds the whole list in memory
total_bytes = sum([int(e['size']) for e in parsed_entries])

# Generator expression: processes one at a time
total_bytes = sum(int(e['size']) for e in parsed_entries)

Drop the square brackets and you get a generator expression — lazy, memory-efficient, and identical in output. Use generator expressions inside sum(), max(), min(), any(), and all().


Part 7: Sorting and Grouping

sorted() with Key Functions

# Sort entries by response time (slowest first)
# Assume response_time is in the last field of your custom log format
sorted_entries = sorted(parsed_entries,
                        key=lambda e: float(e['response_time']),
                        reverse=True)

# Top 20 slowest endpoints
for entry in sorted_entries[:20]:
    print(f"{float(entry['response_time']):>8.3f}s  "
          f"{entry['method']} {entry['path']}")

operator.itemgetter — Faster Than Lambda

from operator import itemgetter

# These are equivalent, but itemgetter is ~20% faster in tight loops:
sorted(entries, key=lambda e: e['status'])
sorted(entries, key=itemgetter('status'))

# Multi-key sorting: sort by status, then by response time
sorted(entries, key=itemgetter('status', 'response_time'))

Under the Hood: itemgetter is implemented in C. A lambda is a Python function that goes through the full Python function call protocol on every invocation. For sorting 50 million entries, that overhead difference is measured in seconds.

itertools.groupby — The awk END Block Replacement

from itertools import groupby
from operator import itemgetter

# Entries must be sorted by the grouping key first
sorted_by_status = sorted(parsed_entries, key=itemgetter('status'))

for status, group in groupby(sorted_by_status, key=itemgetter('status')):
    entries_list = list(group)
    print(f"Status {status}: {len(entries_list)} requests")

Gotcha: groupby only groups consecutive items with the same key. If your data isn't sorted by the grouping key first, you'll get multiple groups for the same value. This is the #1 groupby mistake. It works like the Unix uniq command — it needs sorted input.


Part 8: The csv Module — Replacing awk -F','

Reading CSVs

# awk: print the third column of a comma-separated file
awk -F',' '{print $3}' servers.csv
import csv

# Python: read as named fields
with open('servers.csv') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row['hostname'])  # named fields, not $3

DictReader uses the first row as field names. You get row['hostname'] instead of parts[2]. When someone adds a column to the CSV, your awk script breaks (every $N shifts by one). Your Python script keeps working because it uses names, not positions.

Writing CSVs

import csv

fieldnames = ['ip', 'request_count', 'error_count', 'error_rate']

with open('report.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    for ip, data in sorted(report.items()):
        writer.writerow({
            'ip': ip,
            'request_count': data['total'],
            'error_count': data['errors'],
            'error_rate': f"{data['errors']/data['total']*100:.1f}%",
        })

Gotcha: On Windows, csv.writer adds blank lines between rows unless you pass newline='' to open(). This is one of Python's most FAQ'd bugs. On Linux it's harmless but good practice anyway — your coworker will open the CSV on Windows eventually.

Handling TSV and Other Delimiters

# TSV (tab-separated)
reader = csv.DictReader(f, delimiter='\t')

# Pipe-separated (sometimes seen in telecom and legacy systems)
reader = csv.DictReader(f, delimiter='|')

# Handling quoted fields with commas inside them
# csv.reader handles this automatically — awk does not

That last point matters. A CSV field like "Atlanta, GA" is one field. awk -F',' splits it into two. The csv module handles quoting correctly because it implements RFC 4180.


Part 9: String Formatting for Reports

Your manager doesn't want to see raw data. They want a table they can paste into Slack.

f-string Alignment

# Right-align numbers, left-align strings
print(f"{'IP Address':<20} {'Requests':>10} {'Errors':>10} {'Error %':>10}")
print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*10}")

for ip, count in ip_counts.most_common(10):
    errors = error_counts.get(ip, 0)
    rate = errors / count * 100 if count > 0 else 0
    print(f"{ip:<20} {count:>10,} {errors:>10,} {rate:>9.1f}%")

Output:

IP Address             Requests     Errors    Error %
-------------------- ---------- ---------- ----------
10.0.1.47               847,231     12,847       1.5%
172.16.5.12             623,118      8,441       1.4%
10.0.3.88               541,002     31,204       5.8%   ← suspicious
192.168.1.100           412,876        287       0.1%

The format mini-language:

Specifier Meaning Example
<20 Left-align, width 20 "hello "
>10 Right-align, width 10 " hello"
>10, Right-align with comma separator " 847,231"
>9.1f Right-align float, 1 decimal " 1.5"
^20 Center-align, width 20 " hello "

JSON Output

When the report needs to be machine-readable:

import json

report = {
    'generated_at': datetime.now(timezone.utc).isoformat(),
    'period': '2026-03-16 to 2026-03-22',
    'top_ips': [
        {'ip': ip, 'requests': count}
        for ip, count in ip_counts.most_common(10)
    ],
    'errors_by_hour': dict(sorted(errors_by_hour.items())),
}

# Pretty-print for humans
print(json.dumps(report, indent=2))

# Compact for machines / APIs
with open('report.json', 'w') as f:
    json.dump(report, f)

Gotcha: json.dump() chokes on datetime objects. You must convert them to strings first (.isoformat() is standard). If you want automatic serialization, write a custom encoder:

class OpsEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

json.dumps(report, cls=OpsEncoder)

Part 10: The Full Pipeline — Putting It All Together

Time to solve the mission. This script processes all seven days of logs in a single pass and produces the three required outputs.

#!/usr/bin/env python3
"""Nginx log analyzer — weekly ops report."""

import gzip
import re
import csv
import json
import sys
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path

# --- Configuration ---
LOG_DIR = Path('/var/log/nginx')
REPORT_PATH = Path('weekly_report.json')

NGINX_PATTERN = re.compile(
    r'(?P<ip>\S+) \S+ (?P<user>\S+) '
    r'\[(?P<timestamp>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+) \S+" '
    r'(?P<status>\d{3}) (?P<size>\S+) '
    r'"(?P<referer>[^"]*)" '
    r'"(?P<user_agent>[^"]*)"'
)

TIME_FMT = "%d/%b/%Y:%H:%M:%S %z"

# --- Generator Pipeline ---

def read_logs(log_dir):
    """Read all gzipped log files chronologically."""
    for log_file in sorted(log_dir.glob('access.log.*.gz')):
        print(f"  Processing {log_file.name}...", file=sys.stderr)
        with gzip.open(log_file, 'rt', errors='replace') as f:
            yield from f

def parse_entries(lines):
    """Parse raw lines into dicts. Skip malformed lines."""
    unparsed = 0
    for line in lines:
        match = NGINX_PATTERN.match(line)
        if match:
            yield match.groupdict()
        else:
            unparsed += 1
    if unparsed:
        print(f"  Skipped {unparsed:,} unparseable lines", file=sys.stderr)

# --- Aggregation ---

def analyze(log_dir):
    """Single-pass analysis: top IPs, slowest endpoints, error rate by hour."""
    ip_counts       = Counter()
    endpoint_times  = defaultdict(list)  # path → [response_times]
    errors_by_hour  = Counter()
    total_by_hour   = Counter()
    status_counts   = Counter()

    for entry in parse_entries(read_logs(log_dir)):
        ip_counts[entry['ip']] += 1
        status_counts[entry['status']] += 1

        # Parse timestamp for hourly grouping
        try:
            dt = datetime.strptime(entry['timestamp'], TIME_FMT)
        except ValueError:
            continue
        hour_key = dt.strftime("%Y-%m-%d %H:00")
        total_by_hour[hour_key] += 1

        status = int(entry['status'])
        if status >= 400:
            errors_by_hour[hour_key] += 1

    return {
        'ip_counts': ip_counts,
        'errors_by_hour': errors_by_hour,
        'total_by_hour': total_by_hour,
        'status_counts': status_counts,
    }

# --- Reporting ---

def print_report(data):
    """Print a human-readable report to stdout."""
    ip_counts = data['ip_counts']
    errors_by_hour = data['errors_by_hour']
    total_by_hour = data['total_by_hour']

    print("\n=== TOP 10 CLIENT IPs ===\n")
    print(f"{'IP Address':<20} {'Requests':>12}")
    print(f"{'-'*20} {'-'*12}")
    for ip, count in ip_counts.most_common(10):
        print(f"{ip:<20} {count:>12,}")

    print("\n=== ERROR RATE BY HOUR ===\n")
    print(f"{'Hour':<20} {'Total':>10} {'Errors':>10} {'Rate':>8}")
    print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*8}")
    for hour in sorted(total_by_hour.keys()):
        total = total_by_hour[hour]
        errors = errors_by_hour.get(hour, 0)
        rate = errors / total * 100 if total else 0
        flag = " ←" if rate > 5.0 else ""
        print(f"{hour:<20} {total:>10,} {errors:>10,} {rate:>7.1f}%{flag}")

    print("\n=== STATUS CODE SUMMARY ===\n")
    for code, count in sorted(data['status_counts'].items()):
        print(f"  {code}: {count:>12,}")

# --- Main ---

if __name__ == '__main__':
    log_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else LOG_DIR
    print(f"Analyzing logs in {log_dir}...", file=sys.stderr)
    results = analyze(log_dir)
    print_report(results)
    print(f"\nTotal requests: {sum(results['ip_counts'].values()):,}")

Run it:

python3 analyze_logs.py /var/log/nginx

That's ~90 lines, handles arbitrary file sizes, runs in a single pass, and produces a clean report. The awk equivalent would be roughly the same length but harder to extend and debug.


Part 11: Performance — When Python Is and Isn't Enough

This is the honest conversation. Python is not always the fastest tool.

The Benchmark

For the task "count lines matching a pattern in a 1 GB file":

Tool Time Memory Notes
grep -c 'ERROR' ~2s ~1 MB C, memory-mapped, SIMD optimized
awk '/ERROR/{c++} END{print c}' ~5s ~1 MB C, compiled pattern
Python for line in f: if 'ERROR' in line ~12s ~1 MB CPython interpreter overhead
Python f.read().count('ERROR') ~4s ~1 GB Fast but memory-hungry
ripgrep -c 'ERROR' ~0.8s ~1 MB Rust, parallelized, SIMD

The Decision Framework

"I need to count one thing"           → grep or awk (seconds)
"I need to count + group + filter"    → Python (minutes, but one pass)
"I need to process 100 GB"            → Python generator pipeline (steady memory)
"I need to process 100 GB FAST"       → awk for simple, pandas for complex, or Rust
"I need a report I'll run every day"  → Python (maintainable, extensible, testable)

Mental Model: grep/awk are race cars — blindingly fast on a straight track. Python is a pickup truck — slower, but it can carry a full data pipeline, error handling, output formatting, and a test suite in the back. Don't race a pickup truck. Don't haul lumber in a race car.

When to Reach for pandas

You don't need pandas for most ops log analysis. But when you do:

  • Aggregating across multiple dimensions (group by hour AND endpoint AND status)
  • Time-series resampling (convert irregular timestamps to regular intervals)
  • Joining data from two different sources (correlate access logs with application logs)
  • Producing charts or statistical summaries

pandas loads data into memory. For a 1 GB CSV, expect 2–4 GB of RAM. If your data fits in memory and you need complex aggregation, pandas is worth the dependency. If it doesn't fit, stick with the generator pipeline pattern from Part 5.


Part 12: Parsing Other Log Formats

The generator pipeline works with any format. You just swap the parser.

JSON Structured Logs

Modern applications log JSON. Each line is a complete JSON object:

{"timestamp":"2026-03-22T15:32:01.847Z","level":"ERROR","service":"auth-api","message":"Token validation failed","request_id":"a8f3b2c1","client_ip":"10.0.1.47","path":"/api/v2/auth/verify","status":500,"duration_ms":234}
import json

def parse_json_logs(lines):
    """Parse JSON-structured log lines."""
    for line in lines:
        try:
            yield json.loads(line)
        except json.JSONDecodeError:
            continue  # skip malformed lines

No regex needed. Every field is already named and typed. This is why structured logging exists — it turns a parsing problem into a json.loads() call.

Syslog Format

Mar 22 15:32:01 web-prod-03 nginx[12847]: 10.0.1.47 - - [22/Mar/2026:15:32:01 +0000] ...
SYSLOG_PATTERN = re.compile(
    r'(?P<month>\w{3})\s+(?P<day>\d{1,2}) '
    r'(?P<time>\d{2}:\d{2}:\d{2}) '
    r'(?P<hostname>\S+) '
    r'(?P<program>\S+?)(?:\[(?P<pid>\d+)\])?: '
    r'(?P<message>.*)'
)

The Pattern: Same Pipeline, Different Parser

# Swap the parser based on the log format — the pipeline stays the same
if log_format == 'nginx':
    entries = parse_nginx(lines)
elif log_format == 'json':
    entries = parse_json_logs(lines)
elif log_format == 'syslog':
    entries = parse_syslog(lines)

# Everything downstream works identically
errors = filter_errors(entries)
timed  = add_parsed_time(entries)

This is the payoff of the generator pipeline pattern. Your processing logic doesn't care what format the data came from. It only cares about the dict that each parser yields.


Exercises

Exercise 1: Quick Win (2 minutes)

Open a Python REPL and try this:

from collections import Counter
words = "the quick brown fox jumps over the lazy brown dog the fox".split()
Counter(words).most_common(3)

Predict the output before you run it.

Answer
[('the', 3), ('brown', 2), ('fox', 2)]
`Counter` counted every word, `most_common(3)` returned the top 3 sorted by frequency. Ties (`brown` and `fox` both appear twice) are returned in insertion order.

Exercise 2: Parse and Count (10 minutes)

Given this sample log data (save it as sample.log):

10.0.1.47 - - [22/Mar/2026:15:32:01 +0000] "GET /api/v2/products HTTP/1.1" 200 1432 "-" "curl/7.68"
10.0.1.47 - - [22/Mar/2026:15:32:02 +0000] "GET /api/v2/products HTTP/1.1" 200 1432 "-" "curl/7.68"
172.16.5.12 - - [22/Mar/2026:15:32:03 +0000] "POST /api/v2/orders HTTP/1.1" 500 234 "-" "python-requests/2.28"
10.0.3.88 - - [22/Mar/2026:15:33:01 +0000] "GET /health HTTP/1.1" 200 2 "-" "kube-probe/1.24"
172.16.5.12 - - [22/Mar/2026:15:33:04 +0000] "POST /api/v2/orders HTTP/1.1" 502 0 "-" "python-requests/2.28"
10.0.1.47 - - [22/Mar/2026:16:01:00 +0000] "GET /api/v2/products HTTP/1.1" 404 89 "-" "curl/7.68"

Write a script that prints: 1. Top IP by request count 2. Number of 5xx errors 3. The hour with the most requests

Hint Use `Counter` for the IPs. For 5xx errors, check if `int(status) >= 500`. For the hour, use `datetime.strptime()` and `.strftime("%Y-%m-%d %H:00")` as the Counter key.
Solution
import re
from collections import Counter
from datetime import datetime

NGINX_PATTERN = re.compile(
    r'(?P<ip>\S+) \S+ (?P<user>\S+) '
    r'\[(?P<timestamp>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+) \S+" '
    r'(?P<status>\d{3}) (?P<size>\S+)'
)

ip_counts = Counter()
hour_counts = Counter()
error_5xx = 0

with open('sample.log') as f:
    for line in f:
        m = NGINX_PATTERN.match(line)
        if not m:
            continue
        ip_counts[m.group('ip')] += 1
        if int(m.group('status')) >= 500:
            error_5xx += 1
        dt = datetime.strptime(m.group('timestamp'), "%d/%b/%Y:%H:%M:%S %z")
        hour_counts[dt.strftime("%Y-%m-%d %H:00")] += 1

top_ip, top_count = ip_counts.most_common(1)[0]
top_hour, hour_count = hour_counts.most_common(1)[0]

print(f"Top IP: {top_ip} ({top_count} requests)")
print(f"5xx errors: {error_5xx}")
print(f"Busiest hour: {top_hour} ({hour_count} requests)")

Exercise 3: Build a Generator Pipeline (15 minutes)

Modify the Exercise 2 solution to use the generator pipeline pattern:

  1. read_lines(path) — yields lines from the file
  2. parse_entries(lines) — yields parsed dicts
  3. exclude_health_checks(entries) — filters out /health requests
  4. A main function that connects the pipeline and counts errors per unique endpoint

This should produce output showing that /api/v2/orders has a 100% error rate while /api/v2/products has a 33% error rate.

Solution
from collections import defaultdict

def read_lines(path):
    with open(path) as f:
        yield from f

def parse_entries(lines):
    for line in lines:
        m = NGINX_PATTERN.match(line)
        if m:
            yield m.groupdict()

def exclude_health_checks(entries):
    for entry in entries:
        if entry['path'] != '/health':
            yield entry

# Connect the pipeline
lines   = read_lines('sample.log')
entries = parse_entries(lines)
cleaned = exclude_health_checks(entries)

# Aggregate
endpoint_stats = defaultdict(lambda: {'total': 0, 'errors': 0})
for entry in cleaned:
    path = entry['path']
    endpoint_stats[path]['total'] += 1
    if int(entry['status']) >= 400:
        endpoint_stats[path]['errors'] += 1

for path, stats in sorted(endpoint_stats.items()):
    rate = stats['errors'] / stats['total'] * 100
    print(f"{path:<30} {stats['total']:>5} requests, "
          f"{stats['errors']:>5} errors ({rate:.0f}%)")

Cheat Sheet

Task Bash/awk Python
Count occurrences sort \| uniq -c \| sort -rn Counter(items).most_common()
Group by field awk '{a[$1]++} END{...}' defaultdict(list) or Counter
Parse CSV awk -F',' csv.DictReader(f)
Parse log line awk '{print $1, $9}' re.compile(pattern).match(line).group('name')
Parse timestamp date -d "string" +%s datetime.strptime(s, fmt)
Convert timezone TZ=US/Eastern date dt.astimezone(ZoneInfo("America/New_York"))
Read gzipped file zcat file.gz gzip.open(f, 'rt')
Process huge file awk '{...}' file for line in open(f): (generator pipeline)
Format number printf "%'d" 1000000 f"{1000000:,}"1,000,000
Write CSV echo "$a,$b" (breaks on commas in data) csv.DictWriter (RFC 4180 compliant)
JSON output jq -n '{...}' (painful for dynamic data) json.dumps(obj, indent=2)
Top N items sort -rn \| head -N Counter.most_common(N)
Sort by field sort -t, -k3 -rn sorted(data, key=itemgetter('field'))

Key imports:

from collections import Counter, defaultdict, namedtuple
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo          # Python 3.9+
from itertools import groupby
from operator import itemgetter
from pathlib import Path
import csv, gzip, json, re, sys

Takeaways

  1. Read files line by line, never with .readlines() or .read(). Generator pipelines process files of any size using constant memory.

  2. Counter replaces sort | uniq -c | sort -rn and does it in one pass. It's the single most useful class for ops data wrangling.

  3. Named regex groups ((?P<name>...)) make log parsing maintainable. When the format changes, you update one pattern, not every $N reference in your script.

  4. Always use datetime.now(timezone.utc) for aware datetimes. Mixing naive and aware datetimes is a TypeError waiting to happen.

  5. Python is 5–10x slower than grep/awk for simple searches — and that's fine. The moment you need to count, group, filter, and format in the same script, Python's maintainability wins.

  6. The generator pipeline pattern is Python's answer to Unix pipes. Each stage yields one item at a time. Swap the parser to handle any log format. The downstream logic stays the same.