Why This Matters
Read scaling is well-understood: add caches, add replicas, add CDN. Write scaling is harder because every write must eventually land on a single authoritative copy of the data. You cannot cache a mutation. You cannot serve a stale write. The moment your system exceeds what a single database node can absorb, you are forced into architectural decisions — sharding, buffering, event sourcing — that constrain every query pattern, migration, and feature for years.
This is why write scaling questions separate Staff from senior in system design interviews. Seniors propose "shard the database" as a performance optimization. Staff engineers understand that sharding is a schema commitment: the shard key determines which queries are cheap (shard-local), which are expensive (scatter-gather), and which become impossible. Getting the shard key wrong costs a quarter of migration work. Getting it right requires understanding the access patterns deeply enough to predict what the system needs two years from now.
If you can articulate why you chose a specific write-scaling strategy, name the queries it makes expensive, and explain what vertical scaling would have given you for free — you demonstrate the systems reasoning that Staff interviews demand.
The 60-Second Version
- Write scaling is a schema commitment. Sharding, partitioning, and event sourcing constrain every query you write for years. This is not a performance tuning exercise — it is an architectural decision you will live with long after the throughput crisis passes.
- Exhaust vertical scaling first. A bigger machine preserves all existing query patterns and costs nothing in engineering complexity. PostgreSQL handles ~10K TPS on a single node. If you are at 2K TPS, you have 5x headroom — do not shard prematurely.
- The shard key is a multi-year decision. Choose based on: high cardinality (even distribution), present in all queries (no scatter-gather), natural business boundary (data locality), and immutable (no row migration).
- Not all writes need the same latency. Payment confirmations require synchronous acknowledgment. Analytics events can tolerate 500ms of buffering. Design two write paths — sync for critical, async for everything else.
- Cross-shard queries are the ongoing tax. Accept this for rare operations (analytics, batch reports), but never let user-facing features depend on scatter-gather. If a feature needs all-shard data, build a pre-computed view.
- Hot shards are inevitable. The question is not how to prevent them but how fast you can detect and isolate them. Per-shard dashboards from day one — aggregate metrics hide shard-level problems.
The Problem
Write throughput hits a ceiling when a single node owns all mutations. Scaling writes means distributing mutation responsibility across nodes, partitions, or time windows. Every distribution strategy is a schema commitment that affects query patterns, consistency model, and operational complexity for years. You are not optimizing throughput — you are choosing a data model you will live with long after the performance crisis passes.
Playbooks That Use This Pattern
- Database Sharding — Shard key selection, rebalancing, cross-shard query cost
- Stream Processing — Partitioned event ingestion, ordering guarantees per partition
- Payment Processing — Ledger write distribution, double-entry consistency
- Order Matching — Matching engine write throughput, contention on order books
- Leaderboard & Counting — High-frequency counter updates, approximate vs. exact counts
The Core Tradeoff
| Strategy | What Works | What Breaks | Who Pays |
|---|---|---|---|
| Vertical scaling | Zero code changes, preserves all query patterns | Hard ceiling, single point of failure | Ops budget (bigger machines) |
| Horizontal sharding | Near-linear write throughput | Cross-shard queries, rebalancing, schema rigidity | Every engineer writing queries, forever |
| Write-behind / buffering | Absorbs bursts, smooths latency spikes | Durability gap between accept and persist | On-call when the buffer crashes |
| Event sourcing | Full audit trail, temporal queries, replay | Projection lag, eventual consistency, operational complexity | Team maintaining projections and replay infrastructure |
| Command batching | Throughput gains via amortized I/O | Latency increase per individual write, retry complexity | Users waiting for acknowledgment |
Staff Default Position
Exhaust vertical scaling first — it preserves every existing query pattern and costs nothing in engineering complexity. When vertical scaling is insufficient, shard by natural business key (user ID, tenant ID, account ID). Event sourcing only when the audit trail is a stated business requirement, not a performance optimization. Batching is a latency-for-throughput trade that should be measured, not assumed.
When to Deviate
- Multi-tenant SaaS with regulatory isolation — shard by tenant even before hitting write limits; compliance forces your hand.
- Financial systems requiring reconstruction — event sourcing earns its complexity when regulators demand point-in-time replay.
- Extreme burst patterns (flash sales, market opens) — write-behind buffering is justified when the alternative is dropping writes entirely.
- Sub-millisecond latency requirements — skip batching; the latency cost is the system's core constraint, not throughput.
Common Interview Mistakes
| What Candidates Say | What Interviewers Hear | What Staff Engineers Say |
|---|---|---|
| "We'll just shard it" | No understanding of cross-shard query cost | "Sharding commits us to this access pattern — here's what we lose" |
| "Event sourcing solves this" | Resume-driven architecture | "Event sourcing is justified here because the business needs replay, not because we need write throughput" |
| "We can batch writes for performance" | Hasn't considered latency impact | "Batching trades P99 latency for throughput — let me quantify both sides" |
| "Use a write-ahead log" | Conflating durability mechanism with scaling strategy | "WAL gives us durability, not distribution — we still need to decide where writes land" |
| "NoSQL scales writes automatically" | Doesn't understand partition key design | "The partition key is the shard key — we're making the same commitment with different syntax" |
Quick Reference
Staff Sentence Templates
Implementation Deep Dive
Sharded Writes with PostgreSQL — The Staff Default
When vertical scaling is exhausted, the next step is horizontal sharding by a natural business key. Here is the full implementation path.
Shard Key Selection Process
The shard key determines write distribution, query routing, and rebalancing cost for the lifetime of the system. Choose wrong and every query pays the tax.
Selection criteria (in order of priority):
- High cardinality — The key must distribute writes evenly.
user_id(millions of values) is good.country_code(200 values) creates hot shards. - Present in all queries — Every query must include the shard key, or the system must scatter-gather across all shards.
- Natural business boundary — The key should align with data locality. User data naturally groups by
user_id; cross-user queries are rare. - Immutable — Changing a shard key requires moving rows between shards. Use values that never change (
user_id,account_id), not values that might (region,plan_type).
Composite key example — E-commerce order system:
Shard key: merchant_id (not order_id)
Why merchant_id:
- All order queries include merchant context (merchant dashboard, reports)
- Merchants are the tenancy boundary — data isolation is natural
- Cardinality: 500K merchants → even distribution across 16 shards
- Immutable: orders never change merchants
Why NOT order_id:
- Queries by merchant (dashboard, reports) would scatter-gather across all shards
- Cross-merchant queries are rare but intra-merchant queries are every page load
🎯 Staff Move: State the shard key and immediately state what queries become expensive. "Sharding by merchant_id means any cross-merchant analytics query must scatter-gather. That is acceptable because cross-merchant queries are batch jobs, not user-facing."
Write-Behind Buffer Implementation
When write bursts exceed what shards can absorb synchronously, a write-behind buffer smooths the spikes:
# Write path with Redis-backed buffer
function createOrder(order):
orderId = generateId()
# Step 1 — Validate and accept
validate(order)
# Step 2 — Write to durable buffer (Redis Stream or Kafka)
redis.XADD("orders:buffer", "*",
"order_id", orderId,
"merchant_id", order.merchantId,
"payload", serialize(order))
# Step 3 — Return accepted (not committed)
return { id: orderId, status: "accepted" }
# Async consumer drains buffer to sharded DB
function bufferConsumer():
while true:
entries = redis.XREADGROUP("consumers", "worker-1",
COUNT=100, BLOCK=1000, "orders:buffer", ">")
for entry in entries:
shard = shardRouter.getShard(entry.merchantId)
shard.insert(deserialize(entry.payload))
redis.XACK("orders:buffer", "consumers", entry.id)
Durability tradeoff: Between XADD and the consumer persisting to PostgreSQL, the order exists only in Redis. If Redis crashes, buffered orders are lost. Mitigation: use Redis with AOF persistence (appendfsync everysec) or replace with Kafka for stronger durability guarantees.
🎯 Staff Move: Always name the durability gap explicitly. "There is a window between accept and persist where data lives only in the buffer. For orders, that window is typically under 500ms. If that is unacceptable, we write to Kafka with acks=all instead of Redis."
Kafka-Based Write Distribution
For sustained high write throughput, Kafka provides durable, partitioned write distribution:
# Producer: partition by shard key
producer.send(
topic = "orders",
key = order.merchantId, # Kafka partitions by this key
value = serialize(order),
acks = "all" # Wait for all replicas
)
# Consumer: one consumer per shard
# Kafka partition N → PostgreSQL shard N
function shardConsumer(partitionId):
consumer = KafkaConsumer(topic="orders", partition=partitionId)
shard = postgresShards[partitionId]
for batch in consumer.poll(batchSize=500, timeout=1000):
shard.beginTransaction()
for record in batch:
shard.insert(deserialize(record.value))
shard.commit()
consumer.commitOffsets()
Why Kafka partitions map to DB shards: Each Kafka partition is consumed by exactly one consumer in a consumer group. This gives you ordered, single-writer access to each shard — no write contention between consumers.
Connection and Transaction Management
| Configuration | Recommended Value | Rationale |
|---|---|---|
| Connections per shard | 5–10 per app instance | Limits contention; shards are the bottleneck, not connections |
| Transaction scope | Single shard only | Cross-shard transactions require 2PC; avoid at all costs |
| Batch size for bulk inserts | 500–1,000 rows per commit | Amortizes commit overhead without holding locks too long |
| Statement timeout per shard | 5s | Prevents slow queries from blocking the shard's connection pool |
| Idle connection timeout | 60s | Reclaims connections during low-traffic periods |
The numbers that matter. PostgreSQL single node handles approximately 10K TPS for write-heavy workloads (INSERT/UPDATE with indexes). With 10 shards, expect approximately 80K TPS aggregate — not 100K, because shard routing overhead, connection management, and occasional cross-shard coordination consume 15–20% of theoretical throughput. With 100 shards, the overhead grows to 25–30% due to increased routing complexity and metadata management.
🎯 Staff Move: Quote the overhead honestly. Candidates who say "10 shards = 10x throughput" reveal they have not operated a sharded system. The real multiplier is 7–8x, and it decreases as shard count increases.
Architecture Diagram
Sync path: For writes requiring immediate acknowledgment (payment confirmations, inventory decrements), the app server routes through the shard router directly to the correct PostgreSQL shard.
Async path: For writes that tolerate buffering (analytics events, activity logs, non-critical updates), the app server publishes to Kafka. Each Kafka partition maps to one DB shard. Consumers drain partitions to their corresponding shards in batches.
🎯 Staff Move: In an interview, present both paths and let the interviewer choose which to explore. This demonstrates that you understand not all writes have the same latency requirement.
Failure Scenarios
1. Hot Shard from Skewed Partition Key
Timeline: A viral merchant generates 100x normal order volume. All orders hash to shard 7. Shard 7 hits 100% CPU and I/O saturation. Write latency on shard 7 goes from 5ms to 2,000ms. Other merchants on shard 7 experience collateral damage.
Blast radius: All tenants on the hot shard. Other shards are unaffected, but if the application retries failed writes, retry storms can cascade to the app tier.
Detection: Per-shard write latency monitoring. CPU and I/O utilization per shard instance. Query queue depth exceeding threshold.
Recovery:
- Immediate: rate-limit the hot tenant at the application tier to cap their write throughput
- Short-term: split the hot shard — re-hash the tenant to a dedicated shard
- Long-term: use consistent hashing with virtual nodes so that a single tenant's traffic spreads across multiple physical shards
🎯 Staff Insight: Hot shards are inevitable. The question is not "how do we prevent them" but "how fast can we detect and isolate them." Build per-shard dashboards from day one — aggregate metrics hide shard-level problems.
2. Write Buffer Overflow During Traffic Spike
Timeline: Flash sale drives 20x normal write volume. Kafka consumers cannot keep up — consumer lag grows from 0 to 500K messages. Buffer retention period (24 hours) approaches. If consumers do not catch up, oldest messages will be dropped.
Blast radius: Delayed writes for all tenants, not just the spike source. If messages are dropped, writes are permanently lost. Downstream systems reading from the database see incomplete data.
Detection: Kafka consumer group lag (messages behind). Consumer throughput (messages/sec processed). Buffer disk utilization approaching retention limit.
Recovery:
- Scale consumers horizontally — add more consumer instances (up to the number of partitions)
- Increase consumer batch size to improve throughput (1,000 → 5,000 per commit)
- If critical, temporarily bypass the buffer and write directly to shards (sync path)
- Increase retention period to buy time for consumers to catch up
🎯 Staff Insight: Size the buffer for 10x normal load, but have a runbook for 100x. The buffer exists to absorb spikes, but if the spike exceeds buffer capacity, you need a fallback — and that fallback should be tested, not theoretical.
3. Cross-Shard Query Timeout During Scatter-Gather
Timeline: An analytics dashboard queries "total orders across all merchants for the last 24 hours." The query fans out to all 16 shards. 15 shards respond in 50ms. Shard 7 (the hot shard) responds in 8 seconds. The aggregate query times out at the 5-second threshold.
Blast radius: Analytics dashboards and reporting features that require cross-shard aggregation. Transactional user-facing queries (which are shard-local) are unaffected.
Detection: Scatter-gather query p99 latency. Per-shard response time within scatter-gather operations. Timeout rate for aggregate queries.
Recovery:
- Set per-shard timeouts within the scatter-gather — return partial results from responsive shards with a warning
- Pre-compute cross-shard aggregations asynchronously (materialized views rebuilt on a schedule)
- Use a dedicated analytics replica per shard that handles aggregate queries without competing with transactional writes
- Move cross-shard analytics to a separate OLAP system (ClickHouse, BigQuery) fed by CDC
🎯 Staff Insight: Cross-shard queries are the ongoing tax of sharding. Accept this tax for rare operations, but never let user-facing features depend on scatter-gather. If a feature requires data from all shards, build a pre-computed view.
In the Wild
Abstract write-scaling patterns are easier to internalize when you see how they were applied under real production constraints. These are public, documented examples — not speculation.
Instagram: Sharding PostgreSQL by User ID
Instagram ran on a single PostgreSQL instance until their growth forced a decision: re-architect from scratch or shard the existing system. They chose to shard PostgreSQL by user_id using a logical sharding layer (PL/Proxy routing to multiple PostgreSQL backends). Every user's photos, likes, and comments live on the same shard — making the most common queries (a user's feed, their profile, their notifications) shard-local and fast.
The Staff-level insight: Instagram chose user_id over photo_id despite photos being the dominant write entity. Why? Because 90% of queries start from a user context (a user's feed, a user's profile). Sharding by photo_id would have made user-centric queries scatter-gather across all shards — the most common operation would have been the most expensive. The shard key decision was driven by query patterns, not by write distribution.
Uber: Schemaless — Write-Optimized Sharding
Uber built Schemaless, a sharded MySQL-backed storage layer, to handle tens of millions of trip writes per day. Each row is identified by a (row_key, column_name, ref_key) triple, and the row_key determines the shard. Writes are append-only — updates create new versions rather than mutating in place. This eliminates write contention (no row-level locks) and makes the system naturally audit-friendly.
The Staff-level insight: By making writes append-only, Uber turned a write-scaling problem into a storage problem. Append-only writes never contend with each other — two writes to the same logical entity land in different physical rows. The tradeoff is that reads must merge multiple versions, but that merge is simple and cacheable. This is a deliberate choice: they paid read complexity to buy write throughput.
LinkedIn: Kafka as the Write-Distribution Layer
LinkedIn built Apache Kafka specifically to solve their write-scaling problem: hundreds of microservices each generating thousands of events per second (profile views, connection requests, message sends), with downstream systems unable to keep up with the write volume. Kafka acts as a durable, partitioned write buffer — producers write at full speed, and consumers drain at their own pace.
The Staff-level insight: LinkedIn's key insight was separating write acceptance from write processing. The producer's write latency is just the Kafka append (5–50ms). The consumer's processing latency is decoupled — it can batch, retry, and backfill without affecting the producer. This separation is the fundamental write-scaling pattern: accept fast, process later. The partition key determines write distribution, and LinkedIn uses entity-specific keys (member_id for profile events, connection_id for connection events) to maintain per-entity ordering while distributing load across partitions.
Practice Drill
Staff-Caliber Answer ShapeExpand
-
Size the problem. Normal load: 5K TPS — a single PostgreSQL node handles this comfortably (~10K TPS ceiling). Flash sale: 80K TPS for 30 minutes — 8x the single-node ceiling. Total flash sale volume: 80K × 1,800 seconds = 144M orders.
-
Distinguish write types. Not all 80K writes/sec require the same guarantees. (a) Order creation with payment authorization — must be synchronous, consistent. Maybe 20K/sec. (b) Inventory reservation — must be synchronous to prevent overselling. Maybe 20K/sec. (c) Analytics events, recommendation signals, activity logs — can be buffered. Maybe 40K/sec.
-
Design the sync path. Shard PostgreSQL by
merchant_id(the natural business boundary). 10 shards × 8K effective TPS = 80K TPS capacity. This handles the 20K/sec of synchronous order writes with 4x headroom per shard. Inventory lives on the same shard as the merchant — no cross-shard contention. -
Design the async path. The 40K/sec of non-critical writes go to Kafka. 10 partitions (one per shard). Consumers drain to PostgreSQL at their own pace. During the flash sale, consumer lag grows — that is acceptable for analytics events. Lag clears within 15 minutes post-sale.
-
Handle the burst. Flash sale is 30 minutes. Pre-warm: disable non-essential background jobs, pre-create inventory reservations. During: rate-limit per-user to 5 orders/min to prevent bot abuse. After: drain Kafka backlog, reconcile inventory counts.
-
Justify not event sourcing. Event sourcing would provide a full audit trail of every order state transition, but the operational complexity (projection maintenance, replay infrastructure) is not justified for a flash sale that happens once a year. Sharding + buffering is simpler and sufficient.
The Staff move: Distinguish the 30-minute spike from the 364-day baseline. Do not build permanent infrastructure for a temporary load. The sharding handles the baseline; the buffer handles the spike.
Staff Interview Application
How to Introduce This Pattern
State the concrete throughput requirement, the single-node ceiling, and the chosen shard key. Then immediately name the tradeoff. This tells the interviewer you understand that sharding is a commitment, not a free lunch.
When NOT to Use This Pattern
- Below the single-node ceiling: If you are at 2K TPS and the ceiling is 10K, you have 5x headroom. Do not shard prematurely — optimize indexes, connection pooling, and query patterns first.
- No natural shard key exists: If every query touches every partition, sharding adds overhead without reducing per-node load. Consider a different data model or an OLAP system instead.
- Write contention on a single entity: If the bottleneck is concurrent updates to one row (counter, balance, inventory), sharding does not help — you need optimistic concurrency, CAS operations, or partitioned counters.
- Temporary burst, not sustained load: If writes spike for minutes during flash sales but are low otherwise, use a write-behind buffer to absorb the burst. Do not architect permanent sharding for temporary load.
Follow-Up Questions to Anticipate
| Interviewer Asks | What They Are Testing | How to Respond |
|---|---|---|
| "How do you rebalance shards?" | Operational maturity | "We use consistent hashing with virtual nodes. Adding a shard requires migrating only 1/N of the data. We dual-write during migration: old shard + new shard, then cut over reads." |
| "What about cross-shard transactions?" | Distributed systems fundamentals | "We avoid them. Operations are designed to be shard-local. When absolutely required, we use the saga pattern with compensating actions rather than 2PC." |
| "How do you handle ID generation?" | Practical systems experience | "Snowflake IDs: timestamp + shard_id + sequence. Globally unique without coordination. The shard_id is embedded in the ID, so we can route queries without a lookup." |
| "What happens when a shard goes down?" | Fault tolerance reasoning | "Each shard is a PostgreSQL primary with a synchronous standby. Failover is automatic via Patroni. The shard router detects the new primary within 10 seconds." |
| "How do you know when to add more shards?" | Capacity planning | "We monitor per-shard write latency p99 and storage utilization. When any shard sustains >70% of its throughput ceiling for a week, we add capacity. We aim for 40-50% utilization to absorb spikes." |
Capacity Planning Quick Reference
Sizing a Sharded Write System
| Parameter | Formula | Example (50K TPS target) |
|---|---|---|
| Single-node ceiling | Benchmark with production schema + indexes | ~10K TPS (PostgreSQL, write-heavy) |
| Shard count (minimum) | target_TPS / (node_ceiling × 0.8) | 50K / 8K = 7 → round to 8 shards |
| Overhead factor | 15-20% for routing, metadata, cross-shard ops | 8 shards × 8K effective = 64K TPS capacity |
| Headroom target | 40-50% utilization at steady state | 50K / 64K = 78% → add 2 shards for headroom = 10 |
| Connections per shard | app_servers × pool_per_shard | 20 × 5 = 100 connections per shard |
| Total connections | shard_count × connections_per_shard | 10 × 100 = 1,000 total DB connections |
| Kafka partitions (if buffered) | ≥ shard_count (1:1 mapping ideal) | 10 partitions |
Write Throughput by Storage Engine
| Storage Engine | Approximate Write TPS (Single Node) | Sweet Spot | Watch Out For |
|---|---|---|---|
| PostgreSQL | 5-15K TPS | ACID transactions, complex schemas | Vacuum overhead at high write rates |
| MySQL (InnoDB) | 5-12K TPS | ACID, mature replication | Buffer pool sizing for write-heavy workloads |
| Cassandra | 20-50K TPS per node | Time-series, append-heavy | Read-before-write patterns destroy performance |
| DynamoDB | Provisioned: unlimited (pay per WCU) | Predictable key-value writes | Hot partition throttling at >1K WCU/partition |
| MongoDB | 10-30K TPS | Flexible schema, document writes | Global lock on MMAPv1 (use WiredTiger) |
| Redis (AOF) | 50-100K TPS | Ephemeral/buffered writes | Data > memory = eviction or crash |
Key Numbers Worth Memorizing
| Metric | Value | Why It Matters |
|---|---|---|
| PostgreSQL max connections (practical) | 200-500 | Beyond this, connection overhead degrades performance; use PgBouncer |
| Kafka producer throughput (single instance) | 100K-500K msg/sec | Producer is rarely the bottleneck; partitioning and consumer lag are |
| Cross-shard query overhead | 2-10x single-shard latency | Scatter-gather across N shards adds N round-trips + merge time |
| Shard rebalancing data transfer rate | 50-200 MB/sec | Moving 1TB of data to a new shard takes 1.5-5.5 hours |
| Snowflake ID generation rate | ~4K IDs/ms per generator | More than enough for any single application instance |
| WAL generation rate (PostgreSQL, heavy writes) | 50-200 MB/min | Replication lag correlates with WAL volume; monitor during write spikes |
| Batch insert throughput (PostgreSQL) | 50-100K rows/sec | Use COPY or multi-row INSERT to amortize transaction overhead |
| Write amplification (with 5 indexes) | ~6x raw data written | Each index is a separate B-tree update; more indexes = slower writes |
| Kafka end-to-end latency (producer → consumer) | 5-50ms | The buffering delay; size linger.ms and batch.size for your SLA |
Common Pitfalls Checklist
| Pitfall | Symptom | Fix |
|---|---|---|
| Shard key includes mutable field | Row moves between shards on update; requires distributed transaction | Choose immutable keys only (user_id, account_id, never region or plan_type) |
| No cross-shard query plan | Product ships scatter-gather on every page load; p99 blows up | Pre-compute cross-shard aggregations in a materialized view or OLAP store |
| Write buffer with no backpressure | Buffer grows unbounded during sustained spike; OOM or message loss | Set max buffer size; reject with 503 + Retry-After when buffer is full |
| Testing with uniform shard key distribution | Production has Zipf distribution; hot shard appears within days | Load-test with production key histograms, not synthetic uniform data |
| Auto-increment ID as shard key | All recent writes land on the last shard (highest ID range) | Use hash-based sharding (consistent hashing) or Snowflake IDs |