redis

package
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBufferRead

func NewBufferRead(ctx context.Context, client *redisclient.RedisClient, name string, group string, consumer string, opts ...Option) isb.BufferReader

NewBufferRead returns a new redis buffer reader.

func NewBufferWrite

func NewBufferWrite(ctx context.Context, client *redisclient.RedisClient, name string, group string, opts ...Option) isb.BufferWriter

NewBufferWrite returns a new redis queue writer.

Types

type BufferRead

type BufferRead struct {
	Name     string
	Stream   string
	Group    string
	Consumer string
	*BufferReadInfo
	*redisclient.RedisClient
	// contains filtered or unexported fields
}

BufferRead is the read queue implementation powered by RedisClient.

func (*BufferRead) Ack

func (br *BufferRead) Ack(_ context.Context, offsets []isb.Offset) []error

Ack acknowledges the offset to the read queue. Ack is always pipelined, if you want to avoid it then send array of 1 element.

func (*BufferRead) Close

func (br *BufferRead) Close() error

func (*BufferRead) GetGroupName

func (br *BufferRead) GetGroupName() string

GetGroupName gets the name of the consumer group.

func (*BufferRead) GetLag

func (br *BufferRead) GetLag() time.Duration

GetLag returns the lag of the buffer

func (*BufferRead) GetName

func (br *BufferRead) GetName() string

GetName returns name for the buffer.

func (*BufferRead) GetRefreshEmptyError

func (br *BufferRead) GetRefreshEmptyError() uint32

GetRefreshEmptyError returns whether the buffer has a lag

func (*BufferRead) GetStreamName

func (br *BufferRead) GetStreamName() string

GetStreamName returns the stream name.

func (*BufferRead) IsEmpty

func (br *BufferRead) IsEmpty() bool

IsEmpty returns whether the buffer is empty.

func (*BufferRead) Pending

func (br *BufferRead) Pending(_ context.Context) (int64, error)

func (*BufferRead) Rate

func (br *BufferRead) Rate(_ context.Context) (float64, error)

func (*BufferRead) Read

func (br *BufferRead) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)

Read reads the messages from the stream. During a restart, we need to make sure all the un-acknowledged messages are reprocessed. we need to replace `>` with `0-0` during restarts. We might run into data loss otherwise.

type BufferReadInfo

type BufferReadInfo struct {
	// contains filtered or unexported fields
}

BufferReadInfo will contain the buffer information from the reader point of view.

type BufferWrite

type BufferWrite struct {
	Name   string
	Stream string
	Group  string
	*BufferWriteInfo
	*redisclient.RedisClient
	// contains filtered or unexported fields
}

BufferWrite is the write queue implementation powered by RedisClient.

func (*BufferWrite) Close

func (br *BufferWrite) Close() error

func (*BufferWrite) GetBufferLength

func (bw *BufferWrite) GetBufferLength() int64

GetBufferLength is used to get the bufferLength value

func (*BufferWrite) GetConsumerLag

func (bw *BufferWrite) GetConsumerLag() time.Duration

GetConsumerLag returns the consumerLag of the buffer

func (*BufferWrite) GetGroupName

func (bw *BufferWrite) GetGroupName() string

GetGroupName gets the name of the consumer group.

func (*BufferWrite) GetHashKeyName

func (bw *BufferWrite) GetHashKeyName(startTime time.Time) string

GetHashKeyName gets the hash key name.

func (*BufferWrite) GetMinId

func (bw *BufferWrite) GetMinId() string

GetMinId returns the MINID of the buffer

func (*BufferWrite) GetName

func (bw *BufferWrite) GetName() string

GetName gets the name of the buffer.

func (*BufferWrite) GetPendingCount

func (bw *BufferWrite) GetPendingCount() int64

GetPendingCount is used to get the pendingCount value

func (*BufferWrite) GetRefreshFullError

func (bw *BufferWrite) GetRefreshFullError() uint32

GetRefreshFullError returns the refreshFullError count of the buffer

func (*BufferWrite) GetStreamName

func (bw *BufferWrite) GetStreamName() string

GetStreamName gets the stream name. Stream name is derived from the name.

func (*BufferWrite) HasUnprocessedData

func (bw *BufferWrite) HasUnprocessedData() bool

HasUnprocessedData tells us if we have any unprocessed data left in the buffer

func (*BufferWrite) IsFull

func (bw *BufferWrite) IsFull() bool

IsFull returns whether the buffer is full. It could be approximate.

func (*BufferWrite) Write

func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.Offset, []error)

Write is used to write data to the redis interstep buffer

type BufferWriteInfo

type BufferWriteInfo struct {
	// contains filtered or unexported fields
}

BufferWriteInfo will contain the buffer infoRefreshInterval from the writer point of view.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option to apply different options

func WithBufferUsageLimit

func WithBufferUsageLimit(u float64) Option

WithBufferUsageLimit sets the bufferUsageLimit

func WithCheckBacklog

func WithCheckBacklog(b bool) Option

WithCheckBacklog sets the checkBackLog option

func WithInfoRefreshInterval

func WithInfoRefreshInterval(t time.Duration) Option

WithInfoRefreshInterval sets the refresh interval

func WithLagDuration

func WithLagDuration(t time.Duration) Option

WithLagDuration sets the consumerLag duration

func WithMaxLength

func WithMaxLength(m int64) Option

WithMaxLength sets the maxLength

func WithReadTimeOut

func WithReadTimeOut(t time.Duration) Option

WithReadTimeOut sets the readTimeOut

func WithRefreshBufferWriteInfo

func WithRefreshBufferWriteInfo(r bool) Option

WithRefreshBufferWriteInfo sets the refreshBufferWriteInfo

func WithoutPipelining

func WithoutPipelining() Option

WithoutPipelining turns on redis pipelining

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL