Documentation ¶
Index ¶
- func ExecuteAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func ExecuteTryAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func MarkErr(part *message.Part, span *tracing.Span, err error)
- type AutoObserved
- type AutoObservedBatched
- type BatchProcContext
- type BoundsCheckConfig
- type BranchConfig
- type CacheConfig
- type CompressConfig
- type Config
- type DecompressConfig
- type DedupeConfig
- type GrokConfig
- type GroupByConfig
- type GroupByElement
- type GroupByValueConfig
- type InsertPartConfig
- type JMESPathConfig
- type JQConfig
- type JSONSchemaConfig
- type LogConfig
- type MetricConfig
- type ParallelConfig
- type ParseLogConfig
- type Pipeline
- type PipelineConstructorFunc
- type ProtobufConfig
- type RateLimitConfig
- type SelectPartsConfig
- type SleepConfig
- type SplitConfig
- type SubprocessConfig
- type SwitchCaseConfig
- type SwitchConfig
- type V1
- type WhileConfig
- type WorkflowConfig
- type XMLConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExecuteAll ¶ added in v4.1.0
ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func ExecuteCatchAll ¶ added in v4.1.0
func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.
func ExecuteTryAll ¶ added in v4.1.0
ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
Types ¶
type AutoObserved ¶ added in v4.14.0
type AutoObserved interface { // Process a message into one or more resulting messages, or return an error // if one occurred during processing, in which case the message will // continue unchanged except for having that error now affiliated with it. // // If zero messages are returned and the error is nil then the message is // filtered. Process(ctx context.Context, p *message.Part) ([]*message.Part, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
AutoObserved is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.
type AutoObservedBatched ¶ added in v4.14.0
type AutoObservedBatched interface { // Process a batch of messages into one or more resulting batches, or return // an error if one occurred during processing, in which case all messages // will continue unchanged except for having that error now affiliated with // them. // // In order to associate individual messages with an error please use // ctx.OnError instead of msg.ErrorSet. They are similar, but using // ctx.OnError ensures observability data is updated as well as the message // being affiliated with the error. // // If zero message batches are returned and the error is nil then all // messages are filtered. ProcessBatch(ctx *BatchProcContext, b message.Batch) ([]message.Batch, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
AutoObservedBatched is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.
type BatchProcContext ¶ added in v4.14.0
type BatchProcContext struct {
// contains filtered or unexported fields
}
BatchProcContext provides methods for triggering observability updates and accessing processor specific spans.
func TestBatchProcContext ¶ added in v4.14.0
func TestBatchProcContext(ctx context.Context, spans []*tracing.Span, parts []*message.Part) *BatchProcContext
TestBatchProcContext creates a context for batch processors. It's safe to provide nil spans and parts functions for testing purposes.
func (*BatchProcContext) Context ¶ added in v4.14.0
func (b *BatchProcContext) Context() context.Context
Context returns the underlying processor context.Context.
func (*BatchProcContext) OnError ¶ added in v4.14.0
func (b *BatchProcContext) OnError(err error, index int, p *message.Part)
OnError should be called when an individual message has encountered an error, this should be used instead of .ErrorSet() as it includes observability updates.
This method can be called with index -1 in order to set generalised observability information without marking specific message errors.
type BoundsCheckConfig ¶ added in v4.1.0
type BoundsCheckConfig struct { MaxParts int `json:"max_parts" yaml:"max_parts"` MinParts int `json:"min_parts" yaml:"min_parts"` MaxPartSize int `json:"max_part_size" yaml:"max_part_size"` MinPartSize int `json:"min_part_size" yaml:"min_part_size"` }
BoundsCheckConfig contains configuration fields for the BoundsCheck processor.
func NewBoundsCheckConfig ¶ added in v4.1.0
func NewBoundsCheckConfig() BoundsCheckConfig
NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
type BranchConfig ¶ added in v4.1.0
type BranchConfig struct { RequestMap string `json:"request_map" yaml:"request_map"` Processors []Config `json:"processors" yaml:"processors"` ResultMap string `json:"result_map" yaml:"result_map"` }
BranchConfig contains configuration fields for the Branch processor.
func NewBranchConfig ¶ added in v4.1.0
func NewBranchConfig() BranchConfig
NewBranchConfig returns a BranchConfig with default values.
type CacheConfig ¶ added in v4.1.0
type CacheConfig struct { Resource string `json:"resource" yaml:"resource"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` Value string `json:"value" yaml:"value"` TTL string `json:"ttl" yaml:"ttl"` }
CacheConfig contains configuration fields for the Cache processor.
func NewCacheConfig ¶ added in v4.1.0
func NewCacheConfig() CacheConfig
NewCacheConfig returns a CacheConfig with default values.
type CompressConfig ¶ added in v4.1.0
type CompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Level int `json:"level" yaml:"level"` }
CompressConfig contains configuration fields for the Compress processor.
func NewCompressConfig ¶ added in v4.1.0
func NewCompressConfig() CompressConfig
NewCompressConfig returns a CompressConfig with default values.
type Config ¶ added in v4.1.0
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Bloblang string `json:"bloblang" yaml:"bloblang"` BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"` Branch BranchConfig `json:"branch" yaml:"branch"` Cache CacheConfig `json:"cache" yaml:"cache"` Catch []Config `json:"catch" yaml:"catch"` Compress CompressConfig `json:"compress" yaml:"compress"` Decompress DecompressConfig `json:"decompress" yaml:"decompress"` Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"` ForEach []Config `json:"for_each" yaml:"for_each"` Grok GrokConfig `json:"grok" yaml:"grok"` GroupBy GroupByConfig `json:"group_by" yaml:"group_by"` GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` JQ JQConfig `json:"jq" yaml:"jq"` JSONSchema JSONSchemaConfig `json:"json_schema" yaml:"json_schema"` Log LogConfig `json:"log" yaml:"log"` Metric MetricConfig `json:"metric" yaml:"metric"` Noop struct{} `json:"noop" yaml:"noop"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` Parallel ParallelConfig `json:"parallel" yaml:"parallel"` ParseLog ParseLogConfig `json:"parse_log" yaml:"parse_log"` Protobuf ProtobufConfig `json:"protobuf" yaml:"protobuf"` RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"` Resource string `json:"resource" yaml:"resource"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` Sleep SleepConfig `json:"sleep" yaml:"sleep"` Split SplitConfig `json:"split" yaml:"split"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` Try []Config `json:"try" yaml:"try"` While WhileConfig `json:"while" yaml:"while"` Workflow WorkflowConfig `json:"workflow" yaml:"workflow"` XML XMLConfig `json:"xml" yaml:"xml"` }
Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type DecompressConfig ¶ added in v4.1.0
type DecompressConfig struct {
Algorithm string `json:"algorithm" yaml:"algorithm"`
}
DecompressConfig contains configuration fields for the Decompress processor.
func NewDecompressConfig ¶ added in v4.1.0
func NewDecompressConfig() DecompressConfig
NewDecompressConfig returns a DecompressConfig with default values.
type DedupeConfig ¶ added in v4.1.0
type DedupeConfig struct { Cache string `json:"cache" yaml:"cache"` Key string `json:"key" yaml:"key"` DropOnCacheErr bool `json:"drop_on_err" yaml:"drop_on_err"` }
DedupeConfig contains configuration fields for the Dedupe processor.
func NewDedupeConfig ¶ added in v4.1.0
func NewDedupeConfig() DedupeConfig
NewDedupeConfig returns a DedupeConfig with default values.
type GrokConfig ¶ added in v4.1.0
type GrokConfig struct { Expressions []string `json:"expressions" yaml:"expressions"` RemoveEmpty bool `json:"remove_empty_values" yaml:"remove_empty_values"` NamedOnly bool `json:"named_captures_only" yaml:"named_captures_only"` UseDefaults bool `json:"use_default_patterns" yaml:"use_default_patterns"` PatternPaths []string `json:"pattern_paths" yaml:"pattern_paths"` PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"` }
GrokConfig contains configuration fields for the Grok processor.
func NewGrokConfig ¶ added in v4.1.0
func NewGrokConfig() GrokConfig
NewGrokConfig returns a GrokConfig with default values.
type GroupByConfig ¶ added in v4.1.0
type GroupByConfig []GroupByElement
GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.
func NewGroupByConfig ¶ added in v4.1.0
func NewGroupByConfig() GroupByConfig
NewGroupByConfig returns a GroupByConfig with default values.
type GroupByElement ¶ added in v4.1.0
type GroupByElement struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
GroupByElement represents a group determined by a condition and a list of group specific processors.
type GroupByValueConfig ¶ added in v4.1.0
type GroupByValueConfig struct {
Value string `json:"value" yaml:"value"`
}
GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.
func NewGroupByValueConfig ¶ added in v4.1.0
func NewGroupByValueConfig() GroupByValueConfig
NewGroupByValueConfig returns a GroupByValueConfig with default values.
type InsertPartConfig ¶ added in v4.1.0
type InsertPartConfig struct { Index int `json:"index" yaml:"index"` Content string `json:"content" yaml:"content"` }
InsertPartConfig contains configuration fields for the InsertPart processor.
func NewInsertPartConfig ¶ added in v4.1.0
func NewInsertPartConfig() InsertPartConfig
NewInsertPartConfig returns a InsertPartConfig with default values.
type JMESPathConfig ¶ added in v4.1.0
type JMESPathConfig struct {
Query string `json:"query" yaml:"query"`
}
JMESPathConfig contains configuration fields for the JMESPath processor.
func NewJMESPathConfig ¶ added in v4.1.0
func NewJMESPathConfig() JMESPathConfig
NewJMESPathConfig returns a JMESPathConfig with default values.
type JQConfig ¶ added in v4.1.0
type JQConfig struct { Query string `json:"query" yaml:"query"` Raw bool `json:"raw" yaml:"raw"` OutputRaw bool `json:"output_raw" yaml:"output_raw"` }
JQConfig contains configuration fields for the JQ processor.
func NewJQConfig ¶ added in v4.1.0
func NewJQConfig() JQConfig
NewJQConfig returns a JQConfig with default values.
type JSONSchemaConfig ¶ added in v4.1.0
type JSONSchemaConfig struct { SchemaPath string `json:"schema_path" yaml:"schema_path"` Schema string `json:"schema" yaml:"schema"` }
JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.
func NewJSONSchemaConfig ¶ added in v4.1.0
func NewJSONSchemaConfig() JSONSchemaConfig
NewJSONSchemaConfig returns a JSONSchemaConfig with default values.
type LogConfig ¶ added in v4.1.0
type LogConfig struct { Level string `json:"level" yaml:"level"` Fields map[string]string `json:"fields" yaml:"fields"` FieldsMapping string `json:"fields_mapping" yaml:"fields_mapping"` Message string `json:"message" yaml:"message"` }
LogConfig contains configuration fields for the Log processor.
func NewLogConfig ¶ added in v4.1.0
func NewLogConfig() LogConfig
NewLogConfig returns a LogConfig with default values.
type MetricConfig ¶ added in v4.1.0
type MetricConfig struct { Type string `json:"type" yaml:"type"` Name string `json:"name" yaml:"name"` Labels map[string]string `json:"labels" yaml:"labels"` Value string `json:"value" yaml:"value"` }
MetricConfig contains configuration fields for the Metric processor.
func NewMetricConfig ¶ added in v4.1.0
func NewMetricConfig() MetricConfig
NewMetricConfig returns a MetricConfig with default values.
type ParallelConfig ¶ added in v4.1.0
type ParallelConfig struct { Cap int `json:"cap" yaml:"cap"` Processors []Config `json:"processors" yaml:"processors"` }
ParallelConfig is a config struct containing fields for the Parallel processor.
func NewParallelConfig ¶ added in v4.1.0
func NewParallelConfig() ParallelConfig
NewParallelConfig returns a default ParallelConfig.
type ParseLogConfig ¶ added in v4.1.0
type ParseLogConfig struct { Format string `json:"format" yaml:"format"` Codec string `json:"codec" yaml:"codec"` BestEffort bool `json:"best_effort" yaml:"best_effort"` WithRFC3339 bool `json:"allow_rfc3339" yaml:"allow_rfc3339"` WithYear string `json:"default_year" yaml:"default_year"` WithTimezone string `json:"default_timezone" yaml:"default_timezone"` }
ParseLogConfig contains configuration fields for the ParseLog processor.
func NewParseLogConfig ¶ added in v4.1.0
func NewParseLogConfig() ParseLogConfig
NewParseLogConfig returns a ParseLogConfig with default values.
type Pipeline ¶
type Pipeline interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // TriggerCloseNow signals that the component should close immediately, // messages in flight will be dropped. TriggerCloseNow() // WaitForClose blocks until the component has closed down or the context is // cancelled. Closing occurs either when the input transaction channel is // closed and messages are flushed (and acked), or when CloseNowAsync is // called. WaitForClose(ctx context.Context) error }
Pipeline is an interface that implements channel based consumer and producer methods for streaming data through a processing pipeline.
type PipelineConstructorFunc ¶
PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.
type ProtobufConfig ¶ added in v4.1.0
type ProtobufConfig struct { Operator string `json:"operator" yaml:"operator"` Message string `json:"message" yaml:"message"` ImportPaths []string `json:"import_paths" yaml:"import_paths"` }
ProtobufConfig contains configuration fields for the Protobuf processor.
func NewProtobufConfig ¶ added in v4.1.0
func NewProtobufConfig() ProtobufConfig
NewProtobufConfig returns a ProtobufConfig with default values.
type RateLimitConfig ¶ added in v4.1.0
type RateLimitConfig struct {
Resource string `json:"resource" yaml:"resource"`
}
RateLimitConfig contains configuration fields for the RateLimit processor.
func NewRateLimitConfig ¶ added in v4.1.0
func NewRateLimitConfig() RateLimitConfig
NewRateLimitConfig returns a RateLimitConfig with default values.
type SelectPartsConfig ¶ added in v4.1.0
type SelectPartsConfig struct {
Parts []int `json:"parts" yaml:"parts"`
}
SelectPartsConfig contains configuration fields for the SelectParts processor.
func NewSelectPartsConfig ¶ added in v4.1.0
func NewSelectPartsConfig() SelectPartsConfig
NewSelectPartsConfig returns a SelectPartsConfig with default values.
type SleepConfig ¶ added in v4.1.0
type SleepConfig struct {
Duration string `json:"duration" yaml:"duration"`
}
SleepConfig contains configuration fields for the Sleep processor.
func NewSleepConfig ¶ added in v4.1.0
func NewSleepConfig() SleepConfig
NewSleepConfig returns a SleepConfig with default values.
type SplitConfig ¶ added in v4.1.0
type SplitConfig struct { Size int `json:"size" yaml:"size"` ByteSize int `json:"byte_size" yaml:"byte_size"` }
SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.
func NewSplitConfig ¶ added in v4.1.0
func NewSplitConfig() SplitConfig
NewSplitConfig returns a SplitConfig with default values.
type SubprocessConfig ¶ added in v4.1.0
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` CodecSend string `json:"codec_send" yaml:"codec_send"` CodecRecv string `json:"codec_recv" yaml:"codec_recv"` }
SubprocessConfig contains configuration fields for the Subprocess processor.
func NewSubprocessConfig ¶ added in v4.1.0
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig returns a SubprocessConfig with default values.
type SwitchCaseConfig ¶ added in v4.1.0
type SwitchCaseConfig struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` Fallthrough bool `json:"fallthrough" yaml:"fallthrough"` }
SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.
func NewSwitchCaseConfig ¶ added in v4.1.0
func NewSwitchCaseConfig() SwitchCaseConfig
NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.
func (*SwitchCaseConfig) UnmarshalJSON ¶ added in v4.1.0
func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.
func (*SwitchCaseConfig) UnmarshalYAML ¶ added in v4.1.0
func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(any) error) error
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type SwitchConfig ¶ added in v4.1.0
type SwitchConfig []SwitchCaseConfig
SwitchConfig is a config struct containing fields for the Switch processor.
func NewSwitchConfig ¶ added in v4.1.0
func NewSwitchConfig() SwitchConfig
NewSwitchConfig returns a default SwitchConfig.
type V1 ¶
type V1 interface { // Process a batch of messages into one or more resulting batches, or return // an error if the entire batch could not be processed, currently the only // valid reason for returning an error is if the context was cancelled. // // If zero messages are returned and the error is nil then all messages are // filtered. ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
V1 is a common interface implemented by processors. The implementation of a V1 processor is responsible for all expected observability and error handling behaviour described within Benthos documentation.
func NewAutoObservedBatchedProcessor ¶ added in v4.14.0
func NewAutoObservedBatchedProcessor(typeStr string, p AutoObservedBatched, mgr component.Observability) V1
NewAutoObservedBatchProcessor wraps an AutoObservedBatched processor with an implementation of V1 which handles observability information.
func NewAutoObservedProcessor ¶ added in v4.14.0
func NewAutoObservedProcessor(typeStr string, p AutoObserved, mgr component.Observability) V1
NewAutoObservedProcessor wraps an AutoObserved processor with an implementation of V1 which handles observability information.
type WhileConfig ¶ added in v4.1.0
type WhileConfig struct { AtLeastOnce bool `json:"at_least_once" yaml:"at_least_once"` MaxLoops int `json:"max_loops" yaml:"max_loops"` Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
WhileConfig is a config struct containing fields for the While processor.
func NewWhileConfig ¶ added in v4.1.0
func NewWhileConfig() WhileConfig
NewWhileConfig returns a default WhileConfig.
type WorkflowConfig ¶ added in v4.1.0
type WorkflowConfig struct { MetaPath string `json:"meta_path" yaml:"meta_path"` Order [][]string `json:"order" yaml:"order"` BranchResources []string `json:"branch_resources" yaml:"branch_resources"` Branches map[string]BranchConfig `json:"branches" yaml:"branches"` }
WorkflowConfig is a config struct containing fields for the Workflow processor.
func NewWorkflowConfig ¶ added in v4.1.0
func NewWorkflowConfig() WorkflowConfig
NewWorkflowConfig returns a default WorkflowConfig.
type XMLConfig ¶ added in v4.1.0
type XMLConfig struct { Operator string `json:"operator" yaml:"operator"` Cast bool `json:"cast" yaml:"cast"` }
XMLConfig contains configuration fields for the XML processor.
func NewXMLConfig ¶ added in v4.1.0
func NewXMLConfig() XMLConfig
NewXMLConfig returns a XMLConfig with default values.
Source Files ¶
- auto_observed.go
- config.go
- config_bounds_check.go
- config_branch.go
- config_cache.go
- config_compress.go
- config_decompress.go
- config_dedupe.go
- config_grok.go
- config_group_by.go
- config_group_by_value.go
- config_insert_part.go
- config_jmespath.go
- config_jq.go
- config_jsonschema.go
- config_log.go
- config_metric.go
- config_parallel.go
- config_parse_log.go
- config_protobuf.go
- config_rate_limit.go
- config_select_parts.go
- config_sleep.go
- config_split.go
- config_subprocess.go
- config_switch.go
- config_while.go
- config_workflow.go
- config_xml.go
- error.go
- execute.go
- interface.go