How do I commit offsets safely in Kafka using Go?

In Kafka, committing offsets safely is crucial to ensure that messages are processed exactly once, particularly in Go applications. Below are some guidelines and an example of how to commit offsets safely using the Sarama library, which is a popular Kafka client for Go.

Kafka, Go, offsets, Sarama, Go Kafka client, commit offsets, message processing, exactly-once delivery

This guide provides insights into how to commit offsets safely in Kafka using Go, ensuring reliable message processing in distributed systems.

package main import ( "context" "log" "github.com/Shopify/sarama" ) func main() { // Create a new Sarama Config config := sarama.NewConfig() config.Version = sarama.V2_4_0_0 // Set the Kafka version config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin // Create a new consumer group group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "example-group", config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer group.Close() // Start consuming messages ctx := context.Background() for { err := group.Consume(ctx, []string{"example-topic"}, &consumer{}) if err != nil { log.Fatalf("Error consuming messages: %v", err) } } } type consumer struct{} func (c *consumer) Setup(sarama.ConsumerGroupSession) error { return nil } func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { log.Printf("Received message: %s", string(message.Value)) session.MarkMessage(message, "") // Committing the offset safely } return nil }

Kafka Go offsets Sarama Go Kafka client commit offsets message processing exactly-once delivery