Documentation ¶
Index ¶
- Constants
- Variables
- type Cache
- type LockType
- type MemoryCache
- func (cache *MemoryCache) Close(ctx context.Context) error
- func (cache *MemoryCache) Flush(_ context.Context)
- func (cache *MemoryCache) Get(_ context.Context, key string) (value string, err error)
- func (cache *MemoryCache) Insert(ctx context.Context, queue string, messages ...*entities.Message) ([]string, error)
- func (cache *MemoryCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)
- func (cache *MemoryCache) ListQueues(ctx context.Context, pattern string, poolType entities.PoolType) (queues []string, err error)
- func (cache *MemoryCache) LockMessage(_ context.Context, message *entities.Message, lockType LockType) (bool, error)
- func (cache *MemoryCache) MakeAvailable(_ context.Context, message *entities.Message) (bool, error)
- func (cache *MemoryCache) PullMessages(ctx context.Context, queue string, n int64, scoreFilter int64) (ids []string, err error)
- func (cache *MemoryCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)
- func (cache *MemoryCache) Set(_ context.Context, key string, value string) error
- func (cache *MemoryCache) TimeoutMessages(_ context.Context, queue string, timeout time.Duration) (ids []string, err error)
- func (cache *MemoryCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error)
- type MemoryMessageEntry
- type RedisCache
- func (cache *RedisCache) Close(ctx context.Context) error
- func (cache *RedisCache) Flush(ctx context.Context)
- func (cache *RedisCache) Get(ctx context.Context, key string) (string, error)
- func (cache *RedisCache) Insert(ctx context.Context, queue string, messages ...*entities.Message) ([]string, error)
- func (cache *RedisCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)
- func (cache *RedisCache) ListQueues(ctx context.Context, pattern string, poolType entities.PoolType) (queues []string, err error)
- func (cache *RedisCache) LockMessage(ctx context.Context, message *entities.Message, lockType LockType) (bool, error)
- func (cache *RedisCache) MakeAvailable(ctx context.Context, message *entities.Message) (bool, error)
- func (cache *RedisCache) PullMessages(ctx context.Context, queue string, n int64, scoreFilter int64) (ids []string, err error)
- func (cache *RedisCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)
- func (cache *RedisCache) Set(ctx context.Context, key string, value string) error
- func (cache *RedisCache) TimeoutMessages(ctx context.Context, queue string, timeout time.Duration) ([]string, error)
- func (cache *RedisCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) ([]string, error)
- type Type
Constants ¶
View Source
const ( REDIS Type = "REDIS" MEMORY Type = "MEMORY" LOCK_ACK LockType = "lock_ack" LOCK_NACK LockType = "lock_nack" RECOVERY_RUNNING = "recovery" RECOVERY_FINISHED = "recovery_finished" RECOVERY_STORAGE_BREAKPOINT_KEY = "recovery_storage_breakpoint" RECOVERY_BREAKPOINT_KEY = "recovery_breakpoint" )
View Source
const ( POOL_PREFIX = "deckard:queue:" PROCESSING_POOL_SUFFIX = ":tmp" LOCK_ACK_SUFFIX = ":" + string(LOCK_ACK) LOCK_NACK_SUFFIX = ":" + string(LOCK_NACK) )
View Source
const DefaultCacheTimeout = 5 * time.Minute
Variables ¶
View Source
var LOCK_ACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_ACK_SUFFIX + "$")
View Source
var LOCK_NACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_NACK_SUFFIX + "$")
View Source
var PROCESSING_POOL_REGEX = regexp.MustCompile("(.+)" + PROCESSING_POOL_SUFFIX + "$")
Functions ¶
This section is empty.
Types ¶
type Cache ¶
type Cache interface { MakeAvailable(ctx context.Context, message *entities.Message) (bool, error) IsProcessing(ctx context.Context, queue string, id string) (bool, error) PullMessages(ctx context.Context, queue string, n int64, scoreFilter int64) (ids []string, err error) TimeoutMessages(ctx context.Context, queue string, timeout time.Duration) (ids []string, err error) // Locks a message for message.LockMs milliseconds. LockMessage(ctx context.Context, message *entities.Message, lockType LockType) (bool, error) // Unlocks all messages from a queue UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error) // Lists all queues from a pool using a pattern search. Only glob-style pattern is supported. ListQueues(ctx context.Context, pattern string, poolType entities.PoolType) (queues []string, err error) // Inserts 1..n elements to cache and return the number of new elements. // Elements already in cache should have its score updated. Insert(ctx context.Context, queue string, messages ...*entities.Message) (insertions []string, err error) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error) Flush(ctx context.Context) // Returns the values of a specified key. Returns an empty string if the key is not set. Get(ctx context.Context, key string) (value string, err error) // Sets the given key to its respective value. Use empty string to unset cache element. Set(ctx context.Context, key string, value string) error // Close connection to the cache Close(ctx context.Context) error }
type MemoryCache ¶
type MemoryCache struct {
// contains filtered or unexported fields
}
MemoryStorage is an implementation of the Storage Interface using memory. Currently only insert and pull functions are implemented.
func NewMemoryCache ¶
func NewMemoryCache() *MemoryCache
func (*MemoryCache) Flush ¶
func (cache *MemoryCache) Flush(_ context.Context)
func (*MemoryCache) IsProcessing ¶
func (*MemoryCache) ListQueues ¶
func (*MemoryCache) LockMessage ¶
func (*MemoryCache) MakeAvailable ¶
func (*MemoryCache) PullMessages ¶
func (*MemoryCache) TimeoutMessages ¶
func (*MemoryCache) UnlockMessages ¶
type MemoryMessageEntry ¶
type MemoryMessageEntry struct {
// contains filtered or unexported fields
}
type RedisCache ¶
type RedisCache struct { Client *redis.Client // contains filtered or unexported fields }
func NewRedisCache ¶
func NewRedisCache(ctx context.Context) (*RedisCache, error)
func (*RedisCache) Flush ¶
func (cache *RedisCache) Flush(ctx context.Context)
func (*RedisCache) IsProcessing ¶
func (*RedisCache) ListQueues ¶
func (*RedisCache) LockMessage ¶
func (*RedisCache) MakeAvailable ¶
func (*RedisCache) PullMessages ¶
func (*RedisCache) TimeoutMessages ¶
func (*RedisCache) UnlockMessages ¶
Click to show internal directories.
Click to hide internal directories.