StaffSignal
Cross-Cutting Framework

Scaling Writes

Sharding, partitioning, write-behind, and event sourcing. Write scaling is a schema commitment.

Scaling Writes — Cross-Cutting Pattern

The Problem

Write throughput hits a ceiling when a single node owns all mutations. Scaling writes means distributing mutation responsibility across nodes, partitions, or time windows. Every distribution strategy is a schema commitment that affects query patterns, consistency model, and operational complexity for years. You are not optimizing throughput — you are choosing a data model you will live with long after the performance crisis passes.

Playbooks That Use This Pattern

The Core Tradeoff

StrategyWhat WorksWhat BreaksWho Pays
Vertical scalingZero code changes, preserves all query patternsHard ceiling, single point of failureOps budget (bigger machines)
Horizontal shardingNear-linear write throughputCross-shard queries, rebalancing, schema rigidityEvery engineer writing queries, forever
Write-behind / bufferingAbsorbs bursts, smooths latency spikesDurability gap between accept and persistOn-call when the buffer crashes
Event sourcingFull audit trail, temporal queries, replayProjection lag, eventual consistency, operational complexityTeam maintaining projections and replay infrastructure
Command batchingThroughput gains via amortized I/OLatency increase per individual write, retry complexityUsers waiting for acknowledgment

Staff Default Position

Exhaust vertical scaling first — it preserves every existing query pattern and costs nothing in engineering complexity. When vertical scaling is insufficient, shard by natural business key (user ID, tenant ID, account ID). Event sourcing only when the audit trail is a stated business requirement, not a performance optimization. Batching is a latency-for-throughput trade that should be measured, not assumed.

When to Deviate

  • Multi-tenant SaaS with regulatory isolation — shard by tenant even before hitting write limits; compliance forces your hand.
  • Financial systems requiring reconstruction — event sourcing earns its complexity when regulators demand point-in-time replay.
  • Extreme burst patterns (flash sales, market opens) — write-behind buffering is justified when the alternative is dropping writes entirely.
  • Sub-millisecond latency requirements — skip batching; the latency cost is the system's core constraint, not throughput.

Common Interview Mistakes

What Candidates SayWhat Interviewers HearWhat Staff Engineers Say
"We'll just shard it"No understanding of cross-shard query cost"Sharding commits us to this access pattern — here's what we lose"
"Event sourcing solves this"Resume-driven architecture"Event sourcing is justified here because the business needs replay, not because we need write throughput"
"We can batch writes for performance"Hasn't considered latency impact"Batching trades P99 latency for throughput — let me quantify both sides"
"Use a write-ahead log"Conflating durability mechanism with scaling strategy"WAL gives us durability, not distribution — we still need to decide where writes land"
"NoSQL scales writes automatically"Doesn't understand partition key design"The partition key is the shard key — we're making the same commitment with different syntax"

Quick Reference

Rendering diagram...

Staff Sentence Templates


Implementation Deep Dive

Sharded Writes with PostgreSQL — The Staff Default

When vertical scaling is exhausted, the next step is horizontal sharding by a natural business key. Here is the full implementation path.

Shard Key Selection Process

The shard key determines write distribution, query routing, and rebalancing cost for the lifetime of the system. Choose wrong and every query pays the tax.

Selection criteria (in order of priority):

  1. High cardinality — The key must distribute writes evenly. user_id (millions of values) is good. country_code (200 values) creates hot shards.
  2. Present in all queries — Every query must include the shard key, or the system must scatter-gather across all shards.
  3. Natural business boundary — The key should align with data locality. User data naturally groups by user_id; cross-user queries are rare.
  4. Immutable — Changing a shard key requires moving rows between shards. Use values that never change (user_id, account_id), not values that might (region, plan_type).

Composite key example — E-commerce order system:

Shard key: merchant_id (not order_id)

Why merchant_id:
  - All order queries include merchant context (merchant dashboard, reports)
  - Merchants are the tenancy boundary — data isolation is natural
  - Cardinality: 500K merchants → even distribution across 16 shards
  - Immutable: orders never change merchants

Why NOT order_id:
  - Queries by merchant (dashboard, reports) would scatter-gather across all shards
  - Cross-merchant queries are rare but intra-merchant queries are every page load

🎯 Staff Move: State the shard key and immediately state what queries become expensive. "Sharding by merchant_id means any cross-merchant analytics query must scatter-gather. That is acceptable because cross-merchant queries are batch jobs, not user-facing."

Write-Behind Buffer Implementation

When write bursts exceed what shards can absorb synchronously, a write-behind buffer smooths the spikes:

# Write path with Redis-backed buffer
function createOrder(order):
    orderId = generateId()

    # Step 1 — Validate and accept
    validate(order)

    # Step 2 — Write to durable buffer (Redis Stream or Kafka)
    redis.XADD("orders:buffer", "*",
        "order_id", orderId,
        "merchant_id", order.merchantId,
        "payload", serialize(order))

    # Step 3 — Return accepted (not committed)
    return { id: orderId, status: "accepted" }

# Async consumer drains buffer to sharded DB
function bufferConsumer():
    while true:
        entries = redis.XREADGROUP("consumers", "worker-1",
            COUNT=100, BLOCK=1000, "orders:buffer", ">")

        for entry in entries:
            shard = shardRouter.getShard(entry.merchantId)
            shard.insert(deserialize(entry.payload))
            redis.XACK("orders:buffer", "consumers", entry.id)

Durability tradeoff: Between XADD and the consumer persisting to PostgreSQL, the order exists only in Redis. If Redis crashes, buffered orders are lost. Mitigation: use Redis with AOF persistence (appendfsync everysec) or replace with Kafka for stronger durability guarantees.

🎯 Staff Move: Always name the durability gap explicitly. "There is a window between accept and persist where data lives only in the buffer. For orders, that window is typically under 500ms. If that is unacceptable, we write to Kafka with acks=all instead of Redis."

Kafka-Based Write Distribution

For sustained high write throughput, Kafka provides durable, partitioned write distribution:

# Producer: partition by shard key
producer.send(
    topic = "orders",
    key = order.merchantId,     # Kafka partitions by this key
    value = serialize(order),
    acks = "all"                # Wait for all replicas
)

# Consumer: one consumer per shard
# Kafka partition N → PostgreSQL shard N
function shardConsumer(partitionId):
    consumer = KafkaConsumer(topic="orders", partition=partitionId)
    shard = postgresShards[partitionId]

    for batch in consumer.poll(batchSize=500, timeout=1000):
        shard.beginTransaction()
        for record in batch:
            shard.insert(deserialize(record.value))
        shard.commit()
        consumer.commitOffsets()

Why Kafka partitions map to DB shards: Each Kafka partition is consumed by exactly one consumer in a consumer group. This gives you ordered, single-writer access to each shard — no write contention between consumers.

Connection and Transaction Management

ConfigurationRecommended ValueRationale
Connections per shard5–10 per app instanceLimits contention; shards are the bottleneck, not connections
Transaction scopeSingle shard onlyCross-shard transactions require 2PC; avoid at all costs
Batch size for bulk inserts500–1,000 rows per commitAmortizes commit overhead without holding locks too long
Statement timeout per shard5sPrevents slow queries from blocking the shard's connection pool
Idle connection timeout60sReclaims connections during low-traffic periods

The numbers that matter. PostgreSQL single node handles approximately 10K TPS for write-heavy workloads (INSERT/UPDATE with indexes). With 10 shards, expect approximately 80K TPS aggregate — not 100K, because shard routing overhead, connection management, and occasional cross-shard coordination consume 15–20% of theoretical throughput. With 100 shards, the overhead grows to 25–30% due to increased routing complexity and metadata management.

🎯 Staff Move: Quote the overhead honestly. Candidates who say "10 shards = 10x throughput" reveal they have not operated a sharded system. The real multiplier is 7–8x, and it decreases as shard count increases.


Architecture Diagram

Rendering diagram...

Sync path: For writes requiring immediate acknowledgment (payment confirmations, inventory decrements), the app server routes through the shard router directly to the correct PostgreSQL shard.

Async path: For writes that tolerate buffering (analytics events, activity logs, non-critical updates), the app server publishes to Kafka. Each Kafka partition maps to one DB shard. Consumers drain partitions to their corresponding shards in batches.

🎯 Staff Move: In an interview, present both paths and let the interviewer choose which to explore. This demonstrates that you understand not all writes have the same latency requirement.


Failure Scenarios

1. Hot Shard from Skewed Partition Key

Timeline: A viral merchant generates 100x normal order volume. All orders hash to shard 7. Shard 7 hits 100% CPU and I/O saturation. Write latency on shard 7 goes from 5ms to 2,000ms. Other merchants on shard 7 experience collateral damage.

Blast radius: All tenants on the hot shard. Other shards are unaffected, but if the application retries failed writes, retry storms can cascade to the app tier.

Detection: Per-shard write latency monitoring. CPU and I/O utilization per shard instance. Query queue depth exceeding threshold.

Recovery:

  1. Immediate: rate-limit the hot tenant at the application tier to cap their write throughput
  2. Short-term: split the hot shard — re-hash the tenant to a dedicated shard
  3. Long-term: use consistent hashing with virtual nodes so that a single tenant's traffic spreads across multiple physical shards

🎯 Staff Insight: Hot shards are inevitable. The question is not "how do we prevent them" but "how fast can we detect and isolate them." Build per-shard dashboards from day one — aggregate metrics hide shard-level problems.

2. Write Buffer Overflow During Traffic Spike

Timeline: Flash sale drives 20x normal write volume. Kafka consumers cannot keep up — consumer lag grows from 0 to 500K messages. Buffer retention period (24 hours) approaches. If consumers do not catch up, oldest messages will be dropped.

Blast radius: Delayed writes for all tenants, not just the spike source. If messages are dropped, writes are permanently lost. Downstream systems reading from the database see incomplete data.

Detection: Kafka consumer group lag (messages behind). Consumer throughput (messages/sec processed). Buffer disk utilization approaching retention limit.

Recovery:

  1. Scale consumers horizontally — add more consumer instances (up to the number of partitions)
  2. Increase consumer batch size to improve throughput (1,000 → 5,000 per commit)
  3. If critical, temporarily bypass the buffer and write directly to shards (sync path)
  4. Increase retention period to buy time for consumers to catch up

🎯 Staff Insight: Size the buffer for 10x normal load, but have a runbook for 100x. The buffer exists to absorb spikes, but if the spike exceeds buffer capacity, you need a fallback — and that fallback should be tested, not theoretical.

3. Cross-Shard Query Timeout During Scatter-Gather

Timeline: An analytics dashboard queries "total orders across all merchants for the last 24 hours." The query fans out to all 16 shards. 15 shards respond in 50ms. Shard 7 (the hot shard) responds in 8 seconds. The aggregate query times out at the 5-second threshold.

Blast radius: Analytics dashboards and reporting features that require cross-shard aggregation. Transactional user-facing queries (which are shard-local) are unaffected.

Detection: Scatter-gather query p99 latency. Per-shard response time within scatter-gather operations. Timeout rate for aggregate queries.

Recovery:

  1. Set per-shard timeouts within the scatter-gather — return partial results from responsive shards with a warning
  2. Pre-compute cross-shard aggregations asynchronously (materialized views rebuilt on a schedule)
  3. Use a dedicated analytics replica per shard that handles aggregate queries without competing with transactional writes
  4. Move cross-shard analytics to a separate OLAP system (ClickHouse, BigQuery) fed by CDC

🎯 Staff Insight: Cross-shard queries are the ongoing tax of sharding. Accept this tax for rare operations, but never let user-facing features depend on scatter-gather. If a feature requires data from all shards, build a pre-computed view.


Staff Interview Application

How to Introduce This Pattern

State the concrete throughput requirement, the single-node ceiling, and the chosen shard key. Then immediately name the tradeoff. This tells the interviewer you understand that sharding is a commitment, not a free lunch.

When NOT to Use This Pattern

  • Below the single-node ceiling: If you are at 2K TPS and the ceiling is 10K, you have 5x headroom. Do not shard prematurely — optimize indexes, connection pooling, and query patterns first.
  • No natural shard key exists: If every query touches every partition, sharding adds overhead without reducing per-node load. Consider a different data model or an OLAP system instead.
  • Write contention on a single entity: If the bottleneck is concurrent updates to one row (counter, balance, inventory), sharding does not help — you need optimistic concurrency, CAS operations, or partitioned counters.
  • Temporary burst, not sustained load: If writes spike for minutes during flash sales but are low otherwise, use a write-behind buffer to absorb the burst. Do not architect permanent sharding for temporary load.

Follow-Up Questions to Anticipate

Interviewer AsksWhat They Are TestingHow to Respond
"How do you rebalance shards?"Operational maturity"We use consistent hashing with virtual nodes. Adding a shard requires migrating only 1/N of the data. We dual-write during migration: old shard + new shard, then cut over reads."
"What about cross-shard transactions?"Distributed systems fundamentals"We avoid them. Operations are designed to be shard-local. When absolutely required, we use the saga pattern with compensating actions rather than 2PC."
"How do you handle ID generation?"Practical systems experience"Snowflake IDs: timestamp + shard_id + sequence. Globally unique without coordination. The shard_id is embedded in the ID, so we can route queries without a lookup."
"What happens when a shard goes down?"Fault tolerance reasoning"Each shard is a PostgreSQL primary with a synchronous standby. Failover is automatic via Patroni. The shard router detects the new primary within 10 seconds."
"How do you know when to add more shards?"Capacity planning"We monitor per-shard write latency p99 and storage utilization. When any shard sustains >70% of its throughput ceiling for a week, we add capacity. We aim for 40-50% utilization to absorb spikes."

Capacity Planning Quick Reference

Sizing a Sharded Write System

ParameterFormulaExample (50K TPS target)
Single-node ceilingBenchmark with production schema + indexes~10K TPS (PostgreSQL, write-heavy)
Shard count (minimum)target_TPS / (node_ceiling × 0.8)50K / 8K = 7 → round to 8 shards
Overhead factor15-20% for routing, metadata, cross-shard ops8 shards × 8K effective = 64K TPS capacity
Headroom target40-50% utilization at steady state50K / 64K = 78% → add 2 shards for headroom = 10
Connections per shardapp_servers × pool_per_shard20 × 5 = 100 connections per shard
Total connectionsshard_count × connections_per_shard10 × 100 = 1,000 total DB connections
Kafka partitions (if buffered)≥ shard_count (1:1 mapping ideal)10 partitions

Write Throughput by Storage Engine

Storage EngineApproximate Write TPS (Single Node)Sweet SpotWatch Out For
PostgreSQL5-15K TPSACID transactions, complex schemasVacuum overhead at high write rates
MySQL (InnoDB)5-12K TPSACID, mature replicationBuffer pool sizing for write-heavy workloads
Cassandra20-50K TPS per nodeTime-series, append-heavyRead-before-write patterns destroy performance
DynamoDBProvisioned: unlimited (pay per WCU)Predictable key-value writesHot partition throttling at >1K WCU/partition
MongoDB10-30K TPSFlexible schema, document writesGlobal lock on MMAPv1 (use WiredTiger)
Redis (AOF)50-100K TPSEphemeral/buffered writesData > memory = eviction or crash

Key Numbers Worth Memorizing

MetricValueWhy It Matters
PostgreSQL max connections (practical)200-500Beyond this, connection overhead degrades performance; use PgBouncer
Kafka producer throughput (single instance)100K-500K msg/secProducer is rarely the bottleneck; partitioning and consumer lag are
Cross-shard query overhead2-10x single-shard latencyScatter-gather across N shards adds N round-trips + merge time
Shard rebalancing data transfer rate50-200 MB/secMoving 1TB of data to a new shard takes 1.5-5.5 hours
Snowflake ID generation rate~4K IDs/ms per generatorMore than enough for any single application instance
WAL generation rate (PostgreSQL, heavy writes)50-200 MB/minReplication lag correlates with WAL volume; monitor during write spikes
Batch insert throughput (PostgreSQL)50-100K rows/secUse COPY or multi-row INSERT to amortize transaction overhead
Write amplification (with 5 indexes)~6x raw data writtenEach index is a separate B-tree update; more indexes = slower writes
Kafka end-to-end latency (producer → consumer)5-50msThe buffering delay; size linger.ms and batch.size for your SLA

Common Pitfalls Checklist

PitfallSymptomFix
Shard key includes mutable fieldRow moves between shards on update; requires distributed transactionChoose immutable keys only (user_id, account_id, never region or plan_type)
No cross-shard query planProduct ships scatter-gather on every page load; p99 blows upPre-compute cross-shard aggregations in a materialized view or OLAP store
Write buffer with no backpressureBuffer grows unbounded during sustained spike; OOM or message lossSet max buffer size; reject with 503 + Retry-After when buffer is full
Testing with uniform shard key distributionProduction has Zipf distribution; hot shard appears within daysLoad-test with production key histograms, not synthetic uniform data
Auto-increment ID as shard keyAll recent writes land on the last shard (highest ID range)Use hash-based sharding (consistent hashing) or Snowflake IDs