This guide provides an overview of how to use Redis streams with consumers in Go. It covers the basics of setting up a Redis stream, creating consumers, and processing messages efficiently.
Redis streams, Go, consumers, message processing, distributed systems
package main
import (
"context"
"log"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis server address
})
// Create a stream
streamName := "mystream"
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{"key": "value"},
}).Result()
if err != nil {
log.Fatalf("Could not add to stream: %v", err)
}
// Create a consumer group
groupName := "mygroup"
_, err = rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Result()
if err != nil {
log.Fatalf("Could not create consumer group: %v", err)
}
// Define consumer name
consumerName := "myconsumer"
for {
// Read messages from the stream
messages, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, "$"},
Count: 1,
Block: 0,
}).Result()
if err != nil {
log.Printf("Error reading from stream: %v", err)
continue
}
// Process each message
for _, msg := range messages {
for _, message := range msg.Messages {
log.Printf("Received message: %s", message.String())
// Acknowledge message processing
_, err := rdb.XAck(ctx, streamName, groupName, message.ID).Result()
if err != nil {
log.Printf("Error acknowledging message: %v", err)
}
}
}
}
}
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?