Documentation ¶
Index ¶
- func ExecuteAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func ExecuteTryAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
- func MarkErr(part *message.Part, span *tracing.Span, err error)
- type AutoObserved
- type AutoObservedBatched
- type BatchProcContext
- type BoundsCheckConfig
- type BranchConfig
- type CacheConfig
- type CompressConfig
- type Config
- type DecompressConfig
- type DedupeConfig
- type GrokConfig
- type GroupByConfig
- type GroupByElement
- type GroupByValueConfig
- type InsertPartConfig
- type JMESPathConfig
- type JQConfig
- type JSONSchemaConfig
- type LogConfig
- type MetricConfig
- type MongoDBConfig
- type ParallelConfig
- type ParseLogConfig
- type Pipeline
- type PipelineConstructorFunc
- type ProtobufConfig
- type RateLimitConfig
- type SelectPartsConfig
- type SleepConfig
- type SplitConfig
- type SubprocessConfig
- type SwitchCaseConfig
- type SwitchConfig
- type V1
- type WhileConfig
- type WorkflowConfig
- type XMLConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExecuteAll ¶
ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func ExecuteCatchAll ¶
func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)
ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.
func ExecuteTryAll ¶
ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
Types ¶
type AutoObserved ¶
type AutoObserved interface { // Process a message into one or more resulting messages, or return an error // if one occurred during processing, in which case the message will // continue unchanged except for having that error now affiliated with it. // // If zero messages are returned and the error is nil then the message is // filtered. Process(ctx context.Context, p *message.Part) ([]*message.Part, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
AutoObserved is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.
type AutoObservedBatched ¶
type AutoObservedBatched interface { // Process a batch of messages into one or more resulting batches, or return // an error if one occurred during processing, in which case all messages // will continue unchanged except for having that error now affiliated with // them. // // In order to associate individual messages with an error please use // ctx.OnError instead of msg.ErrorSet. They are similar, but using // ctx.OnError ensures observability data is updated as well as the message // being affiliated with the error. // // If zero message batches are returned and the error is nil then all // messages are filtered. ProcessBatch(ctx *BatchProcContext, b message.Batch) ([]message.Batch, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
AutoObservedBatched is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.
type BatchProcContext ¶
type BatchProcContext struct {
// contains filtered or unexported fields
}
BatchProcContext provides methods for triggering observability updates and accessing processor specific spans.
func TestBatchProcContext ¶
func TestBatchProcContext(ctx context.Context, spans []*tracing.Span, parts []*message.Part) *BatchProcContext
TestBatchProcContext creates a context for batch processors. It's safe to provide nil spans and parts functions for testing purposes.
func (*BatchProcContext) Context ¶
func (b *BatchProcContext) Context() context.Context
Context returns the underlying processor context.Context.
func (*BatchProcContext) OnError ¶
func (b *BatchProcContext) OnError(err error, index int, p *message.Part)
OnError should be called when an individual message has encountered an error, this should be used instead of .ErrorSet() as it includes observability updates.
This method can be called with index -1 in order to set generalised observability information without marking specific message errors.
type BoundsCheckConfig ¶
type BoundsCheckConfig struct { MaxParts int `json:"max_parts" yaml:"max_parts"` MinParts int `json:"min_parts" yaml:"min_parts"` MaxPartSize int `json:"max_part_size" yaml:"max_part_size"` MinPartSize int `json:"min_part_size" yaml:"min_part_size"` }
BoundsCheckConfig contains configuration fields for the BoundsCheck processor.
func NewBoundsCheckConfig ¶
func NewBoundsCheckConfig() BoundsCheckConfig
NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
type BranchConfig ¶
type BranchConfig struct { RequestMap string `json:"request_map" yaml:"request_map"` Processors []Config `json:"processors" yaml:"processors"` ResultMap string `json:"result_map" yaml:"result_map"` }
BranchConfig contains configuration fields for the Branch processor.
func NewBranchConfig ¶
func NewBranchConfig() BranchConfig
NewBranchConfig returns a BranchConfig with default values.
type CacheConfig ¶
type CacheConfig struct { Resource string `json:"resource" yaml:"resource"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` Value string `json:"value" yaml:"value"` TTL string `json:"ttl" yaml:"ttl"` }
CacheConfig contains configuration fields for the Cache processor.
func NewCacheConfig ¶
func NewCacheConfig() CacheConfig
NewCacheConfig returns a CacheConfig with default values.
type CompressConfig ¶
type CompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Level int `json:"level" yaml:"level"` }
CompressConfig contains configuration fields for the Compress processor.
func NewCompressConfig ¶
func NewCompressConfig() CompressConfig
NewCompressConfig returns a CompressConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Bloblang string `json:"bloblang" yaml:"bloblang"` BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"` Branch BranchConfig `json:"branch" yaml:"branch"` Cache CacheConfig `json:"cache" yaml:"cache"` Catch []Config `json:"catch" yaml:"catch"` Compress CompressConfig `json:"compress" yaml:"compress"` Decompress DecompressConfig `json:"decompress" yaml:"decompress"` Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"` ForEach []Config `json:"for_each" yaml:"for_each"` Grok GrokConfig `json:"grok" yaml:"grok"` GroupBy GroupByConfig `json:"group_by" yaml:"group_by"` GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` JQ JQConfig `json:"jq" yaml:"jq"` JSONSchema JSONSchemaConfig `json:"json_schema" yaml:"json_schema"` Log LogConfig `json:"log" yaml:"log"` Metric MetricConfig `json:"metric" yaml:"metric"` MongoDB MongoDBConfig `json:"mongodb" yaml:"mongodb"` Noop struct{} `json:"noop" yaml:"noop"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` Parallel ParallelConfig `json:"parallel" yaml:"parallel"` ParseLog ParseLogConfig `json:"parse_log" yaml:"parse_log"` Protobuf ProtobufConfig `json:"protobuf" yaml:"protobuf"` RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"` Resource string `json:"resource" yaml:"resource"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` Sleep SleepConfig `json:"sleep" yaml:"sleep"` Split SplitConfig `json:"split" yaml:"split"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` Try []Config `json:"try" yaml:"try"` While WhileConfig `json:"while" yaml:"while"` Workflow WorkflowConfig `json:"workflow" yaml:"workflow"` XML XMLConfig `json:"xml" yaml:"xml"` }
Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type DecompressConfig ¶
type DecompressConfig struct {
Algorithm string `json:"algorithm" yaml:"algorithm"`
}
DecompressConfig contains configuration fields for the Decompress processor.
func NewDecompressConfig ¶
func NewDecompressConfig() DecompressConfig
NewDecompressConfig returns a DecompressConfig with default values.
type DedupeConfig ¶
type DedupeConfig struct { Cache string `json:"cache" yaml:"cache"` Key string `json:"key" yaml:"key"` DropOnCacheErr bool `json:"drop_on_err" yaml:"drop_on_err"` }
DedupeConfig contains configuration fields for the Dedupe processor.
func NewDedupeConfig ¶
func NewDedupeConfig() DedupeConfig
NewDedupeConfig returns a DedupeConfig with default values.
type GrokConfig ¶
type GrokConfig struct { Expressions []string `json:"expressions" yaml:"expressions"` RemoveEmpty bool `json:"remove_empty_values" yaml:"remove_empty_values"` NamedOnly bool `json:"named_captures_only" yaml:"named_captures_only"` UseDefaults bool `json:"use_default_patterns" yaml:"use_default_patterns"` PatternPaths []string `json:"pattern_paths" yaml:"pattern_paths"` PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"` }
GrokConfig contains configuration fields for the Grok processor.
func NewGrokConfig ¶
func NewGrokConfig() GrokConfig
NewGrokConfig returns a GrokConfig with default values.
type GroupByConfig ¶
type GroupByConfig []GroupByElement
GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.
func NewGroupByConfig ¶
func NewGroupByConfig() GroupByConfig
NewGroupByConfig returns a GroupByConfig with default values.
type GroupByElement ¶
type GroupByElement struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
GroupByElement represents a group determined by a condition and a list of group specific processors.
type GroupByValueConfig ¶
type GroupByValueConfig struct {
Value string `json:"value" yaml:"value"`
}
GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.
func NewGroupByValueConfig ¶
func NewGroupByValueConfig() GroupByValueConfig
NewGroupByValueConfig returns a GroupByValueConfig with default values.
type InsertPartConfig ¶
type InsertPartConfig struct { Index int `json:"index" yaml:"index"` Content string `json:"content" yaml:"content"` }
InsertPartConfig contains configuration fields for the InsertPart processor.
func NewInsertPartConfig ¶
func NewInsertPartConfig() InsertPartConfig
NewInsertPartConfig returns a InsertPartConfig with default values.
type JMESPathConfig ¶
type JMESPathConfig struct {
Query string `json:"query" yaml:"query"`
}
JMESPathConfig contains configuration fields for the JMESPath processor.
func NewJMESPathConfig ¶
func NewJMESPathConfig() JMESPathConfig
NewJMESPathConfig returns a JMESPathConfig with default values.
type JQConfig ¶
type JQConfig struct { Query string `json:"query" yaml:"query"` Raw bool `json:"raw" yaml:"raw"` OutputRaw bool `json:"output_raw" yaml:"output_raw"` }
JQConfig contains configuration fields for the JQ processor.
type JSONSchemaConfig ¶
type JSONSchemaConfig struct { SchemaPath string `json:"schema_path" yaml:"schema_path"` Schema string `json:"schema" yaml:"schema"` }
JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.
func NewJSONSchemaConfig ¶
func NewJSONSchemaConfig() JSONSchemaConfig
NewJSONSchemaConfig returns a JSONSchemaConfig with default values.
type LogConfig ¶
type LogConfig struct { Level string `json:"level" yaml:"level"` Fields map[string]string `json:"fields" yaml:"fields"` FieldsMapping string `json:"fields_mapping" yaml:"fields_mapping"` Message string `json:"message" yaml:"message"` }
LogConfig contains configuration fields for the Log processor.
func NewLogConfig ¶
func NewLogConfig() LogConfig
NewLogConfig returns a LogConfig with default values.
type MetricConfig ¶
type MetricConfig struct { Type string `json:"type" yaml:"type"` Name string `json:"name" yaml:"name"` Labels map[string]string `json:"labels" yaml:"labels"` Value string `json:"value" yaml:"value"` }
MetricConfig contains configuration fields for the Metric processor.
func NewMetricConfig ¶
func NewMetricConfig() MetricConfig
NewMetricConfig returns a MetricConfig with default values.
type MongoDBConfig ¶
type MongoDBConfig struct { MongoDB client.Config `json:",inline" yaml:",inline"` WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"` Operation string `json:"operation" yaml:"operation"` FilterMap string `json:"filter_map" yaml:"filter_map"` DocumentMap string `json:"document_map" yaml:"document_map"` Upsert bool `json:"upsert" yaml:"upsert"` HintMap string `json:"hint_map" yaml:"hint_map"` RetryConfig retries.Config `json:",inline" yaml:",inline"` JSONMarshalMode client.JSONMarshalMode `json:"json_marshal_mode" yaml:"json_marshal_mode"` }
MongoDBConfig contains configuration fields for the MongoDB processor.
func NewMongoDBConfig ¶
func NewMongoDBConfig() MongoDBConfig
NewMongoDBConfig returns a MongoDBConfig with default values.
type ParallelConfig ¶
type ParallelConfig struct { Cap int `json:"cap" yaml:"cap"` Processors []Config `json:"processors" yaml:"processors"` }
ParallelConfig is a config struct containing fields for the Parallel processor.
func NewParallelConfig ¶
func NewParallelConfig() ParallelConfig
NewParallelConfig returns a default ParallelConfig.
type ParseLogConfig ¶
type ParseLogConfig struct { Format string `json:"format" yaml:"format"` Codec string `json:"codec" yaml:"codec"` BestEffort bool `json:"best_effort" yaml:"best_effort"` WithRFC3339 bool `json:"allow_rfc3339" yaml:"allow_rfc3339"` WithYear string `json:"default_year" yaml:"default_year"` WithTimezone string `json:"default_timezone" yaml:"default_timezone"` }
ParseLogConfig contains configuration fields for the ParseLog processor.
func NewParseLogConfig ¶
func NewParseLogConfig() ParseLogConfig
NewParseLogConfig returns a ParseLogConfig with default values.
type Pipeline ¶
type Pipeline interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // TriggerCloseNow signals that the component should close immediately, // messages in flight will be dropped. TriggerCloseNow() // WaitForClose blocks until the component has closed down or the context is // cancelled. Closing occurs either when the input transaction channel is // closed and messages are flushed (and acked), or when CloseNowAsync is // called. WaitForClose(ctx context.Context) error }
Pipeline is an interface that implements channel based consumer and producer methods for streaming data through a processing pipeline.
type PipelineConstructorFunc ¶
PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.
type ProtobufConfig ¶
type ProtobufConfig struct { Operator string `json:"operator" yaml:"operator"` Message string `json:"message" yaml:"message"` ImportPaths []string `json:"import_paths" yaml:"import_paths"` }
ProtobufConfig contains configuration fields for the Protobuf processor.
func NewProtobufConfig ¶
func NewProtobufConfig() ProtobufConfig
NewProtobufConfig returns a ProtobufConfig with default values.
type RateLimitConfig ¶
type RateLimitConfig struct {
Resource string `json:"resource" yaml:"resource"`
}
RateLimitConfig contains configuration fields for the RateLimit processor.
func NewRateLimitConfig ¶
func NewRateLimitConfig() RateLimitConfig
NewRateLimitConfig returns a RateLimitConfig with default values.
type SelectPartsConfig ¶
type SelectPartsConfig struct {
Parts []int `json:"parts" yaml:"parts"`
}
SelectPartsConfig contains configuration fields for the SelectParts processor.
func NewSelectPartsConfig ¶
func NewSelectPartsConfig() SelectPartsConfig
NewSelectPartsConfig returns a SelectPartsConfig with default values.
type SleepConfig ¶
type SleepConfig struct {
Duration string `json:"duration" yaml:"duration"`
}
SleepConfig contains configuration fields for the Sleep processor.
func NewSleepConfig ¶
func NewSleepConfig() SleepConfig
NewSleepConfig returns a SleepConfig with default values.
type SplitConfig ¶
type SplitConfig struct { Size int `json:"size" yaml:"size"` ByteSize int `json:"byte_size" yaml:"byte_size"` }
SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.
func NewSplitConfig ¶
func NewSplitConfig() SplitConfig
NewSplitConfig returns a SplitConfig with default values.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` CodecSend string `json:"codec_send" yaml:"codec_send"` CodecRecv string `json:"codec_recv" yaml:"codec_recv"` }
SubprocessConfig contains configuration fields for the Subprocess processor.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig returns a SubprocessConfig with default values.
type SwitchCaseConfig ¶
type SwitchCaseConfig struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` Fallthrough bool `json:"fallthrough" yaml:"fallthrough"` }
SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.
func NewSwitchCaseConfig ¶
func NewSwitchCaseConfig() SwitchCaseConfig
NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.
func (*SwitchCaseConfig) UnmarshalJSON ¶
func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.
func (*SwitchCaseConfig) UnmarshalYAML ¶
func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(any) error) error
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type SwitchConfig ¶
type SwitchConfig []SwitchCaseConfig
SwitchConfig is a config struct containing fields for the Switch processor.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig returns a default SwitchConfig.
type V1 ¶
type V1 interface { // Process a batch of messages into one or more resulting batches, or return // an error if the entire batch could not be processed, currently the only // valid reason for returning an error is if the context was cancelled. // // If zero messages are returned and the error is nil then all messages are // filtered. ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
V1 is a common interface implemented by processors. The implementation of a V1 processor is responsible for all expected observability and error handling behaviour described within Benthos documentation.
func NewAutoObservedBatchedProcessor ¶
func NewAutoObservedBatchedProcessor(typeStr string, p AutoObservedBatched, mgr component.Observability) V1
NewAutoObservedBatchProcessor wraps an AutoObservedBatched processor with an implementation of V1 which handles observability information.
func NewAutoObservedProcessor ¶
func NewAutoObservedProcessor(typeStr string, p AutoObserved, mgr component.Observability) V1
NewAutoObservedProcessor wraps an AutoObserved processor with an implementation of V1 which handles observability information.
type WhileConfig ¶
type WhileConfig struct { AtLeastOnce bool `json:"at_least_once" yaml:"at_least_once"` MaxLoops int `json:"max_loops" yaml:"max_loops"` Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
WhileConfig is a config struct containing fields for the While processor.
func NewWhileConfig ¶
func NewWhileConfig() WhileConfig
NewWhileConfig returns a default WhileConfig.
type WorkflowConfig ¶
type WorkflowConfig struct { MetaPath string `json:"meta_path" yaml:"meta_path"` Order [][]string `json:"order" yaml:"order"` BranchResources []string `json:"branch_resources" yaml:"branch_resources"` Branches map[string]BranchConfig `json:"branches" yaml:"branches"` }
WorkflowConfig is a config struct containing fields for the Workflow processor.
func NewWorkflowConfig ¶
func NewWorkflowConfig() WorkflowConfig
NewWorkflowConfig returns a default WorkflowConfig.
Source Files ¶
- auto_observed.go
- config.go
- config_bounds_check.go
- config_branch.go
- config_cache.go
- config_compress.go
- config_decompress.go
- config_dedupe.go
- config_grok.go
- config_group_by.go
- config_group_by_value.go
- config_insert_part.go
- config_jmespath.go
- config_jq.go
- config_jsonschema.go
- config_log.go
- config_metric.go
- config_mongodb.go
- config_parallel.go
- config_parse_log.go
- config_protobuf.go
- config_rate_limit.go
- config_select_parts.go
- config_sleep.go
- config_split.go
- config_subprocess.go
- config_switch.go
- config_while.go
- config_workflow.go
- config_xml.go
- error.go
- execute.go
- interface.go