queue

package
v3.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2025 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStopped = queue.ErrStopped
View Source
var StartIndex = queue.StartIndex

Functions

This section is empty.

Types

type Config

type Config struct {
	MaxQueuedTasksPerTenant int    `yaml:"max_queued_tasks_per_tenant"`
	StoreTasksOnDisk        bool   `yaml:"store_tasks_on_disk"`
	TasksDiskDirectory      string `yaml:"tasks_disk_directory"`
	CleanTasksDirectory     bool   `yaml:"clean_tasks_directory"`
}

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.

func (*Config) Validate

func (cfg *Config) Validate() error

type Index

type Index = queue.QueueIndex

type Limits

type Limits = queue.Limits

type Metrics

type Metrics = queue.Metrics

func NewMetrics

func NewMetrics(registerer prometheus.Registerer, metricsNamespace string, subsystem string) *Metrics

type Queue

type Queue struct {
	services.Service
	// contains filtered or unexported fields
}

Queue is a wrapper of queue.RequestQueue that uses the file system to store the pending tasks. The queue also allows to store metadata with the task. This metadata can be anything. Metadata is stored in memory. When a task is enqueued (Enqueue), it's stored in the file system and recorded as pending. When it's dequeued (Dequeue), it's removed from the queue but kept in FS until released (Release).

TODO(salvacorts): In the future we may reuse this queue to store any proto message. We would need to use generics for that.

func NewQueue

func NewQueue(
	logger log.Logger,
	cfg Config,
	limits Limits,
	metrics *Metrics,
	storeMetrics storage.ClientMetrics,
) (*Queue, error)

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (*protos.ProtoTask, any, Index, error)

Dequeue takes a task from the queue. The task is not removed from the filesystem until Release is called.

func (*Queue) Enqueue

func (q *Queue) Enqueue(task *protos.ProtoTask, metadata any, successFn func()) error

Enqueue adds a task to the queue. The task is enqueued only if it doesn't already exist in the queue.

func (*Queue) GetConnectedConsumersMetric

func (q *Queue) GetConnectedConsumersMetric() float64

func (*Queue) NotifyConsumerShutdown

func (q *Queue) NotifyConsumerShutdown(consumer string)

func (*Queue) RegisterConsumerConnection

func (q *Queue) RegisterConsumerConnection(consumer string)

func (*Queue) Release

func (q *Queue) Release(task *protos.ProtoTask)

Release removes a task from the filesystem. Dequeue should be called before Remove.

func (*Queue) TotalPending

func (q *Queue) TotalPending() (total int)

func (*Queue) UnregisterConsumerConnection

func (q *Queue) UnregisterConsumerConnection(consumer string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL