processor

package
v4.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteAll added in v4.1.0

func ExecuteAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll added in v4.1.0

func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.

func ExecuteTryAll added in v4.1.0

func ExecuteTryAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func MarkErr

func MarkErr(part *message.Part, span *tracing.Span, err error)

MarkErr marks a message part as having failed. This includes modifying metadata to contain this error as well as adding the error to a tracing span if the message has one.

Types

type AutoObserved added in v4.14.0

type AutoObserved interface {
	// Process a message into one or more resulting messages, or return an error
	// if one occurred during processing, in which case the message will
	// continue unchanged except for having that error now affiliated with it.
	//
	// If zero messages are returned and the error is nil then the message is
	// filtered.
	Process(ctx context.Context, p *message.Part) ([]*message.Part, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

AutoObserved is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.

type AutoObservedBatched added in v4.14.0

type AutoObservedBatched interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if one occurred during processing, in which case all messages
	// will continue unchanged except for having that error now affiliated with
	// them.
	//
	// In order to associate individual messages with an error please use
	// ctx.OnError instead of msg.ErrorSet. They are similar, but using
	// ctx.OnError ensures observability data is updated as well as the message
	// being affiliated with the error.
	//
	// If zero message batches are returned and the error is nil then all
	// messages are filtered.
	ProcessBatch(ctx *BatchProcContext, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

AutoObservedBatched is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.

type BatchProcContext added in v4.14.0

type BatchProcContext struct {
	// contains filtered or unexported fields
}

BatchProcContext provides methods for triggering observability updates and accessing processor specific spans.

func TestBatchProcContext added in v4.14.0

func TestBatchProcContext(ctx context.Context, spans []*tracing.Span, parts []*message.Part) *BatchProcContext

TestBatchProcContext creates a context for batch processors. It's safe to provide nil spans and parts functions for testing purposes.

func (*BatchProcContext) Context added in v4.14.0

func (b *BatchProcContext) Context() context.Context

Context returns the underlying processor context.Context.

func (*BatchProcContext) OnError added in v4.14.0

func (b *BatchProcContext) OnError(err error, index int, p *message.Part)

OnError should be called when an individual message has encountered an error, this should be used instead of .ErrorSet() as it includes observability updates.

This method can be called with index -1 in order to set generalised observability information without marking specific message errors.

func (*BatchProcContext) Span added in v4.14.0

func (b *BatchProcContext) Span(index int) *tracing.Span

Span returns a span created specifically for the invocation of the processor. This can be used in order to add context to what the processor did.

type BoundsCheckConfig added in v4.1.0

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig added in v4.1.0

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type BranchConfig added in v4.1.0

type BranchConfig struct {
	RequestMap string   `json:"request_map" yaml:"request_map"`
	Processors []Config `json:"processors" yaml:"processors"`
	ResultMap  string   `json:"result_map" yaml:"result_map"`
}

BranchConfig contains configuration fields for the Branch processor.

func NewBranchConfig added in v4.1.0

func NewBranchConfig() BranchConfig

NewBranchConfig returns a BranchConfig with default values.

type CacheConfig added in v4.1.0

type CacheConfig struct {
	Resource string `json:"resource" yaml:"resource"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
	TTL      string `json:"ttl" yaml:"ttl"`
}

CacheConfig contains configuration fields for the Cache processor.

func NewCacheConfig added in v4.1.0

func NewCacheConfig() CacheConfig

NewCacheConfig returns a CacheConfig with default values.

type CompressConfig added in v4.1.0

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig added in v4.1.0

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Config added in v4.1.0

type Config struct {
	Label        string             `json:"label" yaml:"label"`
	Type         string             `json:"type" yaml:"type"`
	Bloblang     string             `json:"bloblang" yaml:"bloblang"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Branch       BranchConfig       `json:"branch" yaml:"branch"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Catch        []Config           `json:"catch" yaml:"catch"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	ForEach      []Config           `json:"for_each" yaml:"for_each"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	GroupBy      GroupByConfig      `json:"group_by" yaml:"group_by"`
	GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JQ           JQConfig           `json:"jq" yaml:"jq"`
	JSONSchema   JSONSchemaConfig   `json:"json_schema" yaml:"json_schema"`
	Log          LogConfig          `json:"log" yaml:"log"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	Noop         struct{}           `json:"noop" yaml:"noop"`
	Plugin       any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Parallel     ParallelConfig     `json:"parallel" yaml:"parallel"`
	ParseLog     ParseLogConfig     `json:"parse_log" yaml:"parse_log"`
	Protobuf     ProtobufConfig     `json:"protobuf" yaml:"protobuf"`
	RateLimit    RateLimitConfig    `json:"rate_limit" yaml:"rate_limit"`
	Resource     string             `json:"resource" yaml:"resource"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Sleep        SleepConfig        `json:"sleep" yaml:"sleep"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse struct{}           `json:"sync_response" yaml:"sync_response"`
	Try          []Config           `json:"try" yaml:"try"`
	While        WhileConfig        `json:"while" yaml:"while"`
	Workflow     WorkflowConfig     `json:"workflow" yaml:"workflow"`
	XML          XMLConfig          `json:"xml" yaml:"xml"`
}

Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func NewConfig added in v4.1.0

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func (*Config) UnmarshalYAML added in v4.1.0

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type DecompressConfig added in v4.1.0

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig added in v4.1.0

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type DedupeConfig added in v4.1.0

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig added in v4.1.0

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type GrokConfig added in v4.1.0

type GrokConfig struct {
	Expressions        []string          `json:"expressions" yaml:"expressions"`
	RemoveEmpty        bool              `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly          bool              `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults        bool              `json:"use_default_patterns" yaml:"use_default_patterns"`
	PatternPaths       []string          `json:"pattern_paths" yaml:"pattern_paths"`
	PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig added in v4.1.0

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type GroupByConfig added in v4.1.0

type GroupByConfig []GroupByElement

GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.

func NewGroupByConfig added in v4.1.0

func NewGroupByConfig() GroupByConfig

NewGroupByConfig returns a GroupByConfig with default values.

type GroupByElement added in v4.1.0

type GroupByElement struct {
	Check      string   `json:"check" yaml:"check"`
	Processors []Config `json:"processors" yaml:"processors"`
}

GroupByElement represents a group determined by a condition and a list of group specific processors.

type GroupByValueConfig added in v4.1.0

type GroupByValueConfig struct {
	Value string `json:"value" yaml:"value"`
}

GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func NewGroupByValueConfig added in v4.1.0

func NewGroupByValueConfig() GroupByValueConfig

NewGroupByValueConfig returns a GroupByValueConfig with default values.

type InsertPartConfig added in v4.1.0

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig added in v4.1.0

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPathConfig added in v4.1.0

type JMESPathConfig struct {
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig added in v4.1.0

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JQConfig added in v4.1.0

type JQConfig struct {
	Query     string `json:"query" yaml:"query"`
	Raw       bool   `json:"raw" yaml:"raw"`
	OutputRaw bool   `json:"output_raw" yaml:"output_raw"`
}

JQConfig contains configuration fields for the JQ processor.

func NewJQConfig added in v4.1.0

func NewJQConfig() JQConfig

NewJQConfig returns a JQConfig with default values.

type JSONSchemaConfig added in v4.1.0

type JSONSchemaConfig struct {
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
	Schema     string `json:"schema" yaml:"schema"`
}

JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.

func NewJSONSchemaConfig added in v4.1.0

func NewJSONSchemaConfig() JSONSchemaConfig

NewJSONSchemaConfig returns a JSONSchemaConfig with default values.

type LogConfig added in v4.1.0

type LogConfig struct {
	Level         string            `json:"level" yaml:"level"`
	Fields        map[string]string `json:"fields" yaml:"fields"`
	FieldsMapping string            `json:"fields_mapping" yaml:"fields_mapping"`
	Message       string            `json:"message" yaml:"message"`
}

LogConfig contains configuration fields for the Log processor.

func NewLogConfig added in v4.1.0

func NewLogConfig() LogConfig

NewLogConfig returns a LogConfig with default values.

type MetricConfig added in v4.1.0

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Name   string            `json:"name" yaml:"name"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig added in v4.1.0

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type ParallelConfig added in v4.1.0

type ParallelConfig struct {
	Cap        int      `json:"cap" yaml:"cap"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ParallelConfig is a config struct containing fields for the Parallel processor.

func NewParallelConfig added in v4.1.0

func NewParallelConfig() ParallelConfig

NewParallelConfig returns a default ParallelConfig.

type ParseLogConfig added in v4.1.0

type ParseLogConfig struct {
	Format       string `json:"format" yaml:"format"`
	Codec        string `json:"codec" yaml:"codec"`
	BestEffort   bool   `json:"best_effort" yaml:"best_effort"`
	WithRFC3339  bool   `json:"allow_rfc3339" yaml:"allow_rfc3339"`
	WithYear     string `json:"default_year" yaml:"default_year"`
	WithTimezone string `json:"default_timezone" yaml:"default_timezone"`
}

ParseLogConfig contains configuration fields for the ParseLog processor.

func NewParseLogConfig added in v4.1.0

func NewParseLogConfig() ParseLogConfig

NewParseLogConfig returns a ParseLogConfig with default values.

type Pipeline

type Pipeline interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// TriggerCloseNow signals that the component should close immediately,
	// messages in flight will be dropped.
	TriggerCloseNow()

	// WaitForClose blocks until the component has closed down or the context is
	// cancelled. Closing occurs either when the input transaction channel is
	// closed and messages are flushed (and acked), or when CloseNowAsync is
	// called.
	WaitForClose(ctx context.Context) error
}

Pipeline is an interface that implements channel based consumer and producer methods for streaming data through a processing pipeline.

type PipelineConstructorFunc

type PipelineConstructorFunc func() (Pipeline, error)

PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.

type ProtobufConfig added in v4.1.0

type ProtobufConfig struct {
	Operator    string   `json:"operator" yaml:"operator"`
	Message     string   `json:"message" yaml:"message"`
	ImportPaths []string `json:"import_paths" yaml:"import_paths"`
}

ProtobufConfig contains configuration fields for the Protobuf processor.

func NewProtobufConfig added in v4.1.0

func NewProtobufConfig() ProtobufConfig

NewProtobufConfig returns a ProtobufConfig with default values.

type RateLimitConfig added in v4.1.0

type RateLimitConfig struct {
	Resource string `json:"resource" yaml:"resource"`
}

RateLimitConfig contains configuration fields for the RateLimit processor.

func NewRateLimitConfig added in v4.1.0

func NewRateLimitConfig() RateLimitConfig

NewRateLimitConfig returns a RateLimitConfig with default values.

type SelectPartsConfig added in v4.1.0

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig added in v4.1.0

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SleepConfig added in v4.1.0

type SleepConfig struct {
	Duration string `json:"duration" yaml:"duration"`
}

SleepConfig contains configuration fields for the Sleep processor.

func NewSleepConfig added in v4.1.0

func NewSleepConfig() SleepConfig

NewSleepConfig returns a SleepConfig with default values.

type SplitConfig added in v4.1.0

type SplitConfig struct {
	Size     int `json:"size" yaml:"size"`
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig added in v4.1.0

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type SubprocessConfig added in v4.1.0

type SubprocessConfig struct {
	Name      string   `json:"name" yaml:"name"`
	Args      []string `json:"args" yaml:"args"`
	MaxBuffer int      `json:"max_buffer" yaml:"max_buffer"`
	CodecSend string   `json:"codec_send" yaml:"codec_send"`
	CodecRecv string   `json:"codec_recv" yaml:"codec_recv"`
}

SubprocessConfig contains configuration fields for the Subprocess processor.

func NewSubprocessConfig added in v4.1.0

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig returns a SubprocessConfig with default values.

type SwitchCaseConfig added in v4.1.0

type SwitchCaseConfig struct {
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
	Fallthrough bool     `json:"fallthrough" yaml:"fallthrough"`
}

SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.

func NewSwitchCaseConfig added in v4.1.0

func NewSwitchCaseConfig() SwitchCaseConfig

NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.

func (*SwitchCaseConfig) UnmarshalJSON added in v4.1.0

func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.

func (*SwitchCaseConfig) UnmarshalYAML added in v4.1.0

func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(any) error) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type SwitchConfig added in v4.1.0

type SwitchConfig []SwitchCaseConfig

SwitchConfig is a config struct containing fields for the Switch processor.

func NewSwitchConfig added in v4.1.0

func NewSwitchConfig() SwitchConfig

NewSwitchConfig returns a default SwitchConfig.

type V1

type V1 interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed, currently the only
	// valid reason for returning an error is if the context was cancelled.
	//
	// If zero messages are returned and the error is nil then all messages are
	// filtered.
	ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V1 is a common interface implemented by processors. The implementation of a V1 processor is responsible for all expected observability and error handling behaviour described within Benthos documentation.

func NewAutoObservedBatchedProcessor added in v4.14.0

func NewAutoObservedBatchedProcessor(typeStr string, p AutoObservedBatched, mgr component.Observability) V1

NewAutoObservedBatchProcessor wraps an AutoObservedBatched processor with an implementation of V1 which handles observability information.

func NewAutoObservedProcessor added in v4.14.0

func NewAutoObservedProcessor(typeStr string, p AutoObserved, mgr component.Observability) V1

NewAutoObservedProcessor wraps an AutoObserved processor with an implementation of V1 which handles observability information.

func Unwrap added in v4.14.0

func Unwrap(p V1) V1

Unwrap attempts to access a wrapped processor from the provided implementation where applicable, otherwise the provided processor is returned. This is necessary when access raw implementations that could have been wrapped in a tracing mechanism (or other).

type WhileConfig added in v4.1.0

type WhileConfig struct {
	AtLeastOnce bool     `json:"at_least_once" yaml:"at_least_once"`
	MaxLoops    int      `json:"max_loops" yaml:"max_loops"`
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
}

WhileConfig is a config struct containing fields for the While processor.

func NewWhileConfig added in v4.1.0

func NewWhileConfig() WhileConfig

NewWhileConfig returns a default WhileConfig.

type WorkflowConfig added in v4.1.0

type WorkflowConfig struct {
	MetaPath        string                  `json:"meta_path" yaml:"meta_path"`
	Order           [][]string              `json:"order" yaml:"order"`
	BranchResources []string                `json:"branch_resources" yaml:"branch_resources"`
	Branches        map[string]BranchConfig `json:"branches" yaml:"branches"`
}

WorkflowConfig is a config struct containing fields for the Workflow processor.

func NewWorkflowConfig added in v4.1.0

func NewWorkflowConfig() WorkflowConfig

NewWorkflowConfig returns a default WorkflowConfig.

type XMLConfig added in v4.1.0

type XMLConfig struct {
	Operator string `json:"operator" yaml:"operator"`
	Cast     bool   `json:"cast" yaml:"cast"`
}

XMLConfig contains configuration fields for the XML processor.

func NewXMLConfig added in v4.1.0

func NewXMLConfig() XMLConfig

NewXMLConfig returns a XMLConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL