Documentation ¶
Overview ¶
Package redis provides a general Redis client and utilities.
Index ¶
- func ConvertError(err error) error
- func DeduplicateProtos(ctx context.Context, r Scripter, k string, window time.Duration, ...) (bool, error)
- func InitMutex(ctx context.Context, r Scripter) error
- func InputTaskKey(k string) string
- func IsConsumerGroupExistsErr(err error) bool
- func Key(ks ...string) string
- func ListKey(k string) string
- func LockKey(k string) string
- func LockMutex(ctx context.Context, r redis.Cmdable, k, id string, expiration time.Duration) error
- func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration time.Duration, ...) error
- func MarshalProto(pb proto.Message) (string, error)
- func NewContextWithPagination(ctx context.Context, limit, page int64, total *int64) context.Context
- func PaginationLimitAndOffsetFromContext(ctx context.Context) (limit, offset int64)
- func RangeStreams(ctx context.Context, r redis.Cmdable, group, id string, count int64, ...) error
- func ReadyTaskKey(k string) string
- func SetPaginationTotal(ctx context.Context, total int64)
- func SetProto(ctx context.Context, r redis.Cmdable, k string, pb proto.Message, ...) (*redis.StatusCmd, error)
- func UnlockMutex(ctx context.Context, r Scripter, k, id string, expiration time.Duration) error
- func UnmarshalProto(s string, pb proto.Message) error
- func WaitingTaskKey(k string) string
- func XAutoClaim(ctx context.Context, r Scripter, stream, group, id string, ...) ([]redis.XMessage, string, error)
- type Client
- type Config
- type FailoverConfig
- type FindProtosOption
- type InterfaceSliceCmd
- type ProtoCmd
- type ProtosCmd
- type ProtosWithKeysCmd
- type Scripter
- type TaskQueue
- type WatchCmdable
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertError ¶
ConvertError converts Redis error into errors.Error.
func DeduplicateProtos ¶
func DeduplicateProtos(ctx context.Context, r Scripter, k string, window time.Duration, msgs ...proto.Message) (bool, error)
DeduplicateProtos deduplicates protos using key k. It stores a lock at LockKey(k) and the list of collected protos at ListKey(k).
func InitMutex ¶
InitMutex initializes the mutex scripts at r. InitMutex must be called before mutex functionality is used in a transaction or pipeline.
func InputTaskKey ¶
InputTaskKey returns the subkey of k, where input tasks are stored.
func IsConsumerGroupExistsErr ¶
IsConsumerGroupExistsErr returns true if error represents the redis BUSYGROUP error.
func Key ¶
Key constructs the full key for entity identified by ks by joining ks using the default separator.
func LockMutex ¶
LockMutex locks the value stored at k with a mutex with identifier id. It stores the lock at LockKey(k) and list at ListKey(k).
func LockedWatch ¶
func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration time.Duration, f func(*redis.Tx) error) error
LockedWatch locks the key k with a mutex, watches key k and executes f in a transaction. k is unlocked after f returns.
func MarshalProto ¶
MarshalProto marshals pb into printable string.
func NewContextWithPagination ¶
NewContextWithPagination instructs the store to paginate the results.
func PaginationLimitAndOffsetFromContext ¶
PaginationLimitAndOffsetFromContext returns the pagination limit and the offset if they are present.
func RangeStreams ¶
func RangeStreams(ctx context.Context, r redis.Cmdable, group, id string, count int64, minIdle time.Duration, f func(string, ...redis.XMessage) error, streams ...string) error
RangeStreams sequentially iterates over all non-acknowledged messages in streams calling f with at most count messages. RangeStreams assumes that within its lifetime it is the only consumer within group group using ID id. RangeStreams iterates over all pending messages, which have been idle for at least minIdle milliseconds first.
func ReadyTaskKey ¶
ReadyTaskKey returns the subkey of k, where ready tasks are stored.
func SetPaginationTotal ¶
SetPaginationTotal sets the total number of results inside the paginated context, if it was not set already.
func SetProto ¶
func SetProto(ctx context.Context, r redis.Cmdable, k string, pb proto.Message, expiration time.Duration) (*redis.StatusCmd, error)
SetProto marshals protocol buffer message represented by pb and stores it under key k in r. Note, that SetProto passes k verbatim to the underlying store and hence, k must represent the full key(including namespace etc.).
func UnlockMutex ¶
UnlockMutex unlocks the key k with identifier id.
func UnmarshalProto ¶
UnmarshalProto unmarshals string returned from MarshalProto into pb.
func WaitingTaskKey ¶
WaitingTaskKey returns the subkey of k, where waiting tasks are stored.
Types ¶
type Client ¶
type Client struct { *redis.Client // contains filtered or unexported fields }
Client represents a Redis store client.
type Config ¶
type Config struct { Address string `name:"address" description:"Address of the Redis server"` Password string `name:"password" description:"Password of the Redis server"` Database int `name:"database" description:"Redis database to use"` RootNamespace []string `name:"namespace" description:"Namespace for Redis keys"` PoolSize int `name:"pool-size" description:"The maximum number of database connections"` Failover FailoverConfig `name:"failover" description:"Redis failover configuration"` TLS struct { Require bool `name:"require" description:"Require TLS"` tlsconfig.Client `name:",squash"` } `name:"tls"` // contains filtered or unexported fields }
Config represents Redis configuration.
func (Config) WithNamespace ¶
type FailoverConfig ¶
type FailoverConfig struct { Enable bool `name:"enable" description:"Enable failover using Redis Sentinel"` Addresses []string `name:"addresses" description:"Redis Sentinel server addresses"` MasterName string `name:"master-name" description:"Redis Sentinel master name"` }
FailoverConfig represents Redis failover configuration.
type FindProtosOption ¶
type FindProtosOption func(redisSort)
FindProtosOption is an option for the FindProtos query.
func FindProtosSorted ¶
func FindProtosSorted(alpha bool) FindProtosOption
FindProtosSorted ensures that entries are sorted. If alpha is true, lexicographical sorting is used, otherwise - numerical.
func FindProtosWithOffsetAndCount ¶
func FindProtosWithOffsetAndCount(offset, count int64) FindProtosOption
FindProtosWithOffsetAndCount changes the offset and the limit of the query.
type InterfaceSliceCmd ¶
type InterfaceSliceCmd struct {
*redis.Cmd
}
func RunInterfaceSliceScript ¶
func RunInterfaceSliceScript(ctx context.Context, r Scripter, s *redis.Script, keys []string, args ...interface{}) *InterfaceSliceCmd
func (InterfaceSliceCmd) Result ¶
func (cmd InterfaceSliceCmd) Result() ([]interface{}, error)
type ProtoCmd ¶
type ProtoCmd struct {
// contains filtered or unexported fields
}
ProtoCmd is a command, which can unmarshal its result into a protocol buffer.
func FindProto ¶
func FindProto(ctx context.Context, r WatchCmdable, k string, keyCmd func(string) (string, error)) *ProtoCmd
FindProto finds the protocol buffer stored under the key stored under k. The external key is constructed using keyCmd.
type ProtosCmd ¶
type ProtosCmd stringSliceCmd
ProtosCmd is a command, which can unmarshal its result into multiple protocol buffers.
func FindProtos ¶
func FindProtos(ctx context.Context, r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosCmd
FindProtos gets protos stored under keys in k.
func ListProtos ¶
ListProtos gets list of protos stored under key k.
func (ProtosCmd) Range ¶
Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.
type ProtosWithKeysCmd ¶
type ProtosWithKeysCmd stringSliceCmd
ProtosWithKeysCmd is a command, which can unmarshal its result into multiple protocol buffers given a key.
func FindProtosWithKeys ¶
func FindProtosWithKeys(ctx context.Context, r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosWithKeysCmd
FindProtosWithKeys gets protos stored under keys in k including the keys.
func (ProtosWithKeysCmd) Range ¶
Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command given the key. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.
type Scripter ¶
type Scripter interface { Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(ctx context.Context, hashes ...string) *redis.BoolSliceCmd ScriptLoad(ctx context.Context, script string) *redis.StringCmd }
Scripter is redis.scripter.
type TaskQueue ¶
type TaskQueue struct { Redis WatchCmdable MaxLen int64 Group, ID string Key string }
TaskQueue is a task queue.
func (*TaskQueue) Add ¶
func (q *TaskQueue) Add(ctx context.Context, r redis.Cmdable, s string, startAt time.Time, replace bool) error
Add adds a task s to the queue with a timestamp startAt.
func (*TaskQueue) Init ¶
Init initializes the task queue. It must be called at least once before using the queue.
func (*TaskQueue) Pop ¶
func (q *TaskQueue) Pop(ctx context.Context, r redis.Cmdable, f func(redis.Pipeliner, string, time.Time) error) error
Pop calls f on the most recent task in the queue, for which timestamp is in range [0, time.Now()], if such is available, otherwise it blocks until it is or context is done. Pipeline is executed even if f returns an error.