Concurrency & Parallelism Deep Dive — Khi Goroutines Không Đủ
Biết
go func()là level Junior. Biết khi nào KHÔNG nên dùng goroutine là level Senior. Biết thiết kế hệ thống concurrency cho cả team sử dụng — đó là Staff.
Go khiến concurrency trông dễ dàng. Quá dễ. Dễ đến mức bạn nghĩ mình đã master — cho đến khi race detector la hét lúc 2 giờ sáng. Bài này sẽ đi sâu vào những gì "beyond goroutines".
1. Concurrency Patterns Production-Ready
1.1 Worker Pool — Pattern phổ biến nhất
// Khi nào dùng: Process N items với M workers (M << N)
// Tại sao không spawn N goroutines: resource exhaustion
// (DB connections, file descriptors, memory)
func WorkerPool(ctx context.Context, jobs <-chan Job, numWorkers int) <-chan Result {
results := make(chan Result, numWorkers)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return // Graceful shutdown
default:
result := process(job)
results <- result
}
}
}(i)
}
go func() {
wg.Wait()
close(results) // Close khi TẤT CẢ workers done
}()
return results
}
// Sử dụng:
jobs := make(chan Job, 100)
results := WorkerPool(ctx, jobs, 10) // 10 workers
for _, item := range items {
jobs <- Job{Item: item}
}
close(jobs) // Signal: không còn job nào nữa
for result := range results {
// Process results
}
1.2 Fan-out / Fan-in
// Fan-out: 1 input → N goroutines xử lý song song
// Fan-in: N outputs → merge vào 1 channel
// Fan-out: distribute work
func fanOut(input <-chan Data, n int) []<-chan Result {
channels := make([]<-chan Result, n)
for i := 0; i < n; i++ {
channels[i] = process(input) // Mỗi goroutine đọc từ cùng input
}
return channels
}
// Fan-in: merge results
func fanIn(channels ...<-chan Result) <-chan Result {
merged := make(chan Result)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan Result) {
defer wg.Done()
for result := range c {
merged <- result
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
1.3 Pipeline Pattern
// Stage 1 → Stage 2 → Stage 3
// Mỗi stage chạy trong goroutine riêng
// Data flow qua channels
func stage1(ctx context.Context, input <-chan RawData) <-chan ParsedData {
out := make(chan ParsedData)
go func() {
defer close(out)
for raw := range input {
select {
case <-ctx.Done(): return
case out <- parse(raw):
}
}
}()
return out
}
func stage2(ctx context.Context, input <-chan ParsedData) <-chan EnrichedData {
out := make(chan EnrichedData)
go func() {
defer close(out)
for parsed := range input {
select {
case <-ctx.Done(): return
case out <- enrich(parsed):
}
}
}()
return out
}
// Compose pipeline:
raw := generateData(ctx)
parsed := stage1(ctx, raw)
enriched := stage2(ctx, parsed)
for result := range enriched {
save(result)
}
1.4 Semaphore — Giới hạn concurrency
// Khi cần giới hạn concurrent operations (DB connections, API calls)
type Semaphore struct {
ch chan struct{}
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{ch: make(chan struct{}, max)}
}
func (s *Semaphore) Acquire(ctx context.Context) error {
select {
case s.ch <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (s *Semaphore) Release() {
<-s.ch
}
// Sử dụng: max 5 concurrent DB queries
sem := NewSemaphore(5)
for _, item := range items {
go func(item Item) {
if err := sem.Acquire(ctx); err != nil {
return
}
defer sem.Release()
queryDB(item) // Chỉ 5 queries chạy cùng lúc
}(item)
}
// Hoặc dùng golang.org/x/sync/semaphore (official)
2. Race Conditions — Những Bug Khó Tìm Nhất
2.1 Data Race vs Race Condition
Data Race (Go race detector bắt được):
→ 2 goroutines access cùng biến, ít nhất 1 là write
→ Không có synchronization
→ go run -race sẽ phát hiện
Race Condition (khó hơn, logic bug):
→ Đúng thứ tự events nhưng kết quả sai
→ Ví dụ: check-then-act (TOCTOU)
→ Race detector KHÔNG bắt được
Data race là technical bug.
Race condition là logic bug.
Cả 2 đều nguy hiểm.
2.2 Check-Then-Act (TOCTOU)
// ❌ Classic race condition: check-then-act
func TransferMoney(from, to *Account, amount int) error {
if from.Balance >= amount { // CHECK
// Goroutine khác có thể đã rút tiền ở đây!
from.Balance -= amount // ACT
to.Balance += amount
}
return nil
}
// ✅ Fix: atomic check-and-act
func TransferMoney(from, to *Account, amount int) error {
from.mu.Lock()
defer from.mu.Unlock()
to.mu.Lock()
defer to.mu.Unlock()
if from.Balance < amount {
return ErrInsufficientFunds
}
from.Balance -= amount
to.Balance += amount
return nil
}
// ⚠️ Cẩn thận: lock ordering!
// Nếu goroutine 1: lock(A) → lock(B)
// Và goroutine 2: lock(B) → lock(A)
// → DEADLOCK! 💀
// Fix: luôn lock theo thứ tự cố định (ví dụ: theo ID)
func TransferMoney(a1, a2 *Account, amount int) error {
first, second := a1, a2
if a1.ID > a2.ID {
first, second = a2, a1 // Consistent ordering
}
first.mu.Lock()
defer first.mu.Unlock()
second.mu.Lock()
defer second.mu.Unlock()
// ... transfer logic
}
2.3 Go Race Detector
# Bật race detector (development/testing)
go test -race ./...
go run -race main.go
# Race detector sẽ output:
# WARNING: DATA RACE
# Write at 0x00c000014088 by goroutine 7:
# main.increment()
# main.go:15 +0x38
# Previous read at 0x00c000014088 by goroutine 6:
# main.readCounter()
# main.go:20 +0x2e
# QUAN TRỌNG:
# → Chạy race detector trong CI (go test -race)
# → Overhead ~2-10x slowdown, 5-10x memory
# → KHÔNG dùng trong production
# → Chỉ phát hiện race đã EXECUTE, không phải potential race
3. Lock-Free & Wait-Free — Khi Mutex Quá Chậm
3.1 atomic package
// Khi chỉ cần increment/read 1 biến đơn giản
// atomic nhanh hơn mutex vì không cần OS scheduler
// ❌ Dùng mutex cho counter đơn giản
type Counter struct {
mu sync.Mutex
count int64
}
func (c *Counter) Inc() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// ✅ Dùng atomic — lock-free, nhanh hơn 3-5x
type Counter struct {
count atomic.Int64
}
func (c *Counter) Inc() {
c.count.Add(1)
}
func (c *Counter) Get() int64 {
return c.count.Load()
}
3.2 Compare-And-Swap (CAS)
// CAS: nền tảng của lock-free algorithms
// "Nếu giá trị hiện tại = expected → set thành new"
// Nếu không (ai đó đã thay đổi) → retry
func lockFreeIncrement(addr *int64) {
for {
old := atomic.LoadInt64(addr)
new := old + 1
if atomic.CompareAndSwapInt64(addr, old, new) {
return // Success!
}
// Fail = ai đó đã thay đổi → retry
// Đây là "optimistic locking" ở CPU level
}
}
3.3 Khi nào KHÔNG dùng lock-free
Lock-free code:
✅ Nhanh hơn cho contention thấp (few writers)
✅ Không bị deadlock (không có lock)
❌ Rất khó viết đúng (subtle bugs)
❌ Khó debug, khó test
❌ Không nhanh hơn với contention cao
❌ Code khó đọc, khó maintain
Rule of thumb:
→ Dùng channels cho communication (Go philosophy)
→ Dùng sync.Mutex cho shared state đơn giản
→ Dùng atomic cho counters, flags
→ Lock-free data structures: dùng library, ĐỪNG tự viết
→ sync.Map cho read-heavy concurrent maps
4. Context Propagation & Graceful Shutdown
4.1 Context Hierarchy
// Context trong Go = cancellation tree
// Parent cancel → tất cả children cancel
func main() {
// Root context với OS signal handling
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer cancel()
// HTTP server
srv := &http.Server{Addr: ":8080"}
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
<-ctx.Done() // Chờ signal
log.Println("Shutting down...")
// Graceful shutdown: chờ in-flight requests
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
srv.Shutdown(shutdownCtx)
}
4.2 Graceful Shutdown Pattern
// Pattern đầy đủ cho production service
type Service struct {
httpServer *http.Server
grpcServer *grpc.Server
dbPool *pgxpool.Pool
kafkaReader *kafka.Reader
wg sync.WaitGroup
}
func (s *Service) Shutdown(ctx context.Context) error {
// 1. Stop accepting new requests
s.httpServer.Shutdown(ctx)
s.grpcServer.GracefulStop()
// 2. Stop consuming messages
s.kafkaReader.Close()
// 3. Wait for in-flight work
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
log.Println("All in-flight work completed")
case <-ctx.Done():
log.Println("Shutdown timeout, forcing close")
}
// 4. Close connections (CUỐI CÙNG)
s.dbPool.Close()
return nil
}
// Thứ tự QUAN TRỌNG:
// Stop ingress → drain work → close connections
// Ngược lại = requests fail vì DB đã đóng
5. Distributed Locking
5.1 Redis SETNX
// Simple distributed lock với Redis
// Caveat: KHÔNG safe nếu Redis cluster split brain
func AcquireLock(ctx context.Context, rdb *redis.Client,
key string, ttl time.Duration) (bool, error) {
lockValue := uuid.New().String() // Unique owner ID
ok, err := rdb.SetNX(ctx, "lock:"+key, lockValue, ttl).Result()
if err != nil { return false, err }
return ok, nil // true = acquired, false = already locked
}
func ReleaseLock(ctx context.Context, rdb *redis.Client,
key string, lockValue string) error {
// Lua script: chỉ release nếu owner đúng
// Tránh release lock của người khác!
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
end
return 0
`
_, err := rdb.Eval(ctx, script, []string{"lock:" + key}, lockValue).Result()
return err
}
5.2 Redlock — Multi-node Redis Locking
Redlock algorithm (Martin Kleppmann có critique):
1. Lấy current time
2. Try acquire lock trên N Redis nodes (N=5)
3. Lock acquired nếu majority (≥3/5) OK và total time < TTL
4. Effective TTL = TTL - time_to_acquire
Debate (Kleppmann vs Antirez):
Kleppmann: Redlock không safe vì clock issues
Antirez (Redis creator): OK cho hầu hết use cases
Thực tế:
→ Nếu cần 100% correctness: dùng etcd hoặc ZooKeeper
→ Nếu chấp nhận rare edge cases: Redis SETNX + TTL đủ
→ Hầu hết real-world: Redis single-node lock đủ tốt
5.3 etcd cho Strong Distributed Locking
// etcd: linearizable distributed lock (strongest guarantee)
import clientv3 "go.etcd.io/etcd/client/v3"
func withLock(ctx context.Context, client *clientv3.Client,
lockKey string, fn func() error) error {
session, _ := concurrency.NewSession(client, concurrency.WithTTL(30))
defer session.Close()
mutex := concurrency.NewMutex(session, "/locks/"+lockKey)
if err := mutex.Lock(ctx); err != nil {
return fmt.Errorf("acquire lock: %w", err)
}
defer mutex.Unlock(ctx)
return fn()
}
6. Rate Limiting Algorithms
6.1 Token Bucket
Bucket chứa N tokens. Mỗi request tiêu 1 token.
Tokens được refill theo rate cố định.
Ví dụ: Bucket size=10, refill rate=5/sec
→ Burst tối đa 10 requests
→ Sustained rate: 5 req/s
Ưu điểm: Cho phép burst, smooth rate limiting
Dùng bởi: AWS API Gateway, Stripe
// Go implementation dùng golang.org/x/time/rate
limiter := rate.NewLimiter(rate.Limit(5), 10) // 5/s, burst 10
func handleRequest(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Rate limited", http.StatusTooManyRequests)
return
}
// Process request
}
6.2 Sliding Window
Fixed Window:
[0s-60s]: count requests, reject > limit
Problem: 100 requests ở giây 59, 100 ở giây 61
= 200 req trong 2 giây, vượt quá intent
Sliding Window Log:
→ Keep timestamp của mỗi request
→ Đếm requests trong window [now-60s, now]
→ Chính xác nhưng memory-intensive
Sliding Window Counter (hybrid):
→ Combine 2 fixed windows
→ current_count = prev_window × overlap% + current_window
→ Memory-efficient, approximately accurate
→ Redis-friendly implementation
7. Tóm tắt
Concurrency Principles cho Go:
1. Patterns: Worker Pool, Fan-out/Fan-in, Pipeline, Semaphore
2. Race conditions: Race detector trong CI, lock ordering
3. Atomic: counters/flags, CAS cho lock-free ops
4. Context: propagation, timeouts, graceful shutdown
5. Distributed locking: Redis OK cho hầu hết, etcd cho strict
6. Rate limiting: Token Bucket phổ biến nhất
Go Proverb: "Don't communicate by sharing memory,
share memory by communicating." — Rob Pike
Dịch: Dùng channels, đừng dùng shared variables + mutex.
(Trừ khi channels quá overhead — thì dùng atomic/mutex)
Tài liệu tham khảo
- Go Concurrency Patterns — Go Blog
- Go Memory Model — Official spec
- Concurrency in Go — Katherine Cox-Buday (O'Reilly)
- How to do distributed locking — Martin Kleppmann
- Is Redlock Safe? — Antirez's response
💡 Remember: "Concurrency is not parallelism." Concurrency = dealing with lots of things at once (design). Parallelism = doing lots of things at once (execution). Bạn có thể có concurrency trên 1 CPU core. 🧠