Python: Data Wrangling for Ops
- lesson
- python
- log-parsing
- csv
- regex
- datetime
- generators
- collections
- data-pipelines
- awk-replacement ---# Python Data Wrangling for Ops
Topics: Python, log parsing, csv, regex, datetime, generators, collections, data pipelines, awk replacement Strategy: Build-up + parallel (Bash equivalent shown alongside Python replacement) Level: L1–L2 (Foundations → Operations) Time: 90–120 minutes Prerequisites: None (but you'll get more from this if you've parsed logs in Bash and felt the pain)
The Mission¶
Your team runs 12 Nginx reverse proxies behind an AWS NLB. The observability stack (Datadog) is down for maintenance until Monday. Your manager needs a report by end of day:
- Top 10 client IPs by request count
- Slowest 20 endpoints by average response time
- Error rate by hour (4xx and 5xx) for the last 7 days
You have the access logs on a jump box: seven gzipped files, one per day, about 50 million lines total. Each file is 800 MB uncompressed.
You could do this in awk. You've done it in awk. But the last time you did it, the
one-liner became a 90-line awk script with three associative arrays, a datetime parser built
from substr() calls, and a bug where requests at midnight got counted in the wrong hour.
This time you're going to do it in Python. By the end of this lesson, you'll have a reusable log analysis pipeline that handles any format, any size, and produces clean output you can paste into Slack or pipe into a CSV.
Part 1: Reading Files Without Killing the Server¶
The logs are 800 MB each. Your jump box has 4 GB of RAM. Let's start with what NOT to do.
The Memory Bomb¶
# DO NOT DO THIS
with open('access.log') as f:
lines = f.readlines() # loads 800 MB into RAM
# plus Python object overhead: each line is a str object
# actual memory: ~2.5 GB for an 800 MB file
Under the Hood: Python strings are objects. Each one carries a reference count, a type pointer, a length, a hash cache, and the actual character data. A 200-byte log line costs about 250 bytes as a Python object. For 7 million lines, that overhead alone is ~350 MB on top of the raw data.
f.readlines()creates all those objects at once.
The Right Way: Line by Line¶
count = 0
with open('access.log') as f:
for line in f: # reads one line at a time
if '500' in line:
count += 1
print(f"500 errors: {count}")
Memory usage: roughly one line in RAM at a time. You could process a 100 GB file on a Raspberry Pi.
The Bash Comparison¶
# Bash — fast but limited
grep -c '500' access.log
# The Python version above is ~5x slower than grep for this exact task.
# But grep can't do: "count 500s per hour, grouped by endpoint, excluding health checks"
# That's where Python pays for itself.
Handling Gzipped Files¶
Your logs are gzipped. Python handles this natively:
import gzip
with gzip.open('access.log.gz', 'rt') as f: # 'rt' = read as text
for line in f:
process(line)
The 'rt' mode matters. Without it you get bytes, not strings, and every comparison
fails silently. Ask me how I know.
Gotcha:
gzip.open()with'rt'decodes the bytes to strings on the fly. If your log has non-UTF-8 bytes (binary data in POST bodies, corrupted lines), adderrors='replace'to avoidUnicodeDecodeErrorcrashing your script at line 4,287,331 of a 7-million-line file.
Processing Multiple Files¶
import gzip
from pathlib import Path
log_dir = Path('/var/log/nginx')
for log_file in sorted(log_dir.glob('access.log.*.gz')):
with gzip.open(log_file, 'rt', errors='replace') as f:
for line in f:
process(line)
Flashcard Check #1¶
| Question | Answer |
|---|---|
Why is f.readlines() dangerous on large files? |
It loads the entire file into memory at once. Python object overhead makes it ~3x the file size in RAM. |
What mode string do you pass to gzip.open() for text? |
'rt' — read as text. Without the t, you get raw bytes. |
What does errors='replace' do when opening a file? |
Replaces undecodable bytes with the Unicode replacement character instead of raising UnicodeDecodeError. |
Part 2: collections — The Ops Power Tools¶
The collections module is the single biggest productivity jump when you move from Bash
to Python for data processing. Three tools do 90% of the work.
Counter: The Awk Killer¶
Here's the awk way to count IPs:
Four commands piped together. Works fine. Now here's Python:
from collections import Counter
ip_counts = Counter()
with open('access.log') as f:
for line in f:
ip = line.split()[0]
ip_counts[ip] += 1
for ip, count in ip_counts.most_common(10):
print(f"{count:>8} {ip}")
Mental Model: A
Counteris a dict where every key starts at zero. You never need to check "does this key exist?" before incrementing.counter[key] += 1just works, even ifkeyhas never been seen. This eliminates the most common bug in awk associative arrays and Python dicts alike.
The awk version pipes through sort twice (once alphabetically for uniq, once numerically
for ranking). The Python version sorts once, internally, and gives you the top N directly.
But the real win comes when you need to count two things at once:
ip_counts = Counter()
status_counts = Counter()
endpoint_counts = Counter()
with open('access.log') as f:
for line in f:
parts = line.split()
ip_counts[parts[0]] += 1
status_counts[parts[8]] += 1
endpoint_counts[parts[6]] += 1
One pass through the file. Three separate aggregations. In awk, you'd need three
associative arrays and a longer END block. In Bash, you'd need three separate pipelines
(three passes through an 800 MB file).
Trivia:
Counterwas added in Python 2.7 (2010) by Raymond Hettinger, who also createdOrderedDict,namedtuple, and the@functools.lru_cachedecorator. Hettinger is responsible for more of the "Python feels nice to use" experience than perhaps anyone other than Guido van Rossum himself.
defaultdict: The "I Don't Want to Check if the Key Exists" Dict¶
from collections import defaultdict
# Group log lines by status code
lines_by_status = defaultdict(list)
with open('access.log') as f:
for line in f:
status = line.split()[8]
lines_by_status[status].append(line.rstrip())
# Now you have: {'200': [...], '404': [...], '500': [...]}
print(f"Unique 500 errors: {len(lines_by_status['500'])}")
Without defaultdict, you'd write:
That if check on every single line of a 50-million-line file adds up — in code noise,
in bugs, and in CPU cycles.
| defaultdict argument | Creates | Use case |
|---|---|---|
defaultdict(list) |
Empty list [] |
Grouping items |
defaultdict(int) |
Zero 0 |
Counting (but use Counter instead) |
defaultdict(set) |
Empty set set() |
Unique grouping |
defaultdict(dict) |
Empty dict {} |
Nested grouping |
namedtuple: Give Your Data a Skeleton¶
When you parse a log line, you get a tuple of strings. Was parts[6] the URL or the
status code? Nobody remembers. namedtuple fixes that:
from collections import namedtuple
LogEntry = namedtuple('LogEntry', [
'ip', 'ident', 'user', 'timestamp', 'request',
'status', 'size', 'referer', 'user_agent'
])
Now every parsed line has named fields. entry.ip instead of parts[0]. entry.status
instead of parts[8]. Your code reads like English, and you'll never mix up field indices
again.
Name Origin:
namedtuplewas inspired by the concept of record types in languages like Pascal and C structs. It gives you the memory efficiency of a tuple (no per-instance__dict__) with the readability of a class. For millions of log entries, this matters — a namedtuple uses roughly 72 bytes per instance vs. ~200+ bytes for a regular class.
Part 3: Regular Expressions — Parsing Real Log Formats¶
Splitting on whitespace breaks the moment a log line has quoted strings. Nginx Combined Log Format:
10.0.1.47 - admin [22/Mar/2026:15:32:01 +0000] "GET /api/v2/products?page=3 HTTP/1.1" 200 1432 "https://app.example.com/dashboard" "Mozilla/5.0 (X11; Linux x86_64)"
line.split() turns this into 16 fields because the quoted strings get split. You need
regex.
The Nginx Log Parser¶
import re
# Compile once, use millions of times
NGINX_PATTERN = re.compile(
r'(?P<ip>\S+) \S+ (?P<user>\S+) '
r'\[(?P<timestamp>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d{3}) (?P<size>\S+) '
r'"(?P<referer>[^"]*)" '
r'"(?P<user_agent>[^"]*)"'
)
line = '10.0.1.47 - admin [22/Mar/2026:15:32:01 +0000] "GET /api/v2/products?page=3 HTTP/1.1" 200 1432 "https://app.example.com/dashboard" "Mozilla/5.0 (X11; Linux x86_64)"'
match = NGINX_PATTERN.match(line)
if match:
print(match.group('ip')) # 10.0.1.47
print(match.group('status')) # 200
print(match.group('path')) # /api/v2/products?page=3
print(match.group('method')) # GET
Break down the regex pattern piece by piece:
| Pattern fragment | Matches | Named group |
|---|---|---|
(?P<ip>\S+) |
Non-whitespace (the IP) | ip |
\[(?P<timestamp>[^\]]+)\] |
Everything inside [...] |
timestamp |
"(?P<method>\S+) (?P<path>\S+) \S+" |
HTTP method, path, protocol | method, path |
(?P<status>\d{3}) |
Three digits | status |
"(?P<user_agent>[^"]*)" |
Everything inside quotes | user_agent |
Remember:
(?P<name>...)is Python's named group syntax. ThePstands for "Python" — this extension was added by Python's regex engine and later adopted by other languages. Named groups let you access matches by name instead of index number, which means your code won't break when the log format changes.
Compiled vs. Uncompiled¶
# SLOW: recompiles the pattern on every call
for line in million_lines:
match = re.match(r'(?P<ip>\S+)', line)
# FAST: compile once, match millions of times
pattern = re.compile(r'(?P<ip>\S+)')
for line in million_lines:
match = pattern.match(line)
Under the Hood:
re.compile()converts the regex string into a bytecode program that the regex engine can execute directly. Without compiling,re.match()has to parse the pattern string, build the bytecode, and cache it on every call. Python does cache the last few patterns internally, but in a loop over millions of lines, the cache lookup overhead alone is measurable. Compiling explicitly is free performance.
search vs. match vs. findall¶
| Function | Behavior | Use when |
|---|---|---|
re.match(pattern, string) |
Anchored to start of string | Parsing structured lines |
re.search(pattern, string) |
Finds first match anywhere | Searching within text |
re.findall(pattern, string) |
Returns all matches as list | Extracting multiple values |
re.finditer(pattern, string) |
Returns match objects lazily | Memory-efficient extraction |
# Extract all IPs from a firewall log (multiple IPs per line)
line = "src=10.0.1.5 dst=172.16.0.1 sport=443 dport=52341"
ips = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', line)
# ['10.0.1.5', '172.16.0.1']
Flashcard Check #2¶
| Question | Answer |
|---|---|
What does Counter.most_common(10) return? |
A list of (element, count) tuples, sorted by count descending, limited to the top 10. |
What's the advantage of re.compile() in a loop? |
It converts the pattern to bytecode once. Without it, the pattern is re-parsed on every call, adding overhead across millions of iterations. |
What does (?P<name>...) do in a regex? |
Creates a named capture group. Access with match.group('name') instead of match.group(1). |
defaultdict(list) — what happens when you access a missing key? |
It automatically creates an empty list [] for that key. No KeyError, no if key not in dict check needed. |
Part 4: datetime — The Timezone Nightmare¶
Nginx timestamps look like this: 22/Mar/2026:15:32:01 +0000. You need to parse them,
do time math (which hour does this request belong to?), and handle the fact that your
servers might log in UTC but your manager thinks in US/Eastern.
Parsing Timestamps¶
from datetime import datetime
raw = "22/Mar/2026:15:32:01 +0000"
dt = datetime.strptime(raw, "%d/%b/%Y:%H:%M:%S %z")
# datetime(2026, 3, 22, 15, 32, 1, tzinfo=timezone.utc)
The format codes you'll use constantly:
| Code | Meaning | Example |
|---|---|---|
%d |
Day of month (zero-padded) | 22 |
%b |
Abbreviated month name | Mar |
%Y |
Four-digit year | 2026 |
%H:%M:%S |
24-hour time | 15:32:01 |
%z |
UTC offset | +0000 |
%Z |
Timezone name | UTC |
Gotcha:
%bdepends on the system locale. On a server withLANG=de_DE.UTF-8, March isMär, notMar. Your log parser works on your laptop and breaks on the German team's server. Fix: setlocale.setlocale(locale.LC_TIME, 'C')at the top of your script, or use a locale-independent parser.
Grouping by Hour¶
This is the core of the "error rate by hour" requirement:
from collections import Counter
from datetime import datetime
errors_by_hour = Counter()
with open('access.log') as f:
for line in f:
match = NGINX_PATTERN.match(line)
if not match:
continue
status = int(match.group('status'))
if status >= 400:
dt = datetime.strptime(match.group('timestamp'),
"%d/%b/%Y:%H:%M:%S %z")
hour_key = dt.strftime("%Y-%m-%d %H:00")
errors_by_hour[hour_key] += 1
for hour, count in sorted(errors_by_hour.items()):
print(f"{hour} {count:>6} errors")
Output:
2026-03-16 00:00 142 errors
2026-03-16 01:00 87 errors
2026-03-16 02:00 63 errors
...
2026-03-22 15:00 1847 errors ← the spike
Timezone Conversion¶
from datetime import datetime, timezone
from zoneinfo import ZoneInfo # Python 3.9+
# Parse as UTC
dt_utc = datetime.strptime("22/Mar/2026:15:32:01 +0000",
"%d/%b/%Y:%H:%M:%S %z")
# Convert to US/Eastern
eastern = ZoneInfo("America/New_York")
dt_eastern = dt_utc.astimezone(eastern)
print(dt_eastern) # 2026-03-22 11:32:01-04:00 (EDT)
Under the Hood:
zoneinfo(Python 3.9+) reads the IANA timezone database directly from the operating system's/usr/share/zoneinfo/directory — the same database that powers theTZenvironment variable and thedatecommand. On minimal Docker images that strip timezone data, install thetzdataPyPI package as a fallback.
Time Math¶
How long between two requests? How many seconds since the last error?
from datetime import timedelta
dt1 = datetime.strptime("22/Mar/2026:15:32:01 +0000", "%d/%b/%Y:%H:%M:%S %z")
dt2 = datetime.strptime("22/Mar/2026:15:32:47 +0000", "%d/%b/%Y:%H:%M:%S %z")
delta = dt2 - dt1
print(delta) # 0:00:46
print(delta.total_seconds()) # 46.0
# Is this request older than 7 days?
age = datetime.now(timezone.utc) - dt1
if age > timedelta(days=7):
print("Outside our reporting window")
Gotcha:
datetime.now()returns a naive datetime (no timezone). You cannot subtract a naive datetime from an aware datetime — Python raisesTypeError. Always usedatetime.now(timezone.utc)when comparing with timezone-aware timestamps. This catches more people than any other datetime bug.
Part 5: Generators — Processing Files Bigger Than RAM¶
You have 7 days of logs. Total: ~50 million lines, ~5.6 GB uncompressed. You cannot load this into memory. Generators let you build a processing pipeline where data flows through, one line at a time, like a Unix pipe.
The Pipeline Mental Model¶
Unix: cat *.log | grep -v healthcheck | awk '{print $1}' | sort | uniq -c
Python: read_lines() → filter_lines() → extract_field() → count()
Each stage is a generator. Each yields one item at a time. Nothing accumulates in memory unless you explicitly collect it.
Building a Generator Pipeline¶
import gzip
from pathlib import Path
def read_logs(log_dir):
"""Stage 1: Read all gzipped log files, yielding one line at a time."""
for log_file in sorted(Path(log_dir).glob('access.log.*.gz')):
with gzip.open(log_file, 'rt', errors='replace') as f:
yield from f # yields each line without loading the file
def parse_lines(lines):
"""Stage 2: Parse each line into a named dict. Skip unparseable lines."""
for line in lines:
match = NGINX_PATTERN.match(line)
if match:
yield match.groupdict()
def filter_errors(entries):
"""Stage 3: Keep only 4xx and 5xx responses."""
for entry in entries:
status = int(entry['status'])
if status >= 400:
yield entry
def add_parsed_time(entries):
"""Stage 4: Parse timestamp string into a datetime object."""
for entry in entries:
entry['dt'] = datetime.strptime(
entry['timestamp'], "%d/%b/%Y:%H:%M:%S %z"
)
yield entry
Now connect them:
# The pipeline — reads 5.6 GB of data using ~10 MB of RAM
lines = read_logs('/var/log/nginx')
entries = parse_lines(lines)
errors = filter_errors(entries)
timed = add_parsed_time(errors)
errors_by_hour = Counter()
for entry in timed:
hour_key = entry['dt'].strftime("%Y-%m-%d %H:00")
errors_by_hour[hour_key] += 1
Mental Model: Think of generators like a conveyor belt in a factory. Each stage does one thing to each item and passes it to the next stage. No warehouse between stages. The item enters raw material (a text line), gets stamped (parsed), gets inspected (filtered), and exits as a finished product (aggregated count). At any given moment, only one item is on the belt.
yield from¶
The yield from in read_logs is worth understanding:
# These two are equivalent:
def read_file(path):
with open(path) as f:
for line in f:
yield line
def read_file(path):
with open(path) as f:
yield from f # delegates to f's iterator
yield from is cleaner and slightly faster — it avoids the Python-level for loop
overhead by delegating directly to the underlying iterator.
Flashcard Check #3¶
| Question | Answer |
|---|---|
| What's the memory usage of a generator pipeline processing a 5 GB file? | Roughly the size of one line plus the state of each generator. Typically under 10 MB regardless of file size. |
What does yield from f do? |
Delegates to f's iterator, yielding each item. Equivalent to for item in f: yield item but faster. |
| Why can't you subtract a naive datetime from an aware datetime? | Python raises TypeError. A naive datetime has no timezone info, so the subtraction is ambiguous. Use datetime.now(timezone.utc) for aware comparisons. |
Part 6: Comprehensions — The One-Liner Power Move¶
List and dict comprehensions replace simple for loops with a single expression. For ops
work, they're how you filter, transform, and reshape data in one shot.
List Comprehensions¶
# The loop way
error_ips = []
for entry in parsed_entries:
if int(entry['status']) >= 500:
error_ips.append(entry['ip'])
# The comprehension way
error_ips = [e['ip'] for e in parsed_entries if int(e['status']) >= 500]
Same result, one line instead of four. But don't abuse it:
# Too much in one comprehension — unreadable
result = [f"{e['ip']}:{e['status']}" for e in [NGINX_PATTERN.match(l).groupdict() for l in open('access.log') if NGINX_PATTERN.match(l)] if int(e['status']) >= 500]
# If it doesn't fit your brain in 3 seconds, use a loop.
Dict Comprehensions¶
# Flip a Counter to find IPs with more than 1000 requests
heavy_hitters = {ip: count for ip, count in ip_counts.items() if count > 1000}
# Build a lookup table: status code → description
STATUS_NAMES = {
200: 'OK', 301: 'Moved', 302: 'Found',
400: 'Bad Request', 401: 'Unauthorized', 403: 'Forbidden',
404: 'Not Found', 500: 'Internal Server Error', 502: 'Bad Gateway',
503: 'Service Unavailable', 504: 'Gateway Timeout',
}
Generator Expressions (Lazy Comprehensions)¶
# List comprehension: builds the whole list in memory
total_bytes = sum([int(e['size']) for e in parsed_entries])
# Generator expression: processes one at a time
total_bytes = sum(int(e['size']) for e in parsed_entries)
Drop the square brackets and you get a generator expression — lazy, memory-efficient,
and identical in output. Use generator expressions inside sum(), max(), min(),
any(), and all().
Part 7: Sorting and Grouping¶
sorted() with Key Functions¶
# Sort entries by response time (slowest first)
# Assume response_time is in the last field of your custom log format
sorted_entries = sorted(parsed_entries,
key=lambda e: float(e['response_time']),
reverse=True)
# Top 20 slowest endpoints
for entry in sorted_entries[:20]:
print(f"{float(entry['response_time']):>8.3f}s "
f"{entry['method']} {entry['path']}")
operator.itemgetter — Faster Than Lambda¶
from operator import itemgetter
# These are equivalent, but itemgetter is ~20% faster in tight loops:
sorted(entries, key=lambda e: e['status'])
sorted(entries, key=itemgetter('status'))
# Multi-key sorting: sort by status, then by response time
sorted(entries, key=itemgetter('status', 'response_time'))
Under the Hood:
itemgetteris implemented in C. Alambdais a Python function that goes through the full Python function call protocol on every invocation. For sorting 50 million entries, that overhead difference is measured in seconds.
itertools.groupby — The awk END Block Replacement¶
from itertools import groupby
from operator import itemgetter
# Entries must be sorted by the grouping key first
sorted_by_status = sorted(parsed_entries, key=itemgetter('status'))
for status, group in groupby(sorted_by_status, key=itemgetter('status')):
entries_list = list(group)
print(f"Status {status}: {len(entries_list)} requests")
Gotcha:
groupbyonly groups consecutive items with the same key. If your data isn't sorted by the grouping key first, you'll get multiple groups for the same value. This is the #1groupbymistake. It works like the Unixuniqcommand — it needs sorted input.
Part 8: The csv Module — Replacing awk -F','¶
Reading CSVs¶
import csv
# Python: read as named fields
with open('servers.csv') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['hostname']) # named fields, not $3
DictReader uses the first row as field names. You get row['hostname'] instead of
parts[2]. When someone adds a column to the CSV, your awk script breaks (every $N
shifts by one). Your Python script keeps working because it uses names, not positions.
Writing CSVs¶
import csv
fieldnames = ['ip', 'request_count', 'error_count', 'error_rate']
with open('report.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for ip, data in sorted(report.items()):
writer.writerow({
'ip': ip,
'request_count': data['total'],
'error_count': data['errors'],
'error_rate': f"{data['errors']/data['total']*100:.1f}%",
})
Gotcha: On Windows,
csv.writeradds blank lines between rows unless you passnewline=''toopen(). This is one of Python's most FAQ'd bugs. On Linux it's harmless but good practice anyway — your coworker will open the CSV on Windows eventually.
Handling TSV and Other Delimiters¶
# TSV (tab-separated)
reader = csv.DictReader(f, delimiter='\t')
# Pipe-separated (sometimes seen in telecom and legacy systems)
reader = csv.DictReader(f, delimiter='|')
# Handling quoted fields with commas inside them
# csv.reader handles this automatically — awk does not
That last point matters. A CSV field like "Atlanta, GA" is one field. awk -F',' splits
it into two. The csv module handles quoting correctly because it implements RFC 4180.
Part 9: String Formatting for Reports¶
Your manager doesn't want to see raw data. They want a table they can paste into Slack.
f-string Alignment¶
# Right-align numbers, left-align strings
print(f"{'IP Address':<20} {'Requests':>10} {'Errors':>10} {'Error %':>10}")
print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*10}")
for ip, count in ip_counts.most_common(10):
errors = error_counts.get(ip, 0)
rate = errors / count * 100 if count > 0 else 0
print(f"{ip:<20} {count:>10,} {errors:>10,} {rate:>9.1f}%")
Output:
IP Address Requests Errors Error %
-------------------- ---------- ---------- ----------
10.0.1.47 847,231 12,847 1.5%
172.16.5.12 623,118 8,441 1.4%
10.0.3.88 541,002 31,204 5.8% ← suspicious
192.168.1.100 412,876 287 0.1%
The format mini-language:
| Specifier | Meaning | Example |
|---|---|---|
<20 |
Left-align, width 20 | "hello " |
>10 |
Right-align, width 10 | " hello" |
>10, |
Right-align with comma separator | " 847,231" |
>9.1f |
Right-align float, 1 decimal | " 1.5" |
^20 |
Center-align, width 20 | " hello " |
JSON Output¶
When the report needs to be machine-readable:
import json
report = {
'generated_at': datetime.now(timezone.utc).isoformat(),
'period': '2026-03-16 to 2026-03-22',
'top_ips': [
{'ip': ip, 'requests': count}
for ip, count in ip_counts.most_common(10)
],
'errors_by_hour': dict(sorted(errors_by_hour.items())),
}
# Pretty-print for humans
print(json.dumps(report, indent=2))
# Compact for machines / APIs
with open('report.json', 'w') as f:
json.dump(report, f)
Gotcha:
json.dump()chokes ondatetimeobjects. You must convert them to strings first (.isoformat()is standard). If you want automatic serialization, write a custom encoder:
Part 10: The Full Pipeline — Putting It All Together¶
Time to solve the mission. This script processes all seven days of logs in a single pass and produces the three required outputs.
#!/usr/bin/env python3
"""Nginx log analyzer — weekly ops report."""
import gzip
import re
import csv
import json
import sys
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
# --- Configuration ---
LOG_DIR = Path('/var/log/nginx')
REPORT_PATH = Path('weekly_report.json')
NGINX_PATTERN = re.compile(
r'(?P<ip>\S+) \S+ (?P<user>\S+) '
r'\[(?P<timestamp>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d{3}) (?P<size>\S+) '
r'"(?P<referer>[^"]*)" '
r'"(?P<user_agent>[^"]*)"'
)
TIME_FMT = "%d/%b/%Y:%H:%M:%S %z"
# --- Generator Pipeline ---
def read_logs(log_dir):
"""Read all gzipped log files chronologically."""
for log_file in sorted(log_dir.glob('access.log.*.gz')):
print(f" Processing {log_file.name}...", file=sys.stderr)
with gzip.open(log_file, 'rt', errors='replace') as f:
yield from f
def parse_entries(lines):
"""Parse raw lines into dicts. Skip malformed lines."""
unparsed = 0
for line in lines:
match = NGINX_PATTERN.match(line)
if match:
yield match.groupdict()
else:
unparsed += 1
if unparsed:
print(f" Skipped {unparsed:,} unparseable lines", file=sys.stderr)
# --- Aggregation ---
def analyze(log_dir):
"""Single-pass analysis: top IPs, slowest endpoints, error rate by hour."""
ip_counts = Counter()
endpoint_times = defaultdict(list) # path → [response_times]
errors_by_hour = Counter()
total_by_hour = Counter()
status_counts = Counter()
for entry in parse_entries(read_logs(log_dir)):
ip_counts[entry['ip']] += 1
status_counts[entry['status']] += 1
# Parse timestamp for hourly grouping
try:
dt = datetime.strptime(entry['timestamp'], TIME_FMT)
except ValueError:
continue
hour_key = dt.strftime("%Y-%m-%d %H:00")
total_by_hour[hour_key] += 1
status = int(entry['status'])
if status >= 400:
errors_by_hour[hour_key] += 1
return {
'ip_counts': ip_counts,
'errors_by_hour': errors_by_hour,
'total_by_hour': total_by_hour,
'status_counts': status_counts,
}
# --- Reporting ---
def print_report(data):
"""Print a human-readable report to stdout."""
ip_counts = data['ip_counts']
errors_by_hour = data['errors_by_hour']
total_by_hour = data['total_by_hour']
print("\n=== TOP 10 CLIENT IPs ===\n")
print(f"{'IP Address':<20} {'Requests':>12}")
print(f"{'-'*20} {'-'*12}")
for ip, count in ip_counts.most_common(10):
print(f"{ip:<20} {count:>12,}")
print("\n=== ERROR RATE BY HOUR ===\n")
print(f"{'Hour':<20} {'Total':>10} {'Errors':>10} {'Rate':>8}")
print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*8}")
for hour in sorted(total_by_hour.keys()):
total = total_by_hour[hour]
errors = errors_by_hour.get(hour, 0)
rate = errors / total * 100 if total else 0
flag = " ←" if rate > 5.0 else ""
print(f"{hour:<20} {total:>10,} {errors:>10,} {rate:>7.1f}%{flag}")
print("\n=== STATUS CODE SUMMARY ===\n")
for code, count in sorted(data['status_counts'].items()):
print(f" {code}: {count:>12,}")
# --- Main ---
if __name__ == '__main__':
log_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else LOG_DIR
print(f"Analyzing logs in {log_dir}...", file=sys.stderr)
results = analyze(log_dir)
print_report(results)
print(f"\nTotal requests: {sum(results['ip_counts'].values()):,}")
Run it:
That's ~90 lines, handles arbitrary file sizes, runs in a single pass, and produces a clean report. The awk equivalent would be roughly the same length but harder to extend and debug.
Part 11: Performance — When Python Is and Isn't Enough¶
This is the honest conversation. Python is not always the fastest tool.
The Benchmark¶
For the task "count lines matching a pattern in a 1 GB file":
| Tool | Time | Memory | Notes |
|---|---|---|---|
grep -c 'ERROR' |
~2s | ~1 MB | C, memory-mapped, SIMD optimized |
awk '/ERROR/{c++} END{print c}' |
~5s | ~1 MB | C, compiled pattern |
Python for line in f: if 'ERROR' in line |
~12s | ~1 MB | CPython interpreter overhead |
Python f.read().count('ERROR') |
~4s | ~1 GB | Fast but memory-hungry |
ripgrep -c 'ERROR' |
~0.8s | ~1 MB | Rust, parallelized, SIMD |
The Decision Framework¶
"I need to count one thing" → grep or awk (seconds)
"I need to count + group + filter" → Python (minutes, but one pass)
"I need to process 100 GB" → Python generator pipeline (steady memory)
"I need to process 100 GB FAST" → awk for simple, pandas for complex, or Rust
"I need a report I'll run every day" → Python (maintainable, extensible, testable)
Mental Model: grep/awk are race cars — blindingly fast on a straight track. Python is a pickup truck — slower, but it can carry a full data pipeline, error handling, output formatting, and a test suite in the back. Don't race a pickup truck. Don't haul lumber in a race car.
When to Reach for pandas¶
You don't need pandas for most ops log analysis. But when you do:
- Aggregating across multiple dimensions (group by hour AND endpoint AND status)
- Time-series resampling (convert irregular timestamps to regular intervals)
- Joining data from two different sources (correlate access logs with application logs)
- Producing charts or statistical summaries
pandas loads data into memory. For a 1 GB CSV, expect 2–4 GB of RAM. If your data fits in memory and you need complex aggregation, pandas is worth the dependency. If it doesn't fit, stick with the generator pipeline pattern from Part 5.
Part 12: Parsing Other Log Formats¶
The generator pipeline works with any format. You just swap the parser.
JSON Structured Logs¶
Modern applications log JSON. Each line is a complete JSON object:
{"timestamp":"2026-03-22T15:32:01.847Z","level":"ERROR","service":"auth-api","message":"Token validation failed","request_id":"a8f3b2c1","client_ip":"10.0.1.47","path":"/api/v2/auth/verify","status":500,"duration_ms":234}
import json
def parse_json_logs(lines):
"""Parse JSON-structured log lines."""
for line in lines:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue # skip malformed lines
No regex needed. Every field is already named and typed. This is why structured logging
exists — it turns a parsing problem into a json.loads() call.
Syslog Format¶
SYSLOG_PATTERN = re.compile(
r'(?P<month>\w{3})\s+(?P<day>\d{1,2}) '
r'(?P<time>\d{2}:\d{2}:\d{2}) '
r'(?P<hostname>\S+) '
r'(?P<program>\S+?)(?:\[(?P<pid>\d+)\])?: '
r'(?P<message>.*)'
)
The Pattern: Same Pipeline, Different Parser¶
# Swap the parser based on the log format — the pipeline stays the same
if log_format == 'nginx':
entries = parse_nginx(lines)
elif log_format == 'json':
entries = parse_json_logs(lines)
elif log_format == 'syslog':
entries = parse_syslog(lines)
# Everything downstream works identically
errors = filter_errors(entries)
timed = add_parsed_time(entries)
This is the payoff of the generator pipeline pattern. Your processing logic doesn't care what format the data came from. It only cares about the dict that each parser yields.
Exercises¶
Exercise 1: Quick Win (2 minutes)¶
Open a Python REPL and try this:
from collections import Counter
words = "the quick brown fox jumps over the lazy brown dog the fox".split()
Counter(words).most_common(3)
Predict the output before you run it.
Answer
`Counter` counted every word, `most_common(3)` returned the top 3 sorted by frequency. Ties (`brown` and `fox` both appear twice) are returned in insertion order.Exercise 2: Parse and Count (10 minutes)¶
Given this sample log data (save it as sample.log):
10.0.1.47 - - [22/Mar/2026:15:32:01 +0000] "GET /api/v2/products HTTP/1.1" 200 1432 "-" "curl/7.68"
10.0.1.47 - - [22/Mar/2026:15:32:02 +0000] "GET /api/v2/products HTTP/1.1" 200 1432 "-" "curl/7.68"
172.16.5.12 - - [22/Mar/2026:15:32:03 +0000] "POST /api/v2/orders HTTP/1.1" 500 234 "-" "python-requests/2.28"
10.0.3.88 - - [22/Mar/2026:15:33:01 +0000] "GET /health HTTP/1.1" 200 2 "-" "kube-probe/1.24"
172.16.5.12 - - [22/Mar/2026:15:33:04 +0000] "POST /api/v2/orders HTTP/1.1" 502 0 "-" "python-requests/2.28"
10.0.1.47 - - [22/Mar/2026:16:01:00 +0000] "GET /api/v2/products HTTP/1.1" 404 89 "-" "curl/7.68"
Write a script that prints: 1. Top IP by request count 2. Number of 5xx errors 3. The hour with the most requests
Hint
Use `Counter` for the IPs. For 5xx errors, check if `int(status) >= 500`. For the hour, use `datetime.strptime()` and `.strftime("%Y-%m-%d %H:00")` as the Counter key.Solution
import re
from collections import Counter
from datetime import datetime
NGINX_PATTERN = re.compile(
r'(?P<ip>\S+) \S+ (?P<user>\S+) '
r'\[(?P<timestamp>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d{3}) (?P<size>\S+)'
)
ip_counts = Counter()
hour_counts = Counter()
error_5xx = 0
with open('sample.log') as f:
for line in f:
m = NGINX_PATTERN.match(line)
if not m:
continue
ip_counts[m.group('ip')] += 1
if int(m.group('status')) >= 500:
error_5xx += 1
dt = datetime.strptime(m.group('timestamp'), "%d/%b/%Y:%H:%M:%S %z")
hour_counts[dt.strftime("%Y-%m-%d %H:00")] += 1
top_ip, top_count = ip_counts.most_common(1)[0]
top_hour, hour_count = hour_counts.most_common(1)[0]
print(f"Top IP: {top_ip} ({top_count} requests)")
print(f"5xx errors: {error_5xx}")
print(f"Busiest hour: {top_hour} ({hour_count} requests)")
Exercise 3: Build a Generator Pipeline (15 minutes)¶
Modify the Exercise 2 solution to use the generator pipeline pattern:
read_lines(path)— yields lines from the fileparse_entries(lines)— yields parsed dictsexclude_health_checks(entries)— filters out/healthrequests- A main function that connects the pipeline and counts errors per unique endpoint
This should produce output showing that /api/v2/orders has a 100% error rate while
/api/v2/products has a 33% error rate.
Solution
from collections import defaultdict
def read_lines(path):
with open(path) as f:
yield from f
def parse_entries(lines):
for line in lines:
m = NGINX_PATTERN.match(line)
if m:
yield m.groupdict()
def exclude_health_checks(entries):
for entry in entries:
if entry['path'] != '/health':
yield entry
# Connect the pipeline
lines = read_lines('sample.log')
entries = parse_entries(lines)
cleaned = exclude_health_checks(entries)
# Aggregate
endpoint_stats = defaultdict(lambda: {'total': 0, 'errors': 0})
for entry in cleaned:
path = entry['path']
endpoint_stats[path]['total'] += 1
if int(entry['status']) >= 400:
endpoint_stats[path]['errors'] += 1
for path, stats in sorted(endpoint_stats.items()):
rate = stats['errors'] / stats['total'] * 100
print(f"{path:<30} {stats['total']:>5} requests, "
f"{stats['errors']:>5} errors ({rate:.0f}%)")
Cheat Sheet¶
| Task | Bash/awk | Python |
|---|---|---|
| Count occurrences | sort \| uniq -c \| sort -rn |
Counter(items).most_common() |
| Group by field | awk '{a[$1]++} END{...}' |
defaultdict(list) or Counter |
| Parse CSV | awk -F',' |
csv.DictReader(f) |
| Parse log line | awk '{print $1, $9}' |
re.compile(pattern).match(line).group('name') |
| Parse timestamp | date -d "string" +%s |
datetime.strptime(s, fmt) |
| Convert timezone | TZ=US/Eastern date |
dt.astimezone(ZoneInfo("America/New_York")) |
| Read gzipped file | zcat file.gz |
gzip.open(f, 'rt') |
| Process huge file | awk '{...}' file |
for line in open(f): (generator pipeline) |
| Format number | printf "%'d" 1000000 |
f"{1000000:,}" → 1,000,000 |
| Write CSV | echo "$a,$b" (breaks on commas in data) |
csv.DictWriter (RFC 4180 compliant) |
| JSON output | jq -n '{...}' (painful for dynamic data) |
json.dumps(obj, indent=2) |
| Top N items | sort -rn \| head -N |
Counter.most_common(N) |
| Sort by field | sort -t, -k3 -rn |
sorted(data, key=itemgetter('field')) |
Key imports:
from collections import Counter, defaultdict, namedtuple
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo # Python 3.9+
from itertools import groupby
from operator import itemgetter
from pathlib import Path
import csv, gzip, json, re, sys
Takeaways¶
-
Read files line by line, never with
.readlines()or.read(). Generator pipelines process files of any size using constant memory. -
Counterreplacessort | uniq -c | sort -rnand does it in one pass. It's the single most useful class for ops data wrangling. -
Named regex groups (
(?P<name>...)) make log parsing maintainable. When the format changes, you update one pattern, not every$Nreference in your script. -
Always use
datetime.now(timezone.utc)for aware datetimes. Mixing naive and aware datetimes is aTypeErrorwaiting to happen. -
Python is 5–10x slower than grep/awk for simple searches — and that's fine. The moment you need to count, group, filter, and format in the same script, Python's maintainability wins.
-
The generator pipeline pattern is Python's answer to Unix pipes. Each stage yields one item at a time. Swap the parser to handle any log format. The downstream logic stays the same.
Related Lessons¶
- Python for Ops — The Bash Expert's Bridge — Where to start if you haven't written Python before. Covers subprocess, pathlib, argparse, and the Bash-to-Python mental model shift.
- Text Processing — jq, awk, and sed in the Trenches — The pure-Bash side of log analysis. When grep/awk/sed are still the right tool.
- Log Pipelines — From printf to Dashboard — The infrastructure side: how logs get from your application to a searchable index.
- Nginx — The Swiss Army Server — Understanding the server that generates the logs you're parsing.