Documentation ¶
Index ¶
- func ComputeMetrics(ctx context.Context, pool *MessagePool)
- func ProcessLockPool(ctx context.Context, pool *MessagePool)
- func ProcessTimeoutMessages(ctx context.Context, pool *MessagePool) error
- func RecoveryMessagesPool(ctx context.Context, pool *MessagePool) (metrify bool)
- func RemoveExceedingMessages(ctx context.Context, pool *MessagePool) (bool, error)
- func RemoveTTLMessages(ctx context.Context, pool *MessagePool, filterDate *time.Time) (bool, error)
- type DeckardMessagePool
- type MessagePool
- func (pool *MessagePool) Ack(ctx context.Context, message *entities.Message, timestamp time.Time, ...) (bool, error)
- func (pool *MessagePool) AddMessagesToCache(ctx context.Context, messages ...*entities.Message) (int64, error)
- func (pool *MessagePool) AddMessagesToCacheWithAuditReason(ctx context.Context, reason string, messages ...*entities.Message) (int64, error)
- func (pool *MessagePool) AddMessagesToStorage(ctx context.Context, messages ...*entities.Message) (inserted int64, updated int64, err error)
- func (pool *MessagePool) Count(ctx context.Context, opts *storage.FindOptions) (int64, error)
- func (pool *MessagePool) Flush(ctx context.Context) (bool, error)
- func (pool *MessagePool) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]entities.Message, error)
- func (pool *MessagePool) Nack(ctx context.Context, message *entities.Message, timestamp time.Time, ...) (bool, error)
- func (pool *MessagePool) Pull(ctx context.Context, queue string, n int64, scoreFilter int64) (*[]entities.Message, error)
- func (pool *MessagePool) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error)
- func (pool *MessagePool) TimeoutMessages(ctx context.Context, queue string) ([]string, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComputeMetrics ¶
func ComputeMetrics(ctx context.Context, pool *MessagePool)
func ProcessLockPool ¶
func ProcessLockPool(ctx context.Context, pool *MessagePool)
processLockPool moves messages from the lock message pool to the message pool.
func ProcessTimeoutMessages ¶
func ProcessTimeoutMessages(ctx context.Context, pool *MessagePool) error
ProcessTimeoutMessages process messages timeout that have not been acked (or nacked) for more than 5 minutes returning them back to the active queue with the maximum score. TODO allow each queue to have its own deadline for timeout.
func RecoveryMessagesPool ¶
func RecoveryMessagesPool(ctx context.Context, pool *MessagePool) (metrify bool)
RecoveryMessagesPool recover messages pool sending all storage data to cache
func RemoveExceedingMessages ¶
func RemoveExceedingMessages(ctx context.Context, pool *MessagePool) (bool, error)
Checks if there is any queue with max_elements configuration and then remove every exceeding messages using expiry_date to sort which elements will be removed TODO manage message pool update individually by queue to avoid future bottlenecks
func RemoveTTLMessages ¶
Remove 10000 expired elements ordered by expiration date asc for each queue.
Types ¶
type DeckardMessagePool ¶
type DeckardMessagePool interface { AddMessagesToCache(ctx context.Context, messages ...*entities.Message) (int64, error) AddMessagesToStorage(ctx context.Context, messages ...*entities.Message) (inserted int64, updated int64, err error) Nack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error) Ack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error) TimeoutMessages(ctx context.Context, queue string) ([]string, error) Pull(ctx context.Context, queue string, n int64, scoreFilter int64) (*[]entities.Message, error) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error) Count(ctx context.Context, opts *storage.FindOptions) (int64, error) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]entities.Message, error) // Flushes all deckard content from cache and storage. // Used only for memory instance. Flush(ctx context.Context) (bool, error) }
type MessagePool ¶
type MessagePool struct { QueueConfigurationService queue.ConfigurationService // contains filtered or unexported fields }
func NewMessagePool ¶
func NewMessagePool(auditor audit.Auditor, storageImpl storage.Storage, queueService queue.ConfigurationService, cache cache.Cache) *MessagePool
func (*MessagePool) AddMessagesToCache ¶
func (*MessagePool) AddMessagesToCacheWithAuditReason ¶
func (*MessagePool) AddMessagesToStorage ¶
func (*MessagePool) Count ¶
func (pool *MessagePool) Count(ctx context.Context, opts *storage.FindOptions) (int64, error)
func (*MessagePool) GetStorageMessages ¶
func (pool *MessagePool) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]entities.Message, error)