Skip to content

Portal | Level: L2: Operations | Topics: Distributed Systems Fundamentals | Domain: DevOps & Tooling

Distributed Systems Fundamentals — Primer

Why This Matters

Every modern infrastructure involves distributed systems: databases with replicas, microservices talking over a network, Kubernetes clusters, message queues. The failures you encounter — split-brain, stale reads, phantom messages, partial failures — have patterns that have been studied and named. Understanding the theory lets you recognize the pattern in a production incident and reason about your options. Without it, you're guessing.


Why Distribution Is Hard

Three fundamental problems make distributed systems qualitatively harder than single-machine systems:

1. Partial Failures

In a single machine, either everything works or the machine is down. In a distributed system, one node can fail while others continue. The surviving nodes cannot distinguish "node B is dead" from "the network between me and node B is broken." This is the fundamental problem. There is no solution — only strategies to handle it.

2. Network Unreliability

The network can: - Drop messages (silently, with no error) - Delay messages by seconds or minutes (then deliver them) - Reorder messages (message 2 arrives before message 1) - Deliver messages multiple times (when you retry a dropped message that wasn't actually dropped) - Partition (split the network so nodes can't see each other)

You cannot distinguish "slow" from "failed." A response that hasn't arrived after 1 second might arrive after 10 seconds.

3. No Global Clock

There is no synchronized global time in a distributed system. Two nodes' clocks can be seconds or minutes apart. NTP helps but does not eliminate skew. You cannot use wall-clock time to determine event ordering across nodes. A timestamp on a message from another node doesn't tell you when it actually happened relative to your local events.


The 8 Fallacies of Distributed Computing

Who made it: Peter Deutsch drafted the original 7 fallacies at Sun Microsystems in 1994. James Gosling (creator of Java) added the 8th ("the network is homogeneous"). These fallacies are over 30 years old and still describe the exact mistakes teams make when building microservices today.

Peter Deutsch's list (1994) — still true:

  1. The network is reliable — it isn't. Packets drop.
  2. Latency is zero — a call to localhost takes microseconds; a call across a datacenter takes milliseconds; across regions, tens of milliseconds.
  3. Bandwidth is infinite — especially wrong for Kubernetes watching API servers.
  4. The network is secure — never assume internal traffic is safe.
  5. Topology doesn't change — nodes come and go, IPs change, load balancers rotate.
  6. There is one administrator — multiple teams own different parts of the network.
  7. Transport cost is zero — serialization, deserialization, and connection overhead are real.
  8. The network is homogeneous — different segments have different MTUs, reliability, and latency.

Practical implication: Every remote call must handle: timeouts, retries, errors, and the case where you don't know if it succeeded.


CAP Theorem

A distributed system that stores data can provide at most two of three guarantees:

  • Consistency (C): Every read receives the most recent write (or an error). All nodes see the same data at the same time.
  • Availability (A): Every request receives a response (not an error). The system is always up.
  • Partition tolerance (P): The system continues to work when network partitions occur.

The catch: Network partitions happen. You cannot opt out of P. Real choice is: when a partition occurs, do you prioritize C or A?

Choice Behavior during partition Examples
CP Refuse writes (or reject requests) to prevent stale data etcd, ZooKeeper, HBase, CockroachDB
AP Allow writes/reads but accept stale data Cassandra, DynamoDB (default), CouchDB

What "C" Actually Means in Practice

CAP's "C" is linearizability: reads always return the latest write. This is very strong. Most databases don't offer it by default.

Different consistency levels: - Linearizable: Read after write always returns the written value. Very strong. Requires coordination. - Sequential consistency: Operations appear in some total order consistent with program order. Weaker than linearizable. - Causal consistency: Writes that are causally related are seen in order. Causally unrelated writes can be seen in any order. - Eventual consistency: Given no new updates, all replicas will eventually converge. No guarantees about when.

Practical example with PostgreSQL:

-- Synchronous replication (CP): primary waits for replica to acknowledge write
-- Data loss: zero | Latency penalty: ~RTT to replica
synchronous_standby_names = 'replica-1'

-- Asynchronous replication (AP): primary acks write before replica confirms
-- Data loss: possible (lag) | Latency: minimal
synchronous_standby_names = ''

PACELC Extension

PACELC extends CAP to cover normal (non-partition) operation:

If there is a Partition, choose between Availability and Consistency. Else (normal operation), choose between Latency and Consistency.

System Partition Normal
DynamoDB (default) PA EL
DynamoDB (strong read) PC EC
Cassandra PA EL
CockroachDB PC EC
MySQL (async replica) PA EL
MySQL (sync replica) PC EC

Consensus: Raft

Raft is the consensus algorithm used by etcd, CockroachDB, and Consul. Understanding Raft explains why etcd requires a quorum of nodes to be up, and why a 2-node etcd cluster is worse than a 1-node cluster.

Leader Election

  1. Cluster starts — all nodes are followers
  2. A follower waits a random timeout (150–300ms). If no heartbeat from a leader, it becomes a candidate
  3. Candidate sends RequestVote RPC to all nodes
  4. If a node hasn't voted this term, it votes for the candidate
  5. Candidate with votes from a majority (quorum) becomes leader
  6. Leader sends periodic AppendEntries heartbeats to prevent new elections

Quorum = (N/2) + 1. For 3 nodes, quorum = 2. For 5 nodes, quorum = 3.

A 2-node cluster requires both nodes to agree — loss of either node means no quorum, no writes. Always use odd numbers: 3, 5, or 7 nodes.

Remember: Mnemonic for quorum math: "Majority means more than half." For N nodes, quorum = floor(N/2) + 1. A 3-node cluster tolerates 1 failure; a 5-node cluster tolerates 2. Adding a 4th node to a 3-node cluster gives you zero additional fault tolerance (quorum goes from 2 to 3) while increasing coordination overhead. Always use odd numbers.

Log Replication

Leader receives write:
  1. Appends entry to own log (uncommitted)
  2. Sends AppendEntries to followers
  3. Waits for quorum to acknowledge
  4. Commits entry (applies to state machine)
  5. Returns success to client
  6. Notifies followers to commit on next heartbeat

Follower crashes during step 3:
  - Leader retries indefinitely
  - Once follower rejoins, leader sends missing entries
  - Follower catches up before serving reads

etcd in Practice

# Check etcd cluster health
etcdctl endpoint health --cluster \
  --endpoints=https://10.0.1.10:2379,https://10.0.1.11:2379,https://10.0.1.12:2379 \
  --cacert=/etc/ssl/etcd/ca.crt \
  --cert=/etc/ssl/etcd/client.crt \
  --key=/etc/ssl/etcd/client.key

# Check leader
etcdctl endpoint status --cluster

# Watch for leader elections (useful during incidents)
etcdctl watch --prefix /registry/leases/kube-system --rev=0 2>&1 | grep -i elect

Eventual Consistency Patterns

Read-Your-Own-Writes

After a user makes a change, they should see it when they reload — even if the replica they're now reading from hasn't caught up yet.

Implementation options: 1. Route reads to the primary after writes (defeats the purpose of replicas) 2. Track a "read-after-write token" (the write's LSN/timestamp) and wait on the replica 3. Sticky sessions: always route same user to same replica

# Read-your-writes with PostgreSQL logical replication
# After write to primary, get the LSN
result = primary_conn.execute("SELECT pg_current_wal_lsn()")
lsn = result.fetchone()[0]

# On subsequent reads, wait for replica to catch up
replica_conn.execute(
    "SELECT pg_wal_lsn_diff(%s, pg_last_wal_replay_lsn()) <= 0",
    [lsn]
)

Monotonic Reads

If you read version 5 of a record, you should never then read version 3. Don't serve reads from replicas that are behind your last read.

Implementation: Track the highest LSN/vector clock you've read, reject reads from replicas behind that point.


Vector Clocks and Causality

Timestamps don't work for ordering events across nodes (clocks skew). Vector clocks track causality without relying on synchronized time.

Each node maintains a vector of counters, one per node.

Node A: [A:1, B:0, C:0] — A made 1 event
Node B receives A's message: [A:1, B:1, C:0] — B increments own, keeps A's
Node C receives B's message: [A:1, B:1, C:1]

Comparing [A:2, B:0, C:0] vs [A:1, B:1, C:0]:
  - Neither dominates the other (A>A but B<B)
  - These are concurrent events — no causal ordering
  - Need conflict resolution (last-write-wins, CRDT, human merge)

DynamoDB uses vector clocks (called "version vectors") to detect conflicts when multiple replicas accept concurrent writes.


Idempotency and At-Least-Once Delivery

The Retry Problem

You send a request to a service. The network drops the response. From your perspective, the request failed. But the server may have processed it successfully. You retry. Now the operation runs twice.

Analogy: Idempotency is like an elevator button. Pressing it once calls the elevator. Pressing it five more times does not call five elevators — the result is the same. Non-idempotent operations are like a vending machine: every button press dispenses another item.

Solution: Make operations idempotent — running them multiple times has the same effect as running them once.

# Non-idempotent: creates duplicate charges
def charge_card(user_id: str, amount: int):
    return stripe.charge.create(amount=amount, customer=user_id)

# Idempotent: uses idempotency key
def charge_card(user_id: str, amount: int, order_id: str):
    return stripe.charge.create(
        amount=amount,
        customer=user_id,
        idempotency_key=f"charge-{order_id}",  # same key = same charge
    )

Idempotency Key Pattern

import hashlib
import redis

r = redis.Redis()

def idempotent_operation(idempotency_key: str, payload: dict) -> dict:
    # Check if we've seen this key
    cached = r.get(f"idem:{idempotency_key}")
    if cached:
        return json.loads(cached)  # return previous result

    # Process
    result = do_operation(payload)

    # Cache result (with TTL matching your retry window)
    r.setex(f"idem:{idempotency_key}", 86400, json.dumps(result))

    return result

Distributed Transactions

Two-Phase Commit (2PC)

2PC coordinates a transaction across multiple participants. It is blocking — if the coordinator crashes after Phase 1, participants are stuck until the coordinator recovers.

Phase 1 - Prepare:
  Coordinator → all participants: "Can you commit?"
  Participants: lock resources, write to log, reply "yes" or "no"

Phase 2 - Commit:
  If all said "yes": Coordinator → all: "Commit"
  If any said "no": Coordinator → all: "Abort"
  Participants: release locks, acknowledge

Problem: Coordinator crashes after receiving all "yes" votes but before sending "Commit"
  → Participants are stuck with locks held, waiting for coordinator to recover
  → This is why 2PC is called "blocking"

Saga Pattern

Sagas replace distributed transactions with a sequence of local transactions, each with a compensating transaction (rollback action).

Order saga:
  1. Create order (local DB) → compensate: cancel order
  2. Reserve inventory    → compensate: release inventory
  3. Charge payment       → compensate: refund payment
  4. Ship order           → compensate: arrange return

If step 3 fails:
  Execute compensating transactions in reverse:
    - compensate step 2: release inventory
    - compensate step 1: cancel order

Sagas are eventually consistent, not ACID.
Step 2 can see a "reserved" state that may later be cancelled.

Orchestration-based saga (central coordinator):

class OrderSaga:
    def execute(self, order_id: str):
        try:
            self.create_order(order_id)
            self.reserve_inventory(order_id)
            self.charge_payment(order_id)
            self.ship_order(order_id)
        except InventoryError:
            self.cancel_order(order_id)
        except PaymentError:
            self.release_inventory(order_id)
            self.cancel_order(order_id)

Choreography-based saga (event-driven):

OrderService publishes "OrderCreated"
InventoryService listens → reserves stock → publishes "StockReserved"
PaymentService listens → charges card → publishes "PaymentCharged"
ShippingService listens → ships order → publishes "OrderShipped"

If PaymentService fails → publishes "PaymentFailed"
InventoryService listens to "PaymentFailed" → releases stock
OrderService listens to "PaymentFailed" → cancels order

Thundering Herd and Backoff

The Thundering Herd

When a shared resource (cache, service) goes down and comes back up, all clients retry simultaneously. This overloads the recovering service, causing it to fail again.

Cache misses → 10,000 requests hit DB simultaneously → DB falls over

Exponential Backoff with Jitter

import random
import time

def retry_with_backoff(func, max_retries=5, base_delay=0.5):
    for attempt in range(max_retries):
        try:
            return func()
        except TransientError as e:
            if attempt == max_retries - 1:
                raise

            # Exponential backoff: 0.5s, 1s, 2s, 4s, 8s...
            delay = base_delay * (2 ** attempt)

            # Add jitter: ±50% randomness to desynchronize retries
            jitter = delay * 0.5 * (random.random() * 2 - 1)
            sleep_time = delay + jitter

            time.sleep(sleep_time)

Cache Stampede Prevention

# Probabilistic early expiration (PER) — refresh before expiry
import math

def get_with_per(key: str, delta: float = 0.5):
    """
    Refresh cache probabilistically before it expires.
    delta: how early to consider refreshing (seconds)
    """
    entry = cache.get_with_metadata(key)
    if entry is None:
        return recompute_and_cache(key)

    ttl_remaining = entry.ttl
    # Probability of early refresh increases as TTL approaches 0
    if random.random() < math.exp(-ttl_remaining / delta):
        return recompute_and_cache(key)

    return entry.value

Health Checks That Actually Work

Shallow vs Deep Health Checks

# Shallow: only proves the process is alive
@app.get("/health")
def shallow_health():
    return {"status": "ok"}

# Deep: proves the service can actually do its job
@app.get("/ready")
async def deep_health():
    checks = {}

    # Check database
    try:
        await db.execute("SELECT 1")
        checks["database"] = "ok"
    except Exception as e:
        checks["database"] = f"error: {e}"

    # Check cache
    try:
        await redis.ping()
        checks["cache"] = "ok"
    except Exception as e:
        checks["cache"] = f"error: {e}"

    # Check external dependency (with timeout)
    try:
        async with httpx.AsyncClient(timeout=2.0) as client:
            resp = await client.get("https://payment-service/health")
            checks["payment"] = "ok" if resp.status_code == 200 else "degraded"
    except Exception as e:
        checks["payment"] = "unreachable"

    is_healthy = all(v == "ok" for v in checks.values())
    status_code = 200 if is_healthy else 503

    return JSONResponse(content=checks, status_code=status_code)

Kubernetes uses /health for liveness (restart if failing) and /ready for readiness (stop sending traffic if failing). These should be separate endpoints with different logic.


Split-Brain and Fencing

Split-Brain

A network partition causes two nodes to both believe they are the primary/leader. Both accept writes. When the partition heals, data is inconsistent — two histories must be merged.

Before partition:    During partition:
  Primary ─── Replica    Primary ─ x ─ Replica
              (follower)            (becomes new primary)

Both accept writes on different data.

Fencing with STONITH

STONITH (Shoot The Other Node In The Head) — when a node suspects split-brain, it kills the other node before taking over.

# Pacemaker/Corosync fencing
crm configure primitive stonith-pdsh stonith:external/pdsh \
    params hostlist="node1 node2" pcmk_host_map="node1:10.0.1.1 node2:10.0.1.2"

Etcd Lease-Based Locking

import etcd3

client = etcd3.client()

def acquire_leader_lock(key: str, ttl: int = 15) -> tuple:
    """Returns (lock, lease) if acquired, (None, None) if not."""
    lease = client.lease(ttl)

    # CAS: only set if key doesn't exist
    success, _ = client.transaction(
        compare=[client.transactions.version(key) == 0],
        success=[client.transactions.put(key, "leader-node-id", lease=lease)],
        failure=[]
    )

    if success:
        return lease  # caller must keep refreshing lease
    return None

# Refresh lease to maintain leadership
def maintain_leadership(lease):
    while True:
        lease.refresh()
        time.sleep(ttl / 2)

Quick Reference

Consistency Models (weakest to strongest)

Eventual → Monotonic Read → Read-Your-Writes → Causal → Sequential → Linearizable

Raft Quorum Sizes

Nodes Quorum Tolerated failures
1 1 0
3 2 1
5 3 2
7 4 3

Idempotency Checklist

  • Operation uses a unique idempotency key
  • Key is derived from business domain (order ID, not UUID)
  • Results are cached with appropriate TTL
  • Retries send the same key (not a new one)
  • Cache TTL > maximum expected retry window

The Fallacies Quick List

  1. Network is reliable 2. Latency is zero 3. Bandwidth is infinite
  2. Network is secure 5. Topology doesn't change 6. One administrator
  3. Transport is free 8. Network is homogeneous

Wiki Navigation

Prerequisites