internal

package
v0.85.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMockStorageExtension added in v0.84.0

func NewMockStorageExtension(getClientError error) storage.Extension

Types

type ProducerConsumerQueue

type ProducerConsumerQueue interface {
	// Start starts the queue with a given number of goroutines consuming items from the queue
	// and passing them into the consumer callback.
	Start(ctx context.Context, host component.Host, set QueueSettings) error
	// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
	// to the queue due to queue overflow.
	Produce(item Request) bool
	// Size returns the current Size of the queue
	Size() int
	// Stop stops all consumers, as well as the length reporter if started,
	// and releases the items channel. It blocks until all consumers have stopped.
	Stop()
	// Capacity returns the capacity of the queue.
	Capacity() int
	// IsPersistent returns true if the queue is persistent.
	// TODO: Do not expose this method if the interface moves to a public package.
	IsPersistent() bool
}

ProducerConsumerQueue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue (boundedMemoryQueue) or via a disk-based queue (persistentQueue)

func NewBoundedMemoryQueue

func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue

NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).

func NewPersistentQueue

func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler,
	unmarshaler RequestUnmarshaler) ProducerConsumerQueue

NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage

type QueueSettings added in v0.85.0

type QueueSettings struct {
	exporter.CreateSettings
	DataType component.DataType
	Callback func(item Request)
}

type Request

type Request interface {
	// Context returns the context.Context of the requests.
	Context() context.Context

	// SetContext updates the context.Context of the requests.
	SetContext(context.Context)

	Export(ctx context.Context) error

	// OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried.
	// Otherwise, it should return the original Request.
	OnError(error) Request

	// Count returns the count of spans/metric points or log records.
	Count() int

	// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
	OnProcessingFinished()

	// SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue)
	SetOnProcessingFinished(callback func())
}

Request defines capabilities required for persistent storage of a request

type RequestMarshaler added in v0.84.0

type RequestMarshaler func(Request) ([]byte, error)

RequestMarshaler defines a function which takes a request and marshals it into a byte slice

type RequestUnmarshaler

type RequestUnmarshaler func([]byte) (Request, error)

RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL