internal

package
v0.90.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full.
	ErrQueueIsFull = errors.New("sending queue is full")
	// ErrQueueIsStopped is the error returned when an item is offered to the Queue and the queue is stopped.
	ErrQueueIsStopped = errors.New("sending queue is stopped")
)

Functions

func NewMockStorageExtension added in v0.84.0

func NewMockStorageExtension(getClientError error) storage.Extension

Types

type Queue added in v0.89.0

type Queue[T any] interface {
	component.Component
	// Offer inserts the specified element into this queue if it is possible to do so immediately
	// without violating capacity restrictions. If success returns no error.
	// It returns ErrQueueIsFull if no space is currently available.
	Offer(ctx context.Context, item T) error
	// Consume applies the provided function on the head of queue.
	// The call blocks until there is an item available or the queue is stopped.
	// The function returns true when an item is consumed or false if the queue is stopped.
	Consume(func(ctx context.Context, item T)) bool
	// Size returns the current Size of the queue
	Size() int
	// Capacity returns the capacity of the queue.
	Capacity() int
}

Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue (boundedMemoryQueue) or via a disk-based queue (persistentQueue)

func NewBoundedMemoryQueue

func NewBoundedMemoryQueue[T any](capacity int) Queue[T]

NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).

func NewPersistentQueue

func NewPersistentQueue[T any](capacity int, dataType component.DataType, storageID component.ID, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T]

NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage

type QueueConsumers added in v0.90.0

type QueueConsumers[T any] struct {
	// contains filtered or unexported fields
}

func NewQueueConsumers added in v0.90.0

func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T)) *QueueConsumers[T]

func (*QueueConsumers[T]) Shutdown added in v0.90.0

func (qc *QueueConsumers[T]) Shutdown(ctx context.Context) error

Shutdown ensures that queue and all consumers are stopped.

func (*QueueConsumers[T]) Start added in v0.90.0

func (qc *QueueConsumers[T]) Start(ctx context.Context, host component.Host) error

Start ensures that queue and all consumers are started.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL