cache

package
v0.0.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	REDIS  Type = "REDIS"
	MEMORY Type = "MEMORY"

	LOCK_ACK  LockType = "lock_ack"
	LOCK_NACK LockType = "lock_nack"

	RECOVERY_RUNNING                = "recovery"
	RECOVERY_FINISHED               = "recovery_finished"
	RECOVERY_STORAGE_BREAKPOINT_KEY = "recovery_storage_breakpoint"
	RECOVERY_BREAKPOINT_KEY         = "recovery_breakpoint"
)
View Source
const (
	POOL_PREFIX            = "deckard:queue:"
	PROCESSING_POOL_SUFFIX = ":tmp"
	LOCK_ACK_SUFFIX        = ":" + string(LOCK_ACK)
	LOCK_NACK_SUFFIX       = ":" + string(LOCK_NACK)
	LOCK_ACK_SCORE_SUFFIX  = ":" + string(LOCK_ACK) + ":score"
	LOCK_NACK_SCORE_SUFFIX = ":" + string(LOCK_NACK) + ":score"
)

Variables

View Source
var (
	DefaultTimeoutMs = time.Duration(5 * time.Minute).Milliseconds()
)
View Source
var LOCK_ACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_ACK_SUFFIX + "$")
View Source
var LOCK_NACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_NACK_SUFFIX + "$")
View Source
var PROCESSING_POOL_REGEX = regexp.MustCompile("(.+)" + PROCESSING_POOL_SUFFIX + "$")

Functions

This section is empty.

Types

type Cache

type Cache interface {
	MakeAvailable(ctx context.Context, message *message.Message) (bool, error)
	IsProcessing(ctx context.Context, queue string, id string) (bool, error)
	PullMessages(ctx context.Context, queue string, n int64, minScore *float64, maxScore *float64, ackDeadlineMs int64) (ids []string, err error)
	TimeoutMessages(ctx context.Context, queue string) (ids []string, err error)

	// Locks a message for message.LockMs milliseconds.
	LockMessage(ctx context.Context, message *message.Message, lockType LockType) (bool, error)

	// Unlocks all messages from a queue
	UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error)

	// Lists all queues from a pool using a pattern search. Only glob-style pattern is supported.
	ListQueues(ctx context.Context, pattern string, poolType pool.PoolType) (queues []string, err error)

	// Inserts 1..n elements to cache and return the number of new elements.
	// Elements already in cache should have its score updated.
	Insert(ctx context.Context, queue string, messages ...*message.Message) (insertions []string, err error)
	Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)
	Flush(ctx context.Context)

	// Returns the values of a specified key. Returns an empty string if the key is not set.
	Get(ctx context.Context, key string) (value string, err error)
	// Sets the given key to its respective value. Use empty string to unset cache element.
	Set(ctx context.Context, key string, value string) error

	// Close connection to the cache
	Close(ctx context.Context) error
}

func CreateCache

func CreateCache(ctx context.Context, cacheType Type) (Cache, error)

type LockType

type LockType string

type MemoryCache

type MemoryCache struct {
	// contains filtered or unexported fields
}

MemoryStorage is an implementation of the Storage Interface using memory. It was not made to be performant, but to be used in integration tests and in development. Currently only insert and pull functions are implemented.

func NewMemoryCache

func NewMemoryCache() *MemoryCache

func (*MemoryCache) Close

func (cache *MemoryCache) Close(ctx context.Context) error

func (*MemoryCache) Flush

func (cache *MemoryCache) Flush(_ context.Context)

func (*MemoryCache) Get

func (cache *MemoryCache) Get(_ context.Context, key string) (value string, err error)

func (*MemoryCache) Insert

func (cache *MemoryCache) Insert(ctx context.Context, queue string, messages ...*message.Message) ([]string, error)

func (*MemoryCache) IsProcessing

func (cache *MemoryCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)

func (*MemoryCache) ListQueues

func (cache *MemoryCache) ListQueues(ctx context.Context, pattern string, poolType pool.PoolType) (queues []string, err error)

func (*MemoryCache) LockMessage

func (cache *MemoryCache) LockMessage(_ context.Context, message *message.Message, lockType LockType) (bool, error)

func (*MemoryCache) MakeAvailable

func (cache *MemoryCache) MakeAvailable(_ context.Context, message *message.Message) (bool, error)

func (*MemoryCache) PullMessages

func (cache *MemoryCache) PullMessages(ctx context.Context, queue string, n int64, minScore *float64, maxScore *float64, ackDeadlineMs int64) (ids []string, err error)

func (*MemoryCache) Remove

func (cache *MemoryCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)

func (*MemoryCache) Set

func (cache *MemoryCache) Set(_ context.Context, key string, value string) error

func (*MemoryCache) TimeoutMessages

func (cache *MemoryCache) TimeoutMessages(_ context.Context, queue string) (ids []string, err error)

func (*MemoryCache) UnlockMessages

func (cache *MemoryCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error)

type MemoryMessageEntry

type MemoryMessageEntry struct {
	// contains filtered or unexported fields
}

type RedisCache

type RedisCache struct {
	Client *redis.Client
	// contains filtered or unexported fields
}

func NewRedisCache

func NewRedisCache(ctx context.Context) (*RedisCache, error)

func (*RedisCache) Close

func (cache *RedisCache) Close(ctx context.Context) error

func (*RedisCache) Flush

func (cache *RedisCache) Flush(ctx context.Context)

func (*RedisCache) Get

func (cache *RedisCache) Get(ctx context.Context, key string) (string, error)

func (*RedisCache) Insert

func (cache *RedisCache) Insert(ctx context.Context, queue string, messages ...*message.Message) ([]string, error)

func (*RedisCache) IsProcessing

func (cache *RedisCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)

func (*RedisCache) ListQueues

func (cache *RedisCache) ListQueues(ctx context.Context, pattern string, poolType pool.PoolType) (queues []string, err error)

TODO: This should be optimized. TODO: We should list queues using storage with iterator, and not redis. Rethink this usage

func (*RedisCache) LockMessage

func (cache *RedisCache) LockMessage(ctx context.Context, message *message.Message, lockType LockType) (bool, error)

func (*RedisCache) MakeAvailable

func (cache *RedisCache) MakeAvailable(ctx context.Context, message *message.Message) (bool, error)

func (*RedisCache) PullMessages

func (cache *RedisCache) PullMessages(ctx context.Context, queue string, n int64, minScore *float64, maxScore *float64, ackDeadlineMs int64) (ids []string, err error)

func (*RedisCache) Remove

func (cache *RedisCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)

func (*RedisCache) Set

func (cache *RedisCache) Set(ctx context.Context, key string, value string) error

func (*RedisCache) TimeoutMessages

func (cache *RedisCache) TimeoutMessages(ctx context.Context, queue string) ([]string, error)

func (*RedisCache) UnlockMessages

func (cache *RedisCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) ([]string, error)

type Type

type Type string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL