cache

package
v0.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	REDIS  Type = "REDIS"
	MEMORY Type = "MEMORY"

	LOCK_ACK  LockType = "lock_ack"
	LOCK_NACK LockType = "lock_nack"

	RECOVERY_RUNNING                = "recovery"
	RECOVERY_FINISHED               = "recovery_finished"
	RECOVERY_STORAGE_BREAKPOINT_KEY = "recovery_storage_breakpoint"
	RECOVERY_BREAKPOINT_KEY         = "recovery_breakpoint"
)
View Source
const (
	POOL_PREFIX            = "deckard:queue:"
	PROCESSING_POOL_SUFFIX = ":tmp"
	LOCK_ACK_SUFFIX        = ":" + string(LOCK_ACK)
	LOCK_NACK_SUFFIX       = ":" + string(LOCK_NACK)
)
View Source
const DefaultCacheTimeout = 5 * time.Minute

Variables

View Source
var LOCK_ACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_ACK_SUFFIX + "$")
View Source
var LOCK_NACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_NACK_SUFFIX + "$")
View Source
var PROCESSING_POOL_REGEX = regexp.MustCompile("(.+)" + PROCESSING_POOL_SUFFIX + "$")

Functions

This section is empty.

Types

type Cache

type Cache interface {
	MakeAvailable(ctx context.Context, message *entities.Message) (bool, error)
	IsProcessing(ctx context.Context, queue string, id string) (bool, error)
	PullMessages(ctx context.Context, queue string, n int64, scoreFilter int64) (ids []string, err error)
	TimeoutMessages(ctx context.Context, queue string, timeout time.Duration) (ids []string, err error)

	// Locks a message for message.LockMs milliseconds.
	LockMessage(ctx context.Context, message *entities.Message, lockType LockType) (bool, error)

	// Unlocks all messages from a queue
	UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error)

	// Lists all queues from a pool using a pattern search. Only glob-style pattern is supported.
	ListQueues(ctx context.Context, pattern string, poolType entities.PoolType) (queues []string, err error)

	// Inserts 1..n elements to cache and return the number of new elements.
	// Elements already in cache should have its score updated.
	Insert(ctx context.Context, queue string, messages ...*entities.Message) (insertions []string, err error)
	Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)
	Flush(ctx context.Context)

	// Returns the values of a specified key. Returns an empty string if the key is not set.
	Get(ctx context.Context, key string) (value string, err error)
	// Sets the given key to its respective value. Use empty string to unset cache element.
	Set(ctx context.Context, key string, value string) error

	// Close connection to the cache
	Close(ctx context.Context) error
}

func CreateCache

func CreateCache(ctx context.Context, cacheType Type) (Cache, error)

type LockType

type LockType string

type MemoryCache

type MemoryCache struct {
	// contains filtered or unexported fields
}

MemoryStorage is an implementation of the Storage Interface using memory. Currently only insert and pull functions are implemented.

func NewMemoryCache

func NewMemoryCache() *MemoryCache

func (*MemoryCache) Close

func (cache *MemoryCache) Close(ctx context.Context) error

func (*MemoryCache) Flush

func (cache *MemoryCache) Flush(_ context.Context)

func (*MemoryCache) Get

func (cache *MemoryCache) Get(_ context.Context, key string) (value string, err error)

func (*MemoryCache) Insert

func (cache *MemoryCache) Insert(ctx context.Context, queue string, messages ...*entities.Message) ([]string, error)

func (*MemoryCache) IsProcessing

func (cache *MemoryCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)

func (*MemoryCache) ListQueues

func (cache *MemoryCache) ListQueues(ctx context.Context, pattern string, poolType entities.PoolType) (queues []string, err error)

func (*MemoryCache) LockMessage

func (cache *MemoryCache) LockMessage(_ context.Context, message *entities.Message, lockType LockType) (bool, error)

func (*MemoryCache) MakeAvailable

func (cache *MemoryCache) MakeAvailable(_ context.Context, message *entities.Message) (bool, error)

func (*MemoryCache) PullMessages

func (cache *MemoryCache) PullMessages(ctx context.Context, queue string, n int64, scoreFilter int64) (ids []string, err error)

func (*MemoryCache) Remove

func (cache *MemoryCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)

func (*MemoryCache) Set

func (cache *MemoryCache) Set(_ context.Context, key string, value string) error

func (*MemoryCache) TimeoutMessages

func (cache *MemoryCache) TimeoutMessages(_ context.Context, queue string, timeout time.Duration) (ids []string, err error)

func (*MemoryCache) UnlockMessages

func (cache *MemoryCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error)

type MemoryMessageEntry

type MemoryMessageEntry struct {
	// contains filtered or unexported fields
}

type RedisCache

type RedisCache struct {
	Client *redis.Client
	// contains filtered or unexported fields
}

func NewRedisCache

func NewRedisCache(ctx context.Context) (*RedisCache, error)

func (*RedisCache) Close

func (cache *RedisCache) Close(ctx context.Context) error

func (*RedisCache) Flush

func (cache *RedisCache) Flush(ctx context.Context)

func (*RedisCache) Get

func (cache *RedisCache) Get(ctx context.Context, key string) (string, error)

func (*RedisCache) Insert

func (cache *RedisCache) Insert(ctx context.Context, queue string, messages ...*entities.Message) ([]string, error)

func (*RedisCache) IsProcessing

func (cache *RedisCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)

func (*RedisCache) ListQueues

func (cache *RedisCache) ListQueues(ctx context.Context, pattern string, poolType entities.PoolType) (queues []string, err error)

func (*RedisCache) LockMessage

func (cache *RedisCache) LockMessage(ctx context.Context, message *entities.Message, lockType LockType) (bool, error)

func (*RedisCache) MakeAvailable

func (cache *RedisCache) MakeAvailable(ctx context.Context, message *entities.Message) (bool, error)

func (*RedisCache) PullMessages

func (cache *RedisCache) PullMessages(ctx context.Context, queue string, n int64, scoreFilter int64) (ids []string, err error)

func (*RedisCache) Remove

func (cache *RedisCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)

func (*RedisCache) Set

func (cache *RedisCache) Set(ctx context.Context, key string, value string) error

func (*RedisCache) TimeoutMessages

func (cache *RedisCache) TimeoutMessages(ctx context.Context, queue string, timeout time.Duration) ([]string, error)

func (*RedisCache) UnlockMessages

func (cache *RedisCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) ([]string, error)

type Type

type Type string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL