How do I use Redis streams with consumers in Go?

This guide provides an overview of how to use Redis streams with consumers in Go. It covers the basics of setting up a Redis stream, creating consumers, and processing messages efficiently.

Redis streams, Go, consumers, message processing, distributed systems


package main

import (
    "context"
    "log"
    "github.com/go-redis/redis/v8"
)

func main() {
    ctx := context.Background()
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379", // Redis server address
    })

    // Create a stream
    streamName := "mystream"
    _, err := rdb.XAdd(ctx, &redis.XAddArgs{
        Stream: streamName,
        Values: map[string]interface{}{"key": "value"},
    }).Result()
    if err != nil {
        log.Fatalf("Could not add to stream: %v", err)
    }

    // Create a consumer group
    groupName := "mygroup"
    _, err = rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Result()
    if err != nil {
        log.Fatalf("Could not create consumer group: %v", err)
    }

    // Define consumer name
    consumerName := "myconsumer"

    for {
        // Read messages from the stream
        messages, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamName, "$"},
            Count:    1,
            Block:    0,
        }).Result()
        if err != nil {
            log.Printf("Error reading from stream: %v", err)
            continue
        }

        // Process each message
        for _, msg := range messages {
            for _, message := range msg.Messages {
                log.Printf("Received message: %s", message.String())

                // Acknowledge message processing
                _, err := rdb.XAck(ctx, streamName, groupName, message.ID).Result()
                if err != nil {
                    log.Printf("Error acknowledging message: %v", err)
                }
            }
        }
    }
}
    

Redis streams Go consumers message processing distributed systems