Documentation ¶
Index ¶
- Constants
- Variables
- type Cache
- type LockType
- type MemoryCache
- func (cache *MemoryCache) Close(ctx context.Context) error
- func (cache *MemoryCache) Flush(_ context.Context)
- func (cache *MemoryCache) Get(_ context.Context, key string) (value string, err error)
- func (cache *MemoryCache) Insert(ctx context.Context, queue string, messages ...*message.Message) ([]string, error)
- func (cache *MemoryCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)
- func (cache *MemoryCache) ListQueues(ctx context.Context, pattern string, poolType pool.PoolType) (queues []string, err error)
- func (cache *MemoryCache) LockMessage(_ context.Context, message *message.Message, lockType LockType) (bool, error)
- func (cache *MemoryCache) MakeAvailable(_ context.Context, message *message.Message) (bool, error)
- func (cache *MemoryCache) PullMessages(ctx context.Context, queue string, n int64, minScore *float64, ...) (ids []string, err error)
- func (cache *MemoryCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)
- func (cache *MemoryCache) Set(_ context.Context, key string, value string) error
- func (cache *MemoryCache) TimeoutMessages(_ context.Context, queue string, timeout time.Duration) (ids []string, err error)
- func (cache *MemoryCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error)
- type MemoryMessageEntry
- type RedisCache
- func (cache *RedisCache) Close(ctx context.Context) error
- func (cache *RedisCache) Flush(ctx context.Context)
- func (cache *RedisCache) Get(ctx context.Context, key string) (string, error)
- func (cache *RedisCache) Insert(ctx context.Context, queue string, messages ...*message.Message) ([]string, error)
- func (cache *RedisCache) IsProcessing(ctx context.Context, queue string, id string) (bool, error)
- func (cache *RedisCache) ListQueues(ctx context.Context, pattern string, poolType pool.PoolType) (queues []string, err error)
- func (cache *RedisCache) LockMessage(ctx context.Context, message *message.Message, lockType LockType) (bool, error)
- func (cache *RedisCache) MakeAvailable(ctx context.Context, message *message.Message) (bool, error)
- func (cache *RedisCache) PullMessages(ctx context.Context, queue string, n int64, minScore *float64, ...) (ids []string, err error)
- func (cache *RedisCache) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error)
- func (cache *RedisCache) Set(ctx context.Context, key string, value string) error
- func (cache *RedisCache) TimeoutMessages(ctx context.Context, queue string, timeout time.Duration) ([]string, error)
- func (cache *RedisCache) UnlockMessages(ctx context.Context, queue string, lockType LockType) ([]string, error)
- type Type
Constants ¶
View Source
const ( REDIS Type = "REDIS" MEMORY Type = "MEMORY" LOCK_ACK LockType = "lock_ack" LOCK_NACK LockType = "lock_nack" RECOVERY_RUNNING = "recovery" RECOVERY_FINISHED = "recovery_finished" RECOVERY_STORAGE_BREAKPOINT_KEY = "recovery_storage_breakpoint" RECOVERY_BREAKPOINT_KEY = "recovery_breakpoint" )
View Source
const ( POOL_PREFIX = "deckard:queue:" PROCESSING_POOL_SUFFIX = ":tmp" LOCK_ACK_SUFFIX = ":" + string(LOCK_ACK) LOCK_NACK_SUFFIX = ":" + string(LOCK_NACK) LOCK_ACK_SCORE_SUFFIX = ":" + string(LOCK_ACK) + ":score" LOCK_NACK_SCORE_SUFFIX = ":" + string(LOCK_NACK) + ":score" )
View Source
const DefaultCacheTimeout = 5 * time.Minute
Variables ¶
View Source
var LOCK_ACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_ACK_SUFFIX + "$")
View Source
var LOCK_NACK_POOL_REGEX = regexp.MustCompile("(.+)" + LOCK_NACK_SUFFIX + "$")
View Source
var PROCESSING_POOL_REGEX = regexp.MustCompile("(.+)" + PROCESSING_POOL_SUFFIX + "$")
Functions ¶
This section is empty.
Types ¶
type Cache ¶
type Cache interface { MakeAvailable(ctx context.Context, message *message.Message) (bool, error) IsProcessing(ctx context.Context, queue string, id string) (bool, error) PullMessages(ctx context.Context, queue string, n int64, minScore *float64, maxScore *float64) (ids []string, err error) TimeoutMessages(ctx context.Context, queue string, timeout time.Duration) (ids []string, err error) // Locks a message for message.LockMs milliseconds. LockMessage(ctx context.Context, message *message.Message, lockType LockType) (bool, error) // Unlocks all messages from a queue UnlockMessages(ctx context.Context, queue string, lockType LockType) (messages []string, err error) // Lists all queues from a pool using a pattern search. Only glob-style pattern is supported. ListQueues(ctx context.Context, pattern string, poolType pool.PoolType) (queues []string, err error) // Inserts 1..n elements to cache and return the number of new elements. // Elements already in cache should have its score updated. Insert(ctx context.Context, queue string, messages ...*message.Message) (insertions []string, err error) Remove(ctx context.Context, queue string, ids ...string) (removed int64, err error) Flush(ctx context.Context) // Returns the values of a specified key. Returns an empty string if the key is not set. Get(ctx context.Context, key string) (value string, err error) // Sets the given key to its respective value. Use empty string to unset cache element. Set(ctx context.Context, key string, value string) error // Close connection to the cache Close(ctx context.Context) error }
type MemoryCache ¶
type MemoryCache struct {
// contains filtered or unexported fields
}
MemoryStorage is an implementation of the Storage Interface using memory. It was not made to be performant, but to be used in integration tests and in development. Currently only insert and pull functions are implemented.
func NewMemoryCache ¶
func NewMemoryCache() *MemoryCache
func (*MemoryCache) Flush ¶
func (cache *MemoryCache) Flush(_ context.Context)
func (*MemoryCache) IsProcessing ¶
func (*MemoryCache) ListQueues ¶
func (*MemoryCache) LockMessage ¶
func (*MemoryCache) MakeAvailable ¶
func (*MemoryCache) PullMessages ¶
func (*MemoryCache) TimeoutMessages ¶
func (*MemoryCache) UnlockMessages ¶
type MemoryMessageEntry ¶
type MemoryMessageEntry struct {
// contains filtered or unexported fields
}
type RedisCache ¶
type RedisCache struct { Client *redis.Client // contains filtered or unexported fields }
func NewRedisCache ¶
func NewRedisCache(ctx context.Context) (*RedisCache, error)
func (*RedisCache) Flush ¶
func (cache *RedisCache) Flush(ctx context.Context)
func (*RedisCache) IsProcessing ¶
func (*RedisCache) ListQueues ¶
func (cache *RedisCache) ListQueues(ctx context.Context, pattern string, poolType pool.PoolType) (queues []string, err error)
TODO: This should be optimized. TODO: We should list queues using storage with iterator, and not redis. Rethink this usage
func (*RedisCache) LockMessage ¶
func (*RedisCache) MakeAvailable ¶
func (*RedisCache) PullMessages ¶
func (*RedisCache) TimeoutMessages ¶
func (*RedisCache) UnlockMessages ¶
Click to show internal directories.
Click to hide internal directories.