Your pipeline is running. Records are flowing. And then, at 3 AM on a Tuesday, the on-call alert fires: your Kafka consumer lag is at 40 million messages and climbing, your downstream database is throwing connection pool exhaustion errors, and the business stakeholder who owns that real-time dashboard is already composing an angry Slack message. You fix it by morning — but you don't actually know why it happened or what you changed that made it stop.
That scenario plays out constantly in data engineering. Teams build pipelines that work fine in testing and under modest load, then encounter mysterious slowdowns, cascading failures, and data loss under real production conditions. The root cause is almost always the same: they never developed a systematic model for thinking about flow, pressure, and contention in a pipeline system. They're tuning knobs without understanding the machine.
This lesson fixes that. By the end, you'll have a rigorous mental model for how data moves through a multi-stage pipeline, why bottlenecks form where they do, how backpressure propagates (and when it fails to), and how to diagnose and resolve throughput problems systematically. We're going to go deep — into queue mechanics, JVM internals, operator fusion, network I/O contention, and distributed coordination overhead — because that's the level at which these problems actually live.
What you'll learn:
This lesson assumes you are comfortable with:
You do not need to be an expert in all of these, but if Kafka consumer groups are entirely new to you, spend an hour with the Kafka documentation first. The concepts here build on that foundation.
Before we touch a configuration file or a monitoring dashboard, we need to build the right mental model. Backpressure is not just "the queue is full." It's a specific communication mechanism — a signal that travels upstream through a system to tell producers to slow down.
Think about water flowing through pipes of different diameters. If you have a wide pipe feeding into a narrow pipe, water will back up at the junction. The narrow pipe is the bottleneck. In a naive system, water just overflows — you lose data. In a system with backpressure, the narrow pipe somehow signals the wide pipe to slow down its flow. The question is: how does that signal travel, and what happens when it can't?
In software systems, data flows through a series of stages. Each stage has a processing rate — how many records per second it can handle. When a downstream stage's processing rate is lower than the upstream stage's output rate, pressure builds. The mechanism by which that pressure is communicated back upstream — and the failure modes when it isn't — is what we call backpressure.
There's a fundamental theorem from queueing theory that every pipeline engineer should have memorized: Little's Law.
L = λW
Where:
L = average number of items in the system (queue depth)λ = average arrival rate (throughput)W = average time an item spends in the system (latency)This is not an approximation or a rule of thumb. It's a mathematical identity that holds for any stable queueing system. What it tells us immediately is that latency, throughput, and queue depth are not independent variables. If you want to increase throughput (λ) without increasing queue depth (L), you must decrease the time each item spends in the system (W) — you must actually make the processing faster.
This seems obvious when stated plainly, but it has non-obvious implications. When engineers see high queue depth, they often respond by increasing buffer sizes. But increasing buffer sizes increases L — and if λ stays the same, W must increase proportionally. You haven't solved the problem; you've just hidden it behind larger queues while making your latency worse.
The right response to growing queue depth is to either increase the processing rate of the bottleneck stage or reduce the input rate upstream. Larger buffers just change when the system falls over, not whether it does.
There are two fundamental architectures for handling flow control in pipelines, and they have different failure characteristics.
Push-based systems have producers that push data to consumers. If the consumer can't keep up, the producer either drops data, blocks, or overflows a buffer. Kafka's producer API is push-based at the application level — your code calls producer.send() and it goes. HTTP webhooks are push-based. The failure mode is that you need an explicit mechanism to communicate "slow down" back to the producer, because nothing in the protocol itself prevents overflow.
Pull-based systems have consumers that request data from producers. Kafka's consumer API is actually pull-based at the protocol level — the consumer calls poll() to fetch records. This means the consumer inherently controls its own rate. HTTP long-polling is pull-based. Apache Flink's source operators are pull-based internally. The failure mode is different: if a consumer never pulls (or pulls slowly), the upstream buffer fills up, but at least the consumer isn't being overwhelmed.
Key insight: Most real pipelines are hybrid. Kafka producers push data into Kafka (which acts as a buffer), and Kafka consumers pull from Kafka. The backpressure from a slow downstream consumer manifests as growing consumer lag in Kafka, not as pressure on the original producer. This is both the strength and the danger of using Kafka as a pipeline backbone — it decouples producers from consumers, but it also means slow consumers don't automatically slow down fast producers. You have to implement that feedback loop yourself if you want it.
Flink implements one of the most sophisticated backpressure mechanisms in the stream processing world, and understanding it will sharpen your thinking about what good backpressure looks like in any system.
Flink uses a credit-based flow control system between operators. When an upstream operator wants to send data to a downstream operator's input buffer, it first checks how many credits the downstream operator has advertised. Credits represent available buffer space. If the downstream operator has no credits available (its buffers are full because it's processing slowly), the upstream operator cannot send, and it blocks. That blocking propagates back through the operator chain — eventually reaching the source, which stops pulling from Kafka.
Here's what this looks like in practice. You have a Flink job with three operators:
Kafka Source → Enrichment Operator → Sink (Postgres)
If the Postgres sink is slow (maybe the database is under load, or you're doing single-row inserts), its input buffers fill. The enrichment operator can't send data downstream, so its output buffers fill. The Kafka source can't send to the enrichment operator, so it stops calling poll() on the Kafka consumer client. Consumer lag starts growing. Flink's backpressure monitoring UI will show the enrichment operator and source as "HIGH" backpressure.
Importantly, Flink's backpressure monitoring doesn't just tell you that there's backpressure — it tells you where in the operator graph the pressure is originating. When you see backpressure in Flink, the bottleneck is downstream of where you're seeing the pressure signal. This trips up engineers constantly: they see the source operator is under backpressure and try to speed up the source, when the problem is actually in the sink.
Random tuning is how you spend three days changing configuration values and end up with a system that's marginally better but you don't know why. Systematic tuning starts with a model.
For each stage in your pipeline, you want to know its maximum throughput — the number of records per second it can process when it is the only bottleneck. You measure this in isolation, not end-to-end.
For a Kafka consumer doing some transformation and writing to a downstream store, you'd measure:
Deserialization throughput: How fast can you deserialize records from Kafka? If you're using Avro with Schema Registry, this includes a schema lookup per unique schema ID. With caching, this is fast. Without caching, you'll see it.
Processing throughput: How fast can the transformation logic run? Profile this with realistic data — size distributions matter. A JSON record with a 50-byte payload transforms very differently from one with a 50KB payload.
Sink throughput: How fast can you write to the downstream store? For databases, this depends enormously on batch size, connection pool size, index configuration, and whether you're doing upserts vs. inserts.
Here's a simple Python script to measure deserialization throughput in isolation:
import time
import json
from confluent_kafka import Consumer, KafkaError
def measure_deserialization_throughput(topic: str, bootstrap_servers: str,
sample_size: int = 100_000) -> float:
"""
Measure deserialization throughput without doing any downstream work.
Returns records per second.
"""
consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'throughput-test-' + str(int(time.time())),
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
# Maximize fetch size to isolate deserialization, not network
'fetch.max.bytes': 52428800, # 50MB
'max.partition.fetch.bytes': 10485760, # 10MB
})
consumer.subscribe([topic])
records_processed = 0
bytes_processed = 0
start_time = None
try:
while records_processed < sample_size:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(f"Consumer error: {msg.error()}")
if start_time is None:
start_time = time.monotonic()
# This is the operation we're measuring
payload = json.loads(msg.value().decode('utf-8'))
records_processed += 1
bytes_processed += len(msg.value())
finally:
consumer.close()
elapsed = time.monotonic() - start_time
throughput_rps = records_processed / elapsed
throughput_mbps = (bytes_processed / elapsed) / (1024 * 1024)
print(f"Deserialization throughput: {throughput_rps:,.0f} records/sec")
print(f"Data throughput: {throughput_mbps:.2f} MB/sec")
print(f"Average record size: {bytes_processed/records_processed:.0f} bytes")
return throughput_rps
Run this for each stage in isolation. Then you can build a throughput table:
| Stage | Max Throughput (RPS) | Limiting Factor |
|---|---|---|
| Kafka Consumer Poll | 450,000 | Network bandwidth |
| JSON Deserialization | 280,000 | CPU |
| Enrichment (DB lookup) | 12,000 | DB round-trip latency |
| Postgres Sink (single-row) | 3,500 | DB write latency |
| Postgres Sink (batch=500) | 95,000 | Commit overhead |
The bottleneck is obvious immediately: single-row Postgres inserts at 3,500 RPS, when upstream stages can do 280,000 RPS. This is the stage you need to fix. Everything else is irrelevant until you fix this one.
Warning: This kind of isolated measurement can be misleading if your stages share resources. If your enrichment stage and sink stage both use a connection pool to the same Postgres instance, measuring them in isolation won't reveal the contention. Always validate your model with end-to-end load tests after you've identified and fixed the initial bottleneck.
There's another result from queueing theory that engineers routinely violate: the utilization law. For a single-server queue (M/M/1 in queueing theory notation), as utilization ρ approaches 1 (100%), queue length and latency approach infinity.
Average queue length = ρ / (1 - ρ)
At 50% utilization: average queue length = 1 At 80% utilization: average queue length = 4 At 90% utilization: average queue length = 9 At 95% utilization: average queue length = 19 At 99% utilization: average queue length = 99
This is why you should never design a pipeline that runs at 95%+ of its theoretical maximum throughput under normal load. When you do, any transient spike will cause queues to grow explosively. You need headroom. A practical rule: design for 70% utilization of the bottleneck stage. That gives you enough headroom to absorb spikes while keeping queues under control.
Now we get to the actual work: you have a pipeline that's misbehaving, and you need to find out why. Here's a systematic diagnostic approach.
Every bottleneck in a data pipeline comes down to contention for one of four resources: CPU, memory, network bandwidth, or disk/database I/O. Before looking at application-level metrics, establish which resource is saturated.
On the host running your consumer or processing application:
# CPU: Is any process pegged?
top -H -p $(pgrep -f your-app)
# Or use htop for a nicer view with per-thread CPU
htop -p $(pgrep -f your-app)
# Memory: Are you GC-thrashing?
# For JVM applications, watch for frequent GC pauses
jstat -gcutil $(pgrep -f your-app) 1000
# Network: Are you saturating the NIC?
sar -n DEV 1 10
# Disk I/O: Are writes blocked?
iostat -x 1 10
A common mistake is to look only at CPU. Many pipeline bottlenecks are I/O bound — especially anything involving database writes — and CPU usage will be low even when the system is completely stuck waiting for I/O.
When working with Kafka-based pipelines, there are specific metrics you should always check first.
Consumer Lag by Partition: Aggregate consumer lag hides important information. If 95% of your lag is concentrated in two partitions, you have a partitioning skew problem, not a general throughput problem. Check per-partition lag:
# Using the Kafka CLI
kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe \
--group your-consumer-group
# Look for the LAG column — sort by it mentally
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders-consumer orders 0 10234567 10234901 334
# orders-consumer orders 1 10198234 10237456 39222
# orders-consumer orders 2 10241234 10241298 64
If you see the pattern above — most lag in partition 1 — that partition is receiving disproportionate data. Check your producer's partitioning key. If you're partitioning by customer_id and one customer is generating 80% of your events, you'll never solve this with consumer scaling.
Fetch Latency vs. Processing Time: Understanding whether your consumer is slow because it can't get data fast enough from Kafka, or because it's processing records slowly, changes the solution entirely.
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'diagnostic-consumer',
'enable.auto.commit': False,
})
consumer.subscribe(['orders'])
fetch_times = []
process_times = []
for _ in range(10000):
fetch_start = time.monotonic()
msg = consumer.poll(timeout=5.0)
fetch_end = time.monotonic()
if msg and not msg.error():
fetch_times.append(fetch_end - fetch_start)
process_start = time.monotonic()
# Your actual processing logic here
result = process_record(msg)
write_to_sink(result)
process_end = time.monotonic()
process_times.append(process_end - process_start)
avg_fetch_ms = sum(fetch_times) / len(fetch_times) * 1000
avg_process_ms = sum(process_times) / len(process_times) * 1000
print(f"Avg fetch time: {avg_fetch_ms:.2f}ms")
print(f"Avg process time: {avg_process_ms:.2f}ms")
print(f"Fetch/Process ratio: {avg_fetch_ms/avg_process_ms:.2f}")
If fetch time dominates (ratio > 1), your bottleneck is in getting data from Kafka. Check network bandwidth, fetch.max.bytes, and whether your partitions are too few to support parallel consumption.
If process time dominates (ratio << 1), your bottleneck is in your processing logic or sink. Focus there.
Flink's Web UI is genuinely one of the best built-in diagnostic tools in the streaming ecosystem. The backpressure tab in the job graph view shows you, for each operator, what percentage of time it's blocked waiting for downstream capacity. Here's how to interpret it:
The key insight: the bottleneck operator itself will often show low backpressure, because it's always busy processing. The operators upstream of the bottleneck will show high backpressure. So trace from the source downstream — the first operator you encounter that transitions from high backpressure to low backpressure is the bottleneck.
You can also query Flink's REST API for this data programmatically:
# Get job overview
JOB_ID="your-flink-job-id"
FLINK_HOST="localhost:8081"
curl -s "${FLINK_HOST}/jobs/${JOB_ID}" | python3 -m json.tool
# Get backpressure for a specific vertex
VERTEX_ID="your-vertex-id"
curl -s "${FLINK_HOST}/jobs/${JOB_ID}/vertices/${VERTEX_ID}/backpressure" \
| python3 -c "
import json, sys
data = json.load(sys.stdin)
for subtask in data.get('subtasks', []):
print(f\"Subtask {subtask['subtask']}: {subtask['backpressure-level']} ({subtask['ratio']:.1%})\")
"
One of the trickiest pipeline problems is processing skew — where one subtask or partition is handling disproportionately more work than others. The aggregate throughput looks reasonable, but your end-to-end latency is terrible because you're always waiting for the slow subtask.
In Flink, look at the task metrics in the Web UI. Specifically:
For Kafka consumers in a consumer group, the equivalent is per-partition lag skew as described earlier.
Now that you know where your bottleneck is, here's how to actually fix it.
If your bottleneck is on the producer side (you need to get data into Kafka faster), the most impactful settings are batching-related:
from confluent_kafka import Producer
producer = Producer({
'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
# Batch size in bytes - default 16KB is often too small
# For high-throughput scenarios, 256KB-1MB makes sense
'batch.size': 262144,
# Time to wait for batch to fill before sending
# Default is 0ms (send immediately). 5-20ms significantly increases batching.
'linger.ms': 10,
# Compression. lz4 gives excellent throughput/CPU trade-off.
# zstd gives better compression ratios for archival use cases.
'compression.type': 'lz4',
# In-flight requests per connection
# Keep at 1 if you need strict ordering; 5 for max throughput
'max.in.flight.requests.per.connection': 5,
# Enable idempotent producer to avoid duplicates with retries
# (Requires max.in.flight.requests.per.connection <= 5)
'enable.idempotence': True,
# Buffer memory for all unsent records
'buffer.memory': 67108864, # 64MB
# How long to block if buffer is full
'max.block.ms': 60000,
})
The linger.ms setting is the most underappreciated tuning knob in Kafka. At 0ms (default), the producer sends each batch as soon as a record is ready, even if the batch is nearly empty. At 10ms, it waits up to 10ms to accumulate more records into a batch before sending. The throughput improvement can be 5-10x for small records, because you're amortizing the network round-trip overhead across many records.
The trade-off is latency: you've introduced at least 10ms of additional latency for every record. For real-time alerting pipelines, this might be unacceptable. For analytics pipelines where the downstream query runs every 5 minutes anyway, it's irrelevant.
On the consumer side, the most impactful settings relate to fetch behavior:
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
'group.id': 'order-processor-v2',
# Maximum bytes to fetch from all partitions in one request
# Default 52428800 (50MB) is usually fine; increase if records are large
'fetch.max.bytes': 104857600, # 100MB
# Maximum bytes per partition per fetch
# Must be >= max record size
'max.partition.fetch.bytes': 10485760, # 10MB
# Minimum bytes that must accumulate at broker before it responds
# 0 = respond immediately (low latency)
# >0 = wait for data to accumulate (higher throughput, higher latency)
'fetch.min.bytes': 65536, # 64KB - good for throughput-oriented consumers
# Maximum time broker will wait for fetch.min.bytes
'fetch.wait.max.ms': 500,
# Maximum records returned per poll()
# Does NOT control how many records you process per call
# but does control memory usage
'max.poll.records': 2000,
# Critical: how long between polls before considered dead
# Must be > your processing time per batch
'max.poll.interval.ms': 300000,
# Session timeout - how long before consumer is removed from group
# after a heartbeat failure
'session.timeout.ms': 45000,
# Disable auto-commit; always commit manually for reliability
'enable.auto.commit': False,
})
The most dangerous misconfiguration here is max.poll.interval.ms. If your consumer takes longer to process a batch than this value, Kafka will remove it from the consumer group, trigger a rebalance, and another consumer will start processing the same records again. You'll see this as mysterious repeated processing of the same offsets, often accompanied by rebalance log messages.
The fix is not to increase max.poll.interval.ms indefinitely — that just makes failure detection slow. The fix is to either make your processing faster, reduce max.poll.records so each batch is smaller, or process records asynchronously and commit offsets separately.
The number of Kafka partitions is your fundamental parallelism ceiling. You cannot have more consumer instances in a group consuming concurrently than you have partitions. This is not a tuning parameter you can change at runtime — you need to plan for it.
A common mistake is to create topics with 8 partitions for a use case that eventually needs 200 consumer threads to keep up. You can increase partition count later, but it will cause partition reassignment and temporary disruption, and it will break the ordering guarantees you may have been relying on.
For high-volume production topics, 100-200 partitions is not unusual. The memory overhead per partition at the broker is small; the main cost is coordination overhead during consumer group rebalances, which scales with partition count.
For Flink, the equivalent concept is parallelism. Each Flink subtask processes one or more Kafka partitions. If you have 100 Kafka partitions and Flink source parallelism of 20, each source subtask will be assigned approximately 5 partitions. If your bottleneck is in the source, increasing source parallelism (up to the partition count) will help.
// Flink - setting operator-level parallelism
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set global default
env.setParallelism(20);
FlinkKafkaConsumer<OrderEvent> source = new FlinkKafkaConsumer<>(
"orders",
new OrderEventDeserializationSchema(),
kafkaProperties
);
DataStream<OrderEvent> orders = env
.addSource(source)
.setParallelism(100) // Match Kafka partition count
.name("kafka-source");
DataStream<EnrichedOrder> enriched = orders
.map(new EnrichmentFunction())
.setParallelism(100) // Same parallelism - no network shuffle
.name("enrichment");
// Keyed aggregation will redistribute by key
DataStream<OrderSummary> summaries = enriched
.keyBy(EnrichedOrder::getCustomerId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregator())
.setParallelism(50) // Can be different from source
.name("order-aggregator");
summaries.addSink(new PostgresSink())
.setParallelism(20) // Sink parallelism bounded by DB connection pool
.name("postgres-sink");
Warning: Increasing parallelism everywhere is not free. Each additional Flink subtask consumes memory, and network shuffles between operators with different parallelism consume significant bandwidth and CPU. Measure before and after any parallelism change.
Serialization and deserialization overhead is responsible for more pipeline throughput problems than most engineers realize, because it's often invisible in application-level profiling — the time is spent inside library code that doesn't show up obviously in your stack traces.
Let's look at the real throughput numbers for common serialization formats, measured on realistic data:
import json
import time
import avro.schema
from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder
import io
import msgpack
# Test payload representing a realistic e-commerce order event
sample_record = {
"order_id": "ord_8f3a9c2e1d4b5678",
"customer_id": "cust_12345",
"timestamp": 1698523445000,
"items": [
{"sku": "SKU-001", "quantity": 2, "unit_price": 29.99},
{"sku": "SKU-047", "quantity": 1, "unit_price": 149.99},
],
"shipping_address": {
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip": "94102"
},
"total_amount": 209.97,
"currency": "USD",
"status": "confirmed"
}
NUM_ITERATIONS = 500_000
# JSON benchmark
start = time.monotonic()
for _ in range(NUM_ITERATIONS):
serialized = json.dumps(sample_record).encode('utf-8')
deserialized = json.loads(serialized)
elapsed = time.monotonic() - start
print(f"JSON: {NUM_ITERATIONS/elapsed:,.0f} round-trips/sec, "
f"{len(json.dumps(sample_record).encode()):.0f} bytes/record")
# MessagePack benchmark
start = time.monotonic()
for _ in range(NUM_ITERATIONS):
serialized = msgpack.packb(sample_record)
deserialized = msgpack.unpackb(serialized, raw=False)
elapsed = time.monotonic() - start
print(f"MessagePack: {NUM_ITERATIONS/elapsed:,.0f} round-trips/sec, "
f"{len(msgpack.packb(sample_record)):.0f} bytes/record")
On modern hardware, you'll see something like:
| Format | Throughput (serialization only) | Record Size |
|---|---|---|
| JSON | ~280,000 records/sec | 312 bytes |
| MessagePack | ~620,000 records/sec | 198 bytes |
| Avro (no schema cache) | ~45,000 records/sec | 134 bytes |
| Avro (with schema cache) | ~890,000 records/sec | 134 bytes |
| Protocol Buffers | ~1,100,000 records/sec | 128 bytes |
The Avro numbers are the most instructive. Without schema caching, every deserialization involves parsing the schema, which is catastrophically slow. With caching, Avro is among the fastest options. This is why the Confluent Schema Registry client's default behavior — caching schemas after the first lookup — is critical. If something invalidates that cache (a rolling restart of your application, a memory pressure event causing cache eviction), you'll see a sudden, dramatic drop in throughput until the cache warms back up.
In most pipeline architectures, the sink is the bottleneck. Your Kafka consumers and processing logic can handle hundreds of thousands of records per second. Your PostgreSQL database cannot. Here's how to address this gap systematically.
The difference between single-row inserts and batch inserts is typically one to two orders of magnitude. This is not an exaggeration. A PostgreSQL instance on modest hardware doing single-row inserts with network overhead might do 3,000-5,000 inserts per second. The same instance doing batch inserts of 500 rows per statement will do 80,000-120,000 inserts per second.
The reason is round-trip overhead. Each single-row insert requires:
Steps 1-5 take roughly 1-5ms over a local network. At 2ms per round-trip, your maximum throughput is 500 records/second — regardless of how fast the database can actually write data.
With batching, you amortize steps 1-5 across many rows:
import psycopg2
from psycopg2.extras import execute_values
from collections import defaultdict
import time
class BatchingSink:
def __init__(self, dsn: str, batch_size: int = 500,
flush_interval_seconds: float = 2.0):
self.conn = psycopg2.connect(dsn)
self.batch_size = batch_size
self.flush_interval = flush_interval_seconds
self.buffer = []
self.last_flush = time.monotonic()
def write(self, record: dict) -> None:
self.buffer.append((
record['order_id'],
record['customer_id'],
record['total_amount'],
record['status'],
record['timestamp']
))
# Flush if batch is full OR if we've waited too long
if (len(self.buffer) >= self.batch_size or
time.monotonic() - self.last_flush > self.flush_interval):
self.flush()
def flush(self) -> None:
if not self.buffer:
return
try:
with self.conn.cursor() as cursor:
execute_values(
cursor,
"""
INSERT INTO orders (order_id, customer_id, total_amount,
status, event_timestamp)
VALUES %s
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
total_amount = EXCLUDED.total_amount
""",
self.buffer,
page_size=len(self.buffer) # Send as one batch
)
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
finally:
self.buffer.clear()
self.last_flush = time.monotonic()
Warning: Batching introduces a trade-off with reliability. If your application crashes after accumulating 499 records but before flushing, those records are lost (or will be reprocessed, potentially creating duplicates, depending on your offset commit strategy). The flush interval is your maximum potential data delay. Design this based on your business requirements, not purely on throughput.
A common misconfiguration is setting the connection pool too large. More connections to PostgreSQL doesn't always mean more throughput — Postgres has significant per-connection overhead (each connection runs in its own OS process under the default architecture), and above a certain point, connection overhead starts to dominate.
The practical maximum is usually 2-4 connections per CPU core on the database server. With a 16-core database server, you rarely benefit from more than 64 connections. Using PgBouncer in transaction-mode pooling is the standard solution for handling many application-side connections efficiently.
For your Flink sink operators, set the connection pool size to roughly match the sink parallelism:
// In a Flink RichSinkFunction
public class PostgresSinkFunction extends RichSinkFunction<EnrichedOrder> {
private transient HikariDataSource dataSource;
private transient List<EnrichedOrder> buffer;
private static final int BATCH_SIZE = 500;
@Override
public void open(Configuration parameters) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://db:5432/orders");
config.setUsername("pipeline_user");
config.setPassword(System.getenv("DB_PASSWORD"));
// One connection per subtask is usually sufficient
// The batching handles throughput, not connection count
config.setMaximumPoolSize(2);
config.setMinimumIdle(1);
config.setConnectionTimeout(10000);
config.setIdleTimeout(600000);
dataSource = new HikariDataSource(config);
buffer = new ArrayList<>(BATCH_SIZE + 10);
}
@Override
public void invoke(EnrichedOrder order, Context context) throws Exception {
buffer.add(order);
if (buffer.size() >= BATCH_SIZE) {
flushBuffer();
}
}
private void flushBuffer() throws SQLException {
if (buffer.isEmpty()) return;
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"INSERT INTO enriched_orders (order_id, customer_id, ...) " +
"VALUES (?, ?, ...) ON CONFLICT (order_id) DO UPDATE SET ..."
)) {
for (EnrichedOrder order : buffer) {
stmt.setString(1, order.getOrderId());
stmt.setString(2, order.getCustomerId());
// ... set other fields
stmt.addBatch();
}
stmt.executeBatch();
conn.commit();
}
buffer.clear();
}
}
Understanding the happy path of backpressure is necessary but not sufficient. You also need to understand the failure modes — situations where backpressure doesn't protect you.
Any system that uses an unbounded buffer between a producer and a slow consumer is a time bomb. The buffer will grow until the machine runs out of memory, at which point you get an OOM kill and lose everything in the buffer. This is exactly what happens with naive use of Python's asyncio.Queue() with no maxsize, or with Spark Structured Streaming's default maxOffsetsPerTrigger configuration when used with micro-batches that take longer than the trigger interval.
Always bound your buffers. The question is: what should the bound be? The answer comes from your SLA. If you can tolerate up to 30 seconds of accumulated data in memory (given your average record size), set your buffer bound accordingly. When the buffer is full, you want the system to apply real backpressure — either blocking the upstream stage or dropping data (with an explicit, monitored drop counter).
For JVM-based systems (Flink, Kafka Streams, Spark), garbage collection pauses are a source of throughput variability that doesn't show up in your application metrics. During a GC pause, all application threads stop. No records are processed. No heartbeats are sent. If your GC pauses are long enough, Kafka will consider your consumer dead and trigger a rebalance — even if your application is healthy.
Signs you have a GC problem:
jstat -gcutil showing long GC times (more than 200ms for stop-the-world collections)Common mitigations:
# JVM flags for a production Flink TaskManager
# These go in flink-conf.yaml under env.java.opts.taskmanager
-XX:+UseG1GC
-XX:MaxGCPauseMillis=100
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
-Xloggc:/opt/flink/log/gc.log
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=100M
One of the most insidious failure modes in pipeline systems is what I call the slow consumer death spiral. It works like this:
The way out of this spiral is never to tune your way through it in real-time. You need to:
This exercise will take you through diagnosing and fixing a deliberately misconfigured pipeline. You'll need Docker, Python 3.8+, and about 45 minutes.
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_NUM_PARTITIONS: 12
postgres:
image: postgres:15
ports:
- "5432:5432"
environment:
POSTGRES_DB: pipeline_lab
POSTGRES_USER: pipeline
POSTGRES_PASSWORD: pipeline123
docker-compose up -d
# Create the topic with 12 partitions
docker-compose exec kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic transactions \
--partitions 12 \
--replication-factor 1
# broken_producer.py
# This producer is misconfigured for throughput. Your job: find what's wrong.
from confluent_kafka import Producer
import json
import time
import random
import uuid
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'linger.ms': 0, # Problem 1
'batch.size': 1024, # Problem 2
'compression.type': 'none', # Problem 3
})
def generate_transaction():
return {
'transaction_id': str(uuid.uuid4()),
'account_id': f'acct_{random.randint(1, 10000):05d}',
'merchant_id': f'merch_{random.randint(1, 500):04d}',
'amount': round(random.uniform(1.0, 5000.0), 2),
'currency': random.choice(['USD', 'EUR', 'GBP']),
'timestamp': int(time.time() * 1000),
'transaction_type': random.choice(['purchase', 'refund', 'transfer']),
'metadata': {
'ip_address': f'{random.randint(1,255)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(0,255)}',
'user_agent': 'Mozilla/5.0 (compatible payment client)',
'session_id': str(uuid.uuid4())
}
}
start = time.monotonic()
records_sent = 0
try:
while True:
record = generate_transaction()
producer.produce(
'transactions',
key=record['account_id'].encode(),
value=json.dumps(record).encode()
)
producer.poll(0)
records_sent += 1
if records_sent % 10000 == 0:
elapsed = time.monotonic() - start
print(f"Sent {records_sent:,} records at {records_sent/elapsed:,.0f} rec/sec")
finally:
producer.flush()
Your task: Run the producer for 60 seconds and note the throughput. Then identify the three misconfigurations and fix them. After fixing, run again and compare throughput. You should see at least a 5x improvement.
# broken_consumer.py
# This consumer has a throughput problem. Diagnose and fix it.
from confluent_kafka import Consumer, KafkaError
import json
import time
import psycopg2
conn = psycopg2.connect(
"host=localhost dbname=pipeline_lab user=pipeline password=pipeline123"
)
# Create table if needed
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS transactions (
transaction_id VARCHAR PRIMARY KEY,
account_id VARCHAR NOT NULL,
amount DECIMAL(10,2),
processed_at TIMESTAMP DEFAULT NOW()
)
""")
conn.commit()
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'transaction-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'max.poll.records': 1, # Problem: processing one at a time
})
consumer.subscribe(['transactions'])
records_processed = 0
start_time = None
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
if start_time is None:
start_time = time.monotonic()
record = json.loads(msg.value().decode('utf-8'))
# Problem: single-row insert, one commit per record
with conn.cursor() as cur:
cur.execute("""
INSERT INTO transactions (transaction_id, account_id, amount)
VALUES (%s, %s, %s)
ON CONFLICT (transaction_id) DO NOTHING
""", (record['transaction_id'], record['account_id'], record['amount']))
conn.commit()
records_processed += 1
if records_processed % 1000 == 0:
elapsed = time.monotonic() - start_time
print(f"Processed {records_processed:,} at {records_processed/elapsed:,.0f} rec/sec")
finally:
consumer.close()
Your task:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group transaction-processorExpected results: The broken consumer will process roughly 800-1,500 records/second. A properly batched consumer with max.poll.records=500 and batch inserts should process 40,000-80,000 records/second on the same hardware.
The most common mistake in pipeline optimization is spending time optimizing a stage that isn't the bottleneck. Remember: in a pipeline, there is always exactly one limiting stage. Improving any other stage has zero effect on end-to-end throughput.
How to avoid it: Always build a throughput model (as described earlier) before tuning. Measure each stage in isolation. Only work on the bottleneck.
High consumer lag can mean your consumer is slow, but it can also mean your producer just had a burst that the consumer hasn't caught up to yet. If lag is growing monotonically over time, you have a sustained throughput deficit. If lag spikes and then recovers, you have burst handling to think about — but your steady-state throughput may be fine.
How to check: Plot consumer lag over time (Kafka Exporter + Grafana is the standard setup). A growing-and-never-recovering trend requires throughput fixes. A sawtooth pattern (spike-recover-spike-recover) may just need better alerting thresholds.
When consumers are getting removed from groups due to long processing, the instinct is to increase max.poll.interval.ms. This is masking the problem, not fixing it. A consumer that takes 10 minutes to process a batch is a consumer that will lose all its uncommitted work if it crashes, and will delay failure detection by 10 minutes.
The right fix: Reduce batch size, optimize processing speed, or process asynchronously with careful offset management.
Consumer group rebalances in Kafka cause all consumers in the group to pause while partitions are being reassigned. If your consumer group rebalances frequently (every few minutes), the cumulative pause time can significantly impact throughput — even if each individual rebalance only takes 5-10 seconds.
Common causes: consumers crashing due to processing errors, GC pauses exceeding session.timeout.ms, and rolling deployments without cooperative rebalancing configured.
Fix: Use partition.assignment.strategy=cooperative-sticky to minimize rebalance scope, and ensure session.timeout.ms is comfortably larger than your worst-case GC pause time.
In Avro-based pipelines with Confluent Schema Registry, the first deserialization of each schema ID requires an HTTP call to the Schema Registry. In production, with many different schemas or schema versions, this can create a thundering herd problem during consumer startup — all consumers simultaneously making hundreds of schema lookup requests.
Fix: Pre-warm schema caches during application startup by reading a few records from each topic before starting your main processing loop. Use schema caching (enabled by default in the Confluent client). Monitor Schema Registry response times as a first-class metric.
You've covered a lot of ground in this lesson. Let's crystallize what you now know:
Backpressure is a signal, not just a symptom. Credit-based backpressure (as in Flink) is a first-class mechanism for flow control. Understanding whether your system has true backpressure or just a buffer that will eventually overflow is the first architectural question to ask.
Bottleneck identification requires measurement, not intuition. Build a throughput model by measuring each stage in isolation. The bottleneck is the stage with the lowest maximum throughput. Everything else is irrelevant until that stage is fixed.
Kafka tuning is mostly about batching. linger.ms, batch.size, and max.poll.records are your primary levers. Compression with lz4 is almost always worth the minimal CPU cost. Partition count is your parallelism ceiling and must be planned upfront.
Sink throughput is where most pipelines die. Batch writes are almost always necessary for database sinks. Single-row inserts at 5,000 records/second simply cannot compete with a Kafka cluster delivering 500,000 records/second.
Failure modes matter as much as the happy path. Unbounded buffers, GC death spirals, and the max.poll.interval.ms trap are all real production failures that happen to real teams.
Add metrics to your current pipeline if you haven't already. Prometheus + Kafka Exporter + Grafana is the standard stack. You cannot diagnose what you cannot see.
Implement end-to-end latency tracking by embedding event timestamps in your records and measuring the delta at your sink. This gives you ground truth on whether your pipeline is meeting its SLA, independent of throughput numbers.
Explore Flink's built-in backpressure monitoring in a local cluster. Create a deliberately slow sink and watch the backpressure indicators propagate upstream through the operator graph. Seeing it happen in a controlled environment builds intuition that's hard to develop any other way.
Read the Little's Law implications for your specific architecture. If you know your target throughput and your acceptable maximum latency, you can calculate the queue depth you need to support — and design your buffer sizes accordingly.
Consider the next level: once you've mastered single-pipeline bottleneck diagnosis, the next challenge is cross-pipeline resource contention — multiple pipelines competing for the same database, the same Kafka cluster, or the same Kubernetes node resources. The same principles apply, but the coordination is harder.
The engineers who are consistently good at pipeline performance aren't the ones who know all the Kafka configuration parameters by heart. They're the ones who have a systematic mental model for how data flows and pressure propagates — and who measure before they tune.
Learning Path: Data Pipeline Fundamentals