🏭 Domains✍️ Khoa📅 19/04/2026☕ 11 phút đọc

Domain: Streaming & Real-time Systems

Batch processing đưa bạn từ dữ liệu ngày hôm qua. Stream processing đưa bạn từ dữ liệu giây trước. Khi phát hiện gian lận phải xảy ra trong 100ms, không phải sáng hôm sau, bạn cần streaming.


1. Batch vs Stream Processing

1.1 So sánh căn bản

Batch Processing:
  Input:   bounded dataset (file, DB dump)
  Output:  kết quả sau khi process toàn bộ input
  Latency: minutes to hours
  Tools:   Hadoop MapReduce, Spark, dbt

Stream Processing:
  Input:   unbounded, continuous stream of events
  Output:  kết quả liên tục cập nhật
  Latency: milliseconds to seconds
  Tools:   Kafka Streams, Flink, Spark Structured Streaming

1.2 Lambda vs Kappa Architecture

Lambda Architecture (dùng cả hai):

  Data Source
      │
      ├── Batch Layer ──────────────────────────► Batch Views
      │   (Spark, Hadoop)                         (accurate, slow)
      │                                                │
      └── Speed Layer (Stream) ──────────────────► Real-time Views
          (Flink, Kafka Streams)                   (approximate, fast)
                                                        │
                                              Serving Layer (merge)
                                                        │
                                                     Query

Vấn đề Lambda: maintain 2 codebase cho cùng logic business → drift, complexity.

─────────────────────────────────────────────────────────────────────

Kappa Architecture (chỉ stream):

  Data Source
      │
      └── Stream Layer ───────────────────────────► Views
          (Kafka + Flink)                           (replayable)

  Reprocessing: replay từ đầu Kafka topic khi cần recompute
  → Chỉ 1 codebase, đơn giản hơn
  → Trade-off: cần Kafka retention đủ lớn để replay

Khi nào dùng Lambda? Khi cần strong accuracy guarantee (fintech reporting) và stream approximation không đủ. Khi nào dùng Kappa? Hầu hết modern use cases — nếu Kafka retention đủ lớn và stream processing đủ mature.


2. Apache Kafka — Deep Dive

2.1 Core Concepts

Kafka Cluster:

  ┌─────────────────────────────────────────────────────┐
  │                     Kafka Cluster                   │
  │                                                     │
  │  Broker 1          Broker 2          Broker 3       │
  │  ┌─────────┐       ┌─────────┐       ┌─────────┐   │
  │  │Topic A  │       │Topic A  │       │Topic A  │   │
  │  │Part 0   │       │Part 1   │       │Part 2   │   │
  │  │(Leader) │       │(Leader) │       │(Leader) │   │
  │  │Part 1   │       │Part 2   │       │Part 0   │   │
  │  │(Replica)│       │(Replica)│       │(Replica)│   │
  │  └─────────┘       └─────────┘       └─────────┘   │
  │                                                     │
  │  ZooKeeper / KRaft (metadata, leader election)      │
  └─────────────────────────────────────────────────────┘

Topic: logical channel cho messages, chia thành partitions. Partition: đơn vị parallelism, ordered, immutable log. Offset: vị trí của message trong partition, monotonically increasing. Replica: bản sao của partition trên broker khác, đảm bảo durability.

2.2 Producer — ghi message

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

func NewProducer(brokers string) (*kafka.Producer, error) {
    return kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers":            brokers,
        "acks":                         "all",    // wait for all ISR replicas
        "enable.idempotence":           true,      // exactly-once producer
        "max.in.flight.requests.per.connection": 5,
        "retries":                      10,
        "compression.type":             "lz4",
    })
}

func PublishEvent(p *kafka.Producer, topic string, key, value []byte) error {
    deliveryChan := make(chan kafka.Event, 1)
    err := p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &topic,
            Partition: kafka.PartitionAny, // round-robin nếu không có key
        },
        Key:   key,   // messages cùng key → cùng partition → ordered
        Value: value,
    }, deliveryChan)
    if err != nil {
        return err
    }

    e := <-deliveryChan
    msg := e.(*kafka.Message)
    if msg.TopicPartition.Error != nil {
        return msg.TopicPartition.Error
    }
    return nil
}

Partition key quan trọng: Messages cùng key luôn vào cùng partition → đảm bảo ordering per-key. Ví dụ: dùng account_id làm key → mọi event của một account sẽ được process theo thứ tự.

2.3 Consumer Group — đọc và scale

Topic: payments (6 partitions)

Consumer Group A (payment-processor, 3 consumers):
  Consumer 1 → Partition 0, 1
  Consumer 2 → Partition 2, 3
  Consumer 3 → Partition 4, 5
  → Tối đa #consumers = #partitions

Consumer Group B (analytics, 2 consumers):
  Consumer 1 → Partition 0, 1, 2
  Consumer 2 → Partition 3, 4, 5
  → 2 groups = 2 independent reads, không ảnh hưởng nhau

Rebalance xảy ra khi:
  - Consumer join/leave group
  - Topic partition count thay đổi
  → Trong rebalance, consumption tạm dừng (stop-the-world)
  → Dùng "cooperative rebalance" (Kafka 2.4+) để giảm downtime
func StartConsumer(brokers, topic, groupID string, handler func([]byte) error) {
    consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":        brokers,
        "group.id":                 groupID,
        "auto.offset.reset":        "earliest",
        "enable.auto.commit":       false,     // manual commit để control
        "max.poll.interval.ms":     300000,    // 5 phút max để process 1 batch
    })
    defer consumer.Close()

    consumer.Subscribe([]string{topic}, nil)

    for {
        msg, err := consumer.ReadMessage(100 * time.Millisecond)
        if err != nil {
            continue // timeout, retry
        }

        if err := handler(msg.Value); err != nil {
            // Xử lý error: retry, DLQ, hoặc skip
            sendToDLQ(msg)
        } else {
            // Commit offset SAU KHI xử lý thành công
            consumer.CommitMessage(msg)
        }
    }
}

2.4 Offset Management và Delivery Semantics

At-most-once (có thể mất message):
  Commit offset TRƯỚC khi process
  → Nếu crash sau commit nhưng trước process → message lost

At-least-once (có thể duplicate):
  Commit offset SAU KHI process
  → Nếu crash sau process nhưng trước commit → reprocess → duplicate
  → Phổ biến nhất, cần consumer idempotent

Exactly-once:
  Kafka Transactions API:
    - Producer: enable.idempotence = true + transactional.id
    - Consumer: isolation.level = "read_committed"
    → Atomic produce + offset commit trong một transaction
    → Overhead ~10-15% performance, chỉ dùng khi thực sự cần

2.5 Kafka Internal: Log Compaction

Normal retention: giữ N ngày, sau đó xóa
Log compaction: giữ LATEST message per key

Trước compaction:
  offset: 0   key=alice  value={balance: 100}
  offset: 1   key=bob    value={balance: 200}
  offset: 2   key=alice  value={balance: 150}
  offset: 3   key=alice  value={balance: 120}

Sau compaction:
  offset: 1   key=bob    value={balance: 200}
  offset: 3   key=alice  value={balance: 120}

Use case: changelog topics (Kafka Streams state store backup),
          config topics, user profile topics

3. Stream Processing Frameworks

3.1 Kafka Streams

// Kafka Streams (Java-centric, nhưng concept tương tự ở Go với Goka)
// Ví dụ: count transactions per account trong 5-minute windows

// Go với thư viện Goka:
import "github.com/lovoo/goka"

func processPayment(ctx goka.Context, msg interface{}) {
    payment := msg.(*Payment)
    
    // Lấy state hiện tại (per-partition, local RocksDB)
    var count int64
    if val := ctx.Value(); val != nil {
        count = val.(int64)
    }
    
    count++
    ctx.SetValue(count) // lưu state vào local store
    
    if count > 100 { // threshold
        ctx.Emit("alerts-topic", ctx.Key(), &Alert{
            AccountID: ctx.Key(),
            Reason:    "high_velocity",
        })
    }
}

Kafka Streams đặc điểm:

  • Chạy embedded trong application process (không phải separate cluster)
  • State store local (RocksDB), backed up bởi Kafka changelog topics
  • Horizontal scale = tăng số instances → Kafka tự rebalance partitions
  • Phù hợp khi muốn đơn giản, không muốn manage Flink cluster
Flink Job:

Source (Kafka) → Operators → Sink (DB, Kafka)

Operator types:
  - Map: transform 1 event → 1 event
  - Filter: drop events
  - FlatMap: 1 event → 0..N events
  - KeyBy: group by key (routing to same task)
  - Window: aggregate over time window
  - Async: gọi external service (enrichment)

Flink State:
  - Keyed State: per-key state (most common)
  - Operator State: per-task state (e.g., Kafka offset)

Flink Checkpointing:
  - Snapshot state định kỳ vào durable storage (S3, HDFS)
  - Khi failure: restart từ checkpoint
  - Exactly-once với external systems qua Two-Phase Commit sink

4. Windowing và Watermarks

4.1 Window Types

Event stream (thời gian là event time, không phải processing time):

  Events: e1(t=0) e2(t=2) e3(t=5) e4(t=7) e5(t=10) e6(t=12)

Tumbling Window (5s, non-overlapping):
  [0-5):  e1, e2, e3  → aggregate
  [5-10): e4, e5      → aggregate
  [10-15): e6         → aggregate

Sliding Window (size=10s, slide=5s, overlapping):
  [0-10):  e1,e2,e3,e4,e5  → aggregate
  [5-15):  e3,e4,e5,e6     → aggregate

Session Window (gap=3s):
  e1,e2,e3 → session 1 (gap < 3s giữa các event)
  [gap > 3s]
  e4,e5    → session 2
  [gap > 3s]
  e6       → session 3

Use cases:
  Tumbling: hourly/daily reports
  Sliding:  moving averages, "activity in last N minutes"
  Session:  user session analytics, shopping cart abandonment

4.2 Watermarks — Xử lý Late-arriving Events

Vấn đề: event được generate lúc t=5, nhưng arrive ở processor lúc t=12
         do mạng chậm, mobile offline rồi sync lại, v.v.

Nếu window [5-10) đã close khi event t=5 arrive → mất data!

Watermark W(t): "Tôi tin rằng tất cả events có timestamp < t đã arrive"

Processing timeline:
  event t=0 arrive
  event t=3 arrive
  event t=2 arrive (late, nhưng watermark chưa pass t=3 → vẫn ok)
  Watermark advance đến t=3
  event t=7 arrive
  Watermark advance đến t=7
  → Window [0,5) có thể trigger (watermark đã pass t=5)
  event t=4 arrive (LATE! watermark đã ở t=7)
  → Drop hoặc gửi vào side output

Flink watermark strategy:
  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(5))
    // Cho phép event đến trễ tối đa 5 giây

5. Event-Driven Architecture Patterns

5.1 Event Sourcing + Kafka

Traditional:
  User clicks "Buy" → UPDATE orders SET status='paid'
  Mất lịch sử, khó debug, khó replay

Event Sourcing:
  User clicks "Buy" → Publish OrderPlaced event
  User pays       → Publish PaymentReceived event
  Order ships     → Publish OrderShipped event

  Current state = replay all events
  Kafka topic = source of truth (với log compaction)

Benefits:
  - Complete audit trail (fintech requirement!)
  - Temporal queries ("trạng thái lúc T là gì?")
  - Replay để rebuild projections
  - Easy to add new consumers

5.2 CDC — Change Data Capture với Debezium

Vấn đề: Làm sao sync dữ liệu từ PostgreSQL vào Kafka mà không thay đổi
         application code và không miss bất kỳ thay đổi nào?

Debezium đọc PostgreSQL WAL (Write-Ahead Log):

PostgreSQL
  │
  │  WAL (wal_level = logical)
  │  INSERT/UPDATE/DELETE operations
  ▼
Debezium Connector (Kafka Connect)
  │  Reads replication slot
  ▼
Kafka Topic: postgres.public.payments
  {
    "op": "c",                    // c=create, u=update, d=delete
    "before": null,
    "after": {
      "id": "uuid-123",
      "amount": 100000,
      "status": "completed"
    },
    "ts_ms": 1713340800000
  }
  │
  ├──► Elasticsearch (search index sync)
  ├──► Data Warehouse (analytics)
  └──► Cache invalidation service
# Debezium PostgreSQL Connector config
{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.hostname": "postgres",
  "database.port": "5432",
  "database.user": "debezium",
  "database.dbname": "payments",
  "database.server.name": "postgres",
  "table.include.list": "public.payments,public.accounts",
  "plugin.name": "pgoutput",
  "slot.name": "debezium_slot",
  "publication.name": "debezium_publication"
}

6. Use Cases Thực tế

6.1 Real-time Fraud Detection Stream

Payment Events (Kafka)
      │
      ▼
Flink Job: Fraud Detection
  │
  ├── KeyBy(account_id)
  │
  ├── Window(5 min tumbling)
  │   → COUNT(transactions) per account
  │   → SUM(amount) per account
  │
  ├── Apply FraudRules:
  │   IF count > 20 OR sum > 50M → fraud signal
  │
  └── Sink:
      ├── Kafka: fraud-alerts topic (cho notification service)
      └── Redis: update account risk score (cho real-time API)

Latency: < 200ms từ payment event đến fraud decision

6.2 Real-time Analytics Dashboard

User Actions (clickstream)
      │
      ├── Kafka producer (browser/app SDK)
      │
      ▼
Kafka topic: user-events (16 partitions)
      │
      ▼
Flink Job: Aggregate
  │
  ├── Filter: remove bots (IP blacklist)
  ├── Enrich: lookup user profile (async API call)
  ├── Window: 1-minute tumbling
  │   → Active users count (HyperLogLog)
  │   → Page view count
  │   → Revenue sum
  │
  └── Sink:
      ├── ClickHouse (OLAP, cho dashboard queries)
      └── Kafka: aggregated-metrics (cho other consumers)

Dashboard query: "Revenue in last 5 minutes" → ClickHouse query < 100ms

6.3 Live Leaderboard

Gaming / Loyalty Points System:

Score events → Kafka → Flink
                            │
                            ▼
                      Sorted Set (Redis ZADD)
                      leaderboard:{game_id}
                            │
                            ▼
                      WebSocket Server
                      (push updates to connected clients)

Redis ZADD: O(log N)
Redis ZRANK: O(log N)  → "rank của user X là bao nhiêu?"
Redis ZRANGE: O(log N + M) → "top 10 users"

Scale: 1M concurrent users → Redis Cluster, partition by game_id

7. Interview: "Design a Real-time Analytics Dashboard"

Interview insight: Câu hỏi này test bạn về stream processing architecture, data pipeline design, và trade-off giữa latency vs accuracy. Nhiều candidate chỉ nói về database mà quên mất Kafka và windowing.

Framework trả lời

1. Clarify:
   - Metrics gì? (pageviews, revenue, DAU, custom events?)
   - Latency requirement? (< 1s, < 10s, < 1 min?)
   - Data volume? (events/sec?)
   - Historical data? (chỉ real-time hay cần look back?)

2. Architecture:
   SDK (browser/mobile)
     → Kafka (buffer, durability)
       → Stream Processor (Flink/Kafka Streams)
         → OLAP DB (ClickHouse/Druid) + Redis (hot aggregates)
           → Query API → Dashboard

3. Key Design Decisions:
   - Windowing strategy: tumbling cho dashboards
   - Late data handling: watermark + allow late arrivals
   - Exactly-once: quan trọng cho financial metrics
   - Hot key problem: nếu 1 event bị spike (viral post)
     → pre-aggregate at SDK, sampling

4. Scale:
   - Kafka partitions = parallelism
   - Flink task managers = horizontal scale
   - ClickHouse: columnar, vectorized, scale-out

5. Failure handling:
   - Kafka retention: 7 days replay
   - Flink checkpointing: recovery từ snapshot
   - Reconciliation job hàng ngày: batch vs stream mismatch

Điểm thường bị trừ

❌ "Dùng MySQL với cron job chạy mỗi phút" → không phải real-time
❌ Không mention late-arriving events → window kết quả sẽ sai
❌ Không mention backpressure handling → consumer chậm hơn producer → OOM
❌ Không phân biệt event time vs processing time

✓  Mention watermarks và late data strategy
✓  Phân biệt hot path (Redis, < 1s) và cold path (ClickHouse, historical)
✓  Đề cập đến reconciliation để đảm bảo accuracy