cache

package
v1.0.0-beta.82 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultGroup    = "_tigris_group"
	DefaultConsumer = "_tigris_consumer"
)
View Source
const (
	// ConsumerGroupDefaultCurrentPos is for creating a consumer group that sets the position as current.
	ConsumerGroupDefaultCurrentPos = "$"
)

Variables

View Source
var (
	// ErrStreamAlreadyExists is returned when a stream already exists.
	ErrStreamAlreadyExists = NewCacheError(ErrCodeStreamExists, "stream already exists")
	// ErrStreamNotFound is returned when a stream does not exist.
	ErrStreamNotFound   = NewCacheError(ErrCodeStreamNotFound, "stream not found")
	ErrKeyNotFound      = NewCacheError(ErrCodeKeyNotFound, "key not found")
	ErrKeyAlreadyExists = NewCacheError(ErrCodeKeyAlreadyExists, "key already exists")
	ErrEmptyKey         = NewCacheError(ErrCodeEmptyKey, "key is empty")
)
View Source
var BlockReadGroupDuration = 180 * time.Second

Functions

func IsStreamAlreadyExists

func IsStreamAlreadyExists(err error) bool

func NewCacheError

func NewCacheError(code ErrCode, msg string, args ...any) error

Types

type Cache

type Cache interface {
	Set(ctx context.Context, tableName string, key string, value *internal.CacheData, options *SetOptions) error
	// GetSet is to get the previous value and set the new value
	GetSet(ctx context.Context, tableName string, key string, value *internal.CacheData) (*internal.CacheData, error)
	// Get the value of key
	Get(ctx context.Context, tableName string, key string, options *GetOptions) (*internal.CacheData, error)
	// Delete deletes one or more keys
	Delete(ctx context.Context, tableName string, keys ...string) (int64, error)
	// Exists returns if the key exists, for multiple keys it returns the count of the number of keys that exists
	Exists(ctx context.Context, tableName string, key ...string) (int64, error)
	Keys(ctx context.Context, tableName string, pattern string) ([]string, error)
	Scan(ctx context.Context, tableName string, cursor uint64, count int64, pattern string) ([]string, uint64)

	// CreateStream creates and returns a stream object, throws an error if stream already exists
	CreateStream(ctx context.Context, streamName string) (Stream, error)
	// CreateOrGetStream creates or returns an existing stream
	CreateOrGetStream(ctx context.Context, streamName string) (Stream, error)
	// GetStream only returns a stream if the stream exists in the Cache
	GetStream(ctx context.Context, streamName string) (Stream, error)
	// ListStreams returns the all the streams with the prefix
	ListStreams(ctx context.Context, streamNamePrefix string) ([]string, error)
	// DeleteStream to delete a stream if exists
	DeleteStream(ctx context.Context, streamName string) error
}

func NewCache

func NewCache(cfg *config.CacheConfig) Cache

type ErrCode

type ErrCode byte
const (
	ErrCodeStreamExists     ErrCode = 0x01
	ErrCodeStreamNotFound   ErrCode = 0x02
	ErrCodeKeyNotFound      ErrCode = 0x03
	ErrCodeKeyAlreadyExists ErrCode = 0x04
	ErrCodeEmptyKey         ErrCode = 0x05
)

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (Error) Error

func (se Error) Error() string

type GetOptions

type GetOptions struct {
	// Expiry set the expiration time of the key
	Expiry    time.Duration
	GetDelete bool
}

type ReadGroupPos

type ReadGroupPos string
const (
	// ReadGroupPosStart is to let steam know that reads needs to happen since beginning.
	ReadGroupPosStart ReadGroupPos = "0-0"
	// ReadGroupPosCurrent is to let stream know that reads needs to happen from current.
	ReadGroupPosCurrent ReadGroupPos = ">"
)

type SetOptions

type SetOptions struct {
	// NX is SetIfNotExists i.e. only set the key if it does not already exist.
	NX bool
	// XX is SetIfExists i.e. only set the key if it already exists.
	XX bool
	// EX sets the Expiry time of the key in second
	EX uint64
	// PX sets the Expiry time of the key in millisecond
	PX uint64
}

type Stream

type Stream interface {
	Name() string
	// Add is to add streamData to a stream
	Add(ctx context.Context, value *internal.StreamData) (string, error)
	// Read data from the stream, returns data ID greater than position. To read from current use "$"
	Read(ctx context.Context, pos string) (*StreamMessages, bool, error)
	// ReadGroup is similar to Read but with support for reading from a group. We don't have multiple consumers in a
	// single group. Currently, it creates an internal _tigris_consumer.
	ReadGroup(ctx context.Context, group string, pos ReadGroupPos) (*StreamMessages, bool, error)
	// CreateConsumerGroup creates a consumer group and attach it to the stream. The pos is used to specify the position
	// for this consumer group.
	CreateConsumerGroup(ctx context.Context, group string, pos string) error
	// RemoveConsumerGroup removes consumer group from this stream.
	RemoveConsumerGroup(ctx context.Context, group string) error
	// GetConsumerGroups returns all the consumer groups attached to this stream
	GetConsumerGroups(ctx context.Context) ([]xredis.XInfoGroup, error)
	// GetConsumerGroup returns only information about the consumer group passed in the API.
	GetConsumerGroup(ctx context.Context, group string) (*xredis.XInfoGroup, bool, error)
	// SetID is used to set the position of the group again.
	SetID(ctx context.Context, group string, pos string) error
	// Ack is to acknowledge messages once they are read by the consumer group. This is required to be called in case
	// ReadGroup is used so that messages doesn't end up in pending entries list.
	Ack(ctx context.Context, group string, ids ...string) error
	// Delete is to delete this stream. it removes all the associated consumer group as well.
	Delete(ctx context.Context) error
}

func NewStream

func NewStream(cache *cache, streamName string) Stream

NewStream creates a stream object 'streamName' is the stream name that is already prefixed with tenant and project id already prepended.

type StreamMessages

type StreamMessages struct {
	xredis.XStream
}

func (*StreamMessages) Decode

func (*StreamMessages) Decode(message xredis.XMessage) (*internal.StreamData, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL