Documentation ¶
Index ¶
- func ExecuteAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func ExecuteTryAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func MarkErr(part *message.Part, span *tracing.Span, err error)
- type AutoObserved
- type AutoObservedBatched
- type BatchProcContext
- type Config
- type Pipeline
- type PipelineConstructorFunc
- type V1
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExecuteAll ¶
ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func ExecuteCatchAll ¶
func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.
func ExecuteTryAll ¶
ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
Types ¶
type AutoObserved ¶
type AutoObserved interface { // Process a message into one or more resulting messages, or return an error // if one occurred during processing, in which case the message will // continue unchanged except for having that error now affiliated with it. // // If zero messages are returned and the error is nil then the message is // filtered. Process(ctx context.Context, p *message.Part) ([]*message.Part, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
AutoObserved is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.
type AutoObservedBatched ¶
type AutoObservedBatched interface { // Process a batch of messages into one or more resulting batches, or return // an error if one occurred during processing, in which case all messages // will continue unchanged except for having that error now affiliated with // them. // // In order to associate individual messages with an error please use // ctx.OnError instead of msg.ErrorSet. They are similar, but using // ctx.OnError ensures observability data is updated as well as the message // being affiliated with the error. // // If zero message batches are returned and the error is nil then all // messages are filtered. ProcessBatch(ctx *BatchProcContext, b message.Batch) ([]message.Batch, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
AutoObservedBatched is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.
type BatchProcContext ¶
type BatchProcContext struct {
// contains filtered or unexported fields
}
BatchProcContext provides methods for triggering observability updates and accessing processor specific spans.
func TestBatchProcContext ¶
func TestBatchProcContext(ctx context.Context, spans []*tracing.Span, parts []*message.Part) *BatchProcContext
TestBatchProcContext creates a context for batch processors. It's safe to provide nil spans and parts functions for testing purposes.
func (*BatchProcContext) Context ¶
func (b *BatchProcContext) Context() context.Context
Context returns the underlying processor context.Context.
func (*BatchProcContext) OnError ¶
func (b *BatchProcContext) OnError(err error, index int, p *message.Part)
OnError should be called when an individual message has encountered an error, this should be used instead of .ErrorSet() as it includes observability updates.
This method can be called with index -1 in order to set generalised observability information without marking specific message errors.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` }
Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type Pipeline ¶
type Pipeline interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // TriggerCloseNow signals that the component should close immediately, // messages in flight will be dropped. TriggerCloseNow() // WaitForClose blocks until the component has closed down or the context is // cancelled. Closing occurs either when the input transaction channel is // closed and messages are flushed (and acked), or when CloseNowAsync is // called. WaitForClose(ctx context.Context) error }
Pipeline is an interface that implements channel based consumer and producer methods for streaming data through a processing pipeline.
type PipelineConstructorFunc ¶
PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.
type V1 ¶
type V1 interface { // Process a batch of messages into one or more resulting batches, or return // an error if the entire batch could not be processed, currently the only // valid reason for returning an error is if the context was cancelled. // // If zero messages are returned and the error is nil then all messages are // filtered. ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
V1 is a common interface implemented by processors. The implementation of a V1 processor is responsible for all expected observability and error handling behaviour described within Benthos documentation.
func NewAutoObservedBatchedProcessor ¶
func NewAutoObservedBatchedProcessor(typeStr string, p AutoObservedBatched, mgr component.Observability) V1
NewAutoObservedBatchedProcessor wraps an AutoObservedBatched processor with an implementation of V1 which handles observability information.
func NewAutoObservedProcessor ¶
func NewAutoObservedProcessor(typeStr string, p AutoObserved, mgr component.Observability) V1
NewAutoObservedProcessor wraps an AutoObserved processor with an implementation of V1 which handles observability information.