Documentation ¶
Overview ¶
Package redis provides a general Redis client and utilities.
Index ¶
- Constants
- func ConvertError(err error) error
- func DeduplicateProtos(ctx context.Context, r redis.Scripter, k string, window time.Duration, ...) (bool, error)
- func EntityRegex(key string) (*regexp.Regexp, error)
- func GenerateLockerID() (string, error)
- func InitMutex(ctx context.Context, r redis.Scripter) error
- func InputTaskKey(k string) string
- func IsConsumerGroupExistsErr(err error) bool
- func Key(ks ...string) string
- func ListKey(k string) string
- func LockKey(k string) string
- func LockMutex(ctx context.Context, r redis.Cmdable, k, id string, expiration time.Duration) error
- func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration time.Duration, ...) error
- func MarshalProto(pb proto.Message) (string, error)
- func NewContextWithPagination(ctx context.Context, limit, page int64, total *int64) context.Context
- func PaginationLimitAndOffsetFromContext(ctx context.Context) (limit, offset int64)
- func RangeRedisHMap(ctx context.Context, r redis.Cmdable, scanKey, match string, count int64, ...) error
- func RangeRedisKeys(ctx context.Context, r redis.Cmdable, match string, count int64, ...) error
- func RangeRedisSet(ctx context.Context, r redis.Cmdable, scanKey, match string, count int64, ...) error
- func RangeRedisZSet(ctx context.Context, r redis.Cmdable, scanKey, match string, count int64, ...) error
- func RangeStreams(ctx context.Context, r redis.Cmdable, group, id string, count int64, ...) error
- func ReadyTaskKey(k string) string
- func SetPaginationTotal(ctx context.Context, total int64)
- func SetProto(ctx context.Context, r redis.Cmdable, k string, pb proto.Message, ...) (*redis.StatusCmd, error)
- func UnlockMutex(ctx context.Context, r redis.Scripter, k, id string, expiration time.Duration) error
- func UnmarshalProto(s string, pb proto.Message) error
- func WaitingTaskKey(k string) string
- type Client
- type Config
- type FailoverConfig
- type FindProtosOption
- type ProtoCmd
- type ProtosCmd
- type ProtosWithKeysCmd
- type TaskQueue
- func (q *TaskQueue) Add(ctx context.Context, r redis.Cmdable, s string, startAt time.Time, ...) error
- func (q *TaskQueue) Close(ctx context.Context) error
- func (q *TaskQueue) Dispatch(ctx context.Context, consumerID string, r redis.Cmdable) error
- func (q *TaskQueue) Init(ctx context.Context) error
- func (q *TaskQueue) Pop(ctx context.Context, consumerID string, r redis.Cmdable, ...) error
- type WatchCmdable
Constants ¶
const DefaultRangeCount = 1024
DefaultRangeCount is the default number of elements to be returned by a SCAN-family operation.
const ( // DefaultStreamBlockLimit is the duration for which stream blocking operations // such as XRead and XReadGroup should block. Note that Redis operations cannot be // asynchronously cancelled using context.WithCancel, so long-polling is required. DefaultStreamBlockLimit time.Duration = 0 )
Variables ¶
This section is empty.
Functions ¶
func ConvertError ¶
ConvertError converts Redis error into errors.Error.
func DeduplicateProtos ¶
func DeduplicateProtos( ctx context.Context, r redis.Scripter, k string, window time.Duration, msgs ...proto.Message, ) (bool, error)
DeduplicateProtos deduplicates protos using key k. It stores a lock at LockKey(k) and the list of collected protos at ListKey(k).
func EntityRegex ¶ added in v3.16.0
EntityRegex returns a regex that must match a given redis key format.
func GenerateLockerID ¶ added in v3.15.2
GenerateLockerID generates a unique locker ID to be used with a Redis mutex.
func InitMutex ¶ added in v3.8.4
InitMutex initializes the mutex scripts at r. InitMutex must be called before mutex functionality is used in a transaction or pipeline.
func InputTaskKey ¶
InputTaskKey returns the subkey of k, where input tasks are stored.
func IsConsumerGroupExistsErr ¶
IsConsumerGroupExistsErr returns true if error represents the redis BUSYGROUP error.
func Key ¶
Key constructs the full key for entity identified by ks by joining ks using the default separator.
func LockMutex ¶ added in v3.8.4
LockMutex locks the value stored at k with a mutex with identifier id. It stores the lock at LockKey(k) and list at ListKey(k).
func LockedWatch ¶ added in v3.8.4
func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration time.Duration, f func(*redis.Tx) error) error
LockedWatch locks the key k with a mutex, watches key k and executes f in a transaction. k is unlocked after f returns.
func MarshalProto ¶
MarshalProto marshals pb into printable string.
func NewContextWithPagination ¶
NewContextWithPagination instructs the store to paginate the results.
func PaginationLimitAndOffsetFromContext ¶
PaginationLimitAndOffsetFromContext returns the pagination limit and the offset if they are present.
func RangeRedisHMap ¶ added in v3.15.2
func RangeRedisKeys ¶ added in v3.15.2
func RangeRedisSet ¶ added in v3.15.2
func RangeRedisZSet ¶ added in v3.15.2
func RangeStreams ¶ added in v3.11.2
func RangeStreams(ctx context.Context, r redis.Cmdable, group, id string, count int64, minIdle time.Duration, f func(string, ...redis.XMessage) error, streams ...string) error
RangeStreams sequentially iterates over all non-acknowledged messages in streams calling f with at most count messages. RangeStreams assumes that within its lifetime it is the only consumer within group group using ID id. RangeStreams iterates over all pending messages, which have been idle for at least minIdle milliseconds first.
func ReadyTaskKey ¶
ReadyTaskKey returns the subkey of k, where ready tasks are stored.
func SetPaginationTotal ¶
SetPaginationTotal sets the total number of results inside the paginated context, if it was not set already.
func SetProto ¶
func SetProto(ctx context.Context, r redis.Cmdable, k string, pb proto.Message, expiration time.Duration) (*redis.StatusCmd, error)
SetProto marshals protocol buffer message represented by pb and stores it under key k in r. Note, that SetProto passes k verbatim to the underlying store and hence, k must represent the full key(including namespace etc.).
func UnlockMutex ¶ added in v3.8.4
func UnlockMutex(ctx context.Context, r redis.Scripter, k, id string, expiration time.Duration) error
UnlockMutex unlocks the key k with identifier id.
func UnmarshalProto ¶
UnmarshalProto unmarshals string returned from MarshalProto into pb.
func WaitingTaskKey ¶
WaitingTaskKey returns the subkey of k, where waiting tasks are stored.
Types ¶
type Client ¶
type Client struct { *redis.Client // contains filtered or unexported fields }
Client represents a Redis store client.
type Config ¶
type Config struct { Address string `name:"address" description:"Address of the Redis server"` Password string `name:"password" description:"Password of the Redis server"` Database int `name:"database" description:"Redis database to use"` RootNamespace []string `name:"namespace" description:"Namespace for Redis keys"` PoolSize int `name:"pool-size" description:"The maximum number of database connections"` IdleTimeout time.Duration `name:"idle-timeout" description:"Idle connection timeout"` Failover FailoverConfig `name:"failover" description:"Redis failover configuration"` TLS struct { Require bool `name:"require" description:"Require TLS"` tlsconfig.Client `name:",squash"` } `name:"tls"` // contains filtered or unexported fields }
Config represents Redis configuration.
func (Config) Equals ¶ added in v3.17.0
Equals checks if the other configuration is equivalent to this.
func (Config) WithNamespace ¶
type FailoverConfig ¶
type FailoverConfig struct { Enable bool `name:"enable" description:"Enable failover using Redis Sentinel"` Addresses []string `name:"addresses" description:"Redis Sentinel server addresses"` MasterName string `name:"master-name" description:"Redis Sentinel master name"` }
FailoverConfig represents Redis failover configuration.
func (FailoverConfig) Equals ¶ added in v3.17.0
func (c FailoverConfig) Equals(other FailoverConfig) bool
Equals checks if the other configuration is equivalent to this.
type FindProtosOption ¶
type FindProtosOption func(redisSort)
FindProtosOption is an option for the FindProtos query.
func FindProtosSorted ¶
func FindProtosSorted(alpha bool) FindProtosOption
FindProtosSorted ensures that entries are sorted. If alpha is true, lexicographical sorting is used, otherwise - numerical.
func FindProtosWithOffsetAndCount ¶
func FindProtosWithOffsetAndCount(offset, count int64) FindProtosOption
FindProtosWithOffsetAndCount changes the offset and the limit of the query.
type ProtoCmd ¶
type ProtoCmd struct {
// contains filtered or unexported fields
}
ProtoCmd is a command, which can unmarshal its result into a protocol buffer.
func FindProto ¶
func FindProto(ctx context.Context, r WatchCmdable, k string, keyCmd func(string) (string, error)) *ProtoCmd
FindProto finds the protocol buffer stored under the key stored under k. The external key is constructed using keyCmd.
type ProtosCmd ¶
type ProtosCmd stringSliceCmd
ProtosCmd is a command, which can unmarshal its result into multiple protocol buffers.
func FindProtos ¶
func FindProtos(ctx context.Context, r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosCmd
FindProtos gets protos stored under keys in k.
func ListProtos ¶
ListProtos gets list of protos stored under key k.
func (ProtosCmd) Range ¶
Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.
type ProtosWithKeysCmd ¶
type ProtosWithKeysCmd stringSliceCmd
ProtosWithKeysCmd is a command, which can unmarshal its result into multiple protocol buffers given a key.
func FindProtosWithKeys ¶
func FindProtosWithKeys(ctx context.Context, r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosWithKeysCmd
FindProtosWithKeys gets protos stored under keys in k including the keys.
func (ProtosWithKeysCmd) Range ¶
Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command given the key. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.
type TaskQueue ¶
type TaskQueue struct { Redis WatchCmdable MaxLen int64 Group string Key string StreamBlockLimit time.Duration // contains filtered or unexported fields }
TaskQueue is a task queue.
func (*TaskQueue) Add ¶
func (q *TaskQueue) Add(ctx context.Context, r redis.Cmdable, s string, startAt time.Time, replace bool) error
Add adds a task s to the queue with a timestamp startAt.
func (*TaskQueue) Dispatch ¶ added in v3.18.2
Dispatch dispatches the tasks of the queue. It will continue to run until the context is done. consumerID is used to identify the consumer and should be unique for all concurrent calls to Dispatch.
func (*TaskQueue) Init ¶
Init initializes the task queue. It must be called at least once before using the queue.
func (*TaskQueue) Pop ¶
func (q *TaskQueue) Pop(ctx context.Context, consumerID string, r redis.Cmdable, f func(redis.Pipeliner, string, time.Time) error) error
Pop calls f on the most recent task in the queue, for which timestamp is in range [0, time.Now()], if such is available, otherwise it blocks until it is or context is done. Pipeline is executed even if f returns an error. consumerID is used to identify the consumer and should be unique for all concurrent calls to Pop.