redis

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const ReadFromEarliest = "0-0"
View Source
const ReadFromLatest = "$"

Variables

View Source
var RedisContext = context.Background()

RedisContext is used to pass the context specifically for REDIS operations. A cancelled context during SIGTERM or Ctrl-C that is propagated down will throw a context cancelled error because redis uses context to obtain connection from the connection pool. All redis operations will use the below no-op context.Background() to try to process in-flight messages that we have received prior to the cancellation of the context.

Functions

func GetRedisStreamName added in v0.6.0

func GetRedisStreamName(s string) string

func IsAlreadyExistError

func IsAlreadyExistError(err error) bool

func NotFoundError

func NotFoundError(err error) bool

Types

type Metrics added in v0.7.3

type Metrics struct {
	ReadErrorsInc metricsIncrementFunc
	ReadsAdd      metricsAddFunc // number of actual messages read in
	AcksAdd       metricsAddFunc
	AckErrorsAdd  metricsAddFunc
}

type Option added in v0.7.3

type Option interface {
	Apply(*Options)
}

Option to apply different options

func WithBufferFullWritingStrategy added in v0.7.3

func WithBufferFullWritingStrategy(s dfv1.BufferFullWritingStrategy) Option

WithBufferFullWritingStrategy sets the BufferFullWritingStrategy

func WithBufferUsageLimit added in v0.7.3

func WithBufferUsageLimit(u float64) Option

WithBufferUsageLimit sets the bufferUsageLimit

func WithCheckBacklog added in v0.7.3

func WithCheckBacklog(b bool) Option

WithCheckBacklog sets the checkBackLog option

func WithInfoRefreshInterval added in v0.7.3

func WithInfoRefreshInterval(t time.Duration) Option

WithInfoRefreshInterval sets the refresh interval

func WithLagDuration added in v0.7.3

func WithLagDuration(t time.Duration) Option

WithLagDuration sets the consumerLag duration

func WithMaxLength added in v0.7.3

func WithMaxLength(m int64) Option

WithMaxLength sets the maxLength

func WithReadTimeOut added in v0.7.3

func WithReadTimeOut(t time.Duration) Option

WithReadTimeOut sets the readTimeOut

func WithRefreshBufferWriteInfo added in v0.7.3

func WithRefreshBufferWriteInfo(r bool) Option

WithRefreshBufferWriteInfo sets the refreshBufferWriteInfo

func WithoutPipelining added in v0.7.3

func WithoutPipelining() Option

WithoutPipelining turns on redis pipelining

type Options added in v0.7.3

type Options struct {
	// Pipelining enables redis pipeline
	Pipelining bool
	// InfoRefreshInterval refreshes the info at this interval
	InfoRefreshInterval time.Duration
	// LagDuration is the minimum permissable consumerLag that we consider as a valid consumerLag.
	LagDuration time.Duration
	// ReadTimeOut is the timeout needed for read timeout
	ReadTimeOut time.Duration
	// CheckBackLog is used to read all the PENDING entries from the stream
	CheckBackLog bool
	// MaxLength is the maximum length of the stream before it reaches full
	MaxLength int64
	// BufferUsageLimit is the limit of buffer usage before we declare it as full
	BufferUsageLimit float64
	// RefreshBufferWriteInfo is used to determine if we refresh buffer write info
	RefreshBufferWriteInfo bool
	// BufferFullWritingStrategy is the writing strategy when buffer is full
	BufferFullWritingStrategy dfv1.BufferFullWritingStrategy
}

Options for writing to redis

type RedisClient

type RedisClient struct {
	Client redis.UniversalClient
}

RedisClient datatype to hold redis client attributes.

func NewInClusterRedisClient

func NewInClusterRedisClient() *RedisClient

NewInClusterRedisClient returns a new Redis Client, it assumes it's in a vertex pod, where those required environment variables are available.

func NewRedisClient

func NewRedisClient(options *redis.UniversalOptions) *RedisClient

NewRedisClient returns a new Redis Client.

func (*RedisClient) CreateStreamGroup

func (cl *RedisClient) CreateStreamGroup(ctx context.Context, stream string, group string, start string) error

CreateStreamGroup creates a redis stream group and creates an empty stream if it does not exist.

func (*RedisClient) DeleteKeys

func (cl *RedisClient) DeleteKeys(ctx context.Context, keys ...string) error

DeleteKeys deletes a redis keys

func (*RedisClient) DeleteStreamGroup

func (cl *RedisClient) DeleteStreamGroup(ctx context.Context, stream string, group string) error

DeleteStreamGroup deletes the redis stream group.

func (*RedisClient) IsStreamExists

func (cl *RedisClient) IsStreamExists(ctx context.Context, streamKey string) bool

IsStreamExists check the redis keys exists

func (*RedisClient) IsStreamGroupExists

func (cl *RedisClient) IsStreamGroupExists(ctx context.Context, streamKey string, groupName string) bool

IsStreamGroupExists check the stream group exists

func (*RedisClient) PendingMsgCount

func (cl *RedisClient) PendingMsgCount(ctx context.Context, streamKey, consumerGroup string) (int64, error)

PendingMsgCount returns how many messages are pending.

func (*RedisClient) StreamGroupInfo

func (cl *RedisClient) StreamGroupInfo(ctx context.Context, streamKey string) ([]redis.XInfoGroup, error)

StreamGroupInfo returns redis stream group info

func (*RedisClient) StreamInfo

func (cl *RedisClient) StreamInfo(ctx context.Context, streamKey string) (*redis.XInfoStream, error)

StreamInfo returns redis stream info

type RedisStreamsRead added in v0.7.3

type RedisStreamsRead struct {
	Name         string
	Stream       string
	Group        string
	Consumer     string
	PartitionIdx int32

	*RedisClient
	Options
	Log     *zap.SugaredLogger
	Metrics Metrics

	XStreamToMessages func(xstreams []redis.XStream, messages []*isb.ReadMessage, labels map[string]string) ([]*isb.ReadMessage, error)
}

RedisStreamsRead is the read queue implementation powered by RedisClient.

func (*RedisStreamsRead) Ack added in v0.7.3

func (br *RedisStreamsRead) Ack(_ context.Context, offsets []isb.Offset) []error

Ack acknowledges the offset to the read queue. Ack is always pipelined, if you want to avoid it then send array of 1 element.

func (*RedisStreamsRead) GetGroupName added in v0.7.3

func (br *RedisStreamsRead) GetGroupName() string

GetGroupName gets the name of the consumer group.

func (*RedisStreamsRead) GetName added in v0.7.3

func (br *RedisStreamsRead) GetName() string

GetName returns the name of the partitioned buffer.

func (*RedisStreamsRead) GetPartitionIdx added in v0.9.0

func (br *RedisStreamsRead) GetPartitionIdx() int32

GetPartitionIdx returns the partition number.

func (*RedisStreamsRead) GetStreamName added in v0.7.3

func (br *RedisStreamsRead) GetStreamName() string

GetStreamName returns the stream name.

func (*RedisStreamsRead) NoAck added in v0.7.3

func (br *RedisStreamsRead) NoAck(_ context.Context, _ []isb.Offset)

func (*RedisStreamsRead) Pending added in v0.8.1

func (br *RedisStreamsRead) Pending(_ context.Context) (int64, error)

func (*RedisStreamsRead) Read added in v0.7.3

func (br *RedisStreamsRead) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)

Read reads the messages from the stream. During a restart, we need to make sure all the un-acknowledged messages are reprocessed. we need to replace `>` with `0-0` during restarts. We might run into data loss otherwise.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL