Domain: Streaming & Real-time Systems
Batch processing đưa bạn từ dữ liệu ngày hôm qua. Stream processing đưa bạn từ dữ liệu giây trước. Khi phát hiện gian lận phải xảy ra trong 100ms, không phải sáng hôm sau, bạn cần streaming.
1. Batch vs Stream Processing
1.1 So sánh căn bản
Batch Processing:
Input: bounded dataset (file, DB dump)
Output: kết quả sau khi process toàn bộ input
Latency: minutes to hours
Tools: Hadoop MapReduce, Spark, dbt
Stream Processing:
Input: unbounded, continuous stream of events
Output: kết quả liên tục cập nhật
Latency: milliseconds to seconds
Tools: Kafka Streams, Flink, Spark Structured Streaming
1.2 Lambda vs Kappa Architecture
Lambda Architecture (dùng cả hai):
Data Source
│
├── Batch Layer ──────────────────────────► Batch Views
│ (Spark, Hadoop) (accurate, slow)
│ │
└── Speed Layer (Stream) ──────────────────► Real-time Views
(Flink, Kafka Streams) (approximate, fast)
│
Serving Layer (merge)
│
Query
Vấn đề Lambda: maintain 2 codebase cho cùng logic business → drift, complexity.
─────────────────────────────────────────────────────────────────────
Kappa Architecture (chỉ stream):
Data Source
│
└── Stream Layer ───────────────────────────► Views
(Kafka + Flink) (replayable)
Reprocessing: replay từ đầu Kafka topic khi cần recompute
→ Chỉ 1 codebase, đơn giản hơn
→ Trade-off: cần Kafka retention đủ lớn để replay
Khi nào dùng Lambda? Khi cần strong accuracy guarantee (fintech reporting) và stream approximation không đủ. Khi nào dùng Kappa? Hầu hết modern use cases — nếu Kafka retention đủ lớn và stream processing đủ mature.
2. Apache Kafka — Deep Dive
2.1 Core Concepts
Kafka Cluster:
┌─────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Topic A │ │Topic A │ │Topic A │ │
│ │Part 0 │ │Part 1 │ │Part 2 │ │
│ │(Leader) │ │(Leader) │ │(Leader) │ │
│ │Part 1 │ │Part 2 │ │Part 0 │ │
│ │(Replica)│ │(Replica)│ │(Replica)│ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ ZooKeeper / KRaft (metadata, leader election) │
└─────────────────────────────────────────────────────┘
Topic: logical channel cho messages, chia thành partitions. Partition: đơn vị parallelism, ordered, immutable log. Offset: vị trí của message trong partition, monotonically increasing. Replica: bản sao của partition trên broker khác, đảm bảo durability.
2.2 Producer — ghi message
import "github.com/confluentinc/confluent-kafka-go/v2/kafka"
func NewProducer(brokers string) (*kafka.Producer, error) {
return kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"acks": "all", // wait for all ISR replicas
"enable.idempotence": true, // exactly-once producer
"max.in.flight.requests.per.connection": 5,
"retries": 10,
"compression.type": "lz4",
})
}
func PublishEvent(p *kafka.Producer, topic string, key, value []byte) error {
deliveryChan := make(chan kafka.Event, 1)
err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny, // round-robin nếu không có key
},
Key: key, // messages cùng key → cùng partition → ordered
Value: value,
}, deliveryChan)
if err != nil {
return err
}
e := <-deliveryChan
msg := e.(*kafka.Message)
if msg.TopicPartition.Error != nil {
return msg.TopicPartition.Error
}
return nil
}
Partition key quan trọng: Messages cùng key luôn vào cùng partition
→ đảm bảo ordering per-key. Ví dụ: dùng account_id làm key → mọi event
của một account sẽ được process theo thứ tự.
2.3 Consumer Group — đọc và scale
Topic: payments (6 partitions)
Consumer Group A (payment-processor, 3 consumers):
Consumer 1 → Partition 0, 1
Consumer 2 → Partition 2, 3
Consumer 3 → Partition 4, 5
→ Tối đa #consumers = #partitions
Consumer Group B (analytics, 2 consumers):
Consumer 1 → Partition 0, 1, 2
Consumer 2 → Partition 3, 4, 5
→ 2 groups = 2 independent reads, không ảnh hưởng nhau
Rebalance xảy ra khi:
- Consumer join/leave group
- Topic partition count thay đổi
→ Trong rebalance, consumption tạm dừng (stop-the-world)
→ Dùng "cooperative rebalance" (Kafka 2.4+) để giảm downtime
func StartConsumer(brokers, topic, groupID string, handler func([]byte) error) {
consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": groupID,
"auto.offset.reset": "earliest",
"enable.auto.commit": false, // manual commit để control
"max.poll.interval.ms": 300000, // 5 phút max để process 1 batch
})
defer consumer.Close()
consumer.Subscribe([]string{topic}, nil)
for {
msg, err := consumer.ReadMessage(100 * time.Millisecond)
if err != nil {
continue // timeout, retry
}
if err := handler(msg.Value); err != nil {
// Xử lý error: retry, DLQ, hoặc skip
sendToDLQ(msg)
} else {
// Commit offset SAU KHI xử lý thành công
consumer.CommitMessage(msg)
}
}
}
2.4 Offset Management và Delivery Semantics
At-most-once (có thể mất message):
Commit offset TRƯỚC khi process
→ Nếu crash sau commit nhưng trước process → message lost
At-least-once (có thể duplicate):
Commit offset SAU KHI process
→ Nếu crash sau process nhưng trước commit → reprocess → duplicate
→ Phổ biến nhất, cần consumer idempotent
Exactly-once:
Kafka Transactions API:
- Producer: enable.idempotence = true + transactional.id
- Consumer: isolation.level = "read_committed"
→ Atomic produce + offset commit trong một transaction
→ Overhead ~10-15% performance, chỉ dùng khi thực sự cần
2.5 Kafka Internal: Log Compaction
Normal retention: giữ N ngày, sau đó xóa
Log compaction: giữ LATEST message per key
Trước compaction:
offset: 0 key=alice value={balance: 100}
offset: 1 key=bob value={balance: 200}
offset: 2 key=alice value={balance: 150}
offset: 3 key=alice value={balance: 120}
Sau compaction:
offset: 1 key=bob value={balance: 200}
offset: 3 key=alice value={balance: 120}
Use case: changelog topics (Kafka Streams state store backup),
config topics, user profile topics
3. Stream Processing Frameworks
3.1 Kafka Streams
// Kafka Streams (Java-centric, nhưng concept tương tự ở Go với Goka)
// Ví dụ: count transactions per account trong 5-minute windows
// Go với thư viện Goka:
import "github.com/lovoo/goka"
func processPayment(ctx goka.Context, msg interface{}) {
payment := msg.(*Payment)
// Lấy state hiện tại (per-partition, local RocksDB)
var count int64
if val := ctx.Value(); val != nil {
count = val.(int64)
}
count++
ctx.SetValue(count) // lưu state vào local store
if count > 100 { // threshold
ctx.Emit("alerts-topic", ctx.Key(), &Alert{
AccountID: ctx.Key(),
Reason: "high_velocity",
})
}
}
Kafka Streams đặc điểm:
- Chạy embedded trong application process (không phải separate cluster)
- State store local (RocksDB), backed up bởi Kafka changelog topics
- Horizontal scale = tăng số instances → Kafka tự rebalance partitions
- Phù hợp khi muốn đơn giản, không muốn manage Flink cluster
3.2 Apache Flink Concepts
Flink Job:
Source (Kafka) → Operators → Sink (DB, Kafka)
Operator types:
- Map: transform 1 event → 1 event
- Filter: drop events
- FlatMap: 1 event → 0..N events
- KeyBy: group by key (routing to same task)
- Window: aggregate over time window
- Async: gọi external service (enrichment)
Flink State:
- Keyed State: per-key state (most common)
- Operator State: per-task state (e.g., Kafka offset)
Flink Checkpointing:
- Snapshot state định kỳ vào durable storage (S3, HDFS)
- Khi failure: restart từ checkpoint
- Exactly-once với external systems qua Two-Phase Commit sink
4. Windowing và Watermarks
4.1 Window Types
Event stream (thời gian là event time, không phải processing time):
Events: e1(t=0) e2(t=2) e3(t=5) e4(t=7) e5(t=10) e6(t=12)
Tumbling Window (5s, non-overlapping):
[0-5): e1, e2, e3 → aggregate
[5-10): e4, e5 → aggregate
[10-15): e6 → aggregate
Sliding Window (size=10s, slide=5s, overlapping):
[0-10): e1,e2,e3,e4,e5 → aggregate
[5-15): e3,e4,e5,e6 → aggregate
Session Window (gap=3s):
e1,e2,e3 → session 1 (gap < 3s giữa các event)
[gap > 3s]
e4,e5 → session 2
[gap > 3s]
e6 → session 3
Use cases:
Tumbling: hourly/daily reports
Sliding: moving averages, "activity in last N minutes"
Session: user session analytics, shopping cart abandonment
4.2 Watermarks — Xử lý Late-arriving Events
Vấn đề: event được generate lúc t=5, nhưng arrive ở processor lúc t=12
do mạng chậm, mobile offline rồi sync lại, v.v.
Nếu window [5-10) đã close khi event t=5 arrive → mất data!
Watermark W(t): "Tôi tin rằng tất cả events có timestamp < t đã arrive"
Processing timeline:
event t=0 arrive
event t=3 arrive
event t=2 arrive (late, nhưng watermark chưa pass t=3 → vẫn ok)
Watermark advance đến t=3
event t=7 arrive
Watermark advance đến t=7
→ Window [0,5) có thể trigger (watermark đã pass t=5)
event t=4 arrive (LATE! watermark đã ở t=7)
→ Drop hoặc gửi vào side output
Flink watermark strategy:
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
// Cho phép event đến trễ tối đa 5 giây
5. Event-Driven Architecture Patterns
5.1 Event Sourcing + Kafka
Traditional:
User clicks "Buy" → UPDATE orders SET status='paid'
Mất lịch sử, khó debug, khó replay
Event Sourcing:
User clicks "Buy" → Publish OrderPlaced event
User pays → Publish PaymentReceived event
Order ships → Publish OrderShipped event
Current state = replay all events
Kafka topic = source of truth (với log compaction)
Benefits:
- Complete audit trail (fintech requirement!)
- Temporal queries ("trạng thái lúc T là gì?")
- Replay để rebuild projections
- Easy to add new consumers
5.2 CDC — Change Data Capture với Debezium
Vấn đề: Làm sao sync dữ liệu từ PostgreSQL vào Kafka mà không thay đổi
application code và không miss bất kỳ thay đổi nào?
Debezium đọc PostgreSQL WAL (Write-Ahead Log):
PostgreSQL
│
│ WAL (wal_level = logical)
│ INSERT/UPDATE/DELETE operations
▼
Debezium Connector (Kafka Connect)
│ Reads replication slot
▼
Kafka Topic: postgres.public.payments
{
"op": "c", // c=create, u=update, d=delete
"before": null,
"after": {
"id": "uuid-123",
"amount": 100000,
"status": "completed"
},
"ts_ms": 1713340800000
}
│
├──► Elasticsearch (search index sync)
├──► Data Warehouse (analytics)
└──► Cache invalidation service
# Debezium PostgreSQL Connector config
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.dbname": "payments",
"database.server.name": "postgres",
"table.include.list": "public.payments,public.accounts",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication"
}
6. Use Cases Thực tế
6.1 Real-time Fraud Detection Stream
Payment Events (Kafka)
│
▼
Flink Job: Fraud Detection
│
├── KeyBy(account_id)
│
├── Window(5 min tumbling)
│ → COUNT(transactions) per account
│ → SUM(amount) per account
│
├── Apply FraudRules:
│ IF count > 20 OR sum > 50M → fraud signal
│
└── Sink:
├── Kafka: fraud-alerts topic (cho notification service)
└── Redis: update account risk score (cho real-time API)
Latency: < 200ms từ payment event đến fraud decision
6.2 Real-time Analytics Dashboard
User Actions (clickstream)
│
├── Kafka producer (browser/app SDK)
│
▼
Kafka topic: user-events (16 partitions)
│
▼
Flink Job: Aggregate
│
├── Filter: remove bots (IP blacklist)
├── Enrich: lookup user profile (async API call)
├── Window: 1-minute tumbling
│ → Active users count (HyperLogLog)
│ → Page view count
│ → Revenue sum
│
└── Sink:
├── ClickHouse (OLAP, cho dashboard queries)
└── Kafka: aggregated-metrics (cho other consumers)
Dashboard query: "Revenue in last 5 minutes" → ClickHouse query < 100ms
6.3 Live Leaderboard
Gaming / Loyalty Points System:
Score events → Kafka → Flink
│
▼
Sorted Set (Redis ZADD)
leaderboard:{game_id}
│
▼
WebSocket Server
(push updates to connected clients)
Redis ZADD: O(log N)
Redis ZRANK: O(log N) → "rank của user X là bao nhiêu?"
Redis ZRANGE: O(log N + M) → "top 10 users"
Scale: 1M concurrent users → Redis Cluster, partition by game_id
7. Interview: "Design a Real-time Analytics Dashboard"
Interview insight: Câu hỏi này test bạn về stream processing architecture, data pipeline design, và trade-off giữa latency vs accuracy. Nhiều candidate chỉ nói về database mà quên mất Kafka và windowing.
Framework trả lời
1. Clarify:
- Metrics gì? (pageviews, revenue, DAU, custom events?)
- Latency requirement? (< 1s, < 10s, < 1 min?)
- Data volume? (events/sec?)
- Historical data? (chỉ real-time hay cần look back?)
2. Architecture:
SDK (browser/mobile)
→ Kafka (buffer, durability)
→ Stream Processor (Flink/Kafka Streams)
→ OLAP DB (ClickHouse/Druid) + Redis (hot aggregates)
→ Query API → Dashboard
3. Key Design Decisions:
- Windowing strategy: tumbling cho dashboards
- Late data handling: watermark + allow late arrivals
- Exactly-once: quan trọng cho financial metrics
- Hot key problem: nếu 1 event bị spike (viral post)
→ pre-aggregate at SDK, sampling
4. Scale:
- Kafka partitions = parallelism
- Flink task managers = horizontal scale
- ClickHouse: columnar, vectorized, scale-out
5. Failure handling:
- Kafka retention: 7 days replay
- Flink checkpointing: recovery từ snapshot
- Reconciliation job hàng ngày: batch vs stream mismatch
Điểm thường bị trừ
❌ "Dùng MySQL với cron job chạy mỗi phút" → không phải real-time
❌ Không mention late-arriving events → window kết quả sẽ sai
❌ Không mention backpressure handling → consumer chậm hơn producer → OOM
❌ Không phân biệt event time vs processing time
✓ Mention watermarks và late data strategy
✓ Phân biệt hot path (Redis, < 1s) và cold path (ClickHouse, historical)
✓ Đề cập đến reconciliation để đảm bảo accuracy