messagepool

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2023 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComputeMetrics

func ComputeMetrics(ctx context.Context, pool *MessagePool)

func ProcessLockPool

func ProcessLockPool(ctx context.Context, pool *MessagePool)

processLockPool moves messages from the lock message pool to the message pool.

func ProcessTimeoutMessages

func ProcessTimeoutMessages(ctx context.Context, pool *MessagePool) error

ProcessTimeoutMessages process messages timeout that have not been acked (or nacked) for more than 5 minutes returning them back to the active queue with the maximum score. TODO: allow each queue to have its own deadline for timeout. TODO: change the behavior of this so it doesn't need to load all queue names in memory, we could use the storage to list queues with a cursor TODO: we could even change the timeout mechanism to be not based on the queue name

func RecoveryMessagesPool

func RecoveryMessagesPool(ctx context.Context, pool *MessagePool) (metrify bool)

RecoveryMessagesPool recover messages pool sending all storage data to cache

func RemoveExceedingMessages

func RemoveExceedingMessages(ctx context.Context, pool *MessagePool) (bool, error)

Checks if there is any queue with max_elements configuration and then remove every exceeding messages using expiry_date to sort which elements will be removed TODO manage message pool update individually by queue to avoid future bottlenecks

func RemoveTTLMessages

func RemoveTTLMessages(ctx context.Context, pool *MessagePool, filterDate *time.Time) (bool, error)

Remove 10000 expired elements ordered by expiration date asc for each queue.

Types

type DeckardMessagePool

type DeckardMessagePool interface {
	AddMessagesToCache(ctx context.Context, messages ...*entities.Message) (int64, error)
	AddMessagesToStorage(ctx context.Context, messages ...*entities.Message) (inserted int64, updated int64, err error)
	Nack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error)
	Ack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error)
	TimeoutMessages(ctx context.Context, queue string) ([]string, error)
	Pull(ctx context.Context, queue string, n int64, scoreFilter int64) (*[]entities.Message, error)
	Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error)
	Count(ctx context.Context, opts *storage.FindOptions) (int64, error)
	GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]entities.Message, error)

	// Flushes all deckard content from cache and storage.
	// Used only for memory instance.
	Flush(ctx context.Context) (bool, error)
}

type MessagePool

type MessagePool struct {
	QueueConfigurationService queue.ConfigurationService
	// contains filtered or unexported fields
}

func NewMessagePool

func NewMessagePool(auditor audit.Auditor, storageImpl storage.Storage, queueService queue.ConfigurationService, cache cache.Cache) *MessagePool

func (*MessagePool) Ack

func (pool *MessagePool) Ack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error)

func (*MessagePool) AddMessagesToCache

func (pool *MessagePool) AddMessagesToCache(ctx context.Context, messages ...*entities.Message) (int64, error)

func (*MessagePool) AddMessagesToCacheWithAuditReason

func (pool *MessagePool) AddMessagesToCacheWithAuditReason(ctx context.Context, reason string, messages ...*entities.Message) (int64, error)

func (*MessagePool) AddMessagesToStorage

func (pool *MessagePool) AddMessagesToStorage(ctx context.Context, messages ...*entities.Message) (inserted int64, updated int64, err error)

func (*MessagePool) Count

func (pool *MessagePool) Count(ctx context.Context, opts *storage.FindOptions) (int64, error)

func (*MessagePool) Flush

func (pool *MessagePool) Flush(ctx context.Context) (bool, error)

func (*MessagePool) GetStorageMessages

func (pool *MessagePool) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]entities.Message, error)

func (*MessagePool) Nack

func (pool *MessagePool) Nack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error)

func (*MessagePool) Pull

func (pool *MessagePool) Pull(ctx context.Context, queue string, n int64, scoreFilter int64) (*[]entities.Message, error)

func (*MessagePool) Remove

func (pool *MessagePool) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error)

func (*MessagePool) TimeoutMessages

func (pool *MessagePool) TimeoutMessages(ctx context.Context, queue string) ([]string, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL