Go Concurrency Patterns
Concurrency patterns giúp bạn xây dựng concurrent systems đúng cách — tránh race conditions, deadlocks, và resource leaks. Đây là các patterns đã được battle-tested trong production.
💡 "Don't communicate by sharing memory; share memory by communicating." — Go proverb
Pattern 1: Worker Pool
Use case: Xử lý nhiều jobs với số lượng workers giới hạn.
Basic worker pool
package main
import (
"context"
"fmt"
"sync"
)
func workerPool(ctx context.Context, numWorkers int, jobs <-chan int) {
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return // Channel closed
}
// Process job
fmt.Printf("Worker %d processing job %d\n", workerID, job)
}
}
}(i)
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobs := make(chan int, 100)
// Start worker pool
go workerPool(ctx, 5, jobs)
// Send jobs
for i := 0; i < 100; i++ {
jobs <- i
}
close(jobs)
}
Worker pool với error handling
type Result struct {
JobID int
Data interface{}
Err error
}
func workerPoolWithResults(ctx context.Context, numWorkers int, jobs <-chan int, results chan<- Result) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
// Process job
data, err := processJob(job)
select {
case results <- Result{JobID: job, Data: data, Err: err}:
case <-ctx.Done():
return
}
}
}
}()
}
wg.Wait()
close(results)
}
Worker pool với backpressure
type WorkerPool struct {
workers int
jobs chan func()
semaphore chan struct{}
}
func NewWorkerPool(workers, queueSize int) *WorkerPool {
wp := &WorkerPool{
workers: workers,
jobs: make(chan func(), queueSize),
semaphore: make(chan struct{}, queueSize),
}
for i := 0; i < workers; i++ {
go wp.worker()
}
return wp
}
func (wp *WorkerPool) worker() {
for job := range wp.jobs {
job()
<-wp.semaphore // Release
}
}
func (wp *WorkerPool) Submit(job func()) error {
select {
case wp.semaphore <- struct{}{}: // Acquire
wp.jobs <- job
return nil
default:
return errors.New("queue full") // Backpressure
}
}
func (wp *WorkerPool) Close() {
close(wp.jobs)
}
Pattern 2: Pipeline
Use case: Xử lý data qua nhiều stages.
Basic pipeline
// Stage 1: Generate numbers
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
// Stage 2: Square numbers
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Build pipeline
nums := generate(ctx, 1, 2, 3, 4, 5)
squared := square(ctx, nums)
filtered := filterEven(ctx, squared)
// Consume
for n := range filtered {
fmt.Println(n)
}
}
Fan-out/Fan-in pipeline
// Fan-out: Nhiều workers xử lý cùng input
func fanOut(ctx context.Context, in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = square(ctx, in)
}
return channels
}
// Fan-in: Merge nhiều channels thành 1
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start goroutine for each input channel
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// Close out when all input channels closed
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8)
// Fan-out: 3 workers
workers := fanOut(ctx, nums, 3)
// Fan-in: merge results
result := fanIn(ctx, workers...)
for n := range result {
fmt.Println(n)
}
}
Pattern 3: Timeout & Cancellation
Context-based timeout
func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
Select với timeout
func processWithTimeout(data chan Data, timeout time.Duration) error {
select {
case d := <-data:
return process(d)
case <-time.After(timeout):
return errors.New("timeout")
}
}
Cancel propagation
func parent(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // Cancel all children when parent returns
go child1(ctx)
go child2(ctx)
// If any child fails, cancel others
select {
case <-ctx.Done():
return
case err := <-errorChan:
cancel() // Cancel all
return
}
}
Pattern 4: Rate Limiting
Token bucket
type RateLimiter struct {
tokens chan struct{}
}
func NewRateLimiter(rate int, burst int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, burst),
}
// Fill tokens
for i := 0; i < burst; i++ {
rl.tokens <- struct{}{}
}
// Refill periodically
go func() {
ticker := time.NewTicker(time.Second / time.Duration(rate))
for range ticker.C {
select {
case rl.tokens <- struct{}{}:
default:
// Bucket full, discard token
}
}
}()
return rl
}
func (rl *RateLimiter) Allow() bool {
select {
case <-rl.tokens:
return true
default:
return false
}
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
Usage:
limiter := NewRateLimiter(10, 5) // 10 req/s, burst 5
for i := 0; i < 100; i++ {
if limiter.Allow() {
go processRequest(i)
} else {
fmt.Println("Rate limited")
}
}
Sliding window
type SlidingWindow struct {
mu sync.Mutex
requests []time.Time
limit int
window time.Duration
}
func NewSlidingWindow(limit int, window time.Duration) *SlidingWindow {
return &SlidingWindow{
requests: make([]time.Time, 0),
limit: limit,
window: window,
}
}
func (sw *SlidingWindow) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
cutoff := now.Add(-sw.window)
// Remove old requests
var valid []time.Time
for _, t := range sw.requests {
if t.After(cutoff) {
valid = append(valid, t)
}
}
sw.requests = valid
// Check limit
if len(sw.requests) < sw.limit {
sw.requests = append(sw.requests, now)
return true
}
return false
}
Pattern 5: Circuit Breaker
Use case: Tránh gọi service đang fail, fail fast.
type State int
const (
StateClosed State = iota // Normal
StateOpen // Failing, don't call
StateHalfOpen // Testing if recovered
)
type CircuitBreaker struct {
mu sync.Mutex
state State
failures int
successes int
lastFailure time.Time
maxFailures int
timeout time.Duration
halfOpenSuccesses int
}
func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
maxFailures: maxFailures,
timeout: timeout,
halfOpenSuccesses: 2,
}
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
// Check state
switch cb.state {
case StateOpen:
// Check if should transition to half-open
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = StateHalfOpen
cb.successes = 0
} else {
cb.mu.Unlock()
return errors.New("circuit breaker open")
}
}
cb.mu.Unlock()
// Execute function
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = StateOpen
}
return err
}
// Success
cb.failures = 0
if cb.state == StateHalfOpen {
cb.successes++
if cb.successes >= cb.halfOpenSuccesses {
cb.state = StateClosed
}
}
return nil
}
Pattern 6: Broadcast
Use case: Notify nhiều goroutines cùng lúc.
type Broadcaster struct {
mu sync.Mutex
listeners []chan interface{}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
listeners: make([]chan interface{}, 0),
}
}
func (b *Broadcaster) Subscribe() <-chan interface{} {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan interface{}, 1)
b.listeners = append(b.listeners, ch)
return ch
}
func (b *Broadcaster) Broadcast(msg interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.listeners {
select {
case ch <- msg:
default:
// Listener slow, skip
}
}
}
func (b *Broadcaster) Close() {
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.listeners {
close(ch)
}
b.listeners = nil
}
Pattern 7: Semaphore
Use case: Giới hạn số concurrent operations.
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore {
return make(Semaphore, n)
}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) Release() {
<-s
}
func (s Semaphore) TryAcquire() bool {
select {
case s <- struct{}{}:
return true
default:
return false
}
}
Usage:
sem := NewSemaphore(10) // Max 10 concurrent
for i := 0; i < 100; i++ {
sem.Acquire()
go func(id int) {
defer sem.Release()
process(id)
}(i)
}
Hoặc dùng golang.org/x/sync/semaphore:
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(10)
for i := 0; i < 100; i++ {
sem.Acquire(ctx, 1)
go func(id int) {
defer sem.Release(1)
process(id)
}(i)
}
Pattern 8: errgroup
Use case: Chạy nhiều goroutines, thu thập errors.
import "golang.org/x/sync/errgroup"
func fetchAll(urls []string) error {
var g errgroup.Group
for _, url := range urls {
url := url // Capture
g.Go(func() error {
return fetch(url)
})
}
// Wait for all, return first error
return g.Wait()
}
Với context:
func fetchAllWithContext(ctx context.Context, urls []string) error {
g, ctx := errgroup.WithContext(ctx)
for _, url := range urls {
url := url
g.Go(func() error {
return fetchWithContext(ctx, url)
})
}
return g.Wait() // Cancel all if one fails
}
Với limit:
func fetchAllLimited(urls []string, limit int) error {
g := new(errgroup.Group)
g.SetLimit(limit) // Max concurrent goroutines
for _, url := range urls {
url := url
g.Go(func() error {
return fetch(url)
})
}
return g.Wait()
}
Anti-patterns
❌ Goroutine leak
// BAD: Goroutine never exits
func leak() {
ch := make(chan int)
go func() {
<-ch // Block forever
}()
} // ch out of scope, goroutine leaked
Fix: Always provide exit path.
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case <-ch:
case <-ctx.Done():
return // Exit on cancel
}
}()
}
❌ Race condition
// BAD: Race on shared variable
counter := 0
for i := 0; i < 1000; i++ {
go func() {
counter++ // Race!
}()
}
Fix: Use sync primitives.
var counter int64
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&counter, 1)
}()
}
❌ Channel deadlock
// BAD: Send without receiver
ch := make(chan int)
ch <- 42 // Deadlock!
Fix: Use buffered channel hoặc goroutine.
ch := make(chan int, 1)
ch <- 42 // OK
// Or
ch := make(chan int)
go func() { ch <- 42 }()
<-ch // OK
Tóm tắt Best Practices
✅ Luôn dùng context cho cancellation
✅ Close channels ở sender, không phải receiver
✅ Limit số goroutines (worker pool, semaphore)
✅ Avoid shared mutable state (dùng channels hoặc sync primitives)
✅ Test concurrent code với -race
✅ Monitor goroutine count trong production
Tài liệu tham khảo
- Go Concurrency Patterns: https://go.dev/blog/pipelines
- Advanced Go Concurrency Patterns: https://go.dev/blog/io2013-talk-concurrency
- Bryan C. Mills - Rethinking Classical Concurrency Patterns: https://www.youtube.com/watch?v=5zXAHh5tJqQ
- errgroup package: https://pkg.go.dev/golang.org/x/sync/errgroup