🧠 Programming✍️ Khoa📅 19/04/2026☕ 10 phút đọc

Go Concurrency Patterns

Concurrency patterns giúp bạn xây dựng concurrent systems đúng cách — tránh race conditions, deadlocks, và resource leaks. Đây là các patterns đã được battle-tested trong production.

💡 "Don't communicate by sharing memory; share memory by communicating." — Go proverb


Pattern 1: Worker Pool

Use case: Xử lý nhiều jobs với số lượng workers giới hạn.

Basic worker pool

package main

import (
    "context"
    "fmt"
    "sync"
)

func workerPool(ctx context.Context, numWorkers int, jobs <-chan int) {
    var wg sync.WaitGroup
    
    // Start workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for {
                select {
                case <-ctx.Done():
                    return
                case job, ok := <-jobs:
                    if !ok {
                        return  // Channel closed
                    }
                    // Process job
                    fmt.Printf("Worker %d processing job %d\n", workerID, job)
                }
            }
        }(i)
    }
    
    wg.Wait()
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    jobs := make(chan int, 100)
    
    // Start worker pool
    go workerPool(ctx, 5, jobs)
    
    // Send jobs
    for i := 0; i < 100; i++ {
        jobs <- i
    }
    close(jobs)
}

Worker pool với error handling

type Result struct {
    JobID int
    Data  interface{}
    Err   error
}

func workerPoolWithResults(ctx context.Context, numWorkers int, jobs <-chan int, results chan<- Result) {
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            
            for {
                select {
                case <-ctx.Done():
                    return
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    
                    // Process job
                    data, err := processJob(job)
                    
                    select {
                    case results <- Result{JobID: job, Data: data, Err: err}:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
    }
    
    wg.Wait()
    close(results)
}

Worker pool với backpressure

type WorkerPool struct {
    workers   int
    jobs      chan func()
    semaphore chan struct{}
}

func NewWorkerPool(workers, queueSize int) *WorkerPool {
    wp := &WorkerPool{
        workers:   workers,
        jobs:      make(chan func(), queueSize),
        semaphore: make(chan struct{}, queueSize),
    }
    
    for i := 0; i < workers; i++ {
        go wp.worker()
    }
    
    return wp
}

func (wp *WorkerPool) worker() {
    for job := range wp.jobs {
        job()
        <-wp.semaphore  // Release
    }
}

func (wp *WorkerPool) Submit(job func()) error {
    select {
    case wp.semaphore <- struct{}{}:  // Acquire
        wp.jobs <- job
        return nil
    default:
        return errors.New("queue full")  // Backpressure
    }
}

func (wp *WorkerPool) Close() {
    close(wp.jobs)
}

Pattern 2: Pipeline

Use case: Xử lý data qua nhiều stages.

Basic pipeline

// Stage 1: Generate numbers
func generate(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// Stage 2: Square numbers
func square(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// Stage 3: Filter even numbers
func filterEven(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                select {
                case out <- n:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Build pipeline
    nums := generate(ctx, 1, 2, 3, 4, 5)
    squared := square(ctx, nums)
    filtered := filterEven(ctx, squared)
    
    // Consume
    for n := range filtered {
        fmt.Println(n)
    }
}

Fan-out/Fan-in pipeline

// Fan-out: Nhiều workers xử lý cùng input
func fanOut(ctx context.Context, in <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    
    for i := 0; i < workers; i++ {
        channels[i] = square(ctx, in)
    }
    
    return channels
}

// Fan-in: Merge nhiều channels thành 1
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    // Start goroutine for each input channel
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-ctx.Done():
                return
            }
        }
    }
    
    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }
    
    // Close out when all input channels closed
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    nums := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8)
    
    // Fan-out: 3 workers
    workers := fanOut(ctx, nums, 3)
    
    // Fan-in: merge results
    result := fanIn(ctx, workers...)
    
    for n := range result {
        fmt.Println(n)
    }
}

Pattern 3: Timeout & Cancellation

Context-based timeout

func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    return ioutil.ReadAll(resp.Body)
}

Select với timeout

func processWithTimeout(data chan Data, timeout time.Duration) error {
    select {
    case d := <-data:
        return process(d)
    case <-time.After(timeout):
        return errors.New("timeout")
    }
}

Cancel propagation

func parent(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()  // Cancel all children when parent returns
    
    go child1(ctx)
    go child2(ctx)
    
    // If any child fails, cancel others
    select {
    case <-ctx.Done():
        return
    case err := <-errorChan:
        cancel()  // Cancel all
        return
    }
}

Pattern 4: Rate Limiting

Token bucket

type RateLimiter struct {
    tokens chan struct{}
}

func NewRateLimiter(rate int, burst int) *RateLimiter {
    rl := &RateLimiter{
        tokens: make(chan struct{}, burst),
    }
    
    // Fill tokens
    for i := 0; i < burst; i++ {
        rl.tokens <- struct{}{}
    }
    
    // Refill periodically
    go func() {
        ticker := time.NewTicker(time.Second / time.Duration(rate))
        for range ticker.C {
            select {
            case rl.tokens <- struct{}{}:
            default:
                // Bucket full, discard token
            }
        }
    }()
    
    return rl
}

func (rl *RateLimiter) Allow() bool {
    select {
    case <-rl.tokens:
        return true
    default:
        return false
    }
}

func (rl *RateLimiter) Wait(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

Usage:

limiter := NewRateLimiter(10, 5)  // 10 req/s, burst 5

for i := 0; i < 100; i++ {
    if limiter.Allow() {
        go processRequest(i)
    } else {
        fmt.Println("Rate limited")
    }
}

Sliding window

type SlidingWindow struct {
    mu       sync.Mutex
    requests []time.Time
    limit    int
    window   time.Duration
}

func NewSlidingWindow(limit int, window time.Duration) *SlidingWindow {
    return &SlidingWindow{
        requests: make([]time.Time, 0),
        limit:    limit,
        window:   window,
    }
}

func (sw *SlidingWindow) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    
    now := time.Now()
    cutoff := now.Add(-sw.window)
    
    // Remove old requests
    var valid []time.Time
    for _, t := range sw.requests {
        if t.After(cutoff) {
            valid = append(valid, t)
        }
    }
    sw.requests = valid
    
    // Check limit
    if len(sw.requests) < sw.limit {
        sw.requests = append(sw.requests, now)
        return true
    }
    
    return false
}

Pattern 5: Circuit Breaker

Use case: Tránh gọi service đang fail, fail fast.

type State int

const (
    StateClosed State = iota  // Normal
    StateOpen                 // Failing, don't call
    StateHalfOpen            // Testing if recovered
)

type CircuitBreaker struct {
    mu          sync.Mutex
    state       State
    failures    int
    successes   int
    lastFailure time.Time
    
    maxFailures  int
    timeout      time.Duration
    halfOpenSuccesses int
}

func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        state:             StateClosed,
        maxFailures:       maxFailures,
        timeout:           timeout,
        halfOpenSuccesses: 2,
    }
}

func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mu.Lock()
    
    // Check state
    switch cb.state {
    case StateOpen:
        // Check if should transition to half-open
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = StateHalfOpen
            cb.successes = 0
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker open")
        }
    }
    
    cb.mu.Unlock()
    
    // Execute function
    err := fn()
    
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = StateOpen
        }
        
        return err
    }
    
    // Success
    cb.failures = 0
    
    if cb.state == StateHalfOpen {
        cb.successes++
        if cb.successes >= cb.halfOpenSuccesses {
            cb.state = StateClosed
        }
    }
    
    return nil
}

Pattern 6: Broadcast

Use case: Notify nhiều goroutines cùng lúc.

type Broadcaster struct {
    mu        sync.Mutex
    listeners []chan interface{}
}

func NewBroadcaster() *Broadcaster {
    return &Broadcaster{
        listeners: make([]chan interface{}, 0),
    }
}

func (b *Broadcaster) Subscribe() <-chan interface{} {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    ch := make(chan interface{}, 1)
    b.listeners = append(b.listeners, ch)
    
    return ch
}

func (b *Broadcaster) Broadcast(msg interface{}) {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    for _, ch := range b.listeners {
        select {
        case ch <- msg:
        default:
            // Listener slow, skip
        }
    }
}

func (b *Broadcaster) Close() {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    for _, ch := range b.listeners {
        close(ch)
    }
    b.listeners = nil
}

Pattern 7: Semaphore

Use case: Giới hạn số concurrent operations.

type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
    return make(Semaphore, n)
}

func (s Semaphore) Acquire() {
    s <- struct{}{}
}

func (s Semaphore) Release() {
    <-s
}

func (s Semaphore) TryAcquire() bool {
    select {
    case s <- struct{}{}:
        return true
    default:
        return false
    }
}

Usage:

sem := NewSemaphore(10)  // Max 10 concurrent

for i := 0; i < 100; i++ {
    sem.Acquire()
    go func(id int) {
        defer sem.Release()
        process(id)
    }(i)
}

Hoặc dùng golang.org/x/sync/semaphore:

import "golang.org/x/sync/semaphore"

sem := semaphore.NewWeighted(10)

for i := 0; i < 100; i++ {
    sem.Acquire(ctx, 1)
    go func(id int) {
        defer sem.Release(1)
        process(id)
    }(i)
}

Pattern 8: errgroup

Use case: Chạy nhiều goroutines, thu thập errors.

import "golang.org/x/sync/errgroup"

func fetchAll(urls []string) error {
    var g errgroup.Group
    
    for _, url := range urls {
        url := url  // Capture
        g.Go(func() error {
            return fetch(url)
        })
    }
    
    // Wait for all, return first error
    return g.Wait()
}

Với context:

func fetchAllWithContext(ctx context.Context, urls []string) error {
    g, ctx := errgroup.WithContext(ctx)
    
    for _, url := range urls {
        url := url
        g.Go(func() error {
            return fetchWithContext(ctx, url)
        })
    }
    
    return g.Wait()  // Cancel all if one fails
}

Với limit:

func fetchAllLimited(urls []string, limit int) error {
    g := new(errgroup.Group)
    g.SetLimit(limit)  // Max concurrent goroutines
    
    for _, url := range urls {
        url := url
        g.Go(func() error {
            return fetch(url)
        })
    }
    
    return g.Wait()
}

Anti-patterns

❌ Goroutine leak

// BAD: Goroutine never exits
func leak() {
    ch := make(chan int)
    go func() {
        <-ch  // Block forever
    }()
}  // ch out of scope, goroutine leaked

Fix: Always provide exit path.

func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case <-ch:
        case <-ctx.Done():
            return  // Exit on cancel
        }
    }()
}

❌ Race condition

// BAD: Race on shared variable
counter := 0
for i := 0; i < 1000; i++ {
    go func() {
        counter++  // Race!
    }()
}

Fix: Use sync primitives.

var counter int64
for i := 0; i < 1000; i++ {
    go func() {
        atomic.AddInt64(&counter, 1)
    }()
}

❌ Channel deadlock

// BAD: Send without receiver
ch := make(chan int)
ch <- 42  // Deadlock!

Fix: Use buffered channel hoặc goroutine.

ch := make(chan int, 1)
ch <- 42  // OK

// Or
ch := make(chan int)
go func() { ch <- 42 }()
<-ch  // OK

Tóm tắt Best Practices

✅ Luôn dùng context cho cancellation
✅ Close channels ở sender, không phải receiver
✅ Limit số goroutines (worker pool, semaphore)
✅ Avoid shared mutable state (dùng channels hoặc sync primitives)
✅ Test concurrent code với -race
✅ Monitor goroutine count trong production


Tài liệu tham khảo