In this tutorial, we will explore how to produce and consume messages in Apache Kafka using the Go programming language. Apache Kafka is a distributed streaming platform capable of handling trillions of events a day. With the `confluent-kafka-go` library, we can easily connect to Kafka from our Go applications.
To produce messages, you first need to create a Kafka producer. Below is an example of how to do this using Go.
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
)
func main() {
// Create a new Kafka producer
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
// Produce messages
topic := "test_topic"
for _, word := range []string{"Hello", "Kafka", "from", "Go"} {
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries
producer.Flush(15 * 1000)
producer.Close()
}
To consume messages, you need to create a Kafka consumer. Here’s a basic implementation to read messages from the specified topic.
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
)
func main() {
// Create a new Kafka consumer
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
// Subscribe to the topic
consumer.Subscribe("test_topic", nil)
// Consume messages
for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
log.Printf("Received message: %s", msg.Value)
} else {
log.Printf("Error while consuming: %s", err)
}
}
consumer.Close()
}
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?