Documentation ¶
Index ¶
- func NewBufferRead(ctx context.Context, client *redisclient.RedisClient, name string, ...) isb.BufferReader
- func NewBufferWrite(ctx context.Context, client *redisclient.RedisClient, name string, ...) isb.BufferWriter
- func NewRedisOffset(id string, partitionIdx int32) isb.Offset
- type BufferRead
- type BufferReadInfo
- type BufferWrite
- func (br *BufferWrite) Close() error
- func (bw *BufferWrite) GetBufferLength() int64
- func (bw *BufferWrite) GetConsumerLag() time.Duration
- func (bw *BufferWrite) GetGroupName() string
- func (bw *BufferWrite) GetHashKeyName(startTime time.Time) string
- func (bw *BufferWrite) GetMinId() string
- func (bw *BufferWrite) GetName() string
- func (bw *BufferWrite) GetPartitionIdx() int32
- func (bw *BufferWrite) GetPendingCount() int64
- func (bw *BufferWrite) GetRefreshFullError() uint32
- func (bw *BufferWrite) GetStreamName() string
- func (bw *BufferWrite) HasUnprocessedData() bool
- func (bw *BufferWrite) IsFull() bool
- func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.Offset, []error)
- type BufferWriteInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewBufferRead ¶
func NewBufferRead(ctx context.Context, client *redisclient.RedisClient, name string, group string, consumer string, fromPartitionIdx int32, opts ...redisclient.Option) isb.BufferReader
NewBufferRead returns a new redis buffer reader.
func NewBufferWrite ¶
func NewBufferWrite(ctx context.Context, client *redisclient.RedisClient, name string, group string, partitionIdx int32, opts ...redisclient.Option) isb.BufferWriter
NewBufferWrite returns a new redis queue writer.
Types ¶
type BufferRead ¶
type BufferRead struct { *redisclient.RedisStreamsRead *BufferReadInfo }
BufferRead is the read queue implementation powered by RedisClient.
func (*BufferRead) Close ¶
func (br *BufferRead) Close() error
func (*BufferRead) GetLag ¶
func (br *BufferRead) GetLag() time.Duration
GetLag returns the lag of the buffer
func (*BufferRead) GetRefreshEmptyError ¶
func (br *BufferRead) GetRefreshEmptyError() uint32
GetRefreshEmptyError returns whether the buffer has a lag
func (*BufferRead) IsEmpty ¶
func (br *BufferRead) IsEmpty() bool
IsEmpty returns whether the buffer is empty.
type BufferReadInfo ¶
type BufferReadInfo struct {
// contains filtered or unexported fields
}
BufferReadInfo will contain the buffer information from the reader point of view.
type BufferWrite ¶
type BufferWrite struct { Name string PartitionIdx int32 Stream string Group string *BufferWriteInfo *redisclient.RedisClient redisclient.Options // contains filtered or unexported fields }
BufferWrite is the write queue implementation powered by RedisClient.
func (*BufferWrite) Close ¶
func (br *BufferWrite) Close() error
func (*BufferWrite) GetBufferLength ¶
func (bw *BufferWrite) GetBufferLength() int64
GetBufferLength is used to get the bufferLength value
func (*BufferWrite) GetConsumerLag ¶
func (bw *BufferWrite) GetConsumerLag() time.Duration
GetConsumerLag returns the consumerLag of the buffer
func (*BufferWrite) GetGroupName ¶
func (bw *BufferWrite) GetGroupName() string
GetGroupName gets the name of the consumer group.
func (*BufferWrite) GetHashKeyName ¶
func (bw *BufferWrite) GetHashKeyName(startTime time.Time) string
GetHashKeyName gets the hash key name.
func (*BufferWrite) GetMinId ¶
func (bw *BufferWrite) GetMinId() string
GetMinId returns the MINID of the buffer
func (*BufferWrite) GetName ¶
func (bw *BufferWrite) GetName() string
GetName gets the name of the buffer.
func (*BufferWrite) GetPartitionIdx ¶ added in v0.9.0
func (bw *BufferWrite) GetPartitionIdx() int32
func (*BufferWrite) GetPendingCount ¶
func (bw *BufferWrite) GetPendingCount() int64
GetPendingCount is used to get the pendingCount value
func (*BufferWrite) GetRefreshFullError ¶
func (bw *BufferWrite) GetRefreshFullError() uint32
GetRefreshFullError returns the refreshFullError count of the buffer
func (*BufferWrite) GetStreamName ¶
func (bw *BufferWrite) GetStreamName() string
GetStreamName gets the stream name. Stream name is derived from the name.
func (*BufferWrite) HasUnprocessedData ¶
func (bw *BufferWrite) HasUnprocessedData() bool
HasUnprocessedData tells us if we have any unprocessed data left in the buffer
func (*BufferWrite) IsFull ¶
func (bw *BufferWrite) IsFull() bool
IsFull returns whether the buffer is full. It could be approximate.
type BufferWriteInfo ¶
type BufferWriteInfo struct {
// contains filtered or unexported fields
}
BufferWriteInfo will contain the buffer infoRefreshInterval from the writer point of view.