How do I produce and consume messages in Kafka using Go?

In this tutorial, we will explore how to produce and consume messages in Apache Kafka using the Go programming language. Apache Kafka is a distributed streaming platform capable of handling trillions of events a day. With the `confluent-kafka-go` library, we can easily connect to Kafka from our Go applications.

Producing Messages to Kafka

To produce messages, you first need to create a Kafka producer. Below is an example of how to do this using Go.

import ( "github.com/confluentinc/confluent-kafka-go/kafka" "log" ) func main() { // Create a new Kafka producer producer, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", }) if err != nil { log.Fatalf("Failed to create producer: %s", err) } // Produce messages topic := "test_topic" for _, word := range []string{"Hello", "Kafka", "from", "Go"} { producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } // Wait for message deliveries producer.Flush(15 * 1000) producer.Close() }

Consuming Messages from Kafka

To consume messages, you need to create a Kafka consumer. Here’s a basic implementation to read messages from the specified topic.

import ( "github.com/confluentinc/confluent-kafka-go/kafka" "log" ) func main() { // Create a new Kafka consumer consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "my-group", "auto.offset.reset": "earliest", }) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } // Subscribe to the topic consumer.Subscribe("test_topic", nil) // Consume messages for { msg, err := consumer.ReadMessage(-1) if err == nil { log.Printf("Received message: %s", msg.Value) } else { log.Printf("Error while consuming: %s", err) } } consumer.Close() }

Go Kafka Confluent Go Kafka Kafka Producer Kafka Consumer