rsmq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: MIT Imports: 27 Imported by: 0

README

rsmq-go

Go codecov

Go implementation of the Message Queue based on Redis Streams.

rsmq

Why

  • High performance and low latency
  • Easy to use and maintain with Redis

Features

  • Add message to the queue
  • Consume message from the queue
  • Auto-acknowledgment of message
  • Message delivery delay with specific timestamp
  • Message retry ability
  • Dead letter queue after retry limit
  • Auto clean idle consumer
  • Pending message processing
  • Distributed rate limiting
  • Tag filter for message
  • OpenTelemetry instrumentation

Installation

go get github.com/sysulq/rsmq-go

Example

package rsmq_test

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/redis/go-redis/v9"
	"github.com/sysulq/rsmq-go"
)

func Example_produceAndConsume() {
	cc := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})

	queue := rsmq.New(rsmq.Options{
		Client: cc,
		Topic:  "example",
		ConsumeOpts: rsmq.ConsumeOpts{
			ConsumerGroup:   "task_group",
			AutoCreateGroup: true,
			MaxConcurrency:  1,
		},
	})
	defer queue.Close()

	// Produce tasks
	for i := 0; i < 10; i++ {
		task := &rsmq.Message{
			Payload: json.RawMessage(fmt.Sprintf(`{"message": "Hello %d"}`, i)),
		}

		err := queue.Add(context.Background(), task)
		if err != nil {
			log.Printf("Failed to enqueue task: %v", err)
		}
	}

	// Consume tasks
	go func() {
		err := queue.Consume(
			context.Background(),
			func(ctx context.Context, task *rsmq.Message) error {
				var payload map[string]interface{}
				_ = json.Unmarshal(task.Payload, &payload)
				fmt.Printf("Processing task, payload: %v\n", payload)

				return nil
			},
		)
		if err != nil {
			log.Fatalf("Error consuming tasks: %v", err)
		}
	}()

	time.Sleep(time.Second)
	// Output:
	// Processing task, payload: map[message:Hello 0]
	// Processing task, payload: map[message:Hello 1]
	// Processing task, payload: map[message:Hello 2]
	// Processing task, payload: map[message:Hello 3]
	// Processing task, payload: map[message:Hello 4]
	// Processing task, payload: map[message:Hello 5]
	// Processing task, payload: map[message:Hello 6]
	// Processing task, payload: map[message:Hello 7]
	// Processing task, payload: map[message:Hello 8]
	// Processing task, payload: map[message:Hello 9]
}

Documentation

Overview

Example (ProduceAndConsume)
cc := redis.NewClient(&redis.Options{
	Addr: "localhost:6379",
})

queue, err := rsmq.New(rsmq.Options{
	Client: cc,
	Topic:  "example",
	ConsumeOpts: rsmq.ConsumeOpts{
		ConsumerGroup:   "task_group",
		AutoCreateGroup: true,
		MaxConcurrency:  1,
	},
})
if err != nil {
	log.Fatalf("Failed to create queue: %v", err)
}
defer queue.Close()

// Produce tasks
for i := 0; i < 10; i++ {
	task := &rsmq.Message{
		Payload: json.RawMessage(fmt.Sprintf(`{"message": "Hello %d"}`, i)),
	}

	err := queue.Add(context.Background(), task)
	if err != nil {
		log.Printf("Failed to enqueue task: %v", err)
	}
}

// Consume tasks
go func() {
	err := queue.Consume(
		context.Background(),
		func(ctx context.Context, task *rsmq.Message) error {
			var payload map[string]interface{}
			_ = json.Unmarshal(task.Payload, &payload)
			fmt.Printf("Processing task, payload: %v\n", payload)

			return nil
		},
	)
	if err != nil {
		log.Fatalf("Error consuming tasks: %v", err)
	}
}()

time.Sleep(time.Second)
Output:

Processing task, payload: map[message:Hello 0]
Processing task, payload: map[message:Hello 1]
Processing task, payload: map[message:Hello 2]
Processing task, payload: map[message:Hello 3]
Processing task, payload: map[message:Hello 4]
Processing task, payload: map[message:Hello 5]
Processing task, payload: map[message:Hello 6]
Processing task, payload: map[message:Hello 7]
Processing task, payload: map[message:Hello 8]
Processing task, payload: map[message:Hello 9]

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// MessagingRsmqSystem is the messaging system for rsmq
	MessagingRsmqSystem = attribute.Key("messaging.system").String("rsmq")
	// MessagingRsmqMessageTopic is the messaging topic for rsmq
	MessagingRsmqMessageTopic = attribute.Key("messaging.rsmq.message.topic")
	// MessagingRsmqMessageGroup is the messaging group for rsmq
	MessagingRsmqMessageGroup = attribute.Key("messaging.rsmq.message.group")
	// MessagingRsmqMessageID is the messaging ID for rsmq
	MessagingRsmqMessageTag = attribute.Key("messaging.rsmq.message.tag")
	// MessagingRsmqMessageDeliveryTimestamp is the messaging delivery timestamp for rsmq
	MessagingRsmqMessageDeliveryTimestamp = attribute.Key("messaging.rsmq.message.delivery_timestamp")
)

Functions

func Map2

func Map2[KIn, VIn, KOut, VOut any](f func(KIn, VIn) (KOut, VOut), seq iter.Seq2[KIn, VIn]) iter.Seq2[KOut, VOut]

Map2 returns an iterator over f applied to seq.

Types

type BatchMessageHandler

type BatchMessageHandler func(context.Context, []*Message) []error

BatchMessageHandler is a function that processes a batch of messages and returns a list of errors

type ConsumeOpts

type ConsumeOpts struct {
	// ConsumerGroup is the name of the consumer group
	// Must be set if consuming messages
	ConsumerGroup string
	// ConsumerID is the unique identifier for the consumer
	// Default is generated based on hostname and process ID
	ConsumerID string
	// BatchSize is the number of messages to consume in a single batch
	// If set, the consumer will consume messages in batches
	BatchSize int64
	// MaxBlockDuration is the maximum time to block while waiting for messages
	// If set, the consumer will block for the specified duration
	MaxBlockDuration time.Duration
	// AutoCreateGroup determines whether the consumer group should be created automatically
	// If set, the consumer group will be created if it does not exist
	AutoCreateGroup bool
	// MaxConcurrency is the maximum number of messages to process concurrently
	// If set, the messages will be processed concurrently up to the limit
	MaxConcurrency uint32
	// ConsumerIdleTimeout is the maximum time a consumer can be idle before being removed
	// If set, the idle consumers will be removed periodically
	ConsumerIdleTimeout time.Duration
	// MaxRetryLimit is the maximum number of times a message can be retried
	// If set, the message will be re-queued with an exponential backoff
	MaxRetryLimit uint32
	// RetryTimeWait is the time to wait before retrying a message
	// The time to wait is calculated as 2^retryCount * RetryTimeWait
	RetryTimeWait time.Duration
	// PendingTimeout is the time to wait before a pending message is re-queued
	// If set, the pending messages will be re-queued after the timeout
	PendingTimeout time.Duration
	// IdleConsumerCleanInterval is the interval to clean idle consumers
	// If set, the idle consumers will be removed periodically
	IdleConsumerCleanInterval time.Duration
	// RateLimit is the maximum number of messages to consume per second
	// If set, the rate limiter will be used to limit the number of messages consumed
	RateLimit int
	// SubExpression is the sub expression to filter messages, default is "*"
	// e.g. "tag1||tag2||tag3"
	SubExpression string
	// CloseTimeout is the timeout to wait for the consumer to close
	// Default is 5 seconds
	CloseTimeout time.Duration
}

ConsumeOpts represents options for consuming messages

type Message

type Message = rsmqv1.Message

Message represents a message in the queue

type MessageHandler

type MessageHandler func(context.Context, *Message) error

MessageHandler is a function that processes a message and returns a result

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue manages message production and consumption

func New

func New(opts Options) (*MessageQueue, error)

New creates a new MessageQueue instance

func (*MessageQueue) Add

func (mq *MessageQueue) Add(ctx context.Context, message *Message) error

Add adds a new message to the queue

func (*MessageQueue) BatchConsume

func (mq *MessageQueue) BatchConsume(ctx context.Context, handler BatchMessageHandler) error

BatchConsume starts consuming messages from the queue in batches

func (*MessageQueue) Close

func (mq *MessageQueue) Close() error

Close closes the message queue

func (*MessageQueue) Consume

func (mq *MessageQueue) Consume(ctx context.Context, handler MessageHandler) error

Consume starts consuming messages from the queue

type Options

type Options struct {
	// Client is the Redis client
	// Must be set
	Client redis.Cmdable
	// Topic is the topic name of the message
	// Must be set
	Topic string
	// RetentionOpts represents options for retention policy
	RetentionOpts RetentionOpts
	// TracerProvider is the OpenTelemetry tracer provider
	TracerProvider trace.TracerProvider
	// ConsumeOpts represents options for consuming messages
	ConsumeOpts ConsumeOpts
}

type RetentionOpts

type RetentionOpts struct {
	// MaxLen is the maximum length of the stream
	// Default is 20,000,000
	MaxLen int64
	// MaxRetentionTime is the maximum retention time of the stream
	// Default is 168 hours
	MaxRetentionTime time.Duration
	// CheckRetentionInterval is the interval to check retention time
	// Default is 5 minutes
	CheckRetentionInterval time.Duration
}

RetentionOpts represents options for retention policy

Directories

Path Synopsis
rsmq
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL