Documentation ¶
Overview ¶
Package redis provides a way to interact with redis streams. segmentio/redis-go is used underneath. Redis version >= 6.2 is required.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Subscriber ¶
func Subscriber(groupID, address string, streams []Stream, opts ...SubscriberOption) pubsub.Subscriber
Subscriber creates a subscriber that uses redis streams under the hood. All the events that are handled (either successfully or by using the error handler), won't be consumed again. On the other hand, only events that can't be handled by the client will be re-consumed automatically. This makes the error handler responsible for dealing with unsuccessful handlings. The use of DLQs is encouraged to ensure all events are processed.
Types ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream let's both –producers and consumers– know what redis streams to interact with.
func StreamForPublisher ¶
StreamForPublisher creates a Stream that will know the event types that will use a redis stream with such name in order to publish messages into.
func StreamsForSubscriber ¶
StreamsForSubscriber creates a list of Stream to indicate all the redis streams a subscriber has to read events from.
type SubscriberOption ¶
type SubscriberOption func(*subscriber)
SubscriberOption allows to tweak subscriber behavior.
func ConsumeTimeout ¶
func ConsumeTimeout(timeout time.Duration) SubscriberOption
ConsumeTimeout indicates the maximum amount of time for an event to be in a handling state. Defaults to 1s, which is the minimum value.
func HandlingNumberOfAttempts ¶
func HandlingNumberOfAttempts(attempts int) SubscriberOption
HandlingNumberOfAttempts indicates how many times an event will be processed if the handler errors. Defaults to 1, that is, no automatic retries.
func ReadingBatchCapacity ¶
func ReadingBatchCapacity(capacity int) SubscriberOption
ReadingBatchCapacity indicates how many events can be taken out of the stream at once. Defaults to 10.
func RunFailureRecovery ¶
func RunFailureRecovery(enabled bool, cadence time.Duration) SubscriberOption
RunFailureRecovery enables the execution of the redis xautoclaim command, running it on the indicated cadence. By default, no recovery is run.