In Go, the fan-in and fan-out patterns are common concurrency patterns used to manage goroutines effectively. The fan-out pattern allows multiple goroutines to process tasks simultaneously, while the fan-in pattern consolidates results from multiple goroutines into a single channel. Here’s how you can implement these patterns in Go.
package main
import (
"fmt"
"sync"
"time"
)
// worker simulates a task that processes data.
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Second) // Simulate work
}
}
// fanOut creates multiple workers that process jobs from the channel.
func fanOut(numWorkers int, jobs <-chan int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, &wg)
}
wg.Wait()
}
// fanIn merges results from multiple channels into a single channel.
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// Helper function to merge a single channel.
output := func(c <-chan int) {
for job := range c {
out <- job
}
wg.Done()
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// Close the output channel once all input channels are processed.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
jobs1 := make(chan int, 5)
jobs2 := make(chan int, 5)
// Populate jobs
for i := 1; i <= 5; i++ {
jobs1 <- i
jobs2 <- i + 5
}
close(jobs1)
close(jobs2)
// Fan out using multiple workers
go fanOut(3, fanIn(jobs1, jobs2))
// Wait for a while to let workers finish
time.Sleep(6 * time.Second)
}
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?