redis

package
v3.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2020 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package redis provides a general Redis client and utilities.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddTask

func AddTask(r redis.Cmdable, k string, maxLen int64, payload string, startAt time.Time, replace bool) error

AddTask adds a task identified by payload with timestamp startAt to the stream at InputTaskKey(k). maxLen is the approximate length of the stream, to which it may be trimmed.

func ConvertError

func ConvertError(err error) error

ConvertError converts Redis error into errors.Error.

func DeduplicateProtos

func DeduplicateProtos(ctx context.Context, r Scripter, k string, window time.Duration, msgs ...proto.Message) (bool, error)

DeduplicateProtos deduplicates protos using key k. It stores a lock at DeduplicationLockKey(k) and the list of collected protos at DeduplicationListKey(k).

func DeduplicationListKey

func DeduplicationListKey(k string) string

DeduplicationLockKey returns the key deduplication list for k is stored under.

func DeduplicationLockKey

func DeduplicationLockKey(k string) string

DeduplicationLockKey returns the key deduplication lock for k is stored under.

func DispatchTasks

func DispatchTasks(r WatchCmdable, group, id string, maxLen int64, deadline time.Time, ks ...string) (time.Time, error)

DispatchTasks dispatches ready-to-execute tasks from input task streams and waiting task sets to ready task streams. It first attempts to read at most maxLen tasks from streams at input task keys corresponding to ks as a consumer id from group group. It blocks until deadline, if it is not zero, otherwise it blocks forever. It then adds all the tasks read from the stream to the sorted set at corresponding waiting task key and acks them. Note that task payload is used as the key in the sorted set. It then proceeds to add all the tasks from the sorted set, for which execution time is at or before time.Now() to corresponding ready task stream.

func InitTaskGroup

func InitTaskGroup(r redis.Cmdable, group, k string) error

InitTaskGroup initializes the task group for streams at InputTaskKey(k) and ReadyTaskKey(k). It must be called before all other task-related functions at subkeys of k.

func InputTaskKey

func InputTaskKey(k string) string

InputTaskKey returns the subkey of k, where input tasks are stored.

func IsConsumerGroupExistsErr

func IsConsumerGroupExistsErr(err error) bool

IsConsumerGroupExistsErr returns true if error represents the redis BUSYGROUP error.

func Key

func Key(ks ...string) string

Key constructs the full key for entity identified by ks by joining ks using the default separator.

func MarshalProto

func MarshalProto(pb proto.Message) (string, error)

MarshalProto marshals pb into printable string.

func NewContextWithPagination

func NewContextWithPagination(ctx context.Context, limit, page int64, total *int64) context.Context

NewContextWithPagination instructs the store to paginate the results.

func PaginationLimitAndOffsetFromContext

func PaginationLimitAndOffsetFromContext(ctx context.Context) (limit, offset int64)

PaginationLimitAndOffsetFromContext returns the pagination limit and the offset if they are present.

func PopTask

func PopTask(r redis.Cmdable, group, id string, timeout time.Duration, f func(k string, payload string, startAt time.Time) error, ks ...string) error

PopTask calls f on the most recent task in the queue, for which timestamp is in range [0, time.Now()] If timeout value is 0 - PopTask blocks forever If timeout value is negative - PopTask does not block If timeout value is positive - PopTask blocks until either a task is popped or timeout has passed. group is the consumer group name. id is the consumer group ID. ks are the keys to pop from. Tasks are acked if f returns without error.

func ReadyTaskKey

func ReadyTaskKey(k string) string

ReadyTaskKey returns the subkey of k, where ready tasks are stored.

func SetPaginationTotal

func SetPaginationTotal(ctx context.Context, total int64)

SetPaginationTotal sets the total number of results inside the paginated context, if it was not set already.

func SetProto

func SetProto(r redis.Cmdable, k string, pb proto.Message, expiration time.Duration) (*redis.StatusCmd, error)

SetProto marshals protocol buffer message represented by pb and stores it under key k in r. Note, that SetProto passes k verbatim to the underlying store and hence, k must represent the full key(including namespace etc.).

func UnmarshalProto

func UnmarshalProto(s string, pb proto.Message) error

UnmarshalProto unmarshals string returned from MarshalProto into pb.

func WaitingTaskKey

func WaitingTaskKey(k string) string

WaitingTaskKey returns the subkey of k, where waiting tasks are stored.

Types

type Client

type Client struct {
	*redis.Client
	// contains filtered or unexported fields
}

Client represents a Redis store client.

func New

func New(conf *Config) *Client

New returns a new initialized Redis store.

func (*Client) Key

func (cl *Client) Key(ks ...string) string

Key constructs the full key for entity identified by ks by prepending the configured namespace and joining ks using the default separator.

type Config

type Config struct {
	Address       string         `name:"address" description:"Address of the Redis server"`
	Password      string         `name:"password" description:"Password of the Redis server"`
	Database      int            `name:"database" description:"Redis database to use"`
	RootNamespace []string       `name:"namespace" description:"Namespace for Redis keys"`
	PoolSize      int            `name:"pool-size" description:"The maximum number of database connections"`
	Failover      FailoverConfig `name:"failover" description:"Redis failover configuration"`
	// contains filtered or unexported fields
}

Config represents Redis configuration.

func (Config) IsZero

func (c Config) IsZero() bool

IsZero returns whether the Redis configuration is empty.

func (Config) WithNamespace

func (c Config) WithNamespace(namespace ...string) *Config

type FailoverConfig

type FailoverConfig struct {
	Enable     bool     `name:"enable" description:"Enable failover using Redis Sentinel"`
	Addresses  []string `name:"addresses" description:"Redis Sentinel server addresses"`
	MasterName string   `name:"master-name" description:"Redis Sentinel master name"`
}

FailoverConfig represents Redis failover configuration.

type FindProtosOption

type FindProtosOption func(redisSort)

FindProtosOption is an option for the FindProtos query.

func FindProtosSorted

func FindProtosSorted(alpha bool) FindProtosOption

FindProtosSorted ensures that entries are sorted. If alpha is true, lexicographical sorting is used, otherwise - numerical.

func FindProtosWithOffsetAndCount

func FindProtosWithOffsetAndCount(offset, count int64) FindProtosOption

FindProtosWithOffsetAndCount changes the offset and the limit of the query.

type ProtoCmd

type ProtoCmd struct {
	// contains filtered or unexported fields
}

ProtoCmd is a command, which can unmarshal its result into a protocol buffer.

func FindProto

func FindProto(r WatchCmdable, k string, keyCmd func(string) (string, error)) *ProtoCmd

FindProto finds the protocol buffer stored under the key stored under k. The external key is constructed using keyCmd.

func GetProto

func GetProto(r redis.Cmdable, k string) *ProtoCmd

GetProto unmarshals protocol buffer message stored under key k in r into pb. Note, that GetProto passes k verbatim to the underlying store and hence, k must represent the full key(including namespace etc.).

func (ProtoCmd) ScanProto

func (cmd ProtoCmd) ScanProto(pb proto.Message) error

ScanProto scans command result into proto.Message pb.

type ProtosCmd

type ProtosCmd stringSliceCmd

ProtosCmd is a command, which can unmarshal its result into multiple protocol buffers.

func FindProtos

func FindProtos(r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosCmd

FindProtos gets protos stored under keys in k.

func ListProtos

func ListProtos(ctx context.Context, r redis.Cmdable, k string) ProtosCmd

ListProtos gets list of protos stored under key k.

func (ProtosCmd) Range

func (cmd ProtosCmd) Range(f func() (proto.Message, func() (bool, error))) error

Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.

type ProtosWithKeysCmd

type ProtosWithKeysCmd stringSliceCmd

ProtosWithKeysCmd is a command, which can unmarshal its result into multiple protocol buffers given a key.

func FindProtosWithKeys

func FindProtosWithKeys(r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosWithKeysCmd

FindProtosWithKeys gets protos stored under keys in k including the keys.

func (ProtosWithKeysCmd) Range

func (cmd ProtosWithKeysCmd) Range(f func(string) (proto.Message, func() (bool, error))) error

Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command given the key. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.

type Scripter

type Scripter interface {
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(hashes ...string) *redis.BoolSliceCmd
	ScriptLoad(script string) *redis.StringCmd
}

Scripter is redis.scripter.

type TaskQueue

type TaskQueue struct {
	Redis     WatchCmdable
	MaxLen    int64
	Group, ID string
	Key       string
}

TaskQueue is a task queue.

func (*TaskQueue) Add

func (q *TaskQueue) Add(s string, startAt time.Time, replace bool) error

Add adds a task s to the queue with a timestamp startAt.

func (*TaskQueue) Init

func (q *TaskQueue) Init() error

Init initializes the task queue. It must be called at least once before using the queue.

func (*TaskQueue) Pop

func (q *TaskQueue) Pop(ctx context.Context, f func(string, time.Time) error) error

Pop calls f on the most recent task in the queue, for which timestamp is in range [0, time.Now()], if such is available, otherwise it blocks until it is. If ctx.Deadline() is present, Pop will return at or shortly after it.

func (*TaskQueue) Run

func (q *TaskQueue) Run(ctx context.Context) error

Run dispatches tasks until ctx.Deadline() is reached(if present) or read on ctx.Done() succeeds.

type WatchCmdable

type WatchCmdable interface {
	redis.Cmdable
	Watch(fn func(*redis.Tx) error, keys ...string) error
}

WatchCmdable is transactional redis.Cmdable.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL