processor

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteAll

func ExecuteAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll

func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.

func ExecuteTryAll

func ExecuteTryAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func MarkErr

func MarkErr(part *message.Part, span *tracing.Span, err error)

MarkErr marks a message part as having failed. This includes modifying metadata to contain this error as well as adding the error to a tracing span if the message has one.

Types

type AutoObserved

type AutoObserved interface {
	// Process a message into one or more resulting messages, or return an error
	// if one occurred during processing, in which case the message will
	// continue unchanged except for having that error now affiliated with it.
	//
	// If zero messages are returned and the error is nil then the message is
	// filtered.
	Process(ctx context.Context, p *message.Part) ([]*message.Part, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

AutoObserved is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.

type AutoObservedBatched

type AutoObservedBatched interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if one occurred during processing, in which case all messages
	// will continue unchanged except for having that error now affiliated with
	// them.
	//
	// In order to associate individual messages with an error please use
	// ctx.OnError instead of msg.ErrorSet. They are similar, but using
	// ctx.OnError ensures observability data is updated as well as the message
	// being affiliated with the error.
	//
	// If zero message batches are returned and the error is nil then all
	// messages are filtered.
	ProcessBatch(ctx *BatchProcContext, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

AutoObservedBatched is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.

type BatchProcContext

type BatchProcContext struct {
	// contains filtered or unexported fields
}

BatchProcContext provides methods for triggering observability updates and accessing processor specific spans.

func TestBatchProcContext

func TestBatchProcContext(ctx context.Context, spans []*tracing.Span, parts []*message.Part) *BatchProcContext

TestBatchProcContext creates a context for batch processors. It's safe to provide nil spans and parts functions for testing purposes.

func (*BatchProcContext) Context

func (b *BatchProcContext) Context() context.Context

Context returns the underlying processor context.Context.

func (*BatchProcContext) OnError

func (b *BatchProcContext) OnError(err error, index int, p *message.Part)

OnError should be called when an individual message has encountered an error, this should be used instead of .ErrorSet() as it includes observability updates.

This method can be called with index -1 in order to set generalised observability information without marking specific message errors.

func (*BatchProcContext) Span

func (b *BatchProcContext) Span(index int) *tracing.Span

Span returns a span created specifically for the invocation of the processor. This can be used in order to add context to what the processor did.

type Config

type Config struct {
	Label  string `json:"label" yaml:"label"`
	Type   string `json:"type" yaml:"type"`
	Plugin any    `json:"plugin,omitempty" yaml:"plugin,omitempty"`
}

Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func FromAny

func FromAny(prov docs.Provider, value any) (conf Config, err error)

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

type Pipeline

type Pipeline interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// TriggerCloseNow signals that the component should close immediately,
	// messages in flight will be dropped.
	TriggerCloseNow()

	// WaitForClose blocks until the component has closed down or the context is
	// cancelled. Closing occurs either when the input transaction channel is
	// closed and messages are flushed (and acked), or when CloseNowAsync is
	// called.
	WaitForClose(ctx context.Context) error
}

Pipeline is an interface that implements channel based consumer and producer methods for streaming data through a processing pipeline.

type PipelineConstructorFunc

type PipelineConstructorFunc func() (Pipeline, error)

PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.

type V1

type V1 interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed, currently the only
	// valid reason for returning an error is if the context was cancelled.
	//
	// If zero messages are returned and the error is nil then all messages are
	// filtered.
	ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V1 is a common interface implemented by processors. The implementation of a V1 processor is responsible for all expected observability and error handling behaviour described within Bento documentation.

func NewAutoObservedBatchedProcessor

func NewAutoObservedBatchedProcessor(typeStr string, p AutoObservedBatched, mgr component.Observability) V1

NewAutoObservedBatchedProcessor wraps an AutoObservedBatched processor with an implementation of V1 which handles observability information.

func NewAutoObservedProcessor

func NewAutoObservedProcessor(typeStr string, p AutoObserved, mgr component.Observability) V1

NewAutoObservedProcessor wraps an AutoObserved processor with an implementation of V1 which handles observability information.

func Unwrap

func Unwrap(p V1) V1

Unwrap attempts to access a wrapped processor from the provided implementation where applicable, otherwise the provided processor is returned. This is necessary when access raw implementations that could have been wrapped in a tracing mechanism (or other).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL