Implementing dead-letter queues (DLQs) in Kafka using Go is an important pattern for handling failures gracefully. A dead-letter queue is a special queue where messages that cannot be processed for some reason are sent, allowing systems to handle errors without losing data.
package main
import (
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// Configure Kafka producer
p, err := kafka.NewProducer(&kafka.ProducerConfig{
BootstrapServers: "localhost:9092",
})
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer p.Close()
// Define topic names
primaryTopic := "my-topic"
dlqTopic := "my-dlq"
// Produce a message
err = produceMessage(p, primaryTopic)
if err != nil {
// On error, produce to DLQ
produceToDLQ(p, dlqTopic, err.Error())
}
}
func produceMessage(p *kafka.Producer, topic string) error {
// Simulate message processing which may fail
return fmt.Errorf("processing error")
}
func produceToDLQ(p *kafka.Producer, topic, message string) {
// Produce error message to dead-letter queue
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(message),
}, nil)
}
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?