Documentation ¶
Index ¶
- func ComputeMetrics(ctx context.Context, pool *Queue)
- func ProcessLockPool(ctx context.Context, queue *Queue)
- func ProcessTimeoutMessages(ctx context.Context, queue *Queue) error
- func RecoveryMessagesPool(ctx context.Context, pool *Queue) (metrify bool)
- func RemoveExceedingMessages(ctx context.Context, pool *Queue) (bool, error)
- func RemoveTTLMessages(ctx context.Context, pool *Queue, filterDate *time.Time) (bool, error)
- type DeckardQueue
- type DefaultQueueConfigurationService
- type Queue
- func (pool *Queue) Ack(ctx context.Context, msg *message.Message, reason string) (bool, error)
- func (pool *Queue) AddMessagesToCache(ctx context.Context, messages ...*message.Message) (int64, error)
- func (pool *Queue) AddMessagesToCacheWithAuditReason(ctx context.Context, reason string, messages ...*message.Message) (int64, error)
- func (pool *Queue) AddMessagesToStorage(ctx context.Context, messages ...*message.Message) (inserted int64, updated int64, err error)
- func (pool *Queue) Count(ctx context.Context, opts *storage.FindOptions) (int64, error)
- func (pool *Queue) Flush(ctx context.Context) (bool, error)
- func (pool *Queue) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]message.Message, error)
- func (pool *Queue) Nack(ctx context.Context, msg *message.Message, timestamp time.Time, reason string) (bool, error)
- func (pool *Queue) Pull(ctx context.Context, queue string, n int64, minScore *float64, ...) (*[]message.Message, error)
- func (pool *Queue) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error)
- func (pool *Queue) TimeoutMessages(ctx context.Context, queue string) ([]string, error)
- type QueueConfigurationService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComputeMetrics ¶
func ProcessLockPool ¶
processLockPool moves messages from the lock message pool to the message pool.
func ProcessTimeoutMessages ¶
ProcessTimeoutMessages process messages timeout that have not been acked (or nacked) for more than 5 minutes returning them back to the active queue with the maximum score. TODO: allow each queue to have its own deadline for timeout. TODO: change the behavior of this so it doesn't need to load all queue names in memory, we could use the storage to list queues with a cursor TODO: we could even change the timeout mechanism to be not based on the queue name
func RecoveryMessagesPool ¶
RecoveryMessagesPool recover messages pool sending all storage data to cache
func RemoveExceedingMessages ¶
Checks if there is any queue with max_elements configuration and then remove every exceeding messages using expiry_date to sort which elements will be removed TODO manage message pool update individually by queue to avoid future bottlenecks
Types ¶
type DeckardQueue ¶
type DeckardQueue interface { AddMessagesToCache(ctx context.Context, messages ...*message.Message) (int64, error) AddMessagesToStorage(ctx context.Context, messages ...*message.Message) (inserted int64, updated int64, err error) Nack(ctx context.Context, message *message.Message, timestamp time.Time, reason string) (bool, error) Ack(ctx context.Context, message *message.Message, reason string) (bool, error) TimeoutMessages(ctx context.Context, queue string) ([]string, error) Pull(ctx context.Context, queue string, n int64, minScore *float64, maxScore *float64, ackDeadlineMs int64) (*[]message.Message, error) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error) Count(ctx context.Context, opts *storage.FindOptions) (int64, error) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]message.Message, error) // Flushes all deckard content from cache and storage. // Used only for memory instance. Flush(ctx context.Context) (bool, error) }
type DefaultQueueConfigurationService ¶
type DefaultQueueConfigurationService struct {
// contains filtered or unexported fields
}
func NewQueueConfigurationService ¶
func NewQueueConfigurationService(_ context.Context, storage storage.Storage) *DefaultQueueConfigurationService
func (*DefaultQueueConfigurationService) EditQueueConfiguration ¶
func (queueService *DefaultQueueConfigurationService) EditQueueConfiguration(ctx context.Context, cfg *configuration.QueueConfiguration) error
func (*DefaultQueueConfigurationService) GetQueueConfiguration ¶
func (queueService *DefaultQueueConfigurationService) GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error)
type Queue ¶
type Queue struct { QueueConfigurationService QueueConfigurationService // contains filtered or unexported fields }
func (*Queue) AddMessagesToCache ¶
func (*Queue) AddMessagesToCacheWithAuditReason ¶
func (*Queue) AddMessagesToStorage ¶
func (*Queue) GetStorageMessages ¶
type QueueConfigurationService ¶
type QueueConfigurationService interface { EditQueueConfiguration(ctx context.Context, configuration *configuration.QueueConfiguration) error GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error) }