🖥️ OS✍️ Khoa📅 19/04/2026☕ 10 phút đọc

Concurrency & Parallelism Deep Dive — Khi Goroutines Không Đủ

Biết go func() là level Junior. Biết khi nào KHÔNG nên dùng goroutine là level Senior. Biết thiết kế hệ thống concurrency cho cả team sử dụng — đó là Staff.

Go khiến concurrency trông dễ dàng. Quá dễ. Dễ đến mức bạn nghĩ mình đã master — cho đến khi race detector la hét lúc 2 giờ sáng. Bài này sẽ đi sâu vào những gì "beyond goroutines".


1. Concurrency Patterns Production-Ready

1.1 Worker Pool — Pattern phổ biến nhất

// Khi nào dùng: Process N items với M workers (M << N)
// Tại sao không spawn N goroutines: resource exhaustion
// (DB connections, file descriptors, memory)

func WorkerPool(ctx context.Context, jobs <-chan Job, numWorkers int) <-chan Result {
    results := make(chan Result, numWorkers)
    
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                select {
                case <-ctx.Done():
                    return // Graceful shutdown
                default:
                    result := process(job)
                    results <- result
                }
            }
        }(i)
    }
    
    go func() {
        wg.Wait()
        close(results) // Close khi TẤT CẢ workers done
    }()
    
    return results
}

// Sử dụng:
jobs := make(chan Job, 100)
results := WorkerPool(ctx, jobs, 10) // 10 workers

for _, item := range items {
    jobs <- Job{Item: item}
}
close(jobs) // Signal: không còn job nào nữa

for result := range results {
    // Process results
}

1.2 Fan-out / Fan-in

// Fan-out: 1 input → N goroutines xử lý song song
// Fan-in: N outputs → merge vào 1 channel

// Fan-out: distribute work
func fanOut(input <-chan Data, n int) []<-chan Result {
    channels := make([]<-chan Result, n)
    for i := 0; i < n; i++ {
        channels[i] = process(input) // Mỗi goroutine đọc từ cùng input
    }
    return channels
}

// Fan-in: merge results
func fanIn(channels ...<-chan Result) <-chan Result {
    merged := make(chan Result)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan Result) {
            defer wg.Done()
            for result := range c {
                merged <- result
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(merged)
    }()
    
    return merged
}

1.3 Pipeline Pattern

// Stage 1 → Stage 2 → Stage 3
// Mỗi stage chạy trong goroutine riêng
// Data flow qua channels

func stage1(ctx context.Context, input <-chan RawData) <-chan ParsedData {
    out := make(chan ParsedData)
    go func() {
        defer close(out)
        for raw := range input {
            select {
            case <-ctx.Done(): return
            case out <- parse(raw):
            }
        }
    }()
    return out
}

func stage2(ctx context.Context, input <-chan ParsedData) <-chan EnrichedData {
    out := make(chan EnrichedData)
    go func() {
        defer close(out)
        for parsed := range input {
            select {
            case <-ctx.Done(): return
            case out <- enrich(parsed):
            }
        }
    }()
    return out
}

// Compose pipeline:
raw := generateData(ctx)
parsed := stage1(ctx, raw)
enriched := stage2(ctx, parsed)
for result := range enriched {
    save(result)
}

1.4 Semaphore — Giới hạn concurrency

// Khi cần giới hạn concurrent operations (DB connections, API calls)

type Semaphore struct {
    ch chan struct{}
}

func NewSemaphore(max int) *Semaphore {
    return &Semaphore{ch: make(chan struct{}, max)}
}

func (s *Semaphore) Acquire(ctx context.Context) error {
    select {
    case s.ch <- struct{}{}:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (s *Semaphore) Release() {
    <-s.ch
}

// Sử dụng: max 5 concurrent DB queries
sem := NewSemaphore(5)

for _, item := range items {
    go func(item Item) {
        if err := sem.Acquire(ctx); err != nil {
            return
        }
        defer sem.Release()
        
        queryDB(item) // Chỉ 5 queries chạy cùng lúc
    }(item)
}

// Hoặc dùng golang.org/x/sync/semaphore (official)

2. Race Conditions — Những Bug Khó Tìm Nhất

2.1 Data Race vs Race Condition

Data Race (Go race detector bắt được):
  → 2 goroutines access cùng biến, ít nhất 1 là write
  → Không có synchronization
  → go run -race sẽ phát hiện

Race Condition (khó hơn, logic bug):
  → Đúng thứ tự events nhưng kết quả sai
  → Ví dụ: check-then-act (TOCTOU)
  → Race detector KHÔNG bắt được

Data race là technical bug.
Race condition là logic bug.
Cả 2 đều nguy hiểm.

2.2 Check-Then-Act (TOCTOU)

// ❌ Classic race condition: check-then-act
func TransferMoney(from, to *Account, amount int) error {
    if from.Balance >= amount {       // CHECK
        // Goroutine khác có thể đã rút tiền ở đây!
        from.Balance -= amount         // ACT
        to.Balance += amount
    }
    return nil
}

// ✅ Fix: atomic check-and-act
func TransferMoney(from, to *Account, amount int) error {
    from.mu.Lock()
    defer from.mu.Unlock()
    to.mu.Lock()
    defer to.mu.Unlock()
    
    if from.Balance < amount {
        return ErrInsufficientFunds
    }
    from.Balance -= amount
    to.Balance += amount
    return nil
}

// ⚠️ Cẩn thận: lock ordering!
// Nếu goroutine 1: lock(A) → lock(B)
// Và goroutine 2: lock(B) → lock(A)
// → DEADLOCK! 💀

// Fix: luôn lock theo thứ tự cố định (ví dụ: theo ID)
func TransferMoney(a1, a2 *Account, amount int) error {
    first, second := a1, a2
    if a1.ID > a2.ID {
        first, second = a2, a1 // Consistent ordering
    }
    first.mu.Lock()
    defer first.mu.Unlock()
    second.mu.Lock()
    defer second.mu.Unlock()
    // ... transfer logic
}

2.3 Go Race Detector

# Bật race detector (development/testing)
go test -race ./...
go run -race main.go

# Race detector sẽ output:
# WARNING: DATA RACE
# Write at 0x00c000014088 by goroutine 7:
#   main.increment()
#       main.go:15 +0x38
# Previous read at 0x00c000014088 by goroutine 6:
#   main.readCounter()
#       main.go:20 +0x2e

# QUAN TRỌNG:
# → Chạy race detector trong CI (go test -race)
# → Overhead ~2-10x slowdown, 5-10x memory
# → KHÔNG dùng trong production
# → Chỉ phát hiện race đã EXECUTE, không phải potential race

3. Lock-Free & Wait-Free — Khi Mutex Quá Chậm

3.1 atomic package

// Khi chỉ cần increment/read 1 biến đơn giản
// atomic nhanh hơn mutex vì không cần OS scheduler

// ❌ Dùng mutex cho counter đơn giản
type Counter struct {
    mu    sync.Mutex
    count int64
}
func (c *Counter) Inc() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// ✅ Dùng atomic — lock-free, nhanh hơn 3-5x
type Counter struct {
    count atomic.Int64
}
func (c *Counter) Inc() {
    c.count.Add(1)
}
func (c *Counter) Get() int64 {
    return c.count.Load()
}

3.2 Compare-And-Swap (CAS)

// CAS: nền tảng của lock-free algorithms
// "Nếu giá trị hiện tại = expected → set thành new"
// Nếu không (ai đó đã thay đổi) → retry

func lockFreeIncrement(addr *int64) {
    for {
        old := atomic.LoadInt64(addr)
        new := old + 1
        if atomic.CompareAndSwapInt64(addr, old, new) {
            return // Success!
        }
        // Fail = ai đó đã thay đổi → retry
        // Đây là "optimistic locking" ở CPU level
    }
}

3.3 Khi nào KHÔNG dùng lock-free

Lock-free code:
  ✅ Nhanh hơn cho contention thấp (few writers)
  ✅ Không bị deadlock (không có lock)
  
  ❌ Rất khó viết đúng (subtle bugs)
  ❌ Khó debug, khó test
  ❌ Không nhanh hơn với contention cao
  ❌ Code khó đọc, khó maintain

Rule of thumb:
  → Dùng channels cho communication (Go philosophy)
  → Dùng sync.Mutex cho shared state đơn giản
  → Dùng atomic cho counters, flags
  → Lock-free data structures: dùng library, ĐỪNG tự viết
  → sync.Map cho read-heavy concurrent maps

4. Context Propagation & Graceful Shutdown

4.1 Context Hierarchy

// Context trong Go = cancellation tree
// Parent cancel → tất cả children cancel

func main() {
    // Root context với OS signal handling
    ctx, cancel := signal.NotifyContext(context.Background(),
        syscall.SIGINT, syscall.SIGTERM)
    defer cancel()
    
    // HTTP server
    srv := &http.Server{Addr: ":8080"}
    
    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()
    
    <-ctx.Done() // Chờ signal
    log.Println("Shutting down...")
    
    // Graceful shutdown: chờ in-flight requests
    shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    srv.Shutdown(shutdownCtx)
}

4.2 Graceful Shutdown Pattern

// Pattern đầy đủ cho production service
type Service struct {
    httpServer  *http.Server
    grpcServer  *grpc.Server
    dbPool      *pgxpool.Pool
    kafkaReader *kafka.Reader
    wg          sync.WaitGroup
}

func (s *Service) Shutdown(ctx context.Context) error {
    // 1. Stop accepting new requests
    s.httpServer.Shutdown(ctx)
    s.grpcServer.GracefulStop()
    
    // 2. Stop consuming messages
    s.kafkaReader.Close()
    
    // 3. Wait for in-flight work
    done := make(chan struct{})
    go func() {
        s.wg.Wait()
        close(done)
    }()
    
    select {
    case <-done:
        log.Println("All in-flight work completed")
    case <-ctx.Done():
        log.Println("Shutdown timeout, forcing close")
    }
    
    // 4. Close connections (CUỐI CÙNG)
    s.dbPool.Close()
    return nil
}

// Thứ tự QUAN TRỌNG:
// Stop ingress → drain work → close connections
// Ngược lại = requests fail vì DB đã đóng

5. Distributed Locking

5.1 Redis SETNX

// Simple distributed lock với Redis
// Caveat: KHÔNG safe nếu Redis cluster split brain

func AcquireLock(ctx context.Context, rdb *redis.Client, 
    key string, ttl time.Duration) (bool, error) {
    
    lockValue := uuid.New().String() // Unique owner ID
    
    ok, err := rdb.SetNX(ctx, "lock:"+key, lockValue, ttl).Result()
    if err != nil { return false, err }
    
    return ok, nil // true = acquired, false = already locked
}

func ReleaseLock(ctx context.Context, rdb *redis.Client, 
    key string, lockValue string) error {
    
    // Lua script: chỉ release nếu owner đúng
    // Tránh release lock của người khác!
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        end
        return 0
    `
    _, err := rdb.Eval(ctx, script, []string{"lock:" + key}, lockValue).Result()
    return err
}

5.2 Redlock — Multi-node Redis Locking

Redlock algorithm (Martin Kleppmann có critique):
  1. Lấy current time
  2. Try acquire lock trên N Redis nodes (N=5)
  3. Lock acquired nếu majority (≥3/5) OK và total time < TTL
  4. Effective TTL = TTL - time_to_acquire

Debate (Kleppmann vs Antirez):
  Kleppmann: Redlock không safe vì clock issues
  Antirez (Redis creator): OK cho hầu hết use cases

Thực tế:
  → Nếu cần 100% correctness: dùng etcd hoặc ZooKeeper
  → Nếu chấp nhận rare edge cases: Redis SETNX + TTL đủ
  → Hầu hết real-world: Redis single-node lock đủ tốt

5.3 etcd cho Strong Distributed Locking

// etcd: linearizable distributed lock (strongest guarantee)
import clientv3 "go.etcd.io/etcd/client/v3"

func withLock(ctx context.Context, client *clientv3.Client, 
    lockKey string, fn func() error) error {
    
    session, _ := concurrency.NewSession(client, concurrency.WithTTL(30))
    defer session.Close()
    
    mutex := concurrency.NewMutex(session, "/locks/"+lockKey)
    
    if err := mutex.Lock(ctx); err != nil {
        return fmt.Errorf("acquire lock: %w", err)
    }
    defer mutex.Unlock(ctx)
    
    return fn()
}

6. Rate Limiting Algorithms

6.1 Token Bucket

Bucket chứa N tokens. Mỗi request tiêu 1 token.
Tokens được refill theo rate cố định.

Ví dụ: Bucket size=10, refill rate=5/sec
  → Burst tối đa 10 requests
  → Sustained rate: 5 req/s

Ưu điểm: Cho phép burst, smooth rate limiting
Dùng bởi: AWS API Gateway, Stripe
// Go implementation dùng golang.org/x/time/rate
limiter := rate.NewLimiter(rate.Limit(5), 10) // 5/s, burst 10

func handleRequest(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Rate limited", http.StatusTooManyRequests)
        return
    }
    // Process request
}

6.2 Sliding Window

Fixed Window:
  [0s-60s]: count requests, reject > limit
  Problem: 100 requests ở giây 59, 100 ở giây 61
           = 200 req trong 2 giây, vượt quá intent

Sliding Window Log:
  → Keep timestamp của mỗi request
  → Đếm requests trong window [now-60s, now]
  → Chính xác nhưng memory-intensive

Sliding Window Counter (hybrid):
  → Combine 2 fixed windows
  → current_count = prev_window × overlap% + current_window
  → Memory-efficient, approximately accurate
  → Redis-friendly implementation

7. Tóm tắt

Concurrency Principles cho Go:

  1. Patterns: Worker Pool, Fan-out/Fan-in, Pipeline, Semaphore
  2. Race conditions: Race detector trong CI, lock ordering
  3. Atomic: counters/flags, CAS cho lock-free ops
  4. Context: propagation, timeouts, graceful shutdown
  5. Distributed locking: Redis OK cho hầu hết, etcd cho strict
  6. Rate limiting: Token Bucket phổ biến nhất

  Go Proverb: "Don't communicate by sharing memory,
  share memory by communicating." — Rob Pike
  
  Dịch: Dùng channels, đừng dùng shared variables + mutex.
  (Trừ khi channels quá overhead — thì dùng atomic/mutex)

Tài liệu tham khảo


💡 Remember: "Concurrency is not parallelism." Concurrency = dealing with lots of things at once (design). Parallelism = doing lots of things at once (execution). Bạn có thể có concurrency trên 1 CPU core. 🧠