Documentation ¶
Overview ¶
Package redis provides a general Redis client and utilities.
Index ¶
- func AddTask(r redis.Cmdable, k string, maxLen int64, payload string, startAt time.Time, ...) error
- func ConvertError(err error) error
- func DeduplicateProtos(ctx context.Context, r Scripter, k string, window time.Duration, ...) (bool, error)
- func DispatchTasks(r WatchCmdable, group, id string, maxLen int64, deadline time.Time, ...) (time.Time, error)
- func InitMutex(r Scripter) error
- func InitTaskGroup(r redis.Cmdable, group, k string) error
- func InputTaskKey(k string) string
- func IsConsumerGroupExistsErr(err error) bool
- func Key(ks ...string) string
- func ListKey(k string) string
- func LockKey(k string) string
- func LockMutex(ctx context.Context, r redis.Cmdable, k, id string, expiration time.Duration) error
- func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration time.Duration, ...) error
- func MarshalProto(pb proto.Message) (string, error)
- func NewContextWithPagination(ctx context.Context, limit, page int64, total *int64) context.Context
- func PaginationLimitAndOffsetFromContext(ctx context.Context) (limit, offset int64)
- func PopTask(r redis.Cmdable, group, id string, timeout time.Duration, ...) error
- func ReadyTaskKey(k string) string
- func SetPaginationTotal(ctx context.Context, total int64)
- func SetProto(r redis.Cmdable, k string, pb proto.Message, expiration time.Duration) (*redis.StatusCmd, error)
- func UnlockMutex(r Scripter, k, id string, expiration time.Duration) error
- func UnmarshalProto(s string, pb proto.Message) error
- func WaitingTaskKey(k string) string
- type Client
- type Config
- type FailoverConfig
- type FindProtosOption
- type ProtoCmd
- type ProtosCmd
- type ProtosWithKeysCmd
- type Scripter
- type TaskQueue
- type WatchCmdable
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddTask ¶
func AddTask(r redis.Cmdable, k string, maxLen int64, payload string, startAt time.Time, replace bool) error
AddTask adds a task identified by payload with timestamp startAt to the stream at InputTaskKey(k). maxLen is the approximate length of the stream, to which it may be trimmed.
func ConvertError ¶
ConvertError converts Redis error into errors.Error.
func DeduplicateProtos ¶
func DeduplicateProtos(ctx context.Context, r Scripter, k string, window time.Duration, msgs ...proto.Message) (bool, error)
DeduplicateProtos deduplicates protos using key k. It stores a lock at LockKey(k) and the list of collected protos at ListKey(k).
func DispatchTasks ¶
func DispatchTasks(r WatchCmdable, group, id string, maxLen int64, deadline time.Time, ks ...string) (time.Time, error)
DispatchTasks dispatches ready-to-execute tasks from input task streams and waiting task sets to ready task streams. It first attempts to read at most maxLen tasks from streams at input task keys corresponding to ks as a consumer id from group group. It blocks until deadline, if it is not zero, otherwise it blocks forever. It then adds all the tasks read from the stream to the sorted set at corresponding waiting task key and acks them. Note that task payload is used as the key in the sorted set. It then proceeds to add all the tasks from the sorted set, for which execution time is at or before time.Now() to corresponding ready task stream.
func InitMutex ¶ added in v3.8.4
InitMutex initializes the mutex scripts at r. InitMutex must be called before mutex functionality is used in a transaction or pipeline.
func InitTaskGroup ¶
InitTaskGroup initializes the task group for streams at InputTaskKey(k) and ReadyTaskKey(k). It must be called before all other task-related functions at subkeys of k.
func InputTaskKey ¶
InputTaskKey returns the subkey of k, where input tasks are stored.
func IsConsumerGroupExistsErr ¶
IsConsumerGroupExistsErr returns true if error represents the redis BUSYGROUP error.
func Key ¶
Key constructs the full key for entity identified by ks by joining ks using the default separator.
func LockMutex ¶ added in v3.8.4
LockMutex locks the value stored at k with a mutex with identifier id. It stores the lock at LockKey(k) and list at ListKey(k).
func LockedWatch ¶ added in v3.8.4
func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration time.Duration, f func(*redis.Tx) error) error
LockedWatch locks the key k with a mutex, watches key k and executes f in a transaction. k is unlocked after f returns.
func MarshalProto ¶
MarshalProto marshals pb into printable string.
func NewContextWithPagination ¶
NewContextWithPagination instructs the store to paginate the results.
func PaginationLimitAndOffsetFromContext ¶
PaginationLimitAndOffsetFromContext returns the pagination limit and the offset if they are present.
func PopTask ¶
func PopTask(r redis.Cmdable, group, id string, timeout time.Duration, f func(k string, payload string, startAt time.Time) error, ks ...string) error
PopTask calls f on the most recent task in the queue, for which timestamp is in range [0, time.Now()] If timeout value is 0 - PopTask blocks forever If timeout value is negative - PopTask does not block If timeout value is positive - PopTask blocks until either a task is popped or timeout has passed. group is the consumer group name. id is the consumer group ID. ks are the keys to pop from. Tasks are acked if f returns without error.
func ReadyTaskKey ¶
ReadyTaskKey returns the subkey of k, where ready tasks are stored.
func SetPaginationTotal ¶
SetPaginationTotal sets the total number of results inside the paginated context, if it was not set already.
func SetProto ¶
func SetProto(r redis.Cmdable, k string, pb proto.Message, expiration time.Duration) (*redis.StatusCmd, error)
SetProto marshals protocol buffer message represented by pb and stores it under key k in r. Note, that SetProto passes k verbatim to the underlying store and hence, k must represent the full key(including namespace etc.).
func UnlockMutex ¶ added in v3.8.4
UnlockMutex unlocks the key k with identifier id.
func UnmarshalProto ¶
UnmarshalProto unmarshals string returned from MarshalProto into pb.
func WaitingTaskKey ¶
WaitingTaskKey returns the subkey of k, where waiting tasks are stored.
Types ¶
type Client ¶
type Client struct { *redis.Client // contains filtered or unexported fields }
Client represents a Redis store client.
type Config ¶
type Config struct { Address string `name:"address" description:"Address of the Redis server"` Password string `name:"password" description:"Password of the Redis server"` Database int `name:"database" description:"Redis database to use"` RootNamespace []string `name:"namespace" description:"Namespace for Redis keys"` PoolSize int `name:"pool-size" description:"The maximum number of database connections"` Failover FailoverConfig `name:"failover" description:"Redis failover configuration"` TLS struct { Require bool `name:"require" description:"Require TLS"` tlsconfig.Client `name:",squash"` } `name:"tls"` // contains filtered or unexported fields }
Config represents Redis configuration.
func (Config) WithNamespace ¶
type FailoverConfig ¶
type FailoverConfig struct { Enable bool `name:"enable" description:"Enable failover using Redis Sentinel"` Addresses []string `name:"addresses" description:"Redis Sentinel server addresses"` MasterName string `name:"master-name" description:"Redis Sentinel master name"` }
FailoverConfig represents Redis failover configuration.
type FindProtosOption ¶
type FindProtosOption func(redisSort)
FindProtosOption is an option for the FindProtos query.
func FindProtosSorted ¶
func FindProtosSorted(alpha bool) FindProtosOption
FindProtosSorted ensures that entries are sorted. If alpha is true, lexicographical sorting is used, otherwise - numerical.
func FindProtosWithOffsetAndCount ¶
func FindProtosWithOffsetAndCount(offset, count int64) FindProtosOption
FindProtosWithOffsetAndCount changes the offset and the limit of the query.
type ProtoCmd ¶
type ProtoCmd struct {
// contains filtered or unexported fields
}
ProtoCmd is a command, which can unmarshal its result into a protocol buffer.
func FindProto ¶
FindProto finds the protocol buffer stored under the key stored under k. The external key is constructed using keyCmd.
type ProtosCmd ¶
type ProtosCmd stringSliceCmd
ProtosCmd is a command, which can unmarshal its result into multiple protocol buffers.
func FindProtos ¶
func FindProtos(r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosCmd
FindProtos gets protos stored under keys in k.
func ListProtos ¶
ListProtos gets list of protos stored under key k.
func (ProtosCmd) Range ¶
Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.
type ProtosWithKeysCmd ¶
type ProtosWithKeysCmd stringSliceCmd
ProtosWithKeysCmd is a command, which can unmarshal its result into multiple protocol buffers given a key.
func FindProtosWithKeys ¶
func FindProtosWithKeys(r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosWithKeysCmd
FindProtosWithKeys gets protos stored under keys in k including the keys.
func (ProtosWithKeysCmd) Range ¶
Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command given the key. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.
type Scripter ¶
type Scripter interface { Eval(script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(hashes ...string) *redis.BoolSliceCmd ScriptLoad(script string) *redis.StringCmd }
Scripter is redis.scripter.
type TaskQueue ¶
type TaskQueue struct { Redis WatchCmdable MaxLen int64 Group, ID string Key string }
TaskQueue is a task queue.
func (*TaskQueue) Init ¶
Init initializes the task queue. It must be called at least once before using the queue.
type WatchCmdable ¶
WatchCmdable is transactional redis.Cmdable.