Portal | Level: L2: Operations | Topics: Distributed Systems Fundamentals | Domain: DevOps & Tooling
Distributed Systems Fundamentals — Primer¶
Why This Matters¶
Every modern infrastructure involves distributed systems: databases with replicas, microservices talking over a network, Kubernetes clusters, message queues. The failures you encounter — split-brain, stale reads, phantom messages, partial failures — have patterns that have been studied and named. Understanding the theory lets you recognize the pattern in a production incident and reason about your options. Without it, you're guessing.
Why Distribution Is Hard¶
Three fundamental problems make distributed systems qualitatively harder than single-machine systems:
1. Partial Failures¶
In a single machine, either everything works or the machine is down. In a distributed system, one node can fail while others continue. The surviving nodes cannot distinguish "node B is dead" from "the network between me and node B is broken." This is the fundamental problem. There is no solution — only strategies to handle it.
2. Network Unreliability¶
The network can: - Drop messages (silently, with no error) - Delay messages by seconds or minutes (then deliver them) - Reorder messages (message 2 arrives before message 1) - Deliver messages multiple times (when you retry a dropped message that wasn't actually dropped) - Partition (split the network so nodes can't see each other)
You cannot distinguish "slow" from "failed." A response that hasn't arrived after 1 second might arrive after 10 seconds.
3. No Global Clock¶
There is no synchronized global time in a distributed system. Two nodes' clocks can be seconds or minutes apart. NTP helps but does not eliminate skew. You cannot use wall-clock time to determine event ordering across nodes. A timestamp on a message from another node doesn't tell you when it actually happened relative to your local events.
The 8 Fallacies of Distributed Computing¶
Who made it: Peter Deutsch drafted the original 7 fallacies at Sun Microsystems in 1994. James Gosling (creator of Java) added the 8th ("the network is homogeneous"). These fallacies are over 30 years old and still describe the exact mistakes teams make when building microservices today.
Peter Deutsch's list (1994) — still true:
- The network is reliable — it isn't. Packets drop.
- Latency is zero — a call to localhost takes microseconds; a call across a datacenter takes milliseconds; across regions, tens of milliseconds.
- Bandwidth is infinite — especially wrong for Kubernetes watching API servers.
- The network is secure — never assume internal traffic is safe.
- Topology doesn't change — nodes come and go, IPs change, load balancers rotate.
- There is one administrator — multiple teams own different parts of the network.
- Transport cost is zero — serialization, deserialization, and connection overhead are real.
- The network is homogeneous — different segments have different MTUs, reliability, and latency.
Practical implication: Every remote call must handle: timeouts, retries, errors, and the case where you don't know if it succeeded.
CAP Theorem¶
A distributed system that stores data can provide at most two of three guarantees:
- Consistency (C): Every read receives the most recent write (or an error). All nodes see the same data at the same time.
- Availability (A): Every request receives a response (not an error). The system is always up.
- Partition tolerance (P): The system continues to work when network partitions occur.
The catch: Network partitions happen. You cannot opt out of P. Real choice is: when a partition occurs, do you prioritize C or A?
| Choice | Behavior during partition | Examples |
|---|---|---|
| CP | Refuse writes (or reject requests) to prevent stale data | etcd, ZooKeeper, HBase, CockroachDB |
| AP | Allow writes/reads but accept stale data | Cassandra, DynamoDB (default), CouchDB |
What "C" Actually Means in Practice¶
CAP's "C" is linearizability: reads always return the latest write. This is very strong. Most databases don't offer it by default.
Different consistency levels: - Linearizable: Read after write always returns the written value. Very strong. Requires coordination. - Sequential consistency: Operations appear in some total order consistent with program order. Weaker than linearizable. - Causal consistency: Writes that are causally related are seen in order. Causally unrelated writes can be seen in any order. - Eventual consistency: Given no new updates, all replicas will eventually converge. No guarantees about when.
Practical example with PostgreSQL:
-- Synchronous replication (CP): primary waits for replica to acknowledge write
-- Data loss: zero | Latency penalty: ~RTT to replica
synchronous_standby_names = 'replica-1'
-- Asynchronous replication (AP): primary acks write before replica confirms
-- Data loss: possible (lag) | Latency: minimal
synchronous_standby_names = ''
PACELC Extension¶
PACELC extends CAP to cover normal (non-partition) operation:
If there is a Partition, choose between Availability and Consistency. Else (normal operation), choose between Latency and Consistency.
| System | Partition | Normal |
|---|---|---|
| DynamoDB (default) | PA | EL |
| DynamoDB (strong read) | PC | EC |
| Cassandra | PA | EL |
| CockroachDB | PC | EC |
| MySQL (async replica) | PA | EL |
| MySQL (sync replica) | PC | EC |
Consensus: Raft¶
Raft is the consensus algorithm used by etcd, CockroachDB, and Consul. Understanding Raft explains why etcd requires a quorum of nodes to be up, and why a 2-node etcd cluster is worse than a 1-node cluster.
Leader Election¶
- Cluster starts — all nodes are followers
- A follower waits a random timeout (150–300ms). If no heartbeat from a leader, it becomes a candidate
- Candidate sends
RequestVoteRPC to all nodes - If a node hasn't voted this term, it votes for the candidate
- Candidate with votes from a majority (quorum) becomes leader
- Leader sends periodic
AppendEntriesheartbeats to prevent new elections
Quorum = (N/2) + 1. For 3 nodes, quorum = 2. For 5 nodes, quorum = 3.
A 2-node cluster requires both nodes to agree — loss of either node means no quorum, no writes. Always use odd numbers: 3, 5, or 7 nodes.
Remember: Mnemonic for quorum math: "Majority means more than half." For N nodes, quorum = floor(N/2) + 1. A 3-node cluster tolerates 1 failure; a 5-node cluster tolerates 2. Adding a 4th node to a 3-node cluster gives you zero additional fault tolerance (quorum goes from 2 to 3) while increasing coordination overhead. Always use odd numbers.
Log Replication¶
Leader receives write:
1. Appends entry to own log (uncommitted)
2. Sends AppendEntries to followers
3. Waits for quorum to acknowledge
4. Commits entry (applies to state machine)
5. Returns success to client
6. Notifies followers to commit on next heartbeat
Follower crashes during step 3:
- Leader retries indefinitely
- Once follower rejoins, leader sends missing entries
- Follower catches up before serving reads
etcd in Practice¶
# Check etcd cluster health
etcdctl endpoint health --cluster \
--endpoints=https://10.0.1.10:2379,https://10.0.1.11:2379,https://10.0.1.12:2379 \
--cacert=/etc/ssl/etcd/ca.crt \
--cert=/etc/ssl/etcd/client.crt \
--key=/etc/ssl/etcd/client.key
# Check leader
etcdctl endpoint status --cluster
# Watch for leader elections (useful during incidents)
etcdctl watch --prefix /registry/leases/kube-system --rev=0 2>&1 | grep -i elect
Eventual Consistency Patterns¶
Read-Your-Own-Writes¶
After a user makes a change, they should see it when they reload — even if the replica they're now reading from hasn't caught up yet.
Implementation options: 1. Route reads to the primary after writes (defeats the purpose of replicas) 2. Track a "read-after-write token" (the write's LSN/timestamp) and wait on the replica 3. Sticky sessions: always route same user to same replica
# Read-your-writes with PostgreSQL logical replication
# After write to primary, get the LSN
result = primary_conn.execute("SELECT pg_current_wal_lsn()")
lsn = result.fetchone()[0]
# On subsequent reads, wait for replica to catch up
replica_conn.execute(
"SELECT pg_wal_lsn_diff(%s, pg_last_wal_replay_lsn()) <= 0",
[lsn]
)
Monotonic Reads¶
If you read version 5 of a record, you should never then read version 3. Don't serve reads from replicas that are behind your last read.
Implementation: Track the highest LSN/vector clock you've read, reject reads from replicas behind that point.
Vector Clocks and Causality¶
Timestamps don't work for ordering events across nodes (clocks skew). Vector clocks track causality without relying on synchronized time.
Each node maintains a vector of counters, one per node.
Node A: [A:1, B:0, C:0] — A made 1 event
Node B receives A's message: [A:1, B:1, C:0] — B increments own, keeps A's
Node C receives B's message: [A:1, B:1, C:1]
Comparing [A:2, B:0, C:0] vs [A:1, B:1, C:0]:
- Neither dominates the other (A>A but B<B)
- These are concurrent events — no causal ordering
- Need conflict resolution (last-write-wins, CRDT, human merge)
DynamoDB uses vector clocks (called "version vectors") to detect conflicts when multiple replicas accept concurrent writes.
Idempotency and At-Least-Once Delivery¶
The Retry Problem¶
You send a request to a service. The network drops the response. From your perspective, the request failed. But the server may have processed it successfully. You retry. Now the operation runs twice.
Analogy: Idempotency is like an elevator button. Pressing it once calls the elevator. Pressing it five more times does not call five elevators — the result is the same. Non-idempotent operations are like a vending machine: every button press dispenses another item.
Solution: Make operations idempotent — running them multiple times has the same effect as running them once.
# Non-idempotent: creates duplicate charges
def charge_card(user_id: str, amount: int):
return stripe.charge.create(amount=amount, customer=user_id)
# Idempotent: uses idempotency key
def charge_card(user_id: str, amount: int, order_id: str):
return stripe.charge.create(
amount=amount,
customer=user_id,
idempotency_key=f"charge-{order_id}", # same key = same charge
)
Idempotency Key Pattern¶
import hashlib
import redis
r = redis.Redis()
def idempotent_operation(idempotency_key: str, payload: dict) -> dict:
# Check if we've seen this key
cached = r.get(f"idem:{idempotency_key}")
if cached:
return json.loads(cached) # return previous result
# Process
result = do_operation(payload)
# Cache result (with TTL matching your retry window)
r.setex(f"idem:{idempotency_key}", 86400, json.dumps(result))
return result
Distributed Transactions¶
Two-Phase Commit (2PC)¶
2PC coordinates a transaction across multiple participants. It is blocking — if the coordinator crashes after Phase 1, participants are stuck until the coordinator recovers.
Phase 1 - Prepare:
Coordinator → all participants: "Can you commit?"
Participants: lock resources, write to log, reply "yes" or "no"
Phase 2 - Commit:
If all said "yes": Coordinator → all: "Commit"
If any said "no": Coordinator → all: "Abort"
Participants: release locks, acknowledge
Problem: Coordinator crashes after receiving all "yes" votes but before sending "Commit"
→ Participants are stuck with locks held, waiting for coordinator to recover
→ This is why 2PC is called "blocking"
Saga Pattern¶
Sagas replace distributed transactions with a sequence of local transactions, each with a compensating transaction (rollback action).
Order saga:
1. Create order (local DB) → compensate: cancel order
2. Reserve inventory → compensate: release inventory
3. Charge payment → compensate: refund payment
4. Ship order → compensate: arrange return
If step 3 fails:
Execute compensating transactions in reverse:
- compensate step 2: release inventory
- compensate step 1: cancel order
Sagas are eventually consistent, not ACID.
Step 2 can see a "reserved" state that may later be cancelled.
Orchestration-based saga (central coordinator):
class OrderSaga:
def execute(self, order_id: str):
try:
self.create_order(order_id)
self.reserve_inventory(order_id)
self.charge_payment(order_id)
self.ship_order(order_id)
except InventoryError:
self.cancel_order(order_id)
except PaymentError:
self.release_inventory(order_id)
self.cancel_order(order_id)
Choreography-based saga (event-driven):
OrderService publishes "OrderCreated"
InventoryService listens → reserves stock → publishes "StockReserved"
PaymentService listens → charges card → publishes "PaymentCharged"
ShippingService listens → ships order → publishes "OrderShipped"
If PaymentService fails → publishes "PaymentFailed"
InventoryService listens to "PaymentFailed" → releases stock
OrderService listens to "PaymentFailed" → cancels order
Thundering Herd and Backoff¶
The Thundering Herd¶
When a shared resource (cache, service) goes down and comes back up, all clients retry simultaneously. This overloads the recovering service, causing it to fail again.
Exponential Backoff with Jitter¶
import random
import time
def retry_with_backoff(func, max_retries=5, base_delay=0.5):
for attempt in range(max_retries):
try:
return func()
except TransientError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff: 0.5s, 1s, 2s, 4s, 8s...
delay = base_delay * (2 ** attempt)
# Add jitter: ±50% randomness to desynchronize retries
jitter = delay * 0.5 * (random.random() * 2 - 1)
sleep_time = delay + jitter
time.sleep(sleep_time)
Cache Stampede Prevention¶
# Probabilistic early expiration (PER) — refresh before expiry
import math
def get_with_per(key: str, delta: float = 0.5):
"""
Refresh cache probabilistically before it expires.
delta: how early to consider refreshing (seconds)
"""
entry = cache.get_with_metadata(key)
if entry is None:
return recompute_and_cache(key)
ttl_remaining = entry.ttl
# Probability of early refresh increases as TTL approaches 0
if random.random() < math.exp(-ttl_remaining / delta):
return recompute_and_cache(key)
return entry.value
Health Checks That Actually Work¶
Shallow vs Deep Health Checks¶
# Shallow: only proves the process is alive
@app.get("/health")
def shallow_health():
return {"status": "ok"}
# Deep: proves the service can actually do its job
@app.get("/ready")
async def deep_health():
checks = {}
# Check database
try:
await db.execute("SELECT 1")
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {e}"
# Check cache
try:
await redis.ping()
checks["cache"] = "ok"
except Exception as e:
checks["cache"] = f"error: {e}"
# Check external dependency (with timeout)
try:
async with httpx.AsyncClient(timeout=2.0) as client:
resp = await client.get("https://payment-service/health")
checks["payment"] = "ok" if resp.status_code == 200 else "degraded"
except Exception as e:
checks["payment"] = "unreachable"
is_healthy = all(v == "ok" for v in checks.values())
status_code = 200 if is_healthy else 503
return JSONResponse(content=checks, status_code=status_code)
Kubernetes uses /health for liveness (restart if failing) and /ready for readiness (stop sending traffic if failing). These should be separate endpoints with different logic.
Split-Brain and Fencing¶
Split-Brain¶
A network partition causes two nodes to both believe they are the primary/leader. Both accept writes. When the partition heals, data is inconsistent — two histories must be merged.
Before partition: During partition:
Primary ─── Replica Primary ─ x ─ Replica
(follower) (becomes new primary)
Both accept writes on different data.
Fencing with STONITH¶
STONITH (Shoot The Other Node In The Head) — when a node suspects split-brain, it kills the other node before taking over.
# Pacemaker/Corosync fencing
crm configure primitive stonith-pdsh stonith:external/pdsh \
params hostlist="node1 node2" pcmk_host_map="node1:10.0.1.1 node2:10.0.1.2"
Etcd Lease-Based Locking¶
import etcd3
client = etcd3.client()
def acquire_leader_lock(key: str, ttl: int = 15) -> tuple:
"""Returns (lock, lease) if acquired, (None, None) if not."""
lease = client.lease(ttl)
# CAS: only set if key doesn't exist
success, _ = client.transaction(
compare=[client.transactions.version(key) == 0],
success=[client.transactions.put(key, "leader-node-id", lease=lease)],
failure=[]
)
if success:
return lease # caller must keep refreshing lease
return None
# Refresh lease to maintain leadership
def maintain_leadership(lease):
while True:
lease.refresh()
time.sleep(ttl / 2)
Quick Reference¶
Consistency Models (weakest to strongest)¶
Raft Quorum Sizes¶
| Nodes | Quorum | Tolerated failures |
|---|---|---|
| 1 | 1 | 0 |
| 3 | 2 | 1 |
| 5 | 3 | 2 |
| 7 | 4 | 3 |
Idempotency Checklist¶
- Operation uses a unique idempotency key
- Key is derived from business domain (order ID, not UUID)
- Results are cached with appropriate TTL
- Retries send the same key (not a new one)
- Cache TTL > maximum expected retry window
The Fallacies Quick List¶
- Network is reliable 2. Latency is zero 3. Bandwidth is infinite
- Network is secure 5. Topology doesn't change 6. One administrator
- Transport is free 8. Network is homogeneous
Wiki Navigation¶
Prerequisites¶
- Kubernetes Ops (Production) (Topic Pack, L2)
- Database Operations on Kubernetes (Topic Pack, L2)