Documentation
¶
Overview ¶
Example (ProduceAndConsume) ¶
cc := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) queue, err := rsmq.New(rsmq.Options{ Client: cc, Topic: "example", ConsumeOpts: rsmq.ConsumeOpts{ ConsumerGroup: "task_group", AutoCreateGroup: true, MaxConcurrency: 1, }, }) if err != nil { log.Fatalf("Failed to create queue: %v", err) } defer queue.Close() // Produce tasks for i := 0; i < 10; i++ { task := &rsmq.Message{ Payload: json.RawMessage(fmt.Sprintf(`{"message": "Hello %d"}`, i)), } err := queue.Add(context.Background(), task) if err != nil { log.Printf("Failed to enqueue task: %v", err) } } // Consume tasks go func() { err := queue.Consume( context.Background(), func(ctx context.Context, task *rsmq.Message) error { var payload map[string]interface{} _ = json.Unmarshal(task.Payload, &payload) fmt.Printf("Processing task, payload: %v\n", payload) return nil }, ) if err != nil { log.Fatalf("Error consuming tasks: %v", err) } }() time.Sleep(time.Second)
Output: Processing task, payload: map[message:Hello 0] Processing task, payload: map[message:Hello 1] Processing task, payload: map[message:Hello 2] Processing task, payload: map[message:Hello 3] Processing task, payload: map[message:Hello 4] Processing task, payload: map[message:Hello 5] Processing task, payload: map[message:Hello 6] Processing task, payload: map[message:Hello 7] Processing task, payload: map[message:Hello 8] Processing task, payload: map[message:Hello 9]
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // MessagingRsmqSystem is the messaging system for rsmq MessagingRsmqSystem = attribute.Key("messaging.system").String("rsmq") // MessagingRsmqMessageTopic is the messaging topic for rsmq MessagingRsmqMessageTopic = attribute.Key("messaging.rsmq.message.topic") // MessagingRsmqMessageGroup is the messaging group for rsmq MessagingRsmqMessageGroup = attribute.Key("messaging.rsmq.message.group") // MessagingRsmqMessageID is the messaging ID for rsmq MessagingRsmqMessageTag = attribute.Key("messaging.rsmq.message.tag") // MessagingRsmqMessageDeliveryTimestamp is the messaging delivery timestamp for rsmq MessagingRsmqMessageDeliveryTimestamp = attribute.Key("messaging.rsmq.message.delivery_timestamp") )
Functions ¶
Types ¶
type BatchMessageHandler ¶
BatchMessageHandler is a function that processes a batch of messages and returns a list of errors
type ConsumeOpts ¶
type ConsumeOpts struct { // ConsumerGroup is the name of the consumer group // Must be set if consuming messages ConsumerGroup string // ConsumerID is the unique identifier for the consumer // Default is generated based on hostname and process ID ConsumerID string // BatchSize is the number of messages to consume in a single batch // If set, the consumer will consume messages in batches BatchSize int64 // MaxBlockDuration is the maximum time to block while waiting for messages // If set, the consumer will block for the specified duration MaxBlockDuration time.Duration // AutoCreateGroup determines whether the consumer group should be created automatically // If set, the consumer group will be created if it does not exist AutoCreateGroup bool // MaxConcurrency is the maximum number of messages to process concurrently // If set, the messages will be processed concurrently up to the limit MaxConcurrency uint32 // ConsumerIdleTimeout is the maximum time a consumer can be idle before being removed // If set, the idle consumers will be removed periodically ConsumerIdleTimeout time.Duration // MaxRetryLimit is the maximum number of times a message can be retried // If set, the message will be re-queued with an exponential backoff MaxRetryLimit uint32 // RetryTimeWait is the time to wait before retrying a message // The time to wait is calculated as 2^retryCount * RetryTimeWait RetryTimeWait time.Duration // PendingTimeout is the time to wait before a pending message is re-queued // If set, the pending messages will be re-queued after the timeout PendingTimeout time.Duration // IdleConsumerCleanInterval is the interval to clean idle consumers // If set, the idle consumers will be removed periodically IdleConsumerCleanInterval time.Duration // RateLimit is the maximum number of messages to consume per second // If set, the rate limiter will be used to limit the number of messages consumed RateLimit int // SubExpression is the sub expression to filter messages, default is "*" // e.g. "tag1||tag2||tag3" SubExpression string // CloseTimeout is the timeout to wait for the consumer to close // Default is 5 seconds CloseTimeout time.Duration }
ConsumeOpts represents options for consuming messages
type MessageHandler ¶
MessageHandler is a function that processes a message and returns a result
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue manages message production and consumption
func (*MessageQueue) Add ¶
func (mq *MessageQueue) Add(ctx context.Context, message *Message) error
Add adds a new message to the queue
func (*MessageQueue) BatchConsume ¶
func (mq *MessageQueue) BatchConsume(ctx context.Context, handler BatchMessageHandler) error
BatchConsume starts consuming messages from the queue in batches
func (*MessageQueue) Consume ¶
func (mq *MessageQueue) Consume(ctx context.Context, handler MessageHandler) error
Consume starts consuming messages from the queue
type Options ¶
type Options struct { // Client is the Redis client // Must be set Client redis.Cmdable // Topic is the topic name of the message // Must be set Topic string // RetentionOpts represents options for retention policy RetentionOpts RetentionOpts // TracerProvider is the OpenTelemetry tracer provider TracerProvider trace.TracerProvider // ConsumeOpts represents options for consuming messages ConsumeOpts ConsumeOpts }
type RetentionOpts ¶
type RetentionOpts struct { // MaxLen is the maximum length of the stream // Default is 20,000,000 MaxLen int64 // MaxRetentionTime is the maximum retention time of the stream // Default is 168 hours MaxRetentionTime time.Duration // CheckRetentionInterval is the interval to check retention time // Default is 5 minutes CheckRetentionInterval time.Duration }
RetentionOpts represents options for retention policy
Click to show internal directories.
Click to hide internal directories.