Documentation
¶
Index ¶
- Variables
- type Config
- type Index
- type Limits
- type Metrics
- type Queue
- func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (*protos.ProtoTask, any, Index, error)
- func (q *Queue) Enqueue(task *protos.ProtoTask, metadata any, successFn func()) error
- func (q *Queue) GetConnectedConsumersMetric() float64
- func (q *Queue) NotifyConsumerShutdown(consumer string)
- func (q *Queue) RegisterConsumerConnection(consumer string)
- func (q *Queue) Release(task *protos.ProtoTask)
- func (q *Queue) TotalPending() (total int)
- func (q *Queue) UnregisterConsumerConnection(consumer string)
Constants ¶
This section is empty.
Variables ¶
var ErrStopped = queue.ErrStopped
var StartIndex = queue.StartIndex
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"` StoreTasksOnDisk bool `yaml:"store_tasks_on_disk"` TasksDiskDirectory string `yaml:"tasks_disk_directory"` CleanTasksDirectory bool `yaml:"clean_tasks_directory"` }
func (*Config) RegisterFlagsWithPrefix ¶
RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
type Index ¶
type Index = queue.QueueIndex
type Metrics ¶
func NewMetrics ¶
func NewMetrics(registerer prometheus.Registerer, metricsNamespace string, subsystem string) *Metrics
type Queue ¶
Queue is a wrapper of queue.RequestQueue that uses the file system to store the pending tasks. The queue also allows to store metadata with the task. This metadata can be anything. Metadata is stored in memory. When a task is enqueued (Enqueue), it's stored in the file system and recorded as pending. When it's dequeued (Dequeue), it's removed from the queue but kept in FS until released (Release).
TODO(salvacorts): In the future we may reuse this queue to store any proto message. We would need to use generics for that.
func (*Queue) Dequeue ¶
func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (*protos.ProtoTask, any, Index, error)
Dequeue takes a task from the queue. The task is not removed from the filesystem until Release is called.
func (*Queue) Enqueue ¶
Enqueue adds a task to the queue. The task is enqueued only if it doesn't already exist in the queue.
func (*Queue) GetConnectedConsumersMetric ¶
func (*Queue) NotifyConsumerShutdown ¶
func (*Queue) RegisterConsumerConnection ¶
func (*Queue) Release ¶
Release removes a task from the filesystem. Dequeue should be called before Remove.