Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ProducerConsumerQueue ¶
type ProducerConsumerQueue interface { // StartConsumers starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. StartConsumers(num int, callback func(item Request)) // Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added // to the queue due to queue overflow. Produce(item Request) bool // Size returns the current Size of the queue Size() int // Stop stops all consumers, as well as the length reporter if started, // and releases the items channel. It blocks until all consumers have stopped. Stop() }
ProducerConsumerQueue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue (boundedMemoryQueue) or via a disk-based queue (persistentQueue)
func NewBoundedMemoryQueue ¶
func NewBoundedMemoryQueue(capacity int) ProducerConsumerQueue
NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).
func NewPersistentQueue ¶
func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue
NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
type Request ¶
type Request interface { // Context returns the context.Context of the requests. Context() context.Context // SetContext updates the context.Context of the requests. SetContext(context.Context) Export(ctx context.Context) error // OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried. // Otherwise, it should return the original Request. OnError(error) Request // Count returns the count of spans/metric points or log records. Count() int // Marshal serializes the current request into a byte stream Marshal() ([]byte, error) // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished OnProcessingFinished() // SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue) SetOnProcessingFinished(callback func()) }
Request defines capabilities required for persistent storage of a request
type RequestUnmarshaler ¶
RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request