# Chapter 8: Distributed Scaling & Clouds
## 8.0 - The Three Coordination Problems
<img src="assets/chap_8_cloud_scales.png" width="250" style="float: left; margin: 0 20px 20px 0;" />
PostgreSQL was built for a simpler era: a single server running a single operating system kernel, accessing local memory and direct-attached storage. In this monolithic world, consistency is cheap. If process A writes a page in memory, process B can read it immediately because they share the same physical RAM.
When your workload outgrows a single machine, this physical simplicity vanishes.
Scaling a database is not merely a matter of throwing more hardware at the problem. As the database engine spreads across multiple physical servers, it enters the realm of distributed systems, where the ultimate enemy is not CPU latency or disk I/O, but **coordination**.
Coordination is the act of forcing two independent processes to agree on a single state. In a distributed cluster, coordination requires sending packets across network cards, which must obey the speed of light. Every millisecond your database spends negotiating agreement is a millisecond it is not executing queries.
To scale Postgres effectively, we must stop asking *"How do we make the server faster?"* and start asking *"What coordination exists, and how do we eliminate it?"*
> [!NOTE] The Click
> **Concept**: Scaling is not about acquiring more computing power. It is about buying physical distance and budgeting your network coordination.
> **Payoff**: In a single-node database, memory and disk access are coordinated at microsecond levels by CPU latches. Once you distribute the database, you trade local memory speed for physical distance. Every scaling architecture is a trade-off: what consistency guarantees are you willing to relax to avoid crossing the network coordinate boundary?
---
### The Three Axes of Coordination
Distributed Postgres scaling is divided into three distinct architectural problems, each representing a different kind of coordination bottleneck:
#### 1. Storage Coordination (Where is Truth?)
In traditional Postgres, the ultimate source of truth is the physical page stored in a local directory. If we run multiple Postgres compute nodes, how do they coordinate access to this storage? If we decouple them, does the log remain recovery, or does the log *become* the database? We will explore how cloud-native architectures solve this by separating compute from storage.
#### 2. Write Coordination (What Must Change?)
When inserting billions of rows, a single table becomes a global bottleneck. Background maintenance processes (like `VACUUM` and index updates) must coordinate access to the entire relation. We will explore how partitioning reduces this coordination by dividing a single logical table into localized physical neighborhoods.
#### 3. Connection Coordination (How Many Backends?)
Postgres assigns a dedicated operating system process to every client connection. This design provides safety but introduces massive process management overhead and global memory lock contention under heavy concurrency. We will explore how connection pooling multiplexes thousands of lightweight clients onto a small pool of active database workers.
By understanding scaling through the lens of coordination reduction, you will see that replication, partitioning, and pooling are not arbitrary features. They are deliberate architectural maneuvers designed to liberate the database engine from the physical boundaries of a single server.
---
## 8.1 - Scaling Storage (Compute-Storage Separation)
<img src="assets/arch_cloud_decoupled.png" width="250" style="float: left; margin: 0 20px 20px 0;" />
To understand how modern clouds scale database storage, we must start at the bottom of the database architecture: the relationship between RAM, the engine, and the filesystem.
---
### The Traditional Shape
In standard PostgreSQL, storage is structured as a tight hierarchy between RAM and physical disk.
```mermaid
graph TD
Client["Client Backend Process"] -->|Read / Write| Shared["Shared Buffers (RAM)"]
Shared -->|Asynchronous| Heap["Heap Files<br>(8KB Pages on Disk)"]
Shared -->|Synchronous| WAL["Write-Ahead Log (WAL)<br>(Sequential Log on Disk)"]
```
1. **Shared Buffers**: When a client requests data, a dedicated backend process looks first in **Shared Buffers** (shared RAM allocated at server startup).
2. **Heap Files**: If the requested page is not in memory, the backend issues a read request to the operating system kernel to fetch the page from the local storage into RAM.
3. **The Write-Ahead Log (WAL)**: When a transaction modifies a page, the change is written sequentially to the local WAL files and flushed to disk (`fsync`) before the transaction can commit.
In this model, **the physical page is the source of truth**.
A table is represented on disk as a series of 8KB files (heap files). The WAL is strictly a **recovery mechanism** to replay modifications if the server crashes or to update standbys. Under normal operations, a background process (the **Checkpointer**) periodically sweeps memory and flushes dirty pages back to the local heap files.
> [!NOTE]
> **The Coordination Limit**
> Because pages are truth, the server must manage both the read cache and the heap files locally. Backends must coordinate using memory locks (latches) to edit the same 8KB page, and the checkpointer must compete for disk bandwidth. To prevent block corruption during a crash, Postgres must write the entire 8KB page to the WAL the first time it is modified after a checkpoint (known as **Full-Page Writes**). On high-write workloads, this causes massive write amplification and I/O checkpoints that saturate local drive controllers.
---
### The Aurora Insight
The first major step to resolve this bottleneck was **Compute/Storage Separation**, pioneered by Amazon Aurora.
```mermaid
graph TD
Compute["Primary Compute Node"] -->|"Ships WAL Records Only (Network)"| Storage["Distributed Storage Fleet<br>(Consensus Tier)"]
Storage -->|Apply WAL| Cache1["Page Cache 1"]
Storage -->|Apply WAL| Cache2["Page Cache 2"]
Storage -->|Apply WAL| Cache3["Page Cache 3"]
```
Instead of running Postgres on a virtual machine attached to local disk, the architecture splits the database into two independent fleets:
1. **Compute Nodes**: Stateless virtual machines running the Postgres query executor, parser, and planner. They hold Shared Buffers in memory but contain no durable physical disks.
2. **Distributed Storage Fleet**: A specialized, multi-tenant storage system spread across multiple availability zones.
The fundamental innovation is that the storage fleet **speaks the language of the WAL**.
When a transaction commits, the compute node does not write dirty heap pages to disk. Instead, it streams only the WAL record over the network to the storage fleet. Once a quorum of storage nodes acknowledges receipt, the transaction commits. The storage fleet then asynchronously applies the WAL to its local page caches in the background.
By writing "only the log" to the network, Aurora eliminates the checkpointer bottleneck and Full-Page Writes on the compute node. However, the **physical page remains the source of truth** in this architecture—it is simply managed on separate storage machines.
---
### The Neon Insight
Neon took this decoupling to its logical conclusion by proposing an architectural inversion: **What if the Write-Ahead Log wasn't a recovery helper? What if the log *is* the database?**
```mermaid
graph TD
Compute["Primary Compute Node (Stateless)"] -->|WAL stream| Safekeeper["Safekeeper Node<br>(Durable consensus store)"]
Compute -->|Asks for 8KB Page| Pageserver["Pageserver Node<br>(Reconstructs pages)"]
Safekeeper -->|feeds WAL stream| Pageserver
```
| Model | Source of Truth | Recovery / Cache |
| :--- | :--- | :--- |
| **Traditional Postgres** | **Pages** (8KB heap files) | **WAL** (Recovery log used on crash) |
| **Neon** | **WAL** (Durable log of changes) | **Pages** (A reconstructed cache of the log) |
In Neon's model, the physical 8KB page is no longer stored as a static file on disk. Instead, the page is treated as an on-demand projection—a materialized cache of the log history.
To achieve this, Neon divides the storage tier into two roles:
* **The Safekeepers (Consensus Tier)**: Lightweight processes that accept the incoming WAL stream from the compute node, agree on the durable end of the log using a Paxos-style protocol, and write the WAL bytes directly to local NVMe drives.
* **The Pageservers**: Processes that ingest the WAL stream from the Safekeepers and store the **deltas** (changes) for each page over time. When a compute node needs to read a page (due to a Shared Buffers miss), it asks the Pageserver: *"Give me page 42 at LSN 100."* The Pageserver locates the base page, applies the deltas up to LSN 100, and returns the reconstructed 8KB page.
The compute node remains completely unmodified; the Pageserver acts as a virtual filesystem driver. To Postgres, it looks like a local NVMe disk, but the compute node is now entirely stateless and can crash without risking data corruption because the database history lives in the WAL.
---
### Database Time and LSN Coordinates
By treating the log as the database, we introduce a coordinate system based on time: the **Log Sequence Number (LSN)**.
The LSN is a 64-bit integer representing the byte offset of a record in the Write-Ahead Log. In a log-centric architecture, the state of any block is a function of its address and time:
$\text{Page State} = f(\text{Block Number}, \text{LSN})$
Because Pageservers preserve this delta history, we can perform **Time Travel Queries**. You can boot a read-only compute node pointing to a past LSN and query the exact state of your tables from yesterday morning, without restoring a backup or affecting the performance of the active primary node.
---
### Branching: Zero-Copy Clones
This temporal model makes database cloning a simple metadata operation. Instead of copying terabytes of storage files, Neon supports instantaneous **Timeline Forks**.
```mermaid
graph LR
P0["LSN 0"] --> P100["LSN 100 (Branch Point)"]
P100 --> P200["LSN 200 (Active Production)"]
P100 -->|Fork| D150["LSN 150 (Migration Writes)"]
subgraph Timeline 1 [Production Timeline]
P0
P100
P200
end
subgraph Timeline 2 [Dev Branch]
D150
end
```
When you branch a database at LSN 100, Neon registers a new Timeline (Timeline 2) that points to Timeline 1 as its parent.
* **Reading Unchanged Pages**: When a compute node on Timeline 2 reads a page that hasn't changed since the branch point, the Pageserver reads the deltas from the parent Timeline 1 up to LSN 100.
* **Writing New Pages**: When writes occur on Timeline 2, the new WAL records are written to Timeline 2's log.
* **Copy-on-Write**: This logical layering takes less than a second, consumes zero storage overhead at creation, and operates in complete isolation from the production compute node.
---
### Why Neon Exists (The Serverless Payoff)
Once the database engine is stateless and storage is log-structured, several serverless features become possible:
1. **Scale-to-Zero**: If no client is connected to a compute node, the virtual machine shuts down. The data remains safe in the Pageservers. When a new connection arrives, the serverless router boots a fresh compute container in under a second.
2. **Ephemeral Environments**: Every pull request can automatically spawn an isolated branch of the production database, run migrations, execute tests, and shut down—costing only the few megabytes of deltas generated during the test.
3. **Independent Resource Auto-Scaling**: Compute CPU and RAM can scale up or down dynamically during active query workloads without restarting the underlying storage layers.
---
### The Monolith Fights Back: The Network Penalty
Decoupling compute from storage solves infinite capacity and elasticity, but it instantly introduces a new physical enemy: **Network Latency**.
In traditional Postgres, when a query misses the Shared Buffers cache, the backend process issues a local read syscall to the OS kernel. Fetching a page from local NVMe storage takes between **10 to 100 microseconds**.
In a decoupled architecture, when a compute node misses Shared Buffers, it must traverse a network link to request the page from the Pageserver node.
* **The Network Hop**: Even in a highly optimized cloud data center, a network round-trip introduces **1 to 2 milliseconds** of latency.
* **The Performance Tax**: This means a single page read is suddenly **10x to 100x slower** than local SSD access. If a query execution plan requires scanning thousands of pages that are not cached in the compute node's memory, the query will stall, bound by network packet transit times.
Decoupled storage makes the database highly elastic and operationally lightweight, but it places a strict tax on memory efficiency. To survive the network penalty, you must ensure that your compute nodes are allocated sufficient RAM to hold your active working set, keeping cache hits high and network trips to the Pageserver as rare as possible.
---
## 8.2 - Scaling Writes (Declarative Partitioning)
<img src="assets/arch_partitioning_diverse_v2_secretary_bird_1776815795478.png" width="250" style="float: left; margin: 0 20px 20px 0;" />
Decoupling compute from disk limits separates execution from hardware bounds, but it does not alter the logical structures of your tables. If your database runs a single, massive table receiving high-frequency write traffic, it eventually encounters the **Hot Table Problem**.
Declarative Partitioning is an optimization where one logical table is split into multiple physical child tables. Think of it as **Table Partitioning**—splitting a single giant table into localized physical segments. This isolates index updates and vacuum sweeps to active subsets of data, reducing write coordination.
---
### The Hot Table Problem
In the Elephant Cafe data model, the primary high-volume table is `supply_deliveries`. It logs every shipment of ingredients arriving at every branch location globally:
```sql
CREATE TABLE IF NOT EXISTS supply_deliveries (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
supplier_id INT NOT NULL REFERENCES suppliers(id),
ingredient_id INT NOT NULL REFERENCES ingredients(id),
delivery_time TIMESTAMPTZ NOT NULL,
quantity_kg NUMERIC(8,2) NOT NULL
);
```
As the Cafe scales, the `supply_deliveries` ledger grows to 10 billion rows.
Many developers assume the main issue with a table of this size is search performance. That is a misconception. Finding a single row using a B-Tree index on `delivery_time` is an $O(\log n)$ lookup. For 10 billion rows, the B-Tree is only 4 to 5 levels deep—a search takes under a millisecond.
The real bottlenecks are the coordination costs of **maintaining** the table under continuous write pressure:
1. **Index Maintenance (The Random I/O Cliff)**: Every insert must update the table's indexes. Once the indexes grow larger than RAM (Shared Buffers), Postgres can no longer cache the B-Tree pages. The engine must fetch index pages from disk to perform updates, causing random I/O stalls.
2. **Autovacuum Bottlenecks**: To reclaim dead tuples, Autovacuum must scan the entire table. Scanning a 10-billion-row heap to clean up a few million rows consumes massive CPU and disk bandwidth.
3. **Retention Cleanup**: Deleting historical data (e.g., older than 2 years) via `DELETE` is a catastrophe. It scans the heap, locks millions of rows, floods the WAL stream, and leaves behind a bloated heap that autovacuum struggles to clean.
---
### Partitioning as Coordination Reduction
To solve this, we must reduce the scope of coordination. We divide the monolithic table into multiple physical child tables based on `delivery_time`.
```text
supply_deliveries (Parent Table - Logical router, holds no data)
├── supply_deliveries_2026_01 (Child Partition - Physical table)
├── supply_deliveries_2026_02 (Child Partition - Physical table)
└── supply_deliveries_2026_03 (Child Partition - Physical table)
```
In this layout, we split the table's roles:
* **The Parent Table**: A logical shell that contains the schema definition and routing rules, but holds no data.
* **The Child Tables (Partitions)**: Individual, physical tables that store the rows. Postgres transparently routes inserts to the correct partition.
By dividing the table, we eliminate global bottlenecks:
* **Index Isolation**: Indexes are local to each child partition. When you write to the current month's partition, only that tiny local index is updated. Because it fits easily in RAM, you bypass the random I/O cliff.
* **Localized Autovacuum**: Autovacuum operates on individual partitions. It sweeps the active month's child table without wasting resources scanning cold historical partitions.
---
### Creating Partitions in Postgres
To implement partitioning, we must define the range constraints explicitly. In Postgres, a partitioned table is created by declaring a partition strategy on the parent table and then attaching physical child partitions.
Here is how we partition the `supply_deliveries` ledger:
```sql
-- 1. Create the parent table (logical router)
CREATE TABLE IF NOT EXISTS supply_deliveries_partitioned (
id BIGINT GENERATED ALWAYS AS IDENTITY,
supplier_id INT NOT NULL,
ingredient_id INT NOT NULL,
delivery_time TIMESTAMPTZ NOT NULL,
quantity_kg NUMERIC(8,2) NOT NULL,
PRIMARY KEY (id, delivery_time)
) PARTITION BY RANGE (delivery_time);
-- 2. Create the child partitions (actual physical tables)
CREATE TABLE IF NOT EXISTS supply_deliveries_2024 PARTITION OF supply_deliveries_partitioned
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
CREATE TABLE IF NOT EXISTS supply_deliveries_2025 PARTITION OF supply_deliveries_partitioned
FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
```
> [!WARNING]
> **Primary Key Restrictions**
> In a partitioned table, any unique constraint—including the primary key—**must** contain the partition key column (`delivery_time`). Postgres enforces this because unique index constraints are local to each partition; the engine cannot verify global uniqueness across multiple physical tables without scanning them all unless the partition key limits the check to a single target partition.
> [!NOTE] The Click
> **Concept**: Postgres unique indexes are strictly local to a single physical table.
> **Payoff**: Because Postgres does not support global indexes spanning multiple physical tables, checking uniqueness on a single `id` column would require scanning every partition in the cluster on every insert. By forcing the primary key to include the partition key (`delivery_time`), Postgres can route the write to the exact child table first, and then rely on that child's local index to enforce uniqueness.
---
### Lifecycle Operations
Without partitioning, archiving old data requires a row-by-row transactional sweep:
```sql
DELETE FROM supply_deliveries
WHERE delivery_time >= '2024-01-01' AND delivery_time < '2024-02-01';
```
This must mark every row as dead, write extensive WAL delete records, and bloat the indexes.
With range partitioning, we unhook the physical child table from the parent logical router and drop it entirely:
```sql
-- 1. Detach the partition (safely unhooks it from the parent table)
ALTER TABLE supply_deliveries_partitioned
DETACH PARTITION supply_deliveries_2024_01 CONCURRENTLY;
-- 2. Drop the physical table (reclaims disk space instantly)
DROP TABLE supply_deliveries_2024_01;
```
This represents a metadata-only shortcut. Postgres updates its catalog coordinates and simply deletes the OS files containing the table and indexes. The space is reclaimed instantly, generating near-zero WAL and zero table bloat.
---
### The Hot Partition
In range-partitioned tables, write traffic concentrates on the **Current Month** (the **Hot Partition**). Older partitions are **Cold Archives**.
This asymmetry provides two major caching and maintenance advantages:
1. **Caching Focus**: Memory (Shared Buffers) only needs to keep the blocks of the active month's partition warm, allowing cold historical data to drift out of RAM.
2. **Autovacuum Freezing**: Once a partition becomes cold, Autovacuum runs a final `VACUUM FREEZE` on it. Because the child table is fully frozen and receives no new writes, Autovacuum marks it clean and **never scans it again**, preserving transaction ID headroom.
---
### Partitioning and Timescale
Native range partitioning requires you to manually create child tables ahead of time. If your partition manager fails to create the partition for next month, writes will fail.
Extensions like **TimescaleDB** resolve this operational overhead through **Hypertables**.
```sql
-- Convert standard table to hypertable in TimescaleDB
SELECT create_hypertable('supply_deliveries', 'delivery_time');
```
The hypertable automatically handles the mechanics:
* **Auto-Chunking**: It automatically creates week-sized or day-sized physical partitions (chunks) on the fly as writes arrive.
* **Columnar Compression**: After a chunk is older than a specified policy (e.g., 14 days), a background worker automatically converts the standard row-oriented heap pages into compressed columnar storage, reclaiming **90-95%** of disk space and accelerating analytical queries.
---
### Lab: Inspecting Partition Pruning
As a read-side bonus, the query planner uses **Partition Pruning** (constraint exclusion) to optimize reads. If a query filters on the partition key, the planner discards irrelevant partitions before execution.
If we query for a specific date in 2024:
```sql
EXPLAIN (ANALYZE, BUFFERS, COSTS OFF)
SELECT * FROM supply_deliveries_partitioned
WHERE delivery_time = '2024-06-15 12:00:00+00';
```
The plan shows that only the `2024` partition is evaluated via a bitmap index scan on its local primary key:
```text
Bitmap Heap Scan on supply_deliveries_2024 supply_deliveries_partitioned (actual time=0.014..0.014 rows=0.00 loops=1)
Recheck Cond: (delivery_time = '2024-06-15 12:00:00+00'::timestamp with time zone)
Buffers: shared hit=10
-> Bitmap Index Scan on supply_deliveries_2024_pkey (actual time=0.012..0.012 rows=0.00 loops=1)
Index Cond: (delivery_time = '2024-06-15 12:00:00+00'::timestamp with time zone)
Index Searches: 1
Buffers: shared hit=10
Planning:
Buffers: shared hit=20 read=1
Planning Time: 0.585 ms
Execution Time: 0.029 ms
```
If we query without filtering on the partition key, the planner is forced to evaluate every partition in the cluster (the **Fan-out Penalty**):
```sql
EXPLAIN (ANALYZE, BUFFERS, COSTS OFF)
SELECT * FROM supply_deliveries_partitioned;
```
```text
Append (actual time=0.003..0.003 rows=0.00 loops=1)
-> Seq Scan on supply_deliveries_2024 supply_deliveries_partitioned_1 (actual time=0.001..0.001 rows=0.00 loops=1)
-> Seq Scan on supply_deliveries_2025 supply_deliveries_partitioned_2 (actual time=0.001..0.001 rows=0.00 loops=1)
Planning:
Buffers: shared hit=192 read=1
Planning Time: 0.257 ms
Execution Time: 0.014 ms
```
Partitioning shifts writes by organizing your database around time, ensuring your active index size remains cached, vacuum stays localized, and queries prune irrelevant data.
---
### The Monolith Fights Back: Routing & Key Complexity
Partitioning localizes write contention and indexes, but it trades away simplicity on both the write and read paths.
* **Unique Key Limitations**: Because unique index checks are local to each partition, Postgres cannot guarantee global uniqueness unless the unique constraint (including your primary key) contains the partition key. If you try to enforce a primary key on `id` alone in a table partitioned by `delivery_time`, the migration will fail. You are forced to redefine your keys as composite coordinates: `PRIMARY KEY (id, delivery_time)`.
* **The Fan-out Penalty**: If a client runs a query that does not filter on the partition key (`delivery_time`), the query planner cannot perform partition pruning. The executor is forced to perform a scan on *every single physical child table* in the cluster. A query that was once a single index look-up becomes a parallel append operation across dozens of tables, saturating CPU and buffer pools.
* **Foreign Key Restrictions**: You cannot create a foreign key on another table that references a partitioned table's parent unless the foreign key includes both the target table's primary key and the partition key.
Partitioning is a powerful architectural layout, but it is a one-way street. Once a table is partitioned, converting it back to a standard flat table requires writing a custom migration script to copy billions of rows into a new table.
---
## 8.3 - Scaling Reads (Streaming Replication)
<img src="assets/arch_read_replicas_v2.png" width="250" style="float: left; margin: 0 20px 20px 0;" />
Compared to the architectural transformations of separating storage or partitioning tables, scaling reads is relatively straightforward. We do not divide the tables or redefine where truth is stored. Instead, we duplicate the entire database state by streaming the Write-Ahead Log to secondary nodes known as **Read Replicas** (or **Hot Standbys**).
Postgres implements read-scaling by streaming Write-Ahead Log (WAL) records to secondary standby nodes. Think of this as the **Many Shouting Elephants**—where the primary node broadcasts WAL changes to standby nodes. This allows standbys to serve read queries independently, offloading read traffic from the primary node.
Replication distributes read load while preserving a single primary node for all write operations. However, it introduces the core challenge of distributed systems: **Lag**.
---
### The Mechanism: WAL Streaming
Postgres implements replication at the physical block layer. Instead of replicating SQL statements (which is slow and non-deterministic), the primary streams raw WAL bytes over the network.
1. **Walsender**: A background process on the primary node that reads the active WAL stream and broadcasts it to connected standbys.
2. **Walreceiver**: A process running on the replica that receives the WAL bytes and writes them to its local WAL files.
3. **Startup Process**: A process on the replica that continuously replays the incoming WAL, updating the replica's local heap and index pages.
Because the replica is executing recovery in the background, it is strictly **Read-Only**. Any attempt to run DML (`INSERT`, `UPDATE`) or DDL (`CREATE TABLE`) on a replica will fail.
---
### Lab: Measuring Replication Lag
Because WAL takes time to travel the network and be applied by the replica's startup process, the replica's state is always slightly behind the primary. This is **Replication Lag**.
We monitor this lag from the primary by measuring the distance between LSN markers:
```sql
SELECT
application_name,
pg_size_pretty(sent_lsn - replay_lsn) AS lag_bytes
FROM pg_stat_replication;
```
Postgres tracks the progress of WAL through four distinct markers:
1. `sent_lsn`: The primary sent the WAL record over the network.
2. `write_lsn`: The replica received the WAL in its operating system buffer.
3. `flush_lsn`: The replica flushed the WAL bytes to disk (`fsync`).
4. `replay_lsn`: The replica applied the WAL to its local memory pages, making the changes visible to read queries.
On the replica itself, we can measure the delay in seconds:
```sql
SELECT now() - pg_last_xact_replay_timestamp() AS replication_delay;
```
> [!WARNING]
> **Read-Your-Writes Consistency**
> If an application inserts a record on the primary and immediately redirects the user to a replica to read it, the query may return a 404 because the replica has not yet reached the LSN containing the insert. For critical operations, applications must either route reads to the primary or enforce synchronous replication.
---
### The Durability Gradient: synchronous_commit
To manage the lag-consistency trade-off, Postgres allows you to configure the `synchronous_commit` parameter. This setting determines how many confirmations the primary must receive from standbys before confirming a transaction commit to the client.
1. **`off` (Async)**: The primary returns success as soon as the WAL is in the local buffer, without waiting for disk `fsync`. Outstanding performance, but you risk losing transactions during a crash.
2. **`local`**: The primary waits for its own local disk `fsync` before returning success. Standbys are completely ignored.
3. **`remote_write`**: The primary waits for the standby to receive the WAL in memory. The standby has not flushed it to disk yet, but the data is safe if the primary node suffers physical failure.
4. **`on`**: The primary waits for the standby to flush the WAL to its local disk (`flush_lsn`). This is the default for synchronous replication, ensuring zero data loss if the primary dies.
5. **`remote_apply`**: The primary waits for the standby to replay the WAL (`replay_lsn`). This ensures immediate read-your-writes consistency across all nodes, but increases write latency by the duration of the network round-trip.
> [!IMPORTANT]
> **The SyncRep Wait Event**
> When a write transaction stalls waiting for standby confirmation, its process state in `pg_stat_activity` will show the wait event **`SyncRep`** (Synchronous Replication). If `SyncRep` dominates your wait event profiles, your bottleneck is the network round-trip time between nodes, not disk I/O.
---
### The xmin Horizon and Standby Feedback
Replication creates a conflict of interest between the primary's vacuum process and the replica's queries.
If the primary deletes a row and runs `VACUUM`, it reclaims the dead tuple. When this vacuum operation is written to the WAL and sent to the replica, the replica's startup process must remove the tuple.
However, if a replica is currently executing a long-running query that requires that specific tuple version to maintain snapshot visibility, it faces a **Recovery Conflict**.
If the replica applies the WAL change immediately, the client query is aborted with:
`ERROR: canceled on conflict with recovery`.
> [!NOTE] The Click
> **Concept**: A replica cannot simultaneously be a byte-for-byte physical clone of the primary and an independent queryable snapshot.
> **Payoff**: Because a replica replays raw WAL bytes physically, it cannot diverge from the primary's page state. If a vacuum sweeps a page on the primary, that sweep must happen on the replica. This makes recovery conflicts mathematically inevitable: the replica must either kill long-running user queries to apply the WAL sweep immediately, or delay WAL application (lagging behind the primary) to let the queries finish.
To balance this conflict, Postgres supports **Hot Standby Feedback**:
By setting `hot_standby_feedback = on` on the replica, the node continuously reports its **`xmin` horizon** (the ID of the oldest active transaction it is currently viewing) back to the primary. The primary's vacuum process monitors this horizon and defers reclaiming any dead tuples newer than the replica's oldest active transaction.
> [!CAUTION]
> **Standby Feedback Bloat**
> While standby feedback prevents query cancellations on replicas, it introduces a severe risk of **Table Bloat** on the primary. A single long-running query on a replica pins the primary's `xmin` horizon, preventing the primary's vacuum process from cleaning up dead tuples across *any* tables. This can cause the primary server to run out of disk space.
Replication is a highly efficient way to scale read throughput, but it requires careful tuning of the durability gradient and active monitoring of the replica's `xmin` horizon to prevent memory and disk starvation on the primary node.
---
### The Monolith Fights Back: Lag and Standby Feedback Bloat
Replicating the WAL stream lets you scale read capacity, but it shifts the database into the territory of distributed state inconsistencies.
* **The Consistency Trade-off (Lag)**: Unless you run `remote_apply` synchronous replication (which kills write throughput by waiting for network round-trips), reads from a replica are eventually consistent. This breaks the **Read-Your-Writes** guarantee. If a web application inserts an order, redirects the browser, and reads from a lagging replica, the client receives a confusing "Order Not Found" error.
* **The Vacuum Conflict**: When a replica runs a long query, it must hold a snapshot of past data. If the primary deletes rows and runs `VACUUM`, the replica's query conflicts with this cleanup. Replicas are forced to choose: abort the client query (`canceled on conflict with recovery`) or delay applying the replication stream, causing lag to spike.
* **Standby Feedback Bloat**: If you try to prevent query cancellations by setting `hot_standby_feedback = on`, the replica reports its active transaction horizon back to the primary. This pins the primary's `xmin` horizon, preventing the primary's vacuum process from reclaiming *any* dead tuples database-wide. A single idle query on an analytics standby can saturate the primary's disks with table bloat.
By scaling reads through standbys, you trade local consistency and vacuum predictability for distributed throughput.
---
## 8.4 - Scaling Connections (Connection Pooling)
<img src="assets/arch_crowded_hallway.png" width="250" style="float: left; margin: 0 20px 20px 0;" />
Scaling storage and partitioning tables allows you to handle terabytes of data. However, as your application tier grows from a few servers to thousands of containerized microservices or serverless functions, you encounter a different scaling limit: **Connection Coordination**.
Postgres is built around a **Process-per-Connection** architecture. To understand why this limits scale, we must look at how the OS and the engine manage clients under the hood.
A connection pooler is a proxy layer that multiplexes client connections onto a smaller pool of active database processes. Think of it as a **Maitre D'** coordinating client access. This decouples client state from active processes, preventing CPU context switching and ProcArray lock contention under high concurrency.
---
### The Cost of a Connection
Unlike database systems that multiplex clients onto lightweight, in-memory worker threads, PostgreSQL performs a Unix `fork()` to spawn a dedicated operating system process (a backend) for every connecting client.
This design provides excellent stability: if one client backend encounters a fatal memory error or crashes, the operating system isolates and terminates only that process. The rest of the database remains completely unaffected.
However, this design introduces substantial resource overhead:
1. **Memory Overhead**: Every process requires private memory for query execution, metadata tracking, and sorting, consuming roughly 2MB to 10MB of RAM even when completely idle.
2. **Fork Latency**: Spawning a new OS process is a kernel-level operation involving memory mapping and page table duplication. It takes milliseconds, compared to microseconds for thread creation.
3. **Context Switching**: If you run 2,000 active processes on a server with 16 CPU cores, the operating system spends more time swapping processes in and out of the CPU cores (context switching) than executing queries.
---
### The ProcArray Lock Bottleneck
The absolute limit of connection scaling in Postgres is a global memory structure called the **`ProcArray`**.
The `ProcArray` is a shared-memory array that lists every active backend process in the database. Every time a query runs, the backend must read the `ProcArray` to construct its MVCC visibility snapshot (determining which transactions are currently active, as learned in **[[Manuscript/02 - Physical Storage & MVCC/2.5 - MVCC (The Sharpie Ledger)|Chapter 2.5]]**).
Whenever a client connects or disconnects, the parent process must acquire an exclusive lock on the `ProcArray` to add or remove the process entry.
If your application does not reuse connections and instead opens and closes a connection for every query, you create **Connection Churn**. Under high churn, the `ProcArray` lock is constantly held in exclusive mode. Active queries stall trying to read the `ProcArray` to build their MVCC snapshots, paralyzing the database.
> [!NOTE] The Click
> **Concept**: The ultimate bottleneck of scaling connections in Postgres is not RAM or CPU—it is memory lock contention on the global transaction register.
> **Payoff**: Because every active process must be tracked in the `ProcArray` to calculate MVCC visibility snapshots, adding more backend connections increases the lock contention on this shared memory array. Once the connection count exceeds a few hundred, backends spend more time waiting for the `ProcArray` lock than doing actual work. Multiplexing connections via PgBouncer is not about saving RAM; it is about keeping the `ProcArray` small enough to prevent snapshot lock contention.
---
### Lab: Auditing Idle Connections
The most resource-inefficient state for a Postgres server is having thousands of connections in the `idle` state. An idle connection does no work but continues to hold its memory allocation, its `ProcArray` slot, and a share of the server's `max_connections` limit.
Even worse is the **`idle in transaction`** state. This occurs when an application opens a transaction (`BEGIN`), runs a query, and then stalls (due to slow application logic or network delay) before running `COMMIT` or `ROLLBACK`.
```sql
-- Identify long-lived idle-in-transaction backends
SELECT pid, state,
now() - xact_start AS xact_duration,
query
FROM pg_stat_activity
WHERE state = 'idle in transaction'
ORDER BY xact_duration DESC;
```
An `idle in transaction` backend pins a visibility snapshot. Postgres cannot vacuum or reclaim any dead tuples generated after that transaction started, leading to massive **Table Bloat** across the entire database.
---
### Multiplexing: PgBouncer
To eliminate this connection coordination overhead, we introduce a proxy layer: PgBouncer.
PgBouncer sits between the application tier and the database. It maintains connections to the applications, but maps them to a very small pool of persistent, heavy connections to the actual Postgres processes.
PgBouncer operates in three modes:
| Mode | Behavior | Best For... |
| :--- | :--- | :--- |
| **Session** | A Postgres backend is assigned for the duration of the client connection. | Temporary tables or session-local state. |
| **Transaction** | A Postgres backend is assigned only for the duration of a transaction. | **Standard recommendation**. High reuse efficiency. |
| **Statement** | A Postgres backend is assigned for a single statement. | Append-only analytical workloads; zero transaction blocks. |
> [!CAUTION]
> **The Prepared Statement Trap**
> In `transaction` pooling mode, PgBouncer can swap your client connection to a different Postgres backend between transactions. If you execute a prepared statement (`PREPARE my_query AS...`) in one transaction and try to run it (`EXECUTE my_query`) in the next, it may fail because the second transaction was routed to a backend that was never taught your statement. You must disable driver-side prepared statements or use a pooler that supports statement preparation tracking (like Odyssey).
---
### Serverless Connection Concentration
Connection pooling becomes critical in serverless architectures. If you run serverless functions (such as AWS Lambda or Vercel Functions), a traffic spike can launch 5,000 concurrent function invocations. If each function opens a direct connection to Postgres, the database will crash.
This is why serverless Postgres providers (like Neon) build connection concentrators directly into their routers. The driver uses WebSockets to bundle query requests from thousands of short-lived serverless functions over a single persistent network stream, allowing a stateless, ephemeral compute fleet to scale without ever triggering `ProcArray` contention on the database engine.
---
### The Monolith Fights Back: Session State Loss
Multiplexing thousands of clients onto a handful of active database backends solves process exhaustion, but it breaks the assumption of persistent database sessions.
* **The Prepared Statement Trap**: When you prepare a statement, Postgres compiles and caches the query plan in the backend's local memory. In `transaction` pooling mode, PgBouncer can swap your client to a different backend process between transactions. If your application tries to execute that statement in a later transaction, it will throw an error because the new backend has no record of the prepared query.
* **Loss of Temp Tables**: Temporary tables are bound to the specific backend process that created them. If PgBouncer detaches your client from that process, the temporary tables vanish or become accessible to other clients routed to the same process, causing severe cross-client data leaks.
* **Advisory Lock Failure**: Session-level advisory locks are bound to the backend. If you acquire an advisory lock to coordinate app-level tasks and PgBouncer routes your next query to a different backend, your application loses the lock, breaking concurrency controls.
Connection pooling is highly effective for Scaling Connections, but it requires that your application tier remains completely stateless. If your code relies on session state, you must run PgBouncer in the restrictive `session` mode (losing multiplexing efficiency) or rewrite your application logic to avoid session dependencies.
---
## 8.5 - Scaling Failure (Consensus & Failover)
<img src="assets/arch_ha_governor.png" width="250" style="float: left; margin: 0 20px 20px 0;" />
Scaling storage, writes, and connections solves for throughput and capacity. But in a distributed database cluster, we must also scale for reliability. When the physical server running your primary database suffers a hardware failure, how does the system coordinate recovery without human intervention?
This is the challenge of **High Availability (HA)**.
---
### The Election Sequence (Automatic Failover)
In a high-availability cluster, nodes do not operate in isolation. They coordinate their state through an orchestrator (typically **Patroni**) that relies on a distributed consensus store (like `etcd`, `Consul`, or `ZooKeeper`).
The primary node constantly renews a short-lived lease (a heartbeat) in the consensus store. If the primary crashes, the cluster executes a deterministic recovery sequence:
1. **Detection**: The orchestrator notices that the heartbeat lease has not been renewed (typically within 10 to 30 seconds).
2. **Liveness Loss**: The consensus store revokes the leader key.
3. **Candidate Selection**: The remaining standbys query their local replication state and report their last replayed LSN. The standby that has replayed the most WAL (the highest LSN) is selected as the promotion candidate to minimize data loss.
4. **Fencing**: The orchestrator asserts that the old primary is isolated and cannot write (more below).
5. **Promotion**: The selected replica is promoted to primary by running `pg_promote()`. It exits recovery mode, writes a promotion record to its WAL, and begins accepting write connections.
6. **Routing**: Load balancers (such as HAProxy) poll the nodes' health check endpoints. The node that returns a `200 OK` (the new primary) is routed active write traffic.
> [!CAUTION]
> **The HA Data-Loss Trap**
> Automated failover is bound to the replication settings discussed in **[[Manuscript/08 - Distributed Scaling & Clouds/8.3 - Scaling Reads (Streaming Replication)|8.3 Streaming Replication]]**. If you run asynchronous replication, the replica with the "highest LSN" will still be missing the last few milliseconds of transactions committed on the dead primary. Promoting that replica restores availability but acknowledges the permanent loss of those transactions.
---
### The Terror of the Split-Brain
The most dangerous state in a distributed database is a **Split-Brain**: a network partition cuts the cluster in half, and both sides believe they are the legitimate leader. Both accept writes, creating two divergent database histories that cannot be merged automatically.
To prevent split-brain, the cluster must enforce **Fencing**—ensuring the old primary is completely blocked from writing before promoting a new standby:
* **STONITH (Shoot The Other Node In The Head)**: Hard power-cycling the old server via hardware controllers (like IPMI or iLO). If the box is powered off, it cannot write.
* **Storage Fencing**: Revoking the old server's access to the shared storage volume (e.g., detaching an AWS EBS volume).
* **Application Fencing (Patroni Default)**: The orchestrator relies on the etcd lease. If the old primary returns and discovers its lease has expired, it automatically demotes itself to a standby and blocks local writes.
---
### Quorum: The Rule of Three
A consensus store (like `etcd`) requires a strict majority—**Quorum**—to make decisions. The formula for quorum is:
$\text{Quorum} = \lfloor \frac{N}{2} \rfloor + 1$
Where $N$ is the total number of nodes.
* **With $N=2$**: Quorum is 2. If a network partition splits the two nodes, neither side forms a majority. The cluster halts.
* **With $N=3$**: Quorum is 2. If one node dies, the remaining two can still form a majority and elect a leader. This is why three is the minimum node count for any high-availability consensus tier.
#### Witness Nodes
If running a third database node is too expensive, you can use a **Witness Node**. A witness runs the etcd consensus agent to break ties during elections, but does not run Postgres or store data.
---
### A Worked Topology
A standard, production-grade high-availability database cluster is structured across three availability zones (AZs) to survive a zone outage:
```mermaid
graph TD
App["App tier (multiple AZs)"] --> VIP["HAProxy / VIP"]
subgraph us-east-1a [Availability Zone 1a]
Patroni1["Patroni / Postgres<br>(Primary)"]
end
subgraph us-east-1b [Availability Zone 1b]
Patroni2["Patroni / Postgres<br>(Standby)"]
end
subgraph us-east-1c [Availability Zone 1c]
Patroni3["Patroni / Postgres<br>(Standby)"]
end
VIP -->|routes write/read traffic| Patroni1
VIP -.->|routes read traffic| Patroni2
VIP -.->|routes read traffic| Patroni3
Patroni1 -->|async WAL| Patroni2
Patroni1 -->|async WAL| Patroni3
Patroni1 <-->|etcd consensus| Patroni2
Patroni2 <-->|etcd consensus| Patroni3
Patroni3 <-->|etcd consensus| Patroni1
```
By pairing Patroni with `etcd` and a dynamic load-balancer tier, you shift database operations from a single server vulnerable to physical failure to a self-healing, distributed cluster governed by consensus and quorum.
---
### The Monolith Fights Back: Consensus Latency & Split-Brain Risk
Automating failover coordinates recovery during hardware crashes, but it demands complex distributed synchronization that can stall writes or cause data corruption.
* **Consensus Network Delay**: High Availability relies on quorum math ($Quorum = \lfloor N/2 \rfloor + 1$). The nodes must continuously execute network heartbeats to verify liveness. If transient network issues block etcd communication, the cluster can lose consensus, demoting the primary and halting all writes database-wide until the network settles.
* **The Split-Brain Menace**: If a network partition splits your cluster and fencing (STONITH) fails, both halves of the cluster may claim the leader lease and accept writes. This creates divergent, irreconcilable databases. To guarantee safety, you must configure hardware-level node termination, adding operational complexity.
* **The Promotion Latency Tax**: When the primary dies, the time to detect the crash (usually 10-30 seconds), query standby LSNs, fence the primary, promote the standby (`pg_promote`), and reload HAProxy routing can take up to a minute. During this window, all write traffic is rejected.
High Availability shifts your database from a single-point failure model into a consensus-bound system. You trade absolute write uptime for automated, self-healing recovery.
---
## 8.6 - Summary (The Scaling Playbook)
> Scale is the deliberate introduction of distance. To break the limits of a single machine, you must accept the latency of coordination. You can duplicate the data or you can divide it, but you cannot outrun the speed of light.
<div style="page-break-after: always;"></div>
---
### The Scale Decision Tree
When planning your scaling path, use this decision tree to match your primary technical bottleneck to its architectural remedy:
```mermaid
graph TD
Scale["SCALE"] --> Reads["Reads"]
Scale --> Writes["Writes"]
Reads --> Connections["Connections<br>(Thundering Herd)"]
Reads --> ReadVolume["Read Volume<br>(Query Concurrency)"]
Connections --> PgBouncer["PgBouncer<br>(Connection Pooling)"]
ReadVolume --> Replication["Read Replicas<br>(WAL Streaming)"]
Writes --> StorageCapacity["Storage Capacity<br>(Disk Saturation)"]
Writes --> WriteVolume["Write Volume<br>(Index & Vacuum Contention)"]
StorageCapacity --> Neon["Neon<br>(Compute/Storage Separation)"]
WriteVolume --> Partitioning["Table Partitioning<br>(Write Isolation)"]
```
---
### The Scaling Hierarchy: Order of Operations
Before restructuring your database architecture or provisioning secondary nodes, you should exhaust simpler, local optimization tiers. Always execute scaling optimizations in this order, and **never skip a level without a clear operational reason**:
1. **Fix SQL (Query Tuning)**: Rewrite inefficient joins, remove unnecessary columns, and avoid CTE bottlenecks.
2. **Add Indexes**: Ensure your read queries utilize index scans rather than sequential scans.
3. **Tune Memory**: Configure `work_mem` to prevent disk sorting and allocate sufficient `shared_buffers` to cache your working set.
4. **Tune Autovacuum**: Adjust autovacuum scale factors to prevent table bloat and write amplification early.
5. **Add Replicas**: Stream the WAL to scale read capacity.
6. **Add Pooling (PgBouncer)**: Multiplex connections to eliminate process management overhead and `ProcArray` lock contention.
7. **Partition**: Implement declarative range partitioning to divide index maintenance and localize autovacuum sweeps.
8. **Compute/Storage Separation (Neon)**: Decouple compute from disk to achieve serverless scale-to-zero and infinite log-structured storage.
9. **Shard (Citus/Distributed writes)**: Split your database horizontally across multiple physical primary nodes (last resort; introduces severe application-level coordination overhead).
---
### The Scaling Decision Matrix
Use this matrix to identify the immediate first architectural move based on your primary system symptom:
| Symptom | Primary Cause | First Move | Why? |
| :--- | :--- | :--- | :--- |
| **CPU Saturated** | Inefficient scan execution plans | **Tune query / Add indexes** | Prevents the CPU from executing full-table seq scans. |
| **Read Workload Growing** | Read query concurrency limits local resources | **Add Read Replica** | Offloads read traffic to stateless standby nodes. |
| **Too Many Connections** | High connection churn and process-per-client memory | **Add PgBouncer** | Multiplexes active clients, eliminating `ProcArray` lock overhead. |
| **Index Updates and Vacuum Overwhelming Table** | Monolithic index sizes exceeding Shared Buffers | **Partition Table** | Restricts index updates and autovacuum scans to the active partition. |
| **Elastic / Serverless Workload** | Idle compute containers holding active resources | **Separate Compute/Storage (Neon)** | Allows compute nodes to scale to zero while keeping data durable in NVMe. |
| **Single Node physical limits reached** | Write throughput saturates absolute hardware bounds | **Shard (Last Resort)** | Distributes write load across multiple primary nodes. |
---
### Common Scaling Mistakes
Avoid these typical scaling anti-patterns:
| Problem | Common Mistake (Wrong Solution) | Why it Fails | Correct Solution |
| :--- | :--- | :--- | :--- |
| **Slow Query** | Add replicas | A slow query runs just as slowly on a replica; replicating a bad plan wastes more memory. | **Tune query or add indexes** |
| **Too Many Connections** | Increase `max_connections` | Exposes the database to severe operating system context-switching and `ProcArray` lock contention. | **Add PgBouncer (Transaction mode)** |
| **Huge Table** | Add more RAM | More RAM is a temporary band-aid; it does not prevent autovacuum bottlenecks or clean up bloat. | **Partition by Range (Time)** |
| **Idle Transactions** | Scale up to a bigger instance | Bigger nodes do not stop idle transaction locks from pinning the `xmin` horizon and bloating memory. | **Configure timeouts / Fix application code** |
| **Write Hotspot** | Add more replicas | Replicas only scale reads; they actually *increase* write load on the primary due to WAL streaming. | **Partition / Decouple storage** |
---
### The Coordination Budget
Every scaling architecture trades local physical simplicity for distributed coordinates. You cannot eliminate coordination; you can only decide where to pay its latency budget:
| Database Model | Coordination Location | Architectural Payoff | The Latency Cost |
| :--- | :--- | :--- | :--- |
| **Traditional Postgres** | Shared memory (LwLocks / Latches) | Microsecond local transaction resolution | Saturation under heavy write concurrency |
| **Read Replicas** | The network WAL stream | Scaled read throughput on stateless standbys | Replication lag; eventual consistency |
| **Compute/Storage Separation** | The network consensus tier | Stateless compute nodes; scale-to-zero | Millisecond network latency on cache misses |
| **Distributed Sharding** | The application / Coordinator node | Horizontally scaled write throughput | Complex multi-shard transaction commits |