RabbitMQ supports dead-letter exchanges (DLX) that can be used to route messages that cannot be processed successfully. In this guide, we will demonstrate how to implement a dead-letter queue using RabbitMQ in Go.
streadway/amqp
packageTo begin, you need to create a primary queue along with a dead-letter exchange and queue. Here’s how you can do that:
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
channel, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer channel.Close()
// Declare a dead-letter exchange
err = channel.ExchangeDeclare(
"dlx.exchange", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
// Declare the main queue with the dead-letter exchange
_, err = channel.QueueDeclare(
"main.queue", // name
true, // durable
false, // auto-deleted
false, // exclusive
false, // no-wait
amqp.Table{
"x-dead-letter-exchange": {"dlx.exchange"},
}, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// Declare the dead-letter queue
_, err = channel.QueueDeclare(
"dead.queue", // name
true, // durable
false, // auto-deleted
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare the dead-letter queue: %s", err)
}
log.Println("Dead-letter queue setup successfully!")
}
Now that we have our queues set up, we can publish messages to the main queue. If a message cannot be processed, it will be routed to the dead-letter queue:
err = channel.Publish(
"amq.topic", // exchange
"main.queue", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Test message"),
},
)
To consume messages from the main queue, you can implement the logic to handle message processing. If processing fails, you can reject the message, ensuring it gets routed to the dead-letter queue:
msgs, err := channel.Consume(
"main.queue", // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
for msg := range msgs {
log.Printf("Received a message: %s", msg.Body)
if failedProcessing(msg.Body) {
msg.Nack(false, false) // Not acknowledged, it will go to DLQ
} else {
msg.Ack(false) // Acknowledged
}
}
func failedProcessing(body []byte) bool {
// Simulate processing failure
return true
}
With the above setup, any messages that fail processing will be automatically sent to the dead-letter queue, allowing you to handle them later.
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?