Documentation ¶
Index ¶
- func NewBufferRead(ctx context.Context, client *redisclient.RedisClient, name string, ...) isb.BufferReader
- func NewBufferWrite(ctx context.Context, client *redisclient.RedisClient, name string, ...) isb.BufferWriter
- type BufferRead
- func (br *BufferRead) Ack(_ context.Context, offsets []isb.Offset) []error
- func (br *BufferRead) Close() error
- func (br *BufferRead) GetGroupName() string
- func (br *BufferRead) GetLag() time.Duration
- func (br *BufferRead) GetName() string
- func (br *BufferRead) GetRefreshEmptyError() uint32
- func (br *BufferRead) GetStreamName() string
- func (br *BufferRead) IsEmpty() bool
- func (br *BufferRead) Pending(_ context.Context) (int64, error)
- func (br *BufferRead) Rate(_ context.Context) (float64, error)
- func (br *BufferRead) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)
- type BufferReadInfo
- type BufferWrite
- func (br *BufferWrite) Close() error
- func (bw *BufferWrite) GetBufferLength() int64
- func (bw *BufferWrite) GetConsumerLag() time.Duration
- func (bw *BufferWrite) GetGroupName() string
- func (bw *BufferWrite) GetHashKeyName(startTime time.Time) string
- func (bw *BufferWrite) GetMinId() string
- func (bw *BufferWrite) GetName() string
- func (bw *BufferWrite) GetPendingCount() int64
- func (bw *BufferWrite) GetRefreshFullError() uint32
- func (bw *BufferWrite) GetStreamName() string
- func (bw *BufferWrite) HasUnprocessedData() bool
- func (bw *BufferWrite) IsFull() bool
- func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.Offset, []error)
- type BufferWriteInfo
- type Option
- func WithBufferUsageLimit(u float64) Option
- func WithCheckBacklog(b bool) Option
- func WithInfoRefreshInterval(t time.Duration) Option
- func WithLagDuration(t time.Duration) Option
- func WithMaxLength(m int64) Option
- func WithReadTimeOut(t time.Duration) Option
- func WithRefreshBufferWriteInfo(r bool) Option
- func WithoutPipelining() Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewBufferRead ¶
func NewBufferRead(ctx context.Context, client *redisclient.RedisClient, name string, group string, consumer string, opts ...Option) isb.BufferReader
NewBufferRead returns a new redis buffer reader.
func NewBufferWrite ¶
func NewBufferWrite(ctx context.Context, client *redisclient.RedisClient, name string, group string, opts ...Option) isb.BufferWriter
NewBufferWrite returns a new redis queue writer.
Types ¶
type BufferRead ¶
type BufferRead struct { Name string Stream string Group string Consumer string *BufferReadInfo *redisclient.RedisClient // contains filtered or unexported fields }
BufferRead is the read queue implementation powered by RedisClient.
func (*BufferRead) Ack ¶
Ack acknowledges the offset to the read queue. Ack is always pipelined, if you want to avoid it then send array of 1 element.
func (*BufferRead) Close ¶
func (br *BufferRead) Close() error
func (*BufferRead) GetGroupName ¶
func (br *BufferRead) GetGroupName() string
GetGroupName gets the name of the consumer group.
func (*BufferRead) GetLag ¶
func (br *BufferRead) GetLag() time.Duration
GetLag returns the lag of the buffer
func (*BufferRead) GetName ¶
func (br *BufferRead) GetName() string
GetName returns name for the buffer.
func (*BufferRead) GetRefreshEmptyError ¶
func (br *BufferRead) GetRefreshEmptyError() uint32
GetRefreshEmptyError returns whether the buffer has a lag
func (*BufferRead) GetStreamName ¶
func (br *BufferRead) GetStreamName() string
GetStreamName returns the stream name.
func (*BufferRead) IsEmpty ¶
func (br *BufferRead) IsEmpty() bool
IsEmpty returns whether the buffer is empty.
func (*BufferRead) Read ¶
func (br *BufferRead) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)
Read reads the messages from the stream. During a restart, we need to make sure all the un-acknowledged messages are reprocessed. we need to replace `>` with `0-0` during restarts. We might run into data loss otherwise.
type BufferReadInfo ¶
type BufferReadInfo struct {
// contains filtered or unexported fields
}
BufferReadInfo will contain the buffer information from the reader point of view.
type BufferWrite ¶
type BufferWrite struct { Name string Stream string Group string *BufferWriteInfo *redisclient.RedisClient // contains filtered or unexported fields }
BufferWrite is the write queue implementation powered by RedisClient.
func (*BufferWrite) Close ¶
func (br *BufferWrite) Close() error
func (*BufferWrite) GetBufferLength ¶
func (bw *BufferWrite) GetBufferLength() int64
GetBufferLength is used to get the bufferLength value
func (*BufferWrite) GetConsumerLag ¶
func (bw *BufferWrite) GetConsumerLag() time.Duration
GetConsumerLag returns the consumerLag of the buffer
func (*BufferWrite) GetGroupName ¶
func (bw *BufferWrite) GetGroupName() string
GetGroupName gets the name of the consumer group.
func (*BufferWrite) GetHashKeyName ¶
func (bw *BufferWrite) GetHashKeyName(startTime time.Time) string
GetHashKeyName gets the hash key name.
func (*BufferWrite) GetMinId ¶
func (bw *BufferWrite) GetMinId() string
GetMinId returns the MINID of the buffer
func (*BufferWrite) GetName ¶
func (bw *BufferWrite) GetName() string
GetName gets the name of the buffer.
func (*BufferWrite) GetPendingCount ¶
func (bw *BufferWrite) GetPendingCount() int64
GetPendingCount is used to get the pendingCount value
func (*BufferWrite) GetRefreshFullError ¶
func (bw *BufferWrite) GetRefreshFullError() uint32
GetRefreshFullError returns the refreshFullError count of the buffer
func (*BufferWrite) GetStreamName ¶
func (bw *BufferWrite) GetStreamName() string
GetStreamName gets the stream name. Stream name is derived from the name.
func (*BufferWrite) HasUnprocessedData ¶
func (bw *BufferWrite) HasUnprocessedData() bool
HasUnprocessedData tells us if we have any unprocessed data left in the buffer
func (*BufferWrite) IsFull ¶
func (bw *BufferWrite) IsFull() bool
IsFull returns whether the buffer is full. It could be approximate.
type BufferWriteInfo ¶
type BufferWriteInfo struct {
// contains filtered or unexported fields
}
BufferWriteInfo will contain the buffer infoRefreshInterval from the writer point of view.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option to apply different options
func WithBufferUsageLimit ¶
WithBufferUsageLimit sets the bufferUsageLimit
func WithCheckBacklog ¶
WithCheckBacklog sets the checkBackLog option
func WithInfoRefreshInterval ¶
WithInfoRefreshInterval sets the refresh interval
func WithLagDuration ¶
WithLagDuration sets the consumerLag duration
func WithReadTimeOut ¶
WithReadTimeOut sets the readTimeOut
func WithRefreshBufferWriteInfo ¶
WithRefreshBufferWriteInfo sets the refreshBufferWriteInfo
func WithoutPipelining ¶
func WithoutPipelining() Option
WithoutPipelining turns on redis pipelining