System Design: Ad Click Aggregator (10B Clicks/day, Lambda Architecture, Fraud Detection)
Goal: Build an ad click aggregation pipeline processing 10 billion clicks/day. Deduplicate, detect fraud, aggregate across 1-minute/1-hour/1-day windows, reconcile stream and batch, and produce billing-accurate reports. Sub-minute freshness for dashboards, penny-accurate batch reconciliation for billing.
Mental model -- four ideas that anchor the design:
- Counting is harder than it looks. At $0.25 avg CPC and 10B clicks/day, a 0.1% counting error is $2.5M/day. Stream gives speed, batch gives correctness. Run both; batch wins for money.
- Fraud is adversarial. Estimated to cost the ad industry tens of billions of dollars a year (figures vary widely between Juniper Research, IAS, and advertiser self-reports). Static rules catch the obvious 40%. The rest needs ML on behavioral features. Fraud runs on a parallel Flink job -- it does not block HTTP ingestion (the 202 returns in < 10 ms regardless of fraud state), but the speed-layer aggregation waits up to 5 minutes for the verdict before defaulting to LEGITIMATE. On fraud-pipeline outage, unscored clicks are corrected by the hourly batch reconciliation.
- Dedup is defense in depth. One layer is a single point of failure. Three layers -- Valkey at ingestion, Flink keyed state in stream, Spark full-data in batch -- ensure billing accuracy does not depend on any one layer.
- Money flows through a separate path. Dashboards tolerate approximation for speed. Invoices tolerate latency for precision. When they disagree, batch wins.
TL;DR: HTTP collectors dedup clicks against Valkey and produce to Kafka (256 partitions, RF=3). Flink speed layer reads raw clicks + fraud verdicts, aggregates 1-min/1-hour/1-day tumbling windows, and writes to ClickHouse for sub-minute dashboards. A parallel Flink fraud pipeline runs rule-based filters + XGBoost on behavioral features from a Valkey feature store, emitting verdicts back to Kafka. Raw events also sink to Apache Iceberg. Hourly, a Spark batch job reads Iceberg, reapplies the latest fraud model, writes penny-accurate billing aggregates to PostgreSQL, and overrides the speed layer when they disagree. Three dedup layers (Valkey best-effort, Flink keyed-state stream, Spark full-data batch) ensure no click is billed twice. Financial math uses integer micros to avoid float drift.
1. Problem
A large advertising platform must answer a deceptively simple question: how many legitimate, non-fraudulent clicks did each ad receive, and how much does the advertiser owe?
Two competing requirements make this hard:
- Real-time dashboards need sub-minute freshness. Advertisers adjust bids and budgets on live counts. A 10-minute lag means wasted spend on underperforming campaigns.
- Billing needs penny-level accuracy. An advertiser who spent $1.2M last month will dispute a $50 discrepancy. Stream-processing edge cases (late events, checkpoint replays, watermark drift) are invisible at small scale but compound into real money at 10B clicks/day.
No single processing path satisfies both. Stream gives freshness but not financial accuracy. Batch gives accuracy but not real-time visibility. Lambda architecture runs both paths in parallel -- speed layer for dashboards, batch layer for billing, batch wins when they disagree.
Simple example. A mobile user clicks an ad for running shoes at 14:32:07 UTC. Within 30 seconds, the dashboard shows the click. Within 5 minutes, the fraud pipeline scores it LEGITIMATE. At 15:15 (15 minutes past the next hour boundary, when the hourly Spark batch job runs), the batch reconciliation job writes it to the billing ledger. At month-end, an invoice includes the click at $0.25, computed in integer micros.
Scale. 10B clicks/day = ~115K/sec sustained, ~580K/sec peak US business hours, ~1M/sec Black Friday spikes. Every layer must be horizontally partitioned.
Assumptions:
- Internal platform for a single ad company, not multi-tenant SaaS
- "Click" = user clicking an ad unit. Impressions tracked separately.
- Billing = legitimate, deduplicated clicks only
- Real-time dashboards use the speed layer. Monthly invoices use the batch layer. Batch wins disputes.
- Late-arriving clicks (slow mobile connections) up to 10 minutes are accepted.
2. Requirements
Functional Requirements
| ID | Requirement | Priority |
|---|---|---|
| FR-01 | Ingest click events via HTTP and produce to durable event log | P0 |
| FR-02 | Deduplicate clicks: same user clicking same ad within 30 seconds counts as one click | P0 |
| FR-03 | Real-time aggregation across 1-minute tumbling windows for live dashboards | P0 |
| FR-04 | Hourly aggregation for campaign performance reports | P0 |
| FR-05 | Daily aggregation for billing reconciliation | P0 |
| FR-06 | Fraud detection: flag bot clicks, click farms, click injection, click flooding | P0 |
| FR-07 | Advertiser dashboard API: click counts, spend, CTR by campaign/ad group/ad/geo/device | P0 |
| FR-08 | Batch reconciliation: hourly Spark job produces billing-accurate aggregates | P0 |
| FR-09 | Billing pipeline: monthly invoice generation from batch-reconciled data | P0 |
| FR-10 | Late event handling: accept clicks arriving up to 10 minutes late | P1 |
| FR-11 | Click-level audit trail for billing disputes | P1 |
| FR-12 | Anomaly alerting: notify operations when click patterns are abnormal | P1 |
| FR-13 | Advertiser-level fraud reports: show detected fraud breakdown | P1 |
| FR-14 | Backfill support: reprocess historical data after fraud model updates | P1 |
| FR-15 | Multi-currency billing support | P2 |
Non-Functional Requirements
| ID | Requirement | Target |
|---|---|---|
| NFR-01 | Throughput | 10B clicks/day, 115K clicks/sec sustained, 1M clicks/sec peak |
| NFR-02 | Dashboard freshness | < 1 minute (speed layer aggregation latency) |
| NFR-03 | Batch accuracy | Penny-accurate: batch reconciliation must match to $0.01 per advertiser per day |
| NFR-04 | Availability | 99.99% for ingestion (52 min downtime/year), 99.9% for dashboards |
| NFR-05 | Durability | Zero click loss once accepted (at-least-once ingestion, exactly-once aggregation) |
| NFR-06 | Fraud detection latency | < 5 minutes from click to fraud verdict |
| NFR-07 | Late event tolerance | Accept events up to 10 minutes past their event timestamp |
| NFR-08 | Horizontal scalability | Linear scale-out, no single bottleneck |
| NFR-09 | Data retention | Hot: 7 days, Warm: 90 days, Cold: 3 years (legal/audit) |
| NFR-10 | Recovery Time Objective (RTO) | < 60 seconds for speed layer, < 1 hour for batch layer |
| NFR-11 | Recovery Point Objective (RPO) | 0 for raw events (Kafka RF=3), < 1 minute for aggregates |
| NFR-12 | Geographic distribution | Multi-region ingestion, single-region processing (with failover) |
3. Architecture Overview
Six responsibilities emerge, each with different scaling characteristics:
- Durable event log -- Kafka. Decouples ingestion from processing, buffers during outages, enables replay. Sole source of raw truth.
- Speed layer -- Flink. Real-time dedup, fraud-verdict join, windowed aggregation. Feeds ClickHouse and Valkey for sub-minute freshness.
- Batch layer -- Spark on Iceberg. Hourly reprocessing with exact dedup and retroactive fraud model application. Writes to PostgreSQL.
- Fraud pipeline -- parallel Flink consumer scoring each click through rules + ML. Emits verdicts back to Kafka for both speed and batch layers.
- OLAP store -- ClickHouse. Sub-second dashboard queries across high-cardinality dimensions.
- Data lake -- Iceberg on object storage. Columnar raw events for batch reprocessing, audit, and ML training.
Follow One Click
Before the detailed diagram, here is the full lifecycle in one line and one simple picture.
Three parallel paths share the same Kafka input:
- Speed path (dashboards): Kafka → Flink → ClickHouse. Sub-minute freshness, approximate.
- Fraud path (verdicts): Kafka → Fraud Detector → back into Flink and Spark. Parallel, never blocking.
- Batch path (billing): Kafka → Iceberg → Spark → PostgreSQL. Hourly, penny-accurate.
The detailed diagram below adds feedback loops, attestation, late-arrivals, and the edge blocklist. The simple picture is for orientation; the detailed one is for tracing specific failure modes.
High-Level Architecture
The diagram below traces every click end-to-end: forward data flow (solid arrows) and the feedback loops that let the pipeline learn from its own verdicts (dashed arrows). (Reading on mobile? The detailed diagram is dense -- use the Follow One Click diagram above for orientation and the 8-step walkthrough below for the full story; both are fully readable without this picture.)
How to read it. Every arrow label names the data on the wire. The four numbered subgraphs are the four stages a click moves through: ① ingestion → ② fraud scoring → ③ processing → ④ serving. Any click can be traced end-to-end.
Solid arrows = forward flow. A click enters at ① and exits at ④. 22 solid arrows covering: raw click ingestion, 3-way Kafka fan-out, fraud scoring (Phases 1-4), speed-layer aggregation, batch reconciliation (Phase 5), late-event dead letter, and dashboard/billing reads.
Dashed arrows = feedback loops. Five dashed arrows showing how verdicts reshape future behavior:
- ML feature feedback (
KFV ⇢ FS ⇢ FD) -- a FRAUDULENT verdict bumps the offending IP, device, or fingerprint in the feature store. The next click from that entity hits a higher Phase 2 ML fraud score. - Edge blocklist feedback (
KFV ⇢ BL ⇢ AG) -- the worst repeat offenders are added to the edge blocklist. The next click from them is dropped at the API Gateway before it ever enters Kafka. - Batch override (
SPK ⇢ KFV) -- the hourly Spark job re-scores every click with the latest model and writes Phase 5 verdicts back to the topic. All downstream consumers see the final Phase 5 truth, overriding anything written by Phases 1-4.
Without the feedback loops the system is static -- it catches whatever the rules and the current model catch, forever. With them, every verdict becomes training signal for the next click. That is how the pipeline learns.
The five fraud phases (referenced throughout the post as P1-P5):
| Phase | What it does | Latency | Runs on |
|---|---|---|---|
| P1 | Rule engine -- bot UA, composite dedup, viewability, CTI < 2s, velocity, blocklist | < 100 ms | Flink fraud job |
| P2 | ML scoring -- XGBoost on features from the Valkey feature store | < 500 ms total | Flink fraud job |
| P3 | Aggregate analysis -- geo clustering, publisher CTR anomaly, device velocity | Seconds to minutes | Windowed Flink job |
| P4 | Post-conversion attestation -- install-referrer match, app-retention check | Hours after the click | Separate consumer |
| P5 | Batch reapply -- re-score every click with the latest model, write billing refunds | Hourly, at H+15 | Spark batch job |
P1 and P2 are per-event and fast enough to influence the speed-layer aggregation. P3, P4, and P5 are asynchronous -- their verdicts arrive after the minute window has already been sealed, and they reach the dashboard via Spark's hourly overwrite. Full details and code live in the Click Fraud Detection Pipeline section below.
Click lifecycle in 8 steps:
-
Capture. A browser, mobile app, or ad SDK posts a click to the API Gateway. The gateway runs two cheap edge checks: a dedup lookup in the Valkey dedup cache (drops replays) and a blocklist lookup in the edge blocklist (drops IPs, devices, and fingerprints flagged by the feedback loop in step 8). A click that fails either check never enters the system.
-
Log. Clicks that pass the edge land in the Kafka raw-clicks topic (
ad.clicks.raw) as the single source of truth. From here the same stream fans out three ways: to the Fraud Detector for scoring, to the Flink speed layer for live aggregation, and to the Iceberg data lake for durable storage. -
Score (phases 1-3). The Fraud Detector runs the rule engine -- bot UA, composite dedup, viewability, click-to-install timing, velocity, blocklist -- then the ML model, and emits a verdict to the fraud-verdicts topic (
ad.clicks.fraud-verdicts) within ~500 ms. -
Aggregate live. The Flink speed layer reads the raw-clicks topic and the fraud-verdicts topic together, excludes fraudulent clicks from its tumbling windows, and writes minute/hour aggregates to ClickHouse and hot campaign counters to the Valkey real-time cache. Events arriving more than 10 minutes late are side-outputted to the late-arrivals topic.
-
Persist raw. In parallel, raw clicks sink from Kafka into the Iceberg data lake. Every raw event is preserved for replay, audit, and ML training, independent of what the speed layer did with it.
-
Attest (phase 4). Install and conversion events from mobile SDKs (Play Integrity, App Attest, Install Referrer API) flow into the Post-Conversion Attestation job. It checks install-referrer match and app retention. Mismatches emit Phase 4 verdicts back to the fraud-verdicts topic, catching click injection and flooding that survived phases 1-3.
-
Reconcile (phase 5). Hourly, the Spark batch job reads raw events from the Iceberg lake, late events from the late-arrivals topic, and the latest fraud verdicts. It re-scores every click from the last 24 hours with the current model, writes corrected aggregates back to ClickHouse, and writes billing adjustments and refunds to PostgreSQL. When speed and batch disagree, batch wins.
-
Learn. Every verdict, from every phase, loops back:
- → Valkey feature store so the next click from that device hits a higher ML fraud score
- → edge blocklist so the next click from that IP is blocked at the gateway before it enters Kafka
- → fraud-verdicts topic (from Spark) so all downstream readers see the final Phase 5 truth
Who reads what. The advertiser dashboard reads OLAP queries from ClickHouse, live counters from the Valkey real-time cache, and the fraud breakdown from the fraud-verdicts topic (satisfies FR-13). The billing service reads only PostgreSQL -- it never sees the speed layer, so advertisers are never billed on unreconciled numbers.
4. Why Lambda (not Kappa)
Stream-only (Kappa) architectures are simpler: one pipeline, one codebase, no reconciliation. They work well when counts are approximate. They fail when money is on the line because stream processing has edge cases -- late events, checkpoint replay duplicates, watermark stalls during low traffic -- that are each individually solvable but compound into counting drift under real load. Even a tenth of a percent on $2.5B annual ad spend is $2.5M -- a margin too thin to trust a single pipeline with, which is why billing needs a second opinion.
The fix is to run a batch layer in parallel that reads complete data, does exact dedup, and produces the authoritative numbers. The speed layer serves dashboards. The batch layer serves billing. When they disagree, batch wins -- not because batch is infallible (it has its own failure modes: backfill bugs, model drift, and late-late events past the 24-hour tolerance) but because its complete-data replay is the best approximation of truth the system can produce before the invoice cycle closes.
| Scenario | Lambda or Kappa? | Reason |
|---|---|---|
| Ad click billing | Lambda | Money on the line. Batch reconciliation for auditable accuracy. |
| Financial transaction processing | Lambda | Same reasoning. Regulatory audit trail required. |
| Trending content / feeds | Kappa | Approximate counts are fine. No billing. |
| Real-time monitoring / alerting | Kappa | Speed matters most. Small errors acceptable. |
| Recommendation engine features | Kappa | Features tolerate minor inaccuracies. |
| IoT sensor dashboards | Kappa | No financial implications. Watermarks handle late data. |
5. Scale Estimation
Daily clicks: 10 billion
Average rate: ~115,740 clicks/sec
Peak rate (5x): ~580,000 clicks/sec
Spike (Black Friday): ~1,000,000 clicks/sec
Click event size: ~200 bytes (Avro), ~400 bytes (JSON)
Kafka bandwidth and storage:
Ingest: 115K x 200B = ~23 MB/sec avg, ~200 MB/sec spike
Per broker cap: ~100 MB/sec writes
Brokers needed: 15 (spike capacity + RF=3 + rolling restarts across 3 AZs)
Daily raw: ~800 GB/day (LZ4 compressed), ~2.4 TB with RF=3
7-day retention: ~16.8 TB total
Flink state (speed layer):
Active ads: ~100M
Per-ad state: ~300 bytes (3 window counters + fraud accumulators + bloom)
Total keyed state: ~30 GB worst case
RocksDB on disk: 60-100 GB with compaction
Checkpoint size: 5-10 GB incremental
The 30 GB is a capacity-planning ceiling, not the steady state. The long tail of ad distribution means most ads get zero clicks in the current window + TTL, so the actual resident RocksDB state is closer to 1-10% of that (3-10 GB) on a typical day. Sizing targets the ceiling because bursts on viral ads can push state up quickly, and headroom prevents backpressure into the Kafka source.
ClickHouse + Iceberg:
ClickHouse aggregate storage: ~5 GB/day compressed (10x on aggregated rows)
Iceberg raw data lake: ~400 GB/day Parquet, ~146 TB/year, ~438 TB for 3-year retention
PostgreSQL billing: ~50 GB/year
Valkey:
Dedup: ~300 MB (up to 3.5M active keys, 30s TTL)
Fraud features: ~140 GB (see Fraud Pipeline section for breakdown)
Cluster: 12 shards x 16 GB = 192 GB capacity
The 3.5M ceiling assumes every incoming click creates a fresh dedup key (115K/sec × 30s). In practice a 3-5% duplicate hit rate trims the active keyset to ~3.3M. Either way it fits comfortably in the ~2-4 GB allocated to dedup across the Valkey cluster; sizing targets the ceiling because dedup memory is cheap and predictable.
6. Why Naive Approaches Fail
Each failure clarifies why a later design choice exists.
- OLTP row-per-click: PostgreSQL at 115K inserts/sec generates 23 MB/sec of WAL. By day three, vacuum can't keep up with dead tuples. Queries scanning billions of rows become unusable.
- Redis INCR as billing truth: Atomic and fast, but volatile. Restart loses counts. No audit trail. Producer retries double-count. Defensible for dashboards, indefensible for invoices.
- Stream-only (Kappa): Works for trending feeds, fails for money. Late events, checkpoint replays, and watermark stalls are each individually solvable, but running without a batch safety net means betting every invoice on getting every edge case right forever -- and a tenth of a percent on $2.5B annual spend is $2.5M.
- Batch-only: Exact, but 1-2 hours stale. An advertiser running a flash sale can't see conversion in time to adjust bids.
- Skip fraud detection: At 10-15% industry fraud rates, billing bot clicks + click farms = $250-375M/year in fraudulent charges. Advertisers detect the discrepancy, lose trust, and leave.
What's needed: speed layer for dashboards, batch layer for billing, fraud detection in parallel, three dedup layers as defense in depth. That's Lambda -- operationally complex, but the only design that satisfies both freshness and penny accuracy.
7. Technology Selection
| Store | Technology | Why |
|---|---|---|
| Event log | Apache Kafka | 10B events/day, RF=3, replayable, partitioned by ad_id, KRaft mode (no ZooKeeper) |
| Speed layer | Apache Flink + RocksDB | Exactly-once, event-time windows, incremental checkpoints |
| Data lake | Apache Iceberg on object storage | ACID transactions, time-travel for reprocessing, hidden partitioning, engine-neutral |
| Batch processing | Apache Spark | Reads Iceberg, produces reconciled aggregates |
| OLAP / dashboards | ClickHouse | Columnar, sub-second aggregations on billions of rows, native Kafka engine, materialized views |
| Dedup + fraud state | Valkey | Sub-ms lookups, TTLs, HLL for cardinality, Lua scripts for atomic ops |
| Billing ledger | PostgreSQL | ACID, audit trail, immutable after FINALIZED |
| ML fraud serving | Feature store (Valkey) + XGBoost | Real-time feature lookups, < 500ms inference |
ClickHouse vs. Druid vs. Pinot. ClickHouse wins because its native Kafka engine ingests directly from topics (at-least-once delivery with idempotent writes via ReplacingMergeTree dedup on inserted_at), ReplicatedMergeTree handles HA without external coordination, and materialized views pre-aggregate at insert time. Compression is 10-15x on click data because ad_id and campaign_id have high locality within partitions.
Iceberg vs. Delta Lake vs. Hudi. Iceberg's hidden partitioning allows partitioning by event_hour without exposing the column in the schema, and partition evolution allows adding advertiser_id as a partition dimension later without rewriting data. Flink has a native exactly-once Iceberg sink. Vendor-neutral (Apache Foundation) beats Databricks-driven Delta for multi-engine flexibility.
8. Data Model
Click Event (Kafka Avro)
{
"type": "record",
"name": "AdClickEvent",
"namespace": "com.adplatform.events",
"fields": [
{"name": "click_id", "type": "string"},
{"name": "ad_id", "type": "string"},
{"name": "campaign_id", "type": "string"},
{"name": "advertiser_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "ip_address", "type": "string"},
{"name": "user_agent", "type": "string"},
{"name": "device_type", "type": "string"},
{"name": "geo_country", "type": "string"},
{"name": "geo_city", "type": "string"},
{"name": "publisher_id", "type": "string"},
{"name": "placement_id", "type": "string"},
{"name": "bid_amount_micros", "type": "long"},
{"name": "event_timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "server_timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "referer_url", "type": ["null", "string"], "default": null},
{"name": "session_id", "type": "string"}
]
}Two timestamps, two trust levels. event_timestamp comes from the client clock and can be skewed or spoofed. server_timestamp comes from the collector and is used for Flink watermarks. The gap is itself a fraud signal: legitimate clicks gap 10-200ms, fabricated events gap 0ms, replays gap > 10 min.
Why bid_amount_micros. Currency as integer micros (millionths of a dollar) avoids float drift. 250000 = $0.25. All math is integer; division to dollars happens only at display.
Kafka Topics
Topic: ad.clicks.raw 256 partitions, RF=3, LZ4, 7-day retention, key=ad_id
Topic: ad.clicks.fraud-verdicts 128 partitions, RF=3, compacted, 30-day retention, key=click_id
Topic: ad.clicks.late-arrivals 64 partitions, RF=3, 7-day retention, key=ad_id
Topic: ad.clicks.aggregated 64 partitions, RF=3, 7-day retention, key=ad_id
Why partition raw by ad_id. All clicks for the same ad land on the same partition → per-ad ordering → Flink keyBy(ad_id) without repartitioning. Advertiser dashboards query by ad, so co-partitioning aligns with the query pattern.
Why 256 partitions. At 1M clicks/sec spike, each partition handles ~3,900/sec -- a single Kafka partition supports 50K+/sec, leaving 12x headroom. 64 initial Flink task slots can scale 4x without repartitioning.
Producer config: acks=all, enable.idempotence=true, retries=MAX, max.in.flight=5, compression=lz4, batch.size=64KB, linger.ms=5. Idempotence + acks=all = exactly-once at the Kafka layer.
Consumer Groups
Each processing path has its own Kafka consumer group so they consume independently, commit offsets independently, and fail independently:
| Consumer group | Topic(s) | Read by | Purpose |
|---|---|---|---|
flink-aggregation | ad.clicks.raw + ad.clicks.fraud-verdicts | Flink speed layer | Windowed aggregation + verdict join for dashboards |
flink-fraud-detection | ad.clicks.raw | Flink fraud job | Score every click through P1 + P2 rules and ML |
spark-reconciliation | ad.clicks.fraud-verdicts + ad.clicks.late-arrivals (+ reads Iceberg directly) | Spark batch | Hourly reprocess, reapply latest verdicts, pick up late events |
iceberg-sink | ad.clicks.raw | Flink Iceberg writer | Raw events → data lake for batch replay |
post-conversion-attestation | ad.clicks.raw + external SDK events | Phase 4 consumer | Install-referrer match, app-retention check |
Different consumer groups means the fraud Flink job restarting does not rewind the speed Flink job, and Spark's offset commits do not interfere with either of them. Each path has its own consumer lag metric, its own alerts, and its own scaling policy -- a fraud-pipeline outage never impacts dashboard freshness or billing.
Fraud Verdict
{
"type": "record",
"name": "FraudVerdict",
"namespace": "com.adplatform.fraud",
"fields": [
{"name": "click_id", "type": "string"},
{"name": "ad_id", "type": "string"},
{"name": "verdict", "type": {"type": "enum", "name": "Verdict",
"symbols": ["LEGITIMATE", "SUSPICIOUS", "FRAUDULENT"]}},
{"name": "fraud_score", "type": "float"},
{"name": "fraud_reasons", "type": {"type": "array", "items": "string"}},
{"name": "model_version", "type": "string"},
{"name": "evaluated_at", "type": "long", "logicalType": "timestamp-millis"}
]
}ClickHouse (Speed + Batch Aggregates)
CREATE TABLE ad_click_aggregates ON CLUSTER '{cluster}'
(
ad_id String,
campaign_id String,
advertiser_id String,
geo_country LowCardinality(String),
device_type LowCardinality(String),
window_start DateTime64(3),
window_end DateTime64(3),
window_type LowCardinality(String), -- '1min' | '1hour' | '1day'
click_count UInt64,
legitimate_clicks UInt64,
fraudulent_clicks UInt64,
suspicious_clicks UInt64,
total_spend_micros Int64,
unique_users UInt64,
inserted_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{cluster}/tables/{shard}/ad_click_aggregates',
'{replica}', inserted_at)
PARTITION BY toYYYYMMDD(window_start)
ORDER BY (advertiser_id, campaign_id, ad_id, window_type,
window_start, geo_country, device_type)
TTL window_start + INTERVAL 90 DAY DELETE;ReplacingMergeTree dedupes Flink's replay-after-recovery emissions using inserted_at as the version column, making writes idempotent across the Flink → ClickHouse boundary. LowCardinality dictionary-encodes geo_country (~250 distinct values) and device_type (~5), compressing 10-20x and speeding up GROUP BY.
Iceberg (Raw Data Lake)
CREATE TABLE ad_clicks_raw (
click_id STRING, ad_id STRING, campaign_id STRING,
advertiser_id STRING, user_id STRING, ip_address STRING,
user_agent STRING, device_type STRING, geo_country STRING,
geo_city STRING, publisher_id STRING, placement_id STRING,
bid_amount_micros BIGINT, event_timestamp TIMESTAMP,
server_timestamp TIMESTAMP, referer_url STRING, session_id STRING,
fraud_verdict STRING, fraud_score FLOAT, ingestion_hour INT
) USING iceberg
PARTITIONED BY (days(event_timestamp), advertiser_id)
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd'
);PostgreSQL (Billing)
CREATE TABLE billing_aggregates (
advertiser_id TEXT NOT NULL,
campaign_id TEXT NOT NULL,
billing_date DATE NOT NULL,
legitimate_clicks BIGINT NOT NULL,
total_spend_micros BIGINT NOT NULL,
fraudulent_clicks BIGINT NOT NULL,
refunded_spend_micros BIGINT NOT NULL,
batch_job_id TEXT NOT NULL,
reconciled_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (advertiser_id, campaign_id, billing_date)
);
CREATE TABLE invoices (
invoice_id TEXT PRIMARY KEY,
advertiser_id TEXT NOT NULL,
billing_month DATE NOT NULL,
total_clicks BIGINT NOT NULL,
total_spend_micros BIGINT NOT NULL,
fraud_adjustments_micros BIGINT NOT NULL,
net_amount_micros BIGINT NOT NULL,
currency TEXT NOT NULL DEFAULT 'USD',
status TEXT NOT NULL DEFAULT 'DRAFT',
generated_at TIMESTAMPTZ NOT NULL,
finalized_at TIMESTAMPTZ
);
CREATE TABLE click_audit_log (
click_id TEXT NOT NULL,
ad_id TEXT NOT NULL,
advertiser_id TEXT NOT NULL,
event_timestamp TIMESTAMPTZ NOT NULL,
fraud_verdict TEXT NOT NULL,
fraud_score REAL NOT NULL,
bid_amount_micros BIGINT NOT NULL,
batch_job_id TEXT NOT NULL,
PRIMARY KEY (click_id)
) PARTITION BY RANGE (event_timestamp);Valkey Keys
-- Dedup (TTL = 30 seconds)
dedup:{sha256(user_id:ad_id)} SET <value> NX EX 30
-- Fraud counters (TTL = 60-300 seconds)
fraud:user_clicks:{user_id} INCR, EXPIRE 60
fraud:ip_clicks:{ip_address} INCR, EXPIRE 300
-- Real-time cache (TTL = 120 seconds)
rt:clicks:{advertiser_id}:{window_type} SET <json>
9. API Design
Click Ingestion
POST /v1/click
Content-Type: application/json
{
"ad_id": "ad_12345678",
"placement_id": "plc_001",
"publisher_id": "pub_xyz",
"user_id": "usr_abc123",
"timestamp": 1707000000000
}
→ 202 Accepted { "status": "ACCEPTED", "click_id": "clk_550e8400-e29b-41d4" }
Why 202, not 201. The click is accepted into Kafka but not yet aggregated or fraud-checked. Response returns in < 10ms. Processing is async.
Server enrichment before Kafka: click_id (UUID v7, time-ordered), server_timestamp, geo_country/geo_city (MaxMind), device_type (UA parse), campaign_id/advertiser_id/bid_amount_micros (from ad metadata cache).
Dashboard Query
GET /v1/analytics/clicks?advertiser_id=adv_555&window=1hour&from=...&to=...
&group_by=campaign_id,geo_country
→ 200 OK
{
"advertiser_id": "adv_555",
"window": "1hour",
"freshness_ms": 28000,
"source": "clickhouse", // or "batch_reconciled"
"data": [
{
"campaign_id": "cmp_87654321",
"geo_country": "US",
"window_start": "2026-04-01T14:00:00Z",
"legitimate_clicks": 45230,
"fraudulent_clicks": 312,
"spend_usd": 11307.50,
"unique_users": 38102
}
]
}
freshness_ms tells the dashboard how old the data is. source distinguishes speed (clickhouse) from batch-reconciled. Once batch reconciliation completes for an hour, the dashboard flips the badge to "Reconciled."
Billing Summary
GET /v1/billing/summary?advertiser_id=adv_555&month=2026-03
→ 200 OK
{
"advertiser_id": "adv_555",
"billing_month": "2026-03",
"source": "batch_reconciled",
"total_clicks": 12453210,
"legitimate_clicks": 11892340,
"fraudulent_clicks": 560870,
"total_spend_usd": 2973085.00,
"fraud_adjustments_usd": -140217.50,
"net_amount_usd": 2832867.50,
"invoice_status": "FINALIZED",
"reconciled_at": "2026-04-01T06:00:00Z"
}
Internal Sink Writes
No REST between Flink and storage; direct connections:
Flink speed → ClickHouse JDBC batch INSERT (ReplacingMergeTree idempotent)
Flink speed → Valkey Pipelined SET (overwrite, idempotent)
Spark batch → ClickHouse JDBC batch INSERT (reconciled aggregates)
Spark batch → PostgreSQL JDBC batch UPSERT (billing records)
Flink sink → Iceberg Native Flink-Iceberg connector (exactly-once via 2PC)
10. Click Ingestion and Deduplication
Mental model. Ingestion is a three-stage funnel. The HTTP collector is the wide opening (accepts everything, responds < 10ms). Valkey is the first filter (catches obvious duplicates). Kafka is the narrow end (durable, RF=3, zero loss).
What Counts as a "User" for Dedup
Most ad clicks are not from logged-in users. They come from anonymous visitors on publisher sites, mobile app users without accounts, or clicks from ad exchanges where the ad platform has no direct user relationship. The dedup logic still needs a stable identifier per visitor, so user_id in every dedup key in this post is shorthand for a composed click-identity hash built from the first available signal in this priority order:
| Priority | Identifier | Available when |
|---|---|---|
| 1 | Authenticated user_id | User is logged into the publisher site or advertiser app |
| 2 | IDFA (iOS) / GAID (Android) | Mobile app with ad-tracking permission granted |
| 3 | First-party publisher cookie | Web with cookies, not incognito |
| 4 | Device fingerprint (canvas + WebGL + audio + fonts, hashed) | Always -- computed client-side by the ad SDK |
| 5 | IP + User-Agent hash | Last-resort fallback when all else fails |
click_identity_hash = sha256(first_non_null(
user_id, idfa_or_gaid, cookie_id, device_fp, ip_ua_hash
))
The collector walks the list in order, uses the first non-null identifier, and records which tier was used as a metadata field on the click event. The Valkey dedup key becomes dedup:{click_identity_hash}:{ad_id}, and every user_id reference in the Flink keyed-state dedup and the Spark batch dedup is this composed hash.
Tier 4 (fingerprint) is the most common value in production because most ad clicks are anonymous. Two legitimate users behind the same NAT with similar devices can collide on the tier-5 fallback and have one click over-dropped, but the 30-second window keeps the collision rate well under 0.01% of total traffic. The three-layer dedup architecture tolerates these edge cases because Spark's full-data batch dedup at H+15 runs on the best identifier available at the time, not whatever Valkey had within the 30-second window.
Ingestion Flow
Why 202 even for duplicates. Returning different status codes for new vs. duplicate clicks would let fraudsters probe the dedup window. A uniform 202 reveals nothing about internal state.
Three Layers of Deduplication
| Layer | Mechanism | Catches | Misses | Recovery |
|---|---|---|---|---|
| Valkey (ingestion) | SET NX 30s TTL | Same user + ad within 30s | Valkey down → all clicks pass through | Flink catches most; Spark catches rest |
| Flink (stream) | Keyed state with 30s TTL per (user_id, ad_id) | Duplicates escaping Valkey | Checkpoint boundary edge cases | Spark catches rest |
| Spark (batch) | dropDuplicates on full hourly data | Everything | Nothing | Final word |
If Valkey dies, clicks flow through to Kafka and Flink catches most via keyed state. Dashboards over-count by 1-3% during the outage; billing is unaffected because batch sees everything.
11. Click Fraud Detection Pipeline
Click fraud is estimated to cost the ad industry tens of billions of dollars annually -- Juniper Research, IAS, and advertiser self-reports disagree on the exact figure but all put it in the $40-100B range. The fraud pipeline must identify bot clicks, click farms, click injection, and click flooding -- without adding latency to the click path and without blocking legitimate traffic during outages. Unscored clicks default to LEGITIMATE and get corrected by batch reconciliation.
Fraud Taxonomy
| Fraud Type | Pattern | Detection Method | Volume |
|---|---|---|---|
| Bot clicks | Non-human UAs, headless browsers, impossible click speeds | UA + behavioral heuristics + fingerprinting | 30-40% |
| Click farms | Many clicks from same region, real humans on cheap devices | Geo clustering + conversion-rate collapse | 20-25% |
| Click injection | Malware fires fake click moments before real install | CTI < 2s hard rule + SDK attestation | 15-20% |
| Click flooding | Competitor drains budget with thousands of fake clicks | Per-advertiser velocity + CTR spike detection | 10-15% |
| Click recycling | Same click replayed with modified timestamps | Click ID dedup + timestamp patterns | 5-10% |
| Pixel stuffing | Invisible ad placements generate fake impressions/clicks | Viewability + placement validation | 5-10% |
Fraud Type Deep Dive
Each fraud class has its own signature, weak points, and detection path. The sections below are the operational details.
1. Bot Clicks (30-40%)
Pattern: Automated scripts, headless browsers, and crawlers generating clicks without a human in the loop. The bots are either rented from botnets (residential proxies + Puppeteer) or run on cheap cloud instances (datacenter IPs + Selenium).
What the ad SDK captures on the client:
page_load_to_click_ms = 0
mouse_move_events = 0
scroll_distance_px = 0
viewport_interaction = false
click_coords = (exact center)
session_length_ms = 180
user_agent = "HeadlessChrome/120.0.6099.71"
webgl_renderer = "SwiftShader" // software renderer — no GPU
navigator.webdriver = true
device_fingerprint = "f0f0f0f0..." // seen on 10,000 other "users"
Detection signals:
| Signal | Bot indicator |
|---|---|
| Time from page load to click | < 200ms (no human reads that fast) |
| Mouse movement before click | Zero pixels of travel |
| Click coordinates | Dead-center of ad, same pixel every time |
navigator.webdriver | Set to true (Selenium/Puppeteer tell) |
| WebGL renderer | SwiftShader or llvmpipe (software, not GPU) |
| IP ASN | Known datacenter range (AWS, GCP, Azure, DigitalOcean) |
| Fingerprint reuse | Same canvas hash across thousands of "users" |
Why it's the easiest to detect: every signal above is free, client-side, and deterministic. A single rule (navigator.webdriver == true) catches a large share of script-kiddie bots; fingerprint reuse across "users" catches most of the rest. The rule engine blocks obvious bots in < 10ms without touching the ML model. Bots are cheap to generate but also cheap to filter -- they are the low-hanging fruit of fraud.
2. Click Farms (20-25%)
Pattern: Real humans on real phones clicking real ads in buildings full of cheap labor. Each worker has a legitimate device, a real SIM, and a real residential IP. Behavior looks indistinguishable from organic traffic on any single click.
What the system sees in aggregate:
clicks = 1,000
unique_devices = 1,000 // each worker has a phone
unique_ips = 400 // residential + mobile carrier IPs
geo = concentrated (1 small town, 2-hour radius)
device_class = 85% low-end Android, 2-3 year old models
active_hours = 02:00 - 06:00 local (graveyard shift)
session_length_ms = realistic (humans are actually clicking)
conversion_rate = 0.00% // nobody ever installs or buys
post_install_open = 0 // apps installed get uninstalled in < 60s
Detection signals (all weak individually, strong in aggregate):
- Geographic clustering -- thousands of clicks from a single postal code when the ad targets a whole country.
- Conversion collapse -- CTR is normal but the downstream funnel (installs, purchases, signups) is zero.
- Device homogeneity -- 90% of clicks come from the same 3 phone models at the same OS patch level.
- Abnormal hours -- clicks concentrated in local night hours, when electricity and labor are cheapest.
- Low-quality device tells -- no Bluetooth accessories paired, default wallpapers, factory-fresh system fonts.
Why it's the hardest to detect: everything a normal fraud check looks for -- UA, timing, mouse events, fingerprints -- is genuine. Click farms cannot be blocked with client-side rules. Detection has to live at the aggregate layer: cluster the clicks, look at population statistics, and flag the whole cluster rather than individual events. Short-term click farms that rotate device pools and keep daily volume under thresholds can evade detection for weeks. This is why click farms sit at the top of the difficulty ranking.
3. Click Injection (15-20%)
Pattern: Mobile-specific fraud. Malware on the user's phone listens for app installs (via Android broadcast receivers or iOS URL schemes). The moment a legitimate install begins, the malware fires a fake click event claiming credit for that install, poisoning the attribution last-click window.
Timeline of a real install vs. an injected click:
Organic install (real):
10:00:00 user taps "Install" in Play Store
10:00:15 APK download completes
10:00:28 first app open
→ no prior ad click exists → attributed as organic (free to advertiser)
Injected fraud:
10:00:04.800 malware detects APK install broadcast, fires fake click
10:00:05.000 install completes, SDK reports attribution
→ 200ms gap between "click" and "install" → advertiser charged
The smoking gun is the click-to-install (CTI) distribution.
Legitimate install CTI histogram:
0-2s: 0.1% (impossible for a real store redirect)
2-10s: 5% (fast installers on good WiFi)
10-60s: 55% (typical)
1-5min: 30%
5min+: 10% (decided later, came back)
Injected install CTI:
0-2s: 97% (the malware fires right before the install completes)
2s+: 3%
Detection logic:
- Hard CTI rule:
click → install < 2s→ always fraud. There is no physical way to redirect through the store and finish an install that fast. - CTI distribution check per publisher: if a publisher's CTI median is under 10 seconds, their entire traffic is suspect.
- SDK attestation: Google Play Integrity and Apple App Attest verify the install came from a genuine store, not a sideloaded APK that triggered its own fake broadcast.
- Install referrer matching: Google's Install Referrer API returns the actual referring click; if it doesn't match the attribution claim, the attribution is fake.
Click injection is moderately easy to catch given the physics of the CTI window. The hard part is having the mobile-specific telemetry (install broadcasts, Play Integrity, referrer API) wired into the fraud pipeline at all.
4. Click Flooding (10-15%)
Pattern: Fraudster sends thousands of fake clicks for a device ID, blanketing the attribution window. When that user later installs the app organically (or any other user on a shared device ID), the last click wins attribution and the fraudster collects the payout.
The flooding arithmetic:
Attribution window: 7 days
Target: users about to install a popular game
Fraudster strategy:
- Harvest 10M device IDs from a leaked list
- For each device, send 1 click/day to every popular app on the network
- Per device cost: ~$0 (clicks are free to generate)
- Expected conversions: 0.5% of those users will organically install one of the apps
- 10M devices × 0.5% × $2 CPI = $100,000 stolen per day
Why it's uniquely expensive: unlike bots or pixel stuffing (which inflate worthless metrics), click flooding redirects real conversions. The user was going to install the app anyway. The advertiser was going to get that install for free (organic) or through a legitimate ad network. Instead, the flooder hijacks the last-click credit and bills the advertiser full CPI for an install the fraudster had nothing to do with. It is the only fraud type where the detected revenue leak is always 100% incremental loss.
Detection signals:
- Clicks-per-device velocity -- one device receiving 200 click events from 50 different publishers in 5 minutes is impossible human behavior.
- CTR anomaly per publisher -- CTR suddenly 10x baseline with no change in traffic source.
- Low click-to-install ratio -- a legitimate publisher with 1M clicks/day and 10K installs (1% CR) flips to 10M clicks/day and still 10K installs (0.1% CR).
- Time distribution flattening -- clicks spread uniformly across 24 hours instead of following the human diurnal curve.
- Install referrer mismatch -- the store-level install referrer names a different publisher than the one claiming credit.
The defense is mostly per-device click velocity caps + install-referrer validation. Publishers with a flat click distribution and no conversion lift get their payouts withheld and investigated.
5. Click Recycling (5-10%)
Pattern: Replay attack. The fraudster captures a single legitimate click event (by proxying traffic or scraping server logs) and replays it with a new timestamp to generate duplicate billable events.
What the fraud looks like:
Original click:
click_id = "abc123"
timestamp = 2026-04-10T10:00:00Z
user_id = "user-42"
ad_id = "ad-99"
device_fp = "fp-xyz"
Replay 1 (5 min later):
click_id = "abc123" // same
timestamp = 2026-04-10T10:05:00Z // bumped
user_id = "user-42"
ad_id = "ad-99"
device_fp = "fp-xyz"
Replay 2 (10 min later):
click_id = "abc123" // same
timestamp = 2026-04-10T10:10:00Z // bumped
Detection: click recycling is the easiest fraud to stop because the dedup infrastructure the aggregator already has catches it for free. The three-layer dedup (Valkey primary, Flink RocksDB, batch reapply) drops any click_id it has seen before, regardless of timestamp. Sophisticated recyclers rotate click_id but keep the other parameters -- detection then keys on a composite fingerprint (user_id + ad_id + device_fp + ip + event_timestamp bucketed to 1 second). Clicks with identical composites and different click_ids are flagged as replayed.
Recycling exists because some fraud rings get caught generating fake clicks and pivot to reusing real ones. But the attack has a low ceiling: a captured click is only billable once per dedup window, so reuse requires constant fresh capture. Low ROI for the fraudster, low cost to the advertiser.
6. Pixel Stuffing (5-10%)
Pattern: Publisher places ads where the user cannot see them:
- 1x1 iframes -- ad renders in a literal 1-pixel container
- Off-screen placement --
position: absolute; left: -9999px - Hidden behind content --
z-index: -1or covered by another element - Ad stacking -- multiple ads in one container, all fire impressions
- Below-the-fold on infinite scroll -- ad loads at 50,000px, user scrolls to 800
Detection via IntersectionObserver client-side:
const observer = new IntersectionObserver((entries) => {
entries.forEach(entry => {
report({
ad_id: entry.target.dataset.adId,
visible_pct: entry.intersectionRatio,
ad_w: entry.boundingClientRect.width,
ad_h: entry.boundingClientRect.height,
scroll_y: window.scrollY,
time_in_view_ms: now - entry.target.dataset.firstSeen,
});
});
}, { threshold: [0, 0.25, 0.5, 0.75, 1.0] });MRC viewability standard: an ad impression is only countable if at least 50% of the pixels are in view for at least 1 continuous second (2 seconds for video). Anything below that threshold is a discarded impression, not a billable one.
Fraud rules:
ad_w < 10 || ad_h < 10→ pixel stuffingvisible_pct == 0for the entire session → hidden adtime_in_view_ms < 1000→ unviewable (MRC failure)- Multiple ads reporting identical scroll position and viewport → ad stacking
ad_y > scroll_max_y→ below the fold, never reached
Pixel stuffing mostly inflates impression counts rather than clicks, and when it does generate clicks the viewability signal has already caught the impression. Low cost to the advertiser because most ad contracts now require MRC-viewable impressions to bill.
Difficulty vs. Cost Ranking
Not all fraud is equally hard to catch, and difficulty doesn't track cost.
| Rank | Difficulty (hard → easy) | Cost (expensive → cheap) |
|---|---|---|
| 1 | Click farms (real humans, aggregate-only) | Click flooding (hijacks real conversions, 100% incremental loss) |
| 2 | Click flooding (cross-publisher correlation needed) | Click farms (wastes ad spend at scale) |
| 3 | Click injection (needs mobile install telemetry) | Click injection (last-click hijack on mobile installs) |
| 4 | Bot clicks (client-side signals catch them in < 10ms) | Bot clicks (mostly filtered before billing) |
| 5 | Click recycling (dedup catches for free) | Pixel stuffing (MRC gating caps damage) |
| 6 | Pixel stuffing (IntersectionObserver deterministic) | Click recycling (dedup bounds the loss) |
The rankings diverge on purpose. Click flooding is mid-difficulty but top-cost -- the dollars are in the attribution hijack, not the click volume. Click farms are top-difficulty but second-cost. An efficient fraud team allocates detection effort to the cost ranking, not the volume ranking: blocking 10M bot clicks is less valuable than blocking 10,000 flooded installs.
Fraud Detection Architecture
The pipeline is layered so each fraud class is caught at the cheapest stage that can reliably detect it. Every stage emits verdicts to the same Kafka topic so downstream consumers see a unified stream.
Fraud type → stage mapping:
| Fraud Type | Caught at | Why |
|---|---|---|
| Bot clicks | Phase 1 (R1, R4) | webdriver flag, headless UA, datacenter ASN — deterministic |
| Click recycling | Phase 1 (R2) | Composite dedup already runs for billing accuracy |
| Pixel stuffing | Client SDK + Phase 1 (R3) | IntersectionObserver verdict applied via MRC gate |
| Click injection | Phase 1 (R5) + Phase 4 | CTI < 2s is an absolute block; referrer mismatch catches the rest |
| Click flooding | Phase 1 (R4) + Phase 3 (AG2/AG3) + Phase 4 | Velocity filter → publisher CTR anomaly → referrer mismatch |
| Click farms | Phase 2 + Phase 3 (AG1/AG4) + Phase 5 | No per-event signal reliable; needs population-level clustering + batch re-score |
Feedback loop. Verdicts from every phase write back to the Valkey feature store. A device flagged at Phase 4 becomes a feature the Phase 2 model uses on the next click from that device. Slow offline detection becomes fast online features within one feature-store write cycle.
Fraud Scoring Flow
Per-event phases 1-2 run synchronously and emit verdicts in ~500ms. Phase 3 runs on windowed streams (seconds to minutes later). Phase 4 runs when downstream SDKs report install/conversion events (hours later). Phase 5 runs at the next hourly Spark batch. Every phase writes to ad.clicks.fraud-verdicts; the most recent verdict per click_id wins, with Phase 5 (batch) as the final authority.
Phase 1: Rule Engine (Java)
Rules are fast, deterministic, and auditable. R1-R6 implement the nodes in the architecture diagram, each targeting a specific fraud type. The engine short-circuits on the first blocking rule but accumulates reasons for non-blocking rules (so a click can be flagged for both IP flood and velocity at once).
Shortened skeleton showing the shape of the engine. Full implementation would add UA patterns, datacenter ASN list, per-device blocklist lookups, and the R2/R3 checks collapsed out for brevity.
public class FraudRuleEngine {
public FraudRuleResult evaluate(ClickEvent click, FraudCounters counters) {
// R1 — Bot: webdriver flag, headless UA, datacenter ASN → BLOCK
if (click.getSdkSignals().navigatorWebdriver()) return blocked("WEBDRIVER");
if (isBotUserAgent(click.getUserAgent())) return blocked("BOT_UA");
if (isDatacenterAsn(click.getIpAsn())) return blocked("DATACENTER");
// R2 — Recycling: composite-fingerprint dedup → BLOCK
if (dedup.seen(compositeKey(click))) return blocked("CLICK_RECYCLED");
// R3 — Pixel stuffing: MRC viewability gate → BLOCK
if (!meetsMrcViewability(click.viewability())) return blocked("UNVIEWABLE");
// R5 — Click injection: CTI < 2s hard rule → BLOCK
if (click.isMobileAppInstall() && click.getClickToInstallMs() < 2_000)
return blocked("CTI_INJECTION");
// R6 — Blocklist: populated by phases 3-5 feedback → BLOCK
if (blocklist.contains(click.getIp(), click.getDeviceFp()))
return blocked("BLOCKLIST");
// R4 — Velocity: non-blocking, accumulates reasons for ML → SUSPICIOUS
List<String> reasons = new ArrayList<>();
if (click.getTimeSinceImpression() < 200) reasons.add("TIMING");
if (counters.userClicks(click, Duration.ofMinutes(1)) > 5) reasons.add("USER_VELOCITY");
if (counters.ipClicks(click, Duration.ofMinutes(5)) > 50) reasons.add("IP_FLOOD");
if (counters.deviceClicks(click, Duration.ofMinutes(5)) > 20) reasons.add("DEVICE_FLOOD");
return reasons.isEmpty() ? passed() : suspicious(reasons); // ML decides
}
}Velocity (R4) is deliberately non-blocking -- a flash sale can legitimately produce a click spike, so the engine passes the signal to the ML model and lets it decide.
Phase 2: ML Features (Python)
The ML model catches sophisticated fraud that rules miss: farms using real browsers, slow distributed attacks, coordinated fraud rings, and low-velocity flooding under the R4 thresholds. Features map to the five Valkey namespaces (FS1-FS5) + a geo cluster namespace.
The full production model uses ~40 features. Ten representative ones -- one or two per namespace -- show the shape of the input vector:
def extract_features(click, fs):
user = fs.get(f"user:{click.user_id}")
ip = fs.get(f"ip:{click.ip_address}")
ad = fs.get(f"ad:{click.ad_id}")
pub = fs.get(f"pub:{click.publisher_id}")
fp = fs.get(f"fp:{click.device_fingerprint}")
geo = fs.get(f"geo:{click.geo_cell}") # S2 cell level 12
return {
# FS1 — user behavior
"user_click_count_1h": user.clicks_last_hour,
"user_unique_ads_1h": user.unique_ads_last_hour,
# FS2 — IP reputation
"ip_unique_users_1h": ip.unique_users_last_hour,
"ip_is_datacenter": ip.is_datacenter,
# FS3 — ad-level anomaly
"ad_ctr_deviation_sigma": (ad.ctr_1h - ad.ctr_7d_avg) / max(ad.ctr_7d_std, 1e-3),
# FS4 — publisher CTI histogram (catches injection)
"pub_cti_median_ms": pub.cti_median_ms,
"pub_referrer_match_pct": pub.install_referrer_match_pct,
# FS5 — device fingerprint history
"fp_distinct_user_ids": fp.distinct_user_ids, # > 1 is suspicious
"fp_age_hours": fp.first_seen_hours_ago,
# Geo cluster (catches click farms)
"geo_device_homogeneity": geo.top3_device_models_share,
}Feature → fraud attribution:
- FS1 (user) flags accounts clicking more ads/hour than humanly possible
- FS2 (IP) flags datacenter/VPN/residential proxies and low-diversity IPs
- FS3 (ad) flags CTR spikes without matching conversion lift
- FS4 (publisher CTI) flags impossible click-to-install medians, the injection fingerprint
- FS5 (device fingerprint) flags browsers appearing under many user IDs -- coordinated fraud rings
- Geo cluster flags S2 cells with high volume + low conversion + homogeneous devices -- click farms
The full feature set adds timing signals (time_since_impression_ms, viewable_pct, mouse_moved_before_click), more user counters (24h, unique devices, mouse-move session ratio), more IP signals (VPN, residential proxy, ASN reputation, country mismatch), more ad signals (CR collapse ratio, 7d baselines), more publisher signals (under-10s CTI %, 7d fraud rate), more fingerprint signals (distinct IPs, total conversions), more geo signals (click count, unique users, CR), and calendar features (hour of day, weekend) -- ~40 features total in production.
Feature Store Updates (Valkey)
Every click triggers a pipelined MULTI/EXEC updating five namespaces in a single round trip. Writes happen after the fraud job emits its per-click verdict, so updates never block scoring.
Sliding-window TTLs. Feature-store entries use TTL-based sliding windows: velocity features expire in 30s-5m, aggregate features in 10-60m, and reputation / blocklist features decay over 1-30 days to prevent stale behavior from biasing fraud scores. All features are updated online and automatically expire, so the model reflects recent behavior rather than lifetime history.
MULTI
-- FS1: per-user
INCR fraud:user:{user_id}:clicks_1h
EXPIRE fraud:user:{user_id}:clicks_1h 3600
PFADD fraud:user:{user_id}:unique_ads_1h {ad_id}
EXPIRE fraud:user:{user_id}:unique_ads_1h 3600
-- FS2: per-IP
INCR fraud:ip:{ip}:clicks_5m
EXPIRE fraud:ip:{ip}:clicks_5m 300
PFADD fraud:ip:{ip}:unique_users_1h {user_id}
PFADD fraud:ip:{ip}:unique_devices_1h {device_fp}
EXPIRE fraud:ip:{ip}:unique_users_1h 3600
EXPIRE fraud:ip:{ip}:unique_devices_1h 3600
-- FS5: device fingerprint history
INCR fraud:fp:{device_fp}:clicks_total
PFADD fraud:fp:{device_fp}:user_ids {user_id}
PFADD fraud:fp:{device_fp}:ips {ip}
SET fraud:fp:{device_fp}:first_seen {now} NX
-- Geo cluster (S2 cell level 12, ~1 km²)
INCR fraud:geo:{s2_cell}:clicks_1h
EXPIRE fraud:geo:{s2_cell}:clicks_1h 3600
PFADD fraud:geo:{s2_cell}:users_1h {user_id}
EXPIRE fraud:geo:{s2_cell}:users_1h 3600
HINCRBY fraud:geo:{s2_cell}:device_models {device_model} 1
EXPIRE fraud:geo:{s2_cell}:device_models 3600
EXEC
CTI histogram (Phase 4 callback, off the click path): when an install-referrer event arrives, update the publisher's 7-day t-digest sketch (TDIGEST.ADD fraud:pub:{publisher_id}:cti_7d {click_to_install_ms}) and increment match/mismatch counters. Injection networks staying just above the 2-second hard rule still stand out in the 7-day percentile distribution.
Memory sizing (140 GB total):
FS1 — Per-user counters + HLL (20M users): ~26 GB
FS2 — Per-IP counters + HLL (5M IPs): ~25 GB
FS3 — Per-ad CTR baselines (2M ads): ~0.4 GB
FS4 — Per-publisher CTI sketch (200K pubs): ~0.4 GB
FS5 — Device fingerprint history (50M fps, 7d): ~75 GB ← dominant
Geo cluster (500K S2 cells): ~15 GB
Total: ~140 GB
Valkey cluster: 12 shards × 16 GB = 192 GB
FS5 dominates because it retains 7 days (vs. 1-hour TTLs on user/IP). Fraud rings rotate IPs faster than browser fingerprints, so long retention on fingerprints is the highest-value tradeoff.
Device Fingerprinting
Sophisticated fraudsters rotate user agents and IPs but reuse browsers. A fingerprint is a hash of browser characteristics, collected client-side by the ad SDK:
| Signal | Stability | Forgery Difficulty |
|---|---|---|
| Canvas fingerprint | High | Medium |
| WebGL renderer | High | Medium |
| Audio context fingerprint | High | Hard |
| Screen resolution + color depth | Medium | Easy |
| Installed fonts | Medium | Medium |
| Navigator properties (language, platform, cores) | Medium | Easy |
| Timezone + clock skew | Medium | Medium |
Not perfect (fingerprints can be spoofed, browser updates break them), but a valuable signal layer on top of IP and UA analysis.
Walkthrough: 10,000 Fraudulent Clicks in One Hour
A concrete trace tying the whole pipeline together. Assume an advertiser gets 10,000 fraudulent clicks mixed into a normal hour of traffic. The fraud mix is realistic: bots, farms, flooding, injection, recycling, and pixel stuffing. CPC is $0.25, so the total at risk is $2,500.
The fraud breakdown:
| Fraud type | Count | What makes it hard |
|---|---|---|
| Bot clicks | 4,000 | Most obvious (webdriver flag, datacenter IP) |
| Click farms | 2,500 | Per-event signals all look legitimate |
| Click flooding | 1,500 | Distributed across many publishers |
| Click injection | 1,000 | Mobile-only; needs CTI telemetry |
| Click recycling | 500 | Replay attack on captured click_ids |
| Pixel stuffing | 500 | Invisible ad placements |
| Total | 10,000 | Theoretical charge: $2,500 |
Step 0 — Edge blocklist (before Kafka). 1,000 of the bot IPs were already flagged by yesterday's Phase 5 batch, so they hit BL at the API Gateway and are dropped before entering Kafka. Never counted anywhere. 9,000 clicks remain.
Step 1 — Phase 1 rule engine (within 100 ms of hitting Kafka). The deterministic rules catch the obvious offenders and emit FRAUDULENT verdicts to ad.clicks.fraud-verdicts:
- 3,000 bots (R1: webdriver flag + datacenter ASN + headless UA)
- 500 recycled clicks (R2: composite dedup)
- 500 pixel stuffing (R3: MRC viewability gate)
- 1,000 click injection (R5: CTI < 2s hard rule)
Phase 1 verdict: 5,000 FRAUDULENT.
Step 2 — Phase 2 ML model (within 500 ms total). Passes through the feature store + XGBoost:
- 800 more bot clicks (residential-proxy botnet caught by fingerprint reuse + low mouse-move pct)
- 300 click farm clicks that happened to cluster on low
ip_unique_users_1h - 200 more low-velocity flooding
Phase 2 verdict: 1,300 FRAUDULENT.
After per-event phases: 6,300 caught, 2,700 still looking LEGITIMATE. Those 2,700 are mostly click farms (2,200), low-velocity flooding (300), and stealthy bots (200) that all pass client-side checks.
Step 3 — Flink speed layer (same second). Flink joins each click with its verdict from ad.clicks.fraud-verdicts:
- The 6,300 FRAUDULENT clicks land in
fraudulent_clickson the minute aggregate. They are not added tolegitimate_clicksortotal_spend_micros. - The 2,700 still-LEGITIMATE clicks are counted normally:
+2,700legitimate,+$675spend. - Aggregates flush to ClickHouse + Valkey real-time cache. Dashboard is now over-counted by $675.
Step 4 — Phase 3 aggregate analysis (seconds to minutes later). Windowed Flink jobs running in parallel:
- Geo cluster detector — 1,350 of the "legitimate" clicks are concentrated in one S2 cell with zero conversions and 85% device homogeneity. All 1,350 flagged as click farms.
- Publisher CTR anomaly —
pub_xyzis running at 10x its 7-day baseline CTR with flat install numbers. 200 more flagged. - Device-ID velocity — ~50 devices with 200+ clicks each across unrelated publishers. 150 more flagged.
Phase 3 verdict: 1,700 FRAUDULENT. These are cluster-level verdicts; each one expands back to every click_id in the offending cluster. Verdicts written to ad.clicks.fraud-verdicts.
Phase 3 does not retroactively rewrite the Flink minute window -- that aggregate has already been sealed and pushed to ClickHouse. The dashboard is still over-counted. But the verdicts are now queued for the batch layer.
Step 5 — Phase 4 post-conversion attestation (hours later). As install events trickle back from the mobile SDKs:
- Install-referrer mismatch — 300 of the flooded clicks have install-referrer data naming a different publisher than the one claiming credit. Flagged.
- App-open retention failure — 500 installs attributed to the click farm never opened the app. Flagged.
Phase 4 verdict: 800 FRAUDULENT.
Step 6 — Phase 5 batch reapply (at H+15, the hourly Spark job). Spark reads every raw click for the hour from Iceberg, joins with every verdict from ad.clicks.fraud-verdicts (Phase 1 + 2 + 3 + 4 = 8,800 verdicts), and re-scores everything with the latest ML model. The newer model catches another 150 subtle stealth-bot clicks.
Phase 5 final: 8,950 FRAUDULENT, 50 LEGITIMATE. Spark still misses 50 -- those will only surface in a future backfill (FR-14) once the model improves.
Spark then writes two things:
- ClickHouse — corrected aggregates. Overwrites the speed-layer numbers for that hour via
ReplacingMergeTree(which dedupes oninserted_at). Legitimate clicks drop from 2,700 → 50. Fraudulent clicks rise from 6,300 → 8,950. Dashboard badge flips from "Estimated" to "Reconciled". - PostgreSQL — billing_aggregates. UPSERT the row:
legitimate_clicks=50,total_spend_micros=12_500_000($12.50),fraudulent_clicks=8,950,refunded_spend_micros=2_237_500_000($2,237.50).
Step 7 — Feedback loop. Every verdict (Phase 1-5) writes back into the system:
- Feature store — the IPs, device fingerprints, and publisher IDs on the offending clicks now have worse ML features. The next click from these hits a higher
fp_distinct_user_ids, higherip_asn_reputation, and higherpub_fraud_rate_7d. - Edge blocklist — the worst 3,000 IPs and 500 device fingerprints are added to
BL. The next click from them is blocked at the API Gateway before it even reaches Kafka.
Step 8 — Who sees what:
| Consumer | Sees | When | Accurate? |
|---|---|---|---|
| Advertiser dashboard | 6,300 fraud, 2,700 legitimate, +$675 spend | Real time | Off by $675 (optimistic) |
| Advertiser dashboard | 8,950 fraud, 50 legitimate, $12.50 spend, "Reconciled" badge | H+15 | Accurate for 99.5% of clicks |
| Billing service | 50 legitimate clicks, $12.50, refund of $2,237.50 | H+15 (reads PostgreSQL only) | Accurate — never saw the inflated $675 |
| Fraud report API (FR-13) | 8,950 fraud breakdown by type | H+15 | Accurate |
| Dispute investigation | click_audit_log row per click_id with final verdict + batch_job_id | Any time | Traceable per-click |
The key property: the billing service only reads PostgreSQL and only sees data written by the Spark batch job. The advertiser is never billed for the 2,700 clicks the speed layer over-counted, because billing never reads the speed layer. The dashboard is wrong for 15 minutes, but the invoice is right from the start.
Of the 50 clicks that slip through all five phases, they cost $12.50 and are logged in click_audit_log for future backfill. At industry fraud catch rates of 99.5% (common for top-tier platforms), this is within acceptable tolerance. If a better model arrives via FR-14, the historical data can be re-scored and the advertiser credited retroactively.
12. Exactly-Once Aggregation with Flink
Mental model. Flink's exactly-once bundles three things per checkpoint: Kafka offsets (where to resume), RocksDB state (accumulated windows), and pending sink commits (pre-staged ClickHouse writes). On recovery, all three restore to a consistent point. Events since the checkpoint replay from Kafka, state rebuilds identically, and pending pre-commits from the failed attempt abort before new results are emitted. Strictly speaking this is effectively exactly-once rather than atomic across systems: the Flink→ClickHouse 2PC relies on the sink's ReplacingMergeTree to dedupe idempotent retries on inserted_at, not on a true cross-system atomic commit.
Checkpoint Mechanism
Flink Job Config
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(180_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // incremental
KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("ad.clicks.raw")
.setGroupId("flink-click-aggregation")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setDeserializer(new ClickEventAvroDeserializer())
.build();
DataStream<ClickEvent> clicks = env
.fromSource(source, WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((event, ts) -> event.getServerTimestamp())
.withIdleness(Duration.ofMinutes(2)),
"kafka-source")
.uid("kafka-source").name("Ad Click Events");10-minute bounded out-of-orderness. Mobile clicks can arrive up to 10 minutes late. The watermark lags 10 minutes behind the most recent event, so late arrivals in that window still land in the right window. Events past the watermark go to a side output.
Pipeline
// Dedup + enrich
DataStream<ClickEvent> deduped = clicks
.keyBy(ClickEvent::getDeduplicationKey) // SHA-256(user_id + ad_id)
.process(new ClickDeduplicationFunction()) // 30s TTL
.uid("dedup").name("Click Deduplication");
// Join with Phase 1+2 fraud verdicts (per-event, ~500ms SLA from the fraud job)
// Phase 3/4/5 verdicts arrive too late for the speed layer and are applied by batch (Spark).
DataStream<EnrichedClick> enriched = deduped
.connect(fraudVerdicts.keyBy(FraudVerdict::getClickId))
.process(new FraudJoinFunction()) // waits up to 5 min for P1/P2 verdict, defaults to LEGITIMATE
.uid("fraud-join").name("Fraud Verdict Join");
KeyedStream<EnrichedClick, String> keyedByAd = enriched.keyBy(EnrichedClick::getAdId);
// 1-minute tumbling
DataStream<ClickAggregate> minuteAggs = keyedByAd
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(10))
.sideOutputLateData(lateEvents)
.aggregate(new ClickAggregateFunction(), new ClickWindowFunction("1min"))
.uid("window-1min").name("1-Minute Aggregation");
// 1-hour tumbling (continuous trigger every 1 min)
DataStream<ClickAggregate> hourlyAggs = keyedByAd
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.aggregate(new ClickAggregateFunction(), new ClickWindowFunction("1hour"))
.uid("window-1hour").name("1-Hour Aggregation");
// 1-day tumbling (continuous trigger every 5 min)
DataStream<ClickAggregate> dailyAggs = keyedByAd
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(5)))
.aggregate(new ClickAggregateFunction(), new ClickWindowFunction("1day"))
.uid("window-1day").name("1-Day Aggregation");Aggregate Function
public class ClickAggregateFunction
implements AggregateFunction<EnrichedClick, ClickAccumulator, ClickAggregate> {
@Override public ClickAccumulator createAccumulator() { return new ClickAccumulator(); }
@Override public ClickAccumulator add(EnrichedClick click, ClickAccumulator acc) {
acc.totalClicks++;
switch (click.getFraudVerdict()) {
case LEGITIMATE:
acc.legitimateClicks++;
acc.totalSpendMicros += click.getBidAmountMicros();
acc.uniqueUsers.add(click.getUserId()); // HyperLogLog
break;
case SUSPICIOUS:
acc.suspiciousClicks++;
acc.totalSpendMicros += click.getBidAmountMicros() / 2; // half rate, refund if confirmed
break;
case FRAUDULENT:
acc.fraudulentClicks++;
break;
}
acc.countryBreakdown.merge(click.getGeoCountry(), 1L, Long::sum);
acc.deviceBreakdown.merge(click.getDeviceType(), 1L, Long::sum);
return acc;
}
@Override public ClickAggregate getResult(ClickAccumulator acc) {
return new ClickAggregate(acc.totalClicks, acc.legitimateClicks,
acc.fraudulentClicks, acc.suspiciousClicks, acc.totalSpendMicros,
acc.uniqueUsers.cardinality(), acc.countryBreakdown, acc.deviceBreakdown);
}
@Override public ClickAccumulator merge(ClickAccumulator a, ClickAccumulator b) {
a.totalClicks += b.totalClicks;
a.legitimateClicks += b.legitimateClicks;
a.fraudulentClicks += b.fraudulentClicks;
a.suspiciousClicks += b.suspiciousClicks;
a.totalSpendMicros += b.totalSpendMicros;
a.uniqueUsers.merge(b.uniqueUsers);
b.countryBreakdown.forEach((k, v) -> a.countryBreakdown.merge(k, v, Long::sum));
b.deviceBreakdown.forEach((k, v) -> a.deviceBreakdown.merge(k, v, Long::sum));
return a;
}
}Which fraud verdicts reach the speed layer. Only Phase 1 (rule engine, ~100 ms) and Phase 2 (ML, ~500 ms) arrive in time for the FraudJoinFunction's 5-minute wait window. Phase 3 (windowed aggregate detection), Phase 4 (post-conversion attestation, arrives hours later), and Phase 5 (Spark batch reapply, arrives at H+15) all arrive after the 1-minute tumbling window has already been sealed and pushed to ClickHouse. The speed layer does not retroactively rewrite closed windows -- instead, Spark overwrites the hourly aggregate row in ClickHouse via ReplacingMergeTree at H+15, and the dashboard flips its badge from "Estimated" to "Reconciled". This is exactly the drift the 10k-fraud walkthrough above describes: 2,700 clicks look legitimate to the speed layer at T=0, and 2,650 of them flip to fraudulent at T=H+15 when batch applies the Phase 3/4/5 verdicts.
Why SUSPICIOUS is charged at 50%. If Phase 2 returns a score between 0.5 and 0.85, the click is flagged SUSPICIOUS and the speed layer charges half CPC. This keeps the dashboard spend estimate in a reasonable range without committing to full billing. At H+15, Spark re-scores every SUSPICIOUS click with the final model and either confirms the charge (upgrades to LEGITIMATE) or zeros it (downgrades to FRAUDULENT) -- any difference becomes a refunded_spend_micros line on the billing row.
Late Event Handling
Events within 10 minutes of the watermark re-fire the window; ClickHouse ReplacingMergeTree dedupes the updated aggregate. Events beyond 10 minutes go to ad.clicks.late-arrivals for the hourly batch job. Events after 24 hours are logged for audit but excluded from billing per advertiser agreement.
OutputTag<EnrichedClick> lateEvents = new OutputTag<>("late-clicks") {};
minuteAggs.getSideOutput(lateEvents)
.addSink(new KafkaSink<>("ad.clicks.late-arrivals"))
.uid("late-events-sink");13. Stream/Batch Reconciliation
Even with exactly-once Flink, speed and batch produce slightly different numbers. The reconciliation job finds and resolves discrepancies before they reach billing.
Sources of Discrepancy
| Source | Speed Layer | Batch Layer | Impact |
|---|---|---|---|
| Late events (> 10 min) | Missed or side-output | Included (reads complete data) | Under-count in speed |
| Fraud model update | Old model at processing time | New model applied retroactively | Different fraud counts |
| Checkpoint recovery replay | May emit duplicate aggregates | Reads raw events, no duplicates | Potential over-count in speed |
| Watermark stall | Can delay window closure | No watermarks, reads complete partitions | Timing differences |
| Dedup window edge cases | 30s TTL in Valkey (approximate) | Full-data dedup with Spark (exact) | Speed may miss cross-window dupes |
Reconciliation Flow
Spark Job
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("click-reconciliation") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.getOrCreate()
target_hour = "2026-04-01T14:00:00"
raw_clicks = spark.read.format("iceberg").load("iceberg.ad_clicks_raw") \
.filter(F.col("event_timestamp").between(target_hour, target_hour_end))
clicks_with_verdict = raw_clicks.fillna("LEGITIMATE", subset=["fraud_verdict"])
deduped = clicks_with_verdict.dropDuplicates(["user_id", "ad_id", "event_minute"])
batch_aggregates = deduped.groupBy(
"ad_id", "campaign_id", "advertiser_id", "geo_country", "device_type",
F.window("event_timestamp", "1 hour").alias("window")
).agg(
F.count("*").alias("total_clicks"),
F.count(F.when(F.col("fraud_verdict") == "LEGITIMATE", 1)).alias("legitimate_clicks"),
F.count(F.when(F.col("fraud_verdict") == "FRAUDULENT", 1)).alias("fraudulent_clicks"),
F.sum(F.when(F.col("fraud_verdict") == "LEGITIMATE", F.col("bid_amount_micros")).otherwise(0))
.alias("total_spend_micros"),
F.approx_count_distinct("user_id").alias("unique_users")
)
# ClickHouse — append is safe because ReplacingMergeTree dedupes on inserted_at,
# so a replay of the same batch job produces the same final row.
batch_aggregates.write.format("jdbc") \
.option("url", "jdbc:clickhouse://clickhouse:8123/default") \
.option("dbtable", "ad_click_aggregates_reconciled") \
.mode("append").save()
billing_summary = batch_aggregates.groupBy("advertiser_id", "campaign_id").agg(
F.sum("legitimate_clicks").alias("legitimate_clicks"),
F.sum("total_spend_micros").alias("total_spend_micros"),
F.sum("fraudulent_clicks").alias("fraudulent_clicks")
)
# PostgreSQL — 2-step staging + UPSERT so the authoritative billing_aggregates
# table is only ever updated in place, never truncated.
# Step 1: write to a per-job staging table (safe to overwrite; single writer).
billing_summary.write.format("jdbc") \
.option("url", "jdbc:postgresql://postgres:5432/billing") \
.option("dbtable", f"billing_aggregates_staging_{batch_job_id}") \
.mode("overwrite").save()
# Step 2: MERGE staging into the authoritative billing_aggregates table in a
# single transaction. Idempotent per (advertiser_id, campaign_id, billing_date).
# (psycopg2 shown illustratively; in practice use a connection-pooled client.)
with psycopg2.connect(PG_URL) as conn:
with conn.cursor() as cur:
cur.execute(f"""
INSERT INTO billing_aggregates
(advertiser_id, campaign_id, billing_date, legitimate_clicks,
total_spend_micros, fraudulent_clicks, refunded_spend_micros,
batch_job_id, reconciled_at)
SELECT advertiser_id, campaign_id, '{target_date}',
legitimate_clicks, total_spend_micros, fraudulent_clicks,
0, '{batch_job_id}', now()
FROM billing_aggregates_staging_{batch_job_id}
ON CONFLICT (advertiser_id, campaign_id, billing_date) DO UPDATE SET
legitimate_clicks = EXCLUDED.legitimate_clicks,
total_spend_micros = EXCLUDED.total_spend_micros,
fraudulent_clicks = EXCLUDED.fraudulent_clicks,
refunded_spend_micros = EXCLUDED.refunded_spend_micros,
batch_job_id = EXCLUDED.batch_job_id,
reconciled_at = EXCLUDED.reconciled_at
""")
cur.execute(f"DROP TABLE billing_aggregates_staging_{batch_job_id}")Reconciliation SLA
| Metric | Target | Action if Missed |
|---|---|---|
| Speed vs. batch click diff | < 0.1% per advertiser/hour | Investigate Flink job health |
| Speed vs. batch spend diff | < $10 per advertiser/hour | Re-run batch with broader time range |
| Job completion | H+15 | Alert, use speed layer data with disclaimer |
| Job failure | Retry 3x exponential backoff | Manual investigation, extend billing deadline |
Batch job performance. 417M raw clicks (83 GB Parquet) in 15 minutes. Iceberg partition pruning → 20 Spark executors → hash shuffle on ad_id → write. Total: 8-12 minutes.
14. Billing Pipeline
Integer micros for money. Floating point on 10M clicks × $0.25 produces $2,499,999.9999999702 -- off by 3 cents. Integer micros (250,000 × 10M = 2,500,000,000,000 micros = $2,500,000.00) is exact. All math is integer; division to dollars happens only at display, using banker's rounding.
Invoice states: DRAFT → PENDING_REVIEW → APPROVED → FINALIZED → PAID, with side paths DRAFT → DISPUTED and APPROVED → ADJUSTED → FINALIZED. Each transition is logged. Once FINALIZED, the amount cannot change without a separate adjustment credit/debit.
Timeline:
| When | Action |
|---|---|
| Real-time | Speed layer counts on dashboard ("Estimated") |
| H+15 | Batch reconciliation completes → dashboard flips to "Reconciled" |
| D+1 00:30 UTC | Daily rollup of 24 hourly records |
| M+1 06:00 UTC | Invoice generated (DRAFT) |
| M+1 10:00 UTC | Finance review (PENDING_REVIEW) |
| M+3 days | FINALIZED, sent to advertiser |
| M+30 days | Payment due (PAID or DISPUTED) |
Dispute handling: pull click_audit_log for the disputed period, cross-reference with advertiser server logs, re-run batch with enhanced fraud models, issue a credit memo if confirmed.
15. Bottlenecks and Mitigations
Hot Ad Skew
Problem: A Super Bowl ad gets 500K clicks/sec while most ads get < 10. Since Kafka is partitioned by ad_id, one hot ad sends everything to one partition and one Flink subtask. That subtask falls behind, backpressure propagates, checkpoint barriers can't align.
Fix -- two-phase aggregation: salt the key to spread hot ads across 16 shards, then merge.
// Phase 1: spread across 16 shards
DataStream<ClickAggregate> sharded = enriched
.keyBy(click -> click.getAdId() + "_" + (click.hashCode() & 15))
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new ClickAggregateFunction())
.uid("sharded-count");
// Phase 2: merge back to ad_id
DataStream<ClickAggregate> merged = sharded
.keyBy(ClickAggregate::getAdId)
.process(new MergeAggregatesFunction())
.uid("merge-counts");The salt is a deterministic hash, not random. On recovery, every click lands on the same shard, the merge sums the same partial counts, and exactly-once semantics survive.
Valkey Dedup Throughput
Problem: At 1M clicks/sec spike × 30s TTL = 30M active keys (~1.2 GB, easy). The real bottleneck is 1M SET NX/sec -- more than one Valkey instance handles.
Fix: shard dedup keys across 8 Valkey instances by consistent hashing. Each handles ~125K ops/sec, well under the 200K+ ops/sec per instance.
Fraud Feature Store Latency
Problem: 115K clicks/sec × ~6 feature lookups = 690K Valkey reads/sec.
Fix: Flink async I/O with pipelined MGET. Batch 100 clicks' features into a single pipeline of 600 commands completing in ~2ms. Effective: 1,150 pipelines/sec, easy for the Valkey cluster.
Backpressure When ClickHouse Slows Down
Problem: if ClickHouse slows (compaction backed up, replication lag, query overload), the Flink speed layer's JDBC sink starts taking longer. Flink's backpressure signal propagates upstream: windowing slows, fraud-join slows, dedup slows, and eventually the Kafka source stops pulling. Kafka lag grows.
Fix — three independent safety valves:
- Kafka absorbs the lag. With 15 brokers at ~16.8 TB of 7-day retention, Kafka can buffer hours of ingest at peak without losing data. Lag becomes visible, not lost.
- Flink does not block ingestion. The click collector produces to Kafka directly and responds
202 Acceptedin < 10 ms regardless of Flink's state. Clicks keep flowing in even if every downstream consumer is paused. - Dashboard falls back to batch-reconciled data. If speed-layer freshness exceeds 2 minutes, the dashboard API flips the
sourcefield fromclickhousetobatch_reconciledand displays the last batch-reconciled row with an "Estimated -- speed layer degraded" badge. Reconciliation still happens at H+15 regardless of speed-layer state.
Monitor: kafka.consumer.lag.seconds > 120 pages on-call. flink.task.backpressure > 80% warns. ClickHouse replication queue > 1000 alerts.
Billing impact during a ClickHouse outage: zero. The batch pipeline reads from Iceberg, not from ClickHouse or Flink, so the hourly Spark job keeps producing correct billing numbers even if the speed layer and ClickHouse are both down.
16. Failure Scenarios
Kafka Broker Failure
One of 15 brokers dies. KRaft detects via heartbeat (5s), elects new leaders (10s), producers retry with idempotence. Data loss: zero (RF=3, min.insync.replicas=2). Click loss: zero. Dashboard stale ~10s for affected partitions.
Flink Speed Layer Crash
TaskManager OOMs on RocksDB. Last checkpoint was 60s ago. JobManager allocates a new TM (K8s), downloads checkpoint from S3, rewinds Kafka offsets, replays ~95s of events. First new emit at T+65s. Data loss: zero. Dashboard stale ~65s. Billing impact: none -- batch layer reads from Iceberg independently. This is the point of Lambda.
Fraud Pipeline Failure
Fraud Flink job crashes, no verdicts produced. After 5 minutes, the FraudJoinFunction in speed layer times out waiting and defaults unscored clicks to LEGITIMATE. At H+15 batch applies correct labels retroactively. Why default to LEGITIMATE, not FRAUDULENT: under-reporting revenue with "$0 clicks" panics advertisers. Temporary over-counting is corrected by batch. No permanent overbilling.
Valkey Dedup Failure
Dedup Valkey cluster goes down. Collector switches to pass-through mode -- all clicks go to Kafka, including duplicates. Flink keyed state catches most. Spark batch catches the rest. Dashboards over-count by 1-3% during the outage. Billing: unaffected.
Complete AZ Failure
Entire AZ lost (~1/3 of all infrastructure). Kafka leaders re-elect from surviving AZs (RF=3 across 3 AZs), Flink restores from S3 checkpoints, ClickHouse replicas serve queries, processing resumes at ~67% capacity in 30s, full capacity in ~10min after K8s reschedules pods. Data loss: zero. Billing impact: none.
17. Deployment
Kubernetes Layout
Flink Savepoint-Based Deployments
1. flink savepoint <job-id> s3://checkpoints/savepoints/
2. flink stop --savepointPath s3://... <job-id>
3. flink run -s s3://checkpoints/savepoints/<savepoint-id> -d new-job.jar
4. flink list -r # verify consumer lag decreasing
Constraint: all operators must have stable uid() strings. Change a uid and Flink can't map old state to the new operator -- state is lost.
Fraud Model Canary
Model updates are high-risk (false positives block legitimate traffic, false negatives let fraud through).
1. Deploy new model to 5% of fraud TaskManagers in shadow mode
2. Both models score every click; only old model's verdict is used
3. Compare verdicts for 2 hours:
- New flags > 20% more fraud: investigate (possible false positives)
- New flags > 20% less fraud: investigate (possible missed fraud)
- Within 5%: proceed to 25% rollout
4. Gradual: 5% → 25% → 50% → 100% over 24 hours
5. Monitor advertiser complaint rate for 48 hours
Multi-Region
Each region ingests locally (low latency). Kafka MirrorMaker 2 replicates events to the primary region (US-East) where batch reconciliation and billing run. Each region has its own speed layer for local dashboards. Billing is centralized to avoid multi-region consistency issues with financial data.
18. Observability
Key Metrics
INGESTION:
clicks.ingested.rate clicks/sec per collector pod
kafka.produce.latency.p99 < 10ms
valkey.dedup.hit.rate duplicates caught
KAFKA:
kafka.consumer.lag per partition (messages)
kafka.consumer.lag.seconds estimated time behind
kafka.under.replicated.partitions should be 0
FLINK SPEED:
flink.checkpoint.duration < 60s
flink.checkpoint.failures should be 0
flink.task.backpressure < 50%
flink.watermark.lag event vs. wall clock
flink.late.events.dropped side output rate
FRAUD:
fraud.verdict.distribution legit / suspicious / fraudulent %
fraud.model.latency.p99 < 500ms
fraud.feature.store.latency.p99 < 5ms
BATCH:
batch.job.duration.minutes < 15
batch.reconciliation.discrepancy < 0.1%
SERVING:
clickhouse.query.latency.p99 < 500ms
dashboard.freshness.seconds age of displayed data
billing.reconciliation.lag.hours since last reconciliation
Critical Alerts
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_lag_seconds{job="flink-aggregation"} > 120
for: 2m
severity: critical
message: "Flink aggregation > 2 minutes behind. Dashboard data stale."
- alert: FlinkCheckpointFailing
expr: flink_checkpoint_failures_total > 0
for: 5m
severity: critical
message: "Flink checkpoints failing. Exactly-once guarantees at risk."
- alert: FraudPipelineDown
expr: fraud_clicks_scored_rate == 0
for: 3m
severity: critical
message: "Fraud pipeline not scoring. All clicks defaulting to LEGITIMATE."
- alert: ReconciliationDiscrepancyHigh
expr: batch_reconciliation_discrepancy_pct > 1.0
severity: critical
message: "Speed vs. batch discrepancy > 1%. Billing accuracy compromised."End-to-End Click Trace
Every click carries a trace context from ingestion to billing:
click_id: clk_550e8400-e29b-41d4
|-- ingestion (click_collector, 2ms)
| |-- valkey_dedup_check (0.3ms)
| `-- kafka_produce (1.5ms)
|-- speed_layer (flink_aggregation, 45s wall clock)
| |-- dedup_function (0.1ms)
| |-- fraud_join (5s, waiting for verdict)
| |-- window_aggregate (0ms, accumulated)
| `-- clickhouse_sink (2ms)
|-- fraud_pipeline (flink_fraud, 300ms)
| |-- rule_engine (5ms)
| |-- feature_extraction (50ms)
| |-- model_inference (200ms)
| `-- verdict_emit (2ms)
|-- batch_reconciliation (spark, H+15)
| |-- iceberg_read (partition pruned)
| |-- dedup + aggregate
| `-- billing_write
`-- billing (invoice_generator, M+1)
19. Security
| Layer | Controls |
|---|---|
| API ingestion | HMAC-SHA256 signed SDK tokens with publisher_id; rate limit 10K clicks/sec per publisher |
| Kafka | SASL/SCRAM + topic-level ACLs (click collectors write-only to raw, Flink read-only, fraud job separate), TLS 1.3, at-rest encryption (dm-crypt) |
| Flink | Isolated K8s namespace + network policies, no external ingress, checkpoints SSE-KMS encrypted, TLS between TaskManagers |
| ClickHouse | X.509 cert auth for writers, read-only user for dashboard API, row-level security isolating advertisers |
| Valkey | AUTH + TLS + ACLs separating dedup (read-write) from fraud features (read-only model-serving) |
| PostgreSQL | TLS cert auth, Flink has no access, Spark has INSERT/UPDATE, billing API read-only, PII fields encrypted with app-level KMS |
| Financial data | Billing records immutable after FINALIZED, all mutations logged, 7-year backup retention, access logged and approval-gated |
| Data privacy | Raw user_id/ip retained 7d in Kafka, 90d in Iceberg with hashed user_id, 3y in audit log; deletion requests via Kafka tombstones |
| Fraud data | IP blocklists and model features confidential; advertisers see only aggregate counts, never raw signals (prevents reverse-engineering) |
20. Consistency Model
| Operation | Guarantee | Mechanism |
|---|---|---|
| Click → Kafka | At-least-once (deduplicated downstream) | Idempotent producer, acks=all |
| Speed aggregation | Exactly-once within Flink | Checkpoint + 2PC sink |
| Dashboard read | Eventually consistent | ClickHouse 30-60s behind |
| Batch reconciliation | Exactly-once | Deterministic Spark, UPSERT to PostgreSQL |
| Billing read | Strongly consistent | PostgreSQL primary with synchronous replication |
| Cross-layer (speed vs. batch) | Eventually consistent | Batch overwrites speed hourly |
AP for dashboards (serves stale data during partitions), CP for billing (PostgreSQL primary rejects writes during split-brain). Dashboard staleness is acceptable; billing inconsistency is not.
21. Key Design Principles
The Hierarchy
| Principle | Role |
|---|---|
| KAFKA REMEMBERS EVERYTHING | Immutable event log. Only source of raw truth. Every downstream system is derived. |
| FLINK COMPUTES FAST | Speed layer: real-time aggregation, exactly-once. Feeds dashboards. |
| ICEBERG REMEMBERS HISTORY | Data lake: raw events columnar. Source for batch reconciliation. |
| SPARK COMPUTES TRUTH | Batch layer: penny-accurate. Feeds billing. Overrides speed. |
| CLICKHOUSE SERVES QUERIES | OLAP store: sub-second dashboard queries. Receives from speed and batch. |
| POSTGRESQL STORES MONEY | Billing ledger. ACID. Immutable after finalization. |
| VALKEY IS EPHEMERAL | Dedup cache, fraud counters, real-time cache. Disposable, TTL-bound. |
Rules
- Never bill from the speed layer. Speed is "Estimated" on dashboards. Billing comes from batch-reconciled PostgreSQL records.
- Never trust Valkey as truth. It is a cache. If it dies, the system continues without it.
- Kafka is the black box recorder. Everything else is derived. Lost ClickHouse data? Replay from Kafka + Iceberg.
- Writes are always idempotent. ReplacingMergeTree in ClickHouse, UPSERT in PostgreSQL, SET NX in Valkey. Every sink handles replays.
- Financial amounts use integer micros. Never floating point. Division to dollars happens only at display.
- Fraud defaults to LEGITIMATE. On failure, err toward counting clicks. Batch corrects. Under-reporting revenue is worse than temporary over-counting.
- Batch wins every dispute. Speed and batch disagree? Batch is correct. It has complete data, exact dedup, and the latest fraud model.
- Monitor discrepancy obsessively. Speed vs. batch > 0.1%/hour is an alert. > 1% is a page.
- Three layers of dedup. Valkey at ingestion, Flink keyed state in stream, Spark full-data in batch. Defense in depth.
22. Explore the Technologies
Dive deeper into the technologies and infrastructure patterns used in this design:
Core Technologies
| Technology | Role | Learn More |
|---|---|---|
| Kafka | Immutable event log, 10B clicks/day | Kafka |
| Flink | Speed layer, exactly-once streaming | Flink |
| RocksDB | Flink state backend, incremental checkpoints | RocksDB |
| ClickHouse | OLAP dashboard queries over billions of rows | ClickHouse |
| Apache Iceberg | Data lake, ACID, time-travel | Apache Iceberg |
| Spark | Batch reconciliation, billing aggregation | Spark |
| Valkey | Dedup cache, fraud features, real-time cache | Valkey/Redis |
| PostgreSQL | Billing ledger, audit trail | PostgreSQL |
| Prometheus | Pipeline lag, throughput, accuracy metrics | Prometheus |
| Grafana | Dashboards and alerting | Grafana |
Infrastructure Patterns
| Pattern | Role | Learn More |
|---|---|---|
| Message Queues & Event Streaming | Kafka topic design, partitioning by ad_id | Event Streaming |
| Caching Strategies | Valkey dedup, fraud feature store, dashboard cache | Caching Strategies |
| Kubernetes | Flink job deployment, Spark on K8s | Kubernetes |
| Auto-Scaling Patterns | Kafka lag-based scaling, Flink parallelism | Auto-Scaling |
| Metrics & Monitoring | End-to-end observability, billing accuracy | Metrics & Monitoring |
| Object Storage & Data Lake | Iceberg on S3, Flink checkpoints | Object Storage |
| CI/CD Pipelines | Flink savepoints, fraud model canaries | CI/CD |
| Circuit Breaker & Resilience | Valkey dedup fallback, ClickHouse query breakers | Resilience |
| Rate Limiting & Throttling | Per-publisher limits, fraud IP blocking | Rate Limiting |
Practice this design: Design an Ad Click Aggregator -- interview question with progressive hints.
One immutable event log, one speed layer for dashboards, one batch layer for billing, and a fraud detection pipeline running alongside both. Lambda architecture gets criticized for complexity, but when money is on the line, having a batch layer that corrects the stream layer is not a luxury -- it is a requirement. Get the reconciliation right, and the system can tell an advertiser exactly how many legitimate clicks they received, to the penny, backed by an audit trail that holds up under dispute.