Using consumer groups in Kafka with Go allows multiple consumers to share the processing load. Each consumer in a group reads data from different partitions, enabling horizontal scalability and failover capabilities. In this guide, we will demonstrate how to implement Kafka consumer groups in Go.
package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
)
func main() {
// Create a new consumer
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-consumer-group",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %s\n", err)
}
defer c.Close()
// Subscribe to topic
c.SubscribeTopics([]string{"my-topic"}, nil)
// Start consuming messages
for {
msg, err := c.ReadMessage(context.Background())
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// Handle errors
fmt.Printf("Consumer error: %v\n", err)
}
}
}
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?