Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { // Errors is a channel over which non-fatal errors are // sent. This channel must have a listener otherwise // deadlock will arise. Errors chan error // contains filtered or unexported fields }
Consumer uses Redis Streams to publish and subscribe
func NewConsumer ¶
func NewConsumer(opts *ConsumerOptions) *Consumer
NewConsumer returns an initialised Consumer
type ConsumerOptions ¶
type ConsumerOptions struct { // Group is the name of the consumer group used when // listening for new messages from Redis. This must // be set before calling Listen(). Group string // Consumer is the name of this particular consumer // within the consumer group. This must be set before // calling Listen(). Consumer string // Redis is an instance of *redis.Client for use by // the client. This must be set before using the client. Redis *redis.Client // ReadTimeout is the duration for which the XREADGROUP // call blocks for. A duration of zero means the client // will block indefinitely. It is recommended to set // this to a non-zero duration so that the client is // able to gracefully shutdown. ReadTimeout time.Duration // HandlerTimeout is the duration after which the // context passed to handlers is cancelled. Note that // handlers are not forcefully stopped after this time. // It is up to them to handle context cancellation. // A duration of zero means handlers never timeout. HandlerTimeout time.Duration // PendingTimeout is the duration for which a message // can be pending before the consumer tries to claim it. // // This value should not be shorter than HandlerTimeout // otherwise you risk claiming messages that are still // being processed. PendingTimeout time.Duration // ClaimInterval is the time between attempts to claim // any messages that have been pending for longer than // the PendingTimeout. If this value is zero, then the // consumer will not try to claim pending messages. ClaimInterval time.Duration // MaxRetry is the number of times a message will be // retried before it is passed to the dead-letter // consumer(s) for the stream. If < 0, then the // message will never be dead-lettered. MaxRetry int // Concurrency is the number of goroutines that are // spawned to concurrently handle incoming messages. // A value of zero is equal to a value of one. Concurrency int // BufferSize is the size of the channel that holds // incoming messages and therefore determines how many // messages the consumer can read from Redis in a // single call. A value of zero will create an // unbuffered channel. BufferSize int // Backoff is used to retry requests to Redis in the // case of network failures. If this value is nil, a // Backoff with sensible defaults will be used. Backoff *backoff.Backoff // NetworkRetry is the number of times to retry failed // network requests to Redis before returning a fatal // error. A value of zero means requests will not be // retried. A value of < 0 means requests will be // retried indefinitely until the context is cancelled. NetworkRetry int }
ConsumerOptions contains options to configure the Consumer
type Handler ¶
Handler processes messages. HandleEvent should return a result that tells the client whether the handling was successful or not.
type HandlerError ¶
HandlerError wraps any errors returned from message handlers
func (*HandlerError) Error ¶
func (e *HandlerError) Error() string
Error returns a formatted error string
func (*HandlerError) Unwrap ¶
func (e *HandlerError) Unwrap() error
Unwrap returns the underlying error
type HandlerFunc ¶
HandlerFunc is an adapter that allows ordinary functions to be used as event handlers. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.
func (HandlerFunc) HandleEvent ¶
func (f HandlerFunc) HandleEvent(ctx context.Context, m *Message) Result
HandleEvent calls f(e)
type HandlerPanic ¶
HandlerPanic is returned if a message handler panics. The panic is recovered and converted to an error.
func (*HandlerPanic) Error ¶
func (e *HandlerPanic) Error() string
Error returns a formatted error string
func (*HandlerPanic) Unwrap ¶
func (e *HandlerPanic) Unwrap() error
Unwrap returns the underlying error
type Message ¶
type Message struct { // ID is the Redis ID of the message. When publishing, // this can be set to "*" to instruct Redis to generate // an ID. This is usually what you want. If left blank, // it will automatically be set to "*". ID string // Stream is the name of the stream this message should // be published to or was received from. Stream string // Values represents the message's payload Values map[string]interface{} // contains filtered or unexported fields }
Message represents a message published to or received from a Redis stream.
type NetworkError ¶
NetworkError is returned if a network error occurs when communicating with Redis. The Retrying field tells you whether the library is planning to retry the request. If this is true, Backoff will be set.
func (*NetworkError) Error ¶
func (e *NetworkError) Error() string
Error returns a formatted error string
func (*NetworkError) Unwrap ¶
func (e *NetworkError) Unwrap() error
Unwrap returns the underlying error
type Publisher ¶
type Publisher struct { // Errors is a channel over which non-fatal errors are // sent. This channel must have a listener otherwise // deadlock will arise. // // If NetworkRetry is set to 0, nothing will be sent // over this channel. It doesn't need a listener in // that case. Errors chan error // contains filtered or unexported fields }
Publisher is capable of publishing messages to Redis Streams. It should be created via NewPublisher().
func NewPublisher ¶
func NewPublisher(opts *PublisherOptions) *Publisher
NewPublisher returns an initialised Publisher
type PublisherOptions ¶
type PublisherOptions struct { // StreamMaxLength sets the MAXLEN option when calling // XADD. This limits the size of the stream. Old entries // are automatically evicted when the specified length // is reached. StreamMaxLength int64 // ApproximateMaxLength is an optimisation that allows // the stream to be capped more efficiently, as long as // an exact length is not required. ApproximateMaxLength bool // Redis is an instance of *redis.Client for use by // the client. This must be set before using the client. Redis *redis.Client // Backoff is used to retry requests to Redis in the // case of network failures. If this value is nil, a // Backoff with sensible defaults will be used. Backoff *backoff.Backoff // NetworkRetry is the number of times to retry failed // network requests to Redis before returning a fatal // error. A value of zero means requests will not be // retried. A value of < 0 means requests will be // retried indefinitely until the context is cancelled. NetworkRetry int }
PublisherOptions contains options to configure the Publisher
type RedisError ¶
type RedisError struct {
Err error
}
RedisError is returned if the Redis client returns an error that is not a network error.
func (*RedisError) Error ¶
func (e *RedisError) Error() string
Error returns a formatted error string
type Result ¶
Result defines the result of a handler
func Discard ¶
Discard should be returned when the message was not successfully processed but should not be retried. It will be acknowledged and therefore not re-processed by any consumers. The error will be enqueued on the error channel.