redis

package
v3.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package redis provides a general Redis client and utilities.

Index

Constants

View Source
const DefaultRangeCount = 1024

DefaultRangeCount is the default number of elements to be returned by a SCAN-family operation.

View Source
const (

	// DefaultStreamBlockLimit is the duration for which stream blocking operations
	// such as XRead and XReadGroup should block. Note that Redis operations cannot be
	// asynchronously cancelled using context.WithCancel, so long-polling is required.
	DefaultStreamBlockLimit time.Duration = 0
)

Variables

This section is empty.

Functions

func ConvertError

func ConvertError(err error) error

ConvertError converts Redis error into errors.Error.

func DeduplicateProtos

func DeduplicateProtos(
	ctx context.Context, r redis.Scripter, k string, window time.Duration, limit int, msgs ...proto.Message,
) (bool, error)

DeduplicateProtos deduplicates protos using key k. It stores a lock at LockKey(k) and the list of collected protos at ListKey(k). If the number of protos exceeds limit, the messages are trimmed from the start of the list.

func EntityRegex added in v3.16.0

func EntityRegex(key string) (*regexp.Regexp, error)

EntityRegex returns a regex that must match a given redis key format.

func GenerateLockerID added in v3.15.2

func GenerateLockerID() (string, error)

GenerateLockerID generates a unique locker ID to be used with a Redis mutex.

func InitMutex added in v3.8.4

func InitMutex(ctx context.Context, r redis.Scripter) error

InitMutex initializes the mutex scripts at r. InitMutex must be called before mutex functionality is used in a transaction or pipeline.

func InputTaskKey

func InputTaskKey(k string) string

InputTaskKey returns the subkey of k, where input tasks are stored.

func IsConsumerGroupExistsErr

func IsConsumerGroupExistsErr(err error) bool

IsConsumerGroupExistsErr returns true if error represents the redis BUSYGROUP error.

func Key

func Key(ks ...string) string

Key constructs the full key for entity identified by ks by joining ks using the default separator.

func ListKey added in v3.8.4

func ListKey(k string) string

ListKey returns the key list for k is stored under.

func LockKey added in v3.8.4

func LockKey(k string) string

LockKey returns the key lock for k is stored under.

func LockMutex added in v3.8.4

func LockMutex(ctx context.Context, r redis.Cmdable, k, id string, expiration time.Duration) error

LockMutex locks the value stored at k with a mutex with identifier id. It stores the lock at LockKey(k) and list at ListKey(k).

func LockedWatch added in v3.8.4

func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration time.Duration, f func(*redis.Tx) error) error

LockedWatch locks the key k with a mutex, watches key k and executes f in a transaction. k is unlocked after f returns.

func MarshalProto

func MarshalProto(pb proto.Message) (string, error)

MarshalProto marshals pb into printable string.

func NewContextWithPagination

func NewContextWithPagination(ctx context.Context, limit, page int64, total *int64) context.Context

NewContextWithPagination instructs the store to paginate the results.

func PaginationLimitAndOffsetFromContext

func PaginationLimitAndOffsetFromContext(ctx context.Context) (limit, offset int64)

PaginationLimitAndOffsetFromContext returns the pagination limit and the offset if they are present.

func RangeRedisHMap added in v3.15.2

func RangeRedisHMap(ctx context.Context, r redis.Cmdable, scanKey, match string, count int64, f func(k string, v string) (bool, error)) error

func RangeRedisKeys added in v3.15.2

func RangeRedisKeys(ctx context.Context, r redis.Cmdable, match string, count int64, f func(k string) (bool, error)) error

func RangeRedisSet added in v3.15.2

func RangeRedisSet(ctx context.Context, r redis.Cmdable, scanKey, match string, count int64, f func(v string) (bool, error)) error

func RangeRedisZSet added in v3.15.2

func RangeRedisZSet(ctx context.Context, r redis.Cmdable, scanKey, match string, count int64, f func(k string, v float64) (bool, error)) error

func RangeStreams added in v3.11.2

func RangeStreams(
	ctx context.Context,
	r redis.Cmdable,
	group, id string,
	count int64,
	minIdle time.Duration,
	f func(string, func(...string) error, ...redis.XMessage) error,
	streams ...string,
) error

RangeStreams sequentially iterates over all non-acknowledged messages in streams calling f with at most count messages. f must acknowledge the messages which have been processed. RangeStreams assumes that within its lifetime it is the only consumer within group group using ID id. RangeStreams iterates over all pending messages, which have been idle for at least minIdle milliseconds first.

func ReadyTaskKey

func ReadyTaskKey(k string) string

ReadyTaskKey returns the subkey of k, where ready tasks are stored.

func SetPaginationTotal

func SetPaginationTotal(ctx context.Context, total int64)

SetPaginationTotal sets the total number of results inside the paginated context, if it was not set already.

func SetProto

func SetProto(ctx context.Context, r redis.Cmdable, k string, pb proto.Message, expiration time.Duration) (*redis.StatusCmd, error)

SetProto marshals protocol buffer message represented by pb and stores it under key k in r. Note, that SetProto passes k verbatim to the underlying store and hence, k must represent the full key(including namespace etc.).

func UnlockMutex added in v3.8.4

func UnlockMutex(ctx context.Context, r redis.Scripter, k, id string, expiration time.Duration) error

UnlockMutex unlocks the key k with identifier id.

func UnmarshalProto

func UnmarshalProto(s string, pb proto.Message) error

UnmarshalProto unmarshals string returned from MarshalProto into pb.

func WaitingTaskKey

func WaitingTaskKey(k string) string

WaitingTaskKey returns the subkey of k, where waiting tasks are stored.

Types

type Client

type Client struct {
	*redis.Client
	// contains filtered or unexported fields
}

Client represents a Redis store client.

func New

func New(conf *Config) *Client

New returns a new initialized Redis store.

func (*Client) Key

func (cl *Client) Key(ks ...string) string

Key constructs the full key for entity identified by ks by prepending the configured namespace and joining ks using the default separator.

type Config

type Config struct {
	Address         string         `name:"address" description:"Address of the Redis server"`
	Password        string         `name:"password" description:"Password of the Redis server"`
	Database        int            `name:"database" description:"Redis database to use"`
	RootNamespace   []string       `name:"namespace" description:"Namespace for Redis keys"`
	PoolSize        int            `name:"pool-size" description:"The maximum number of database connections"`
	IdleTimeout     time.Duration  `name:"idle-timeout" description:"Idle connection timeout"`
	ConnMaxLifetime time.Duration  `name:"conn-max-lifetime" description:"Maximum lifetime of a connection"`
	Failover        FailoverConfig `name:"failover" description:"Redis failover configuration"`
	TLS             struct {
		Require          bool `name:"require" description:"Require TLS"`
		tlsconfig.Client `name:",squash"`
	} `name:"tls"`
	// contains filtered or unexported fields
}

Config represents Redis configuration.

func (Config) Equals added in v3.17.0

func (c Config) Equals(other Config) bool

Equals checks if the other configuration is equivalent to this.

func (Config) IsZero

func (c Config) IsZero() bool

IsZero returns whether the Redis configuration is empty.

func (Config) WithNamespace

func (c Config) WithNamespace(namespace ...string) *Config

type FailoverConfig

type FailoverConfig struct {
	Enable     bool     `name:"enable" description:"Enable failover using Redis Sentinel"`
	Addresses  []string `name:"addresses" description:"Redis Sentinel server addresses"`
	MasterName string   `name:"master-name" description:"Redis Sentinel master name"`
}

FailoverConfig represents Redis failover configuration.

func (FailoverConfig) Equals added in v3.17.0

func (c FailoverConfig) Equals(other FailoverConfig) bool

Equals checks if the other configuration is equivalent to this.

type FindProtosOption

type FindProtosOption func(redisSort)

FindProtosOption is an option for the FindProtos query.

func FindProtosSorted

func FindProtosSorted(alpha bool) FindProtosOption

FindProtosSorted ensures that entries are sorted. If alpha is true, lexicographical sorting is used, otherwise - numerical.

func FindProtosWithOffsetAndCount

func FindProtosWithOffsetAndCount(offset, count int64) FindProtosOption

FindProtosWithOffsetAndCount changes the offset and the limit of the query.

type ProtoCmd

type ProtoCmd struct {
	// contains filtered or unexported fields
}

ProtoCmd is a command, which can unmarshal its result into a protocol buffer.

func FindProto

func FindProto(ctx context.Context, r WatchCmdable, k string, keyCmd func(string) (string, error)) *ProtoCmd

FindProto finds the protocol buffer stored under the key stored under k. The external key is constructed using keyCmd.

func GetProto

func GetProto(ctx context.Context, r redis.Cmdable, k string) *ProtoCmd

GetProto unmarshals protocol buffer message stored under key k in r into pb. Note, that GetProto passes k verbatim to the underlying store and hence, k must represent the full key(including namespace etc.).

func (ProtoCmd) ScanProto

func (cmd ProtoCmd) ScanProto(pb proto.Message) error

ScanProto scans command result into proto.Message pb.

type ProtosCmd

type ProtosCmd stringSliceCmd

ProtosCmd is a command, which can unmarshal its result into multiple protocol buffers.

func FindProtos

func FindProtos(ctx context.Context, r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosCmd

FindProtos gets protos stored under keys in k.

func ListProtos

func ListProtos(ctx context.Context, r redis.Cmdable, k string) ProtosCmd

ListProtos gets list of protos stored under key k.

func (ProtosCmd) Range

func (cmd ProtosCmd) Range(f func() (proto.Message, func() (bool, error))) error

Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.

type ProtosWithKeysCmd

type ProtosWithKeysCmd stringSliceCmd

ProtosWithKeysCmd is a command, which can unmarshal its result into multiple protocol buffers given a key.

func FindProtosWithKeys

func FindProtosWithKeys(ctx context.Context, r redis.Cmdable, k string, keyCmd func(string) string, opts ...FindProtosOption) ProtosWithKeysCmd

FindProtosWithKeys gets protos stored under keys in k including the keys.

func (ProtosWithKeysCmd) Range

func (cmd ProtosWithKeysCmd) Range(f func(string) (proto.Message, func() (bool, error))) error

Range ranges over command result and unmarshals it into a protocol buffer. f must return a new empty proto.Message of the type expected to be present in the command given the key. The function returned by f will be called after the commands result is unmarshaled into the message returned by f. If both the function returned by f and the message are nil, the entry is skipped.

type TaskQueue

type TaskQueue struct {
	Redis            WatchCmdable
	MaxLen           int64
	Group            string
	Key              string
	StreamBlockLimit time.Duration
	// contains filtered or unexported fields
}

TaskQueue is a task queue.

func (*TaskQueue) Add

func (q *TaskQueue) Add(ctx context.Context, r redis.Cmdable, s string, startAt time.Time, replace bool) error

Add adds a task s to the queue with a timestamp startAt.

func (*TaskQueue) Close added in v3.11.0

func (q *TaskQueue) Close(ctx context.Context) error

Close closes the TaskQueue.

func (*TaskQueue) Dispatch added in v3.18.2

func (q *TaskQueue) Dispatch(ctx context.Context, consumerID string, r redis.Cmdable) error

Dispatch dispatches the tasks of the queue. It will continue to run until the context is done. consumerID is used to identify the consumer and should be unique for all concurrent calls to Dispatch.

func (*TaskQueue) Init

func (q *TaskQueue) Init(ctx context.Context) error

Init initializes the task queue. It must be called at least once before using the queue.

func (*TaskQueue) Pop

func (q *TaskQueue) Pop(ctx context.Context, consumerID string, r redis.Cmdable, f func(redis.Pipeliner, string, time.Time) error) error

Pop calls f on the most recent task in the queue, for which timestamp is in range [0, time.Now()], if such is available, otherwise it blocks until it is or context is done. Pipeline is executed even if f returns an error. consumerID is used to identify the consumer and should be unique for all concurrent calls to Pop.

type WatchCmdable

type WatchCmdable interface {
	redis.Cmdable
	Watch(ctx context.Context, fn func(*redis.Tx) error, keys ...string) error
}

WatchCmdable is transactional redis.Cmdable.

Directories

Path Synopsis
Package codec provides a codec, which encodes and decodes protocol buffers stored in Redis to/from JSON.
Package codec provides a codec, which encodes and decodes protocol buffers stored in Redis to/from JSON.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL