Documentation ¶
Index ¶
- Constants
- Variables
- func GetRedisStreamName(s string) string
- func IsAlreadyExistError(err error) bool
- func NotFoundError(err error) bool
- type Metrics
- type Option
- func WithBufferFullWritingStrategy(s dfv1.BufferFullWritingStrategy) Option
- func WithBufferUsageLimit(u float64) Option
- func WithCheckBacklog(b bool) Option
- func WithInfoRefreshInterval(t time.Duration) Option
- func WithLagDuration(t time.Duration) Option
- func WithMaxLength(m int64) Option
- func WithReadTimeOut(t time.Duration) Option
- func WithRefreshBufferWriteInfo(r bool) Option
- func WithoutPipelining() Option
- type Options
- type RedisClient
- func (cl *RedisClient) Close()
- func (cl *RedisClient) CreateStreamGroup(ctx context.Context, stream string, group string, start string) error
- func (cl *RedisClient) DeleteKeys(ctx context.Context, keys ...string) error
- func (cl *RedisClient) DeleteStreamGroup(ctx context.Context, stream string, group string) error
- func (cl *RedisClient) IsStreamExists(ctx context.Context, streamKey string) bool
- func (cl *RedisClient) IsStreamGroupExists(ctx context.Context, streamKey string, groupName string) bool
- func (cl *RedisClient) PendingMsgCount(ctx context.Context, streamKey, consumerGroup string) (int64, error)
- func (cl *RedisClient) StreamGroupInfo(ctx context.Context, streamKey string) ([]redis.XInfoGroup, error)
- func (cl *RedisClient) StreamInfo(ctx context.Context, streamKey string) (*redis.XInfoStream, error)
- type RedisStreamsRead
- func (br *RedisStreamsRead) Ack(_ context.Context, offsets []isb.Offset) []error
- func (br *RedisStreamsRead) GetGroupName() string
- func (br *RedisStreamsRead) GetName() string
- func (br *RedisStreamsRead) GetPartitionIdx() int32
- func (br *RedisStreamsRead) GetStreamName() string
- func (br *RedisStreamsRead) NoAck(_ context.Context, _ []isb.Offset)
- func (br *RedisStreamsRead) Pending(_ context.Context) (int64, error)
- func (br *RedisStreamsRead) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)
Constants ¶
const ReadFromEarliest = "0-0"
const ReadFromLatest = "$"
Variables ¶
var RedisContext = context.Background()
RedisContext is used to pass the context specifically for REDIS operations. A cancelled context during SIGTERM or Ctrl-C that is propagated down will throw a context cancelled error because redis uses context to obtain connection from the connection pool. All redis operations will use the below no-op context.Background() to try to process in-flight messages that we have received prior to the cancellation of the context.
Functions ¶
func GetRedisStreamName ¶ added in v0.6.0
func IsAlreadyExistError ¶
func NotFoundError ¶
Types ¶
type Metrics ¶ added in v0.7.3
type Metrics struct { ReadErrorsInc metricsIncrementFunc ReadsAdd metricsAddFunc // number of actual messages read in AcksAdd metricsAddFunc AckErrorsAdd metricsAddFunc }
type Option ¶ added in v0.7.3
type Option interface {
Apply(*Options)
}
Option to apply different options
func WithBufferFullWritingStrategy ¶ added in v0.7.3
func WithBufferFullWritingStrategy(s dfv1.BufferFullWritingStrategy) Option
WithBufferFullWritingStrategy sets the BufferFullWritingStrategy
func WithBufferUsageLimit ¶ added in v0.7.3
WithBufferUsageLimit sets the bufferUsageLimit
func WithCheckBacklog ¶ added in v0.7.3
WithCheckBacklog sets the checkBackLog option
func WithInfoRefreshInterval ¶ added in v0.7.3
WithInfoRefreshInterval sets the refresh interval
func WithLagDuration ¶ added in v0.7.3
WithLagDuration sets the consumerLag duration
func WithMaxLength ¶ added in v0.7.3
WithMaxLength sets the maxLength
func WithReadTimeOut ¶ added in v0.7.3
WithReadTimeOut sets the readTimeOut
func WithRefreshBufferWriteInfo ¶ added in v0.7.3
WithRefreshBufferWriteInfo sets the refreshBufferWriteInfo
func WithoutPipelining ¶ added in v0.7.3
func WithoutPipelining() Option
WithoutPipelining turns on redis pipelining
type Options ¶ added in v0.7.3
type Options struct { // Pipelining enables redis pipeline Pipelining bool // InfoRefreshInterval refreshes the info at this interval InfoRefreshInterval time.Duration // LagDuration is the minimum permissable consumerLag that we consider as a valid consumerLag. LagDuration time.Duration // ReadTimeOut is the timeout needed for read timeout ReadTimeOut time.Duration // CheckBackLog is used to read all the PENDING entries from the stream CheckBackLog bool // MaxLength is the maximum length of the stream before it reaches full MaxLength int64 // BufferUsageLimit is the limit of buffer usage before we declare it as full BufferUsageLimit float64 // RefreshBufferWriteInfo is used to determine if we refresh buffer write info RefreshBufferWriteInfo bool // BufferFullWritingStrategy is the writing strategy when buffer is full BufferFullWritingStrategy dfv1.BufferFullWritingStrategy }
Options for writing to redis
type RedisClient ¶
type RedisClient struct {
Client redis.UniversalClient
}
RedisClient datatype to hold redis client attributes.
func NewInClusterRedisClient ¶
func NewInClusterRedisClient() *RedisClient
NewInClusterRedisClient returns a new Redis Client, it assumes it's in a vertex pod, where those required environment variables are available.
func NewRedisClient ¶
func NewRedisClient(options *redis.UniversalOptions) *RedisClient
NewRedisClient returns a new Redis Client.
func (*RedisClient) Close ¶ added in v1.3.0
func (cl *RedisClient) Close()
func (*RedisClient) CreateStreamGroup ¶
func (cl *RedisClient) CreateStreamGroup(ctx context.Context, stream string, group string, start string) error
CreateStreamGroup creates a redis stream group and creates an empty stream if it does not exist.
func (*RedisClient) DeleteKeys ¶
func (cl *RedisClient) DeleteKeys(ctx context.Context, keys ...string) error
DeleteKeys deletes a redis keys
func (*RedisClient) DeleteStreamGroup ¶
DeleteStreamGroup deletes the redis stream group.
func (*RedisClient) IsStreamExists ¶
func (cl *RedisClient) IsStreamExists(ctx context.Context, streamKey string) bool
IsStreamExists check the redis keys exists
func (*RedisClient) IsStreamGroupExists ¶
func (cl *RedisClient) IsStreamGroupExists(ctx context.Context, streamKey string, groupName string) bool
IsStreamGroupExists check the stream group exists
func (*RedisClient) PendingMsgCount ¶
func (cl *RedisClient) PendingMsgCount(ctx context.Context, streamKey, consumerGroup string) (int64, error)
PendingMsgCount returns how many messages are pending.
func (*RedisClient) StreamGroupInfo ¶
func (cl *RedisClient) StreamGroupInfo(ctx context.Context, streamKey string) ([]redis.XInfoGroup, error)
StreamGroupInfo returns redis stream group info
func (*RedisClient) StreamInfo ¶
func (cl *RedisClient) StreamInfo(ctx context.Context, streamKey string) (*redis.XInfoStream, error)
StreamInfo returns redis stream info
type RedisStreamsRead ¶ added in v0.7.3
type RedisStreamsRead struct { Name string Stream string Group string Consumer string PartitionIdx int32 *RedisClient Options Log *zap.SugaredLogger Metrics Metrics XStreamToMessages func(xstreams []redis.XStream, messages []*isb.ReadMessage, labels map[string]string) ([]*isb.ReadMessage, error) }
RedisStreamsRead is the read queue implementation powered by RedisClient.
func (*RedisStreamsRead) Ack ¶ added in v0.7.3
Ack acknowledges the offset to the read queue. Ack is always pipelined, if you want to avoid it then send array of 1 element.
func (*RedisStreamsRead) GetGroupName ¶ added in v0.7.3
func (br *RedisStreamsRead) GetGroupName() string
GetGroupName gets the name of the consumer group.
func (*RedisStreamsRead) GetName ¶ added in v0.7.3
func (br *RedisStreamsRead) GetName() string
GetName returns the name of the partitioned buffer.
func (*RedisStreamsRead) GetPartitionIdx ¶ added in v0.9.0
func (br *RedisStreamsRead) GetPartitionIdx() int32
GetPartitionIdx returns the partition number.
func (*RedisStreamsRead) GetStreamName ¶ added in v0.7.3
func (br *RedisStreamsRead) GetStreamName() string
GetStreamName returns the stream name.
func (*RedisStreamsRead) NoAck ¶ added in v0.7.3
func (br *RedisStreamsRead) NoAck(_ context.Context, _ []isb.Offset)
func (*RedisStreamsRead) Pending ¶ added in v0.8.1
func (br *RedisStreamsRead) Pending(_ context.Context) (int64, error)
func (*RedisStreamsRead) Read ¶ added in v0.7.3
func (br *RedisStreamsRead) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)
Read reads the messages from the stream. During a restart, we need to make sure all the un-acknowledged messages are reprocessed. we need to replace `>` with `0-0` during restarts. We might run into data loss otherwise.