Documentation ¶
Index ¶
- Constants
- Variables
- func GetRedisStreamName(s string) string
- func IsAlreadyExistError(err error) bool
- func NotFoundError(err error) bool
- type RedisClient
- func (cl *RedisClient) CreateStreamGroup(ctx context.Context, stream string, group string, start string) error
- func (cl *RedisClient) DeleteKeys(ctx context.Context, keys ...string) error
- func (cl *RedisClient) DeleteStreamGroup(ctx context.Context, stream string, group string) error
- func (cl *RedisClient) IsStreamExists(ctx context.Context, streamKey string) bool
- func (cl *RedisClient) IsStreamGroupExists(ctx context.Context, streamKey string, groupName string) bool
- func (cl *RedisClient) PendingMsgCount(ctx context.Context, streamKey, consumerGroup string) (int64, error)
- func (cl *RedisClient) StreamGroupInfo(ctx context.Context, streamKey string) ([]redis.XInfoGroup, error)
- func (cl *RedisClient) StreamInfo(ctx context.Context, streamKey string) (*redis.XInfoStream, error)
Constants ¶
const ReadFromEarliest = "0-0"
Variables ¶
var RedisContext = context.Background()
RedisContext is used to pass the context specifically for REDIS operations. A cancelled context during SIGTERM or Ctrl-C that is propagated down will throw a context cancelled error because redis uses context to obtain connection from the connection pool. All redis operations will use the below no-op context.Background() to try to process in-flight messages that we have received prior to the cancellation of the context.
Functions ¶
func GetRedisStreamName ¶ added in v0.6.0
func IsAlreadyExistError ¶
func NotFoundError ¶
Types ¶
type RedisClient ¶
type RedisClient struct {
Client redis.UniversalClient
}
RedisClient datatype to hold redis client attributes.
func NewInClusterRedisClient ¶
func NewInClusterRedisClient() *RedisClient
NewInClusterRedisClient returns a new Redis Client, it assumes it's in a vertex pod, where those required environment variables are available.
func NewRedisClient ¶
func NewRedisClient(options *redis.UniversalOptions) *RedisClient
NewRedisClient returns a new Redis Client.
func (*RedisClient) CreateStreamGroup ¶
func (cl *RedisClient) CreateStreamGroup(ctx context.Context, stream string, group string, start string) error
CreateStreamGroup creates a redis stream group and creates an empty stream if it does not exist.
func (*RedisClient) DeleteKeys ¶
func (cl *RedisClient) DeleteKeys(ctx context.Context, keys ...string) error
DeleteKeys deletes a redis keys
func (*RedisClient) DeleteStreamGroup ¶
DeleteStreamGroup deletes the redis stream group.
func (*RedisClient) IsStreamExists ¶
func (cl *RedisClient) IsStreamExists(ctx context.Context, streamKey string) bool
IsStreamExists check the redis keys exists
func (*RedisClient) IsStreamGroupExists ¶
func (cl *RedisClient) IsStreamGroupExists(ctx context.Context, streamKey string, groupName string) bool
IsStreamGroupExists check the stream group exists
func (*RedisClient) PendingMsgCount ¶
func (cl *RedisClient) PendingMsgCount(ctx context.Context, streamKey, consumerGroup string) (int64, error)
PendingMsgCount returns how many messages are pending.
func (*RedisClient) StreamGroupInfo ¶
func (cl *RedisClient) StreamGroupInfo(ctx context.Context, streamKey string) ([]redis.XInfoGroup, error)
StreamGroupInfo returns redis stream group info
func (*RedisClient) StreamInfo ¶
func (cl *RedisClient) StreamInfo(ctx context.Context, streamKey string) (*redis.XInfoStream, error)
StreamInfo returns redis stream info