internal

package
v0.0.0-...-450e989 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ProducerConsumerQueue

type ProducerConsumerQueue interface {
	// StartConsumers starts a given number of goroutines consuming items from the queue
	// and passing them into the consumer callback.
	StartConsumers(num int, callback func(item Request))
	// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
	// to the queue due to queue overflow.
	Produce(item Request) bool
	// Size returns the current Size of the queue
	Size() int
	// Stop stops all consumers, as well as the length reporter if started,
	// and releases the items channel. It blocks until all consumers have stopped.
	Stop()
}

ProducerConsumerQueue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue (boundedMemoryQueue) or via a disk-based queue (persistentQueue)

func NewBoundedMemoryQueue

func NewBoundedMemoryQueue(capacity int) ProducerConsumerQueue

NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).

func NewPersistentQueue

func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue

NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage

type Request

type Request interface {
	// Context returns the context.Context of the requests.
	Context() context.Context

	// SetContext updates the context.Context of the requests.
	SetContext(context.Context)

	Export(ctx context.Context) error

	// OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried.
	// Otherwise, it should return the original Request.
	OnError(error) Request

	// Count returns the count of spans/metric points or log records.
	Count() int

	// Marshal serializes the current request into a byte stream
	Marshal() ([]byte, error)

	// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
	OnProcessingFinished()

	// SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue)
	SetOnProcessingFinished(callback func())
}

Request defines capabilities required for persistent storage of a request

type RequestUnmarshaler

type RequestUnmarshaler func([]byte) (Request, error)

RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL