In Kafka, committing offsets safely is crucial to ensure that messages are processed exactly once, particularly in Go applications. Below are some guidelines and an example of how to commit offsets safely using the Sarama library, which is a popular Kafka client for Go.
Kafka, Go, offsets, Sarama, Go Kafka client, commit offsets, message processing, exactly-once delivery
This guide provides insights into how to commit offsets safely in Kafka using Go, ensuring reliable message processing in distributed systems.
package main
import (
"context"
"log"
"github.com/Shopify/sarama"
)
func main() {
// Create a new Sarama Config
config := sarama.NewConfig()
config.Version = sarama.V2_4_0_0 // Set the Kafka version
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
// Create a new consumer group
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "example-group", config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer group.Close()
// Start consuming messages
ctx := context.Background()
for {
err := group.Consume(ctx, []string{"example-topic"}, &consumer{})
if err != nil {
log.Fatalf("Error consuming messages: %v", err)
}
}
}
type consumer struct{}
func (c *consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Received message: %s", string(message.Value))
session.MarkMessage(message, "") // Committing the offset safely
}
return nil
}
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?