redis

package
v0.0.0-...-f1b319b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2023 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

Package redis provides a way to interact with redis streams. segmentio/redis-go is used underneath. Redis version >= 6.2 is required.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Publisher

func Publisher(address string, streams []Stream) pubsub.Publisher

Publisher creates a publisher that uses redis streams under the hood.

func Subscriber

func Subscriber(groupID, address string, streams []Stream, opts ...SubscriberOption) pubsub.Subscriber

Subscriber creates a subscriber that uses redis streams under the hood. All the events that are handled (either successfully or by using the error handler), won't be consumed again. On the other hand, only events that can't be handled by the client will be re-consumed automatically. This makes the error handler responsible for dealing with unsuccessful handlings. The use of DLQs is encouraged to ensure all events are processed.

Types

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream let's both –producers and consumers– know what redis streams to interact with.

func StreamForPublisher

func StreamForPublisher(name string, events ...string) Stream

StreamForPublisher creates a Stream that will know the event types that will use a redis stream with such name in order to publish messages into.

func StreamsForSubscriber

func StreamsForSubscriber(names ...string) []Stream

StreamsForSubscriber creates a list of Stream to indicate all the redis streams a subscriber has to read events from.

func (Stream) CappedAt

func (s Stream) CappedAt(capacity int) Stream

CappedAt let clients indicate the maximum capacity for this specific redis stream. Notice that, in the scenario of capping the same stream twice with different values, this client won't do any special handling: it will overwrite previous capacity references.

type SubscriberOption

type SubscriberOption func(*subscriber)

SubscriberOption allows to tweak subscriber behavior.

func ConsumeTimeout

func ConsumeTimeout(timeout time.Duration) SubscriberOption

ConsumeTimeout indicates the maximum amount of time for an event to be in a handling state. Defaults to 1s, which is the minimum value.

func HandlingNumberOfAttempts

func HandlingNumberOfAttempts(attempts int) SubscriberOption

HandlingNumberOfAttempts indicates how many times an event will be processed if the handler errors. Defaults to 1, that is, no automatic retries.

func ReadingBatchCapacity

func ReadingBatchCapacity(capacity int) SubscriberOption

ReadingBatchCapacity indicates how many events can be taken out of the stream at once. Defaults to 10.

func RunFailureRecovery

func RunFailureRecovery(enabled bool, cadence time.Duration) SubscriberOption

RunFailureRecovery enables the execution of the redis xautoclaim command, running it on the indicated cadence. By default, no recovery is run.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL