Documentation ¶
Overview ¶
Package mredis implements connecting to a redis instance.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Redis ¶
type Redis struct { radix.Client // contains filtered or unexported fields }
Redis is a wrapper around a redis client which provides more functionality.
func InstRedis ¶
func InstRedis(parent *mcmp.Component, options ...RedisOption) *Redis
InstRedis instantiates a Redis instance which will be initialized when the Init event is triggered on the given Component. The redis client will have Close called on it when the Shutdown event is triggered on the given Component.
type RedisOption ¶
type RedisOption func(*redisOpts)
RedisOption is a value which adjusts the behavior of InstRedis.
func RedisDialOpts ¶
func RedisDialOpts(dialOpts ...radix.DialOpt) RedisOption
RedisDialOpts specifies that the given set of DialOpts should be used when creating any new connections.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream wraps a Redis instance in order to provide an abstraction over consuming messages from a single redis stream. Stream is intended to be used in a single-threaded manner, and doesn't spawn any go-routines.
See https://redis.io/topics/streams-intro
func NewStream ¶
func NewStream(r *Redis, opts StreamOpts) *Stream
NewStream initializes and returns a Stream instance using the given options.
func (*Stream) Next ¶
func (s *Stream) Next() (StreamEntry, bool, error)
Next returns the next StreamEntry which needs processing, or false. This method is expected to block for up to the value of the Block field in StreamOpts.
If an error is returned it's up to the caller whether or not they want to keep retrying.
type StreamEntry ¶
type StreamEntry struct { radix.StreamEntry // Ack is used in order to acknowledge that a stream message has been // successfully consumed and should not be consumed again. Ack func() error // Nack is used to declare that a stream message was not successfully // consumed and it needs to be consumed again. Nack func() }
StreamEntry wraps radix's StreamEntry type in order to provde some extra functionality.
type StreamOpts ¶
type StreamOpts struct { // Key is the redis key at which the redis stream resides. Key string // Group is the name of the consumer group which will consume from Key. Group string // Consumer is the name of this particular consumer. This value should // remain the same across restarts of the process. Consumer string // (Optional) InitialCursor is only used when the consumer group is first // being created, and indicates where in the stream the consumer group // should start consuming from. // // "0" indicates the group should consume from the start of the stream. "$" // indicates the group should not consume any old messages, only those added // after the group is initialized. A specific message id can be given to // consume only those messages with greater ids. // // Defaults to "$". InitialCursor string // (Optional) ReadCount indicates the max number of messages which should be // read on every XREADGROUP call. 0 indicates no limit. ReadCount int // (Optional) Block indicates what BLOCK value is sent to XREADGROUP calls. // This value _must_ be less than the ReadtTimeout the redis client is // using. // // Defaults to 5 * time.Second Block time.Duration }
StreamOpts are options used to initialize a Stream instance. Fields are required unless otherwise noted.