Documentation ¶
Index ¶
- func ExecuteAll(procs []V1, msgs ...*message.Batch) ([]*message.Batch, error)
- func ExecuteCatchAll(procs []V1, msgs ...*message.Batch) ([]*message.Batch, error)
- func ExecuteTryAll(procs []V1, msgs ...*message.Batch) ([]*message.Batch, error)
- func IteratePartsWithSpanV2(tracer trace.TracerProvider, operationName string, parts []int, ...)
- func MarkErr(part *message.Part, span *tracing.Span, err error)
- type AWKConfig
- type AvroConfig
- type BoundsCheckConfig
- type BranchConfig
- type CacheConfig
- type CompressConfig
- type Config
- type DecompressConfig
- type DedupeConfig
- type GrokConfig
- type GroupByConfig
- type GroupByElement
- type GroupByValueConfig
- type HTTPConfig
- type InsertPartConfig
- type JMESPathConfig
- type JQConfig
- type JSONSchemaConfig
- type LogConfig
- type MetricConfig
- type MongoDBConfig
- type ParallelConfig
- type ParseLogConfig
- type Pipeline
- type PipelineConstructorFunc
- type ProtobufConfig
- type RateLimitConfig
- type SelectPartsConfig
- type SleepConfig
- type SplitConfig
- type SubprocessConfig
- type SwitchCaseConfig
- type SwitchConfig
- type V1
- type V2
- type V2Batched
- type WhileConfig
- type WorkflowConfig
- type XMLConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExecuteAll ¶
ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func ExecuteCatchAll ¶
ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.
func ExecuteTryAll ¶
ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func IteratePartsWithSpanV2 ¶
func IteratePartsWithSpanV2( tracer trace.TracerProvider, operationName string, parts []int, msg *message.Batch, iter func(int, *tracing.Span, *message.Part) error, )
IteratePartsWithSpanV2 iterates the parts of a message according to a slice of indexes (if empty all parts are iterated) and calls a func for each part along with a tracing span for that part. If an error is returned the part is flagged as failed and the span has the error logged.
Types ¶
type AWKConfig ¶
type AWKConfig struct { Codec string `json:"codec" yaml:"codec"` Program string `json:"program" yaml:"program"` }
AWKConfig contains configuration fields for the AWK processor.
func NewAWKConfig ¶
func NewAWKConfig() AWKConfig
NewAWKConfig returns a AWKConfig with default values.
type AvroConfig ¶
type AvroConfig struct { Operator string `json:"operator" yaml:"operator"` Encoding string `json:"encoding" yaml:"encoding"` Schema string `json:"schema" yaml:"schema"` SchemaPath string `json:"schema_path" yaml:"schema_path"` }
AvroConfig contains configuration fields for the Avro processor.
func NewAvroConfig ¶
func NewAvroConfig() AvroConfig
NewAvroConfig returns a AvroConfig with default values.
type BoundsCheckConfig ¶
type BoundsCheckConfig struct { MaxParts int `json:"max_parts" yaml:"max_parts"` MinParts int `json:"min_parts" yaml:"min_parts"` MaxPartSize int `json:"max_part_size" yaml:"max_part_size"` MinPartSize int `json:"min_part_size" yaml:"min_part_size"` }
BoundsCheckConfig contains configuration fields for the BoundsCheck processor.
func NewBoundsCheckConfig ¶
func NewBoundsCheckConfig() BoundsCheckConfig
NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
type BranchConfig ¶
type BranchConfig struct { RequestMap string `json:"request_map" yaml:"request_map"` Processors []Config `json:"processors" yaml:"processors"` ResultMap string `json:"result_map" yaml:"result_map"` }
BranchConfig contains configuration fields for the Branch processor.
func NewBranchConfig ¶
func NewBranchConfig() BranchConfig
NewBranchConfig returns a BranchConfig with default values.
type CacheConfig ¶
type CacheConfig struct { Resource string `json:"resource" yaml:"resource"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` Value string `json:"value" yaml:"value"` TTL string `json:"ttl" yaml:"ttl"` }
CacheConfig contains configuration fields for the Cache processor.
func NewCacheConfig ¶
func NewCacheConfig() CacheConfig
NewCacheConfig returns a CacheConfig with default values.
type CompressConfig ¶
type CompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Level int `json:"level" yaml:"level"` }
CompressConfig contains configuration fields for the Compress processor.
func NewCompressConfig ¶
func NewCompressConfig() CompressConfig
NewCompressConfig returns a CompressConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Avro AvroConfig `json:"avro" yaml:"avro"` AWK AWKConfig `json:"awk" yaml:"awk"` Bloblang string `json:"bloblang" yaml:"bloblang"` BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"` Branch BranchConfig `json:"branch" yaml:"branch"` Cache CacheConfig `json:"cache" yaml:"cache"` Catch []Config `json:"catch" yaml:"catch"` Compress CompressConfig `json:"compress" yaml:"compress"` Decompress DecompressConfig `json:"decompress" yaml:"decompress"` Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"` ForEach []Config `json:"for_each" yaml:"for_each"` Grok GrokConfig `json:"grok" yaml:"grok"` GroupBy GroupByConfig `json:"group_by" yaml:"group_by"` GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"` HTTP HTTPConfig `json:"http" yaml:"http"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` JQ JQConfig `json:"jq" yaml:"jq"` JSONSchema JSONSchemaConfig `json:"json_schema" yaml:"json_schema"` Log LogConfig `json:"log" yaml:"log"` Metric MetricConfig `json:"metric" yaml:"metric"` MongoDB MongoDBConfig `json:"mongodb" yaml:"mongodb"` Noop struct{} `json:"noop" yaml:"noop"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` Parallel ParallelConfig `json:"parallel" yaml:"parallel"` ParseLog ParseLogConfig `json:"parse_log" yaml:"parse_log"` Protobuf ProtobufConfig `json:"protobuf" yaml:"protobuf"` RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"` Resource string `json:"resource" yaml:"resource"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` Sleep SleepConfig `json:"sleep" yaml:"sleep"` Split SplitConfig `json:"split" yaml:"split"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` Try []Config `json:"try" yaml:"try"` While WhileConfig `json:"while" yaml:"while"` Workflow WorkflowConfig `json:"workflow" yaml:"workflow"` XML XMLConfig `json:"xml" yaml:"xml"` }
Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
type DecompressConfig ¶
type DecompressConfig struct {
Algorithm string `json:"algorithm" yaml:"algorithm"`
}
DecompressConfig contains configuration fields for the Decompress processor.
func NewDecompressConfig ¶
func NewDecompressConfig() DecompressConfig
NewDecompressConfig returns a DecompressConfig with default values.
type DedupeConfig ¶
type DedupeConfig struct { Cache string `json:"cache" yaml:"cache"` Key string `json:"key" yaml:"key"` DropOnCacheErr bool `json:"drop_on_err" yaml:"drop_on_err"` }
DedupeConfig contains configuration fields for the Dedupe processor.
func NewDedupeConfig ¶
func NewDedupeConfig() DedupeConfig
NewDedupeConfig returns a DedupeConfig with default values.
type GrokConfig ¶
type GrokConfig struct { Expressions []string `json:"expressions" yaml:"expressions"` RemoveEmpty bool `json:"remove_empty_values" yaml:"remove_empty_values"` NamedOnly bool `json:"named_captures_only" yaml:"named_captures_only"` UseDefaults bool `json:"use_default_patterns" yaml:"use_default_patterns"` PatternPaths []string `json:"pattern_paths" yaml:"pattern_paths"` PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"` }
GrokConfig contains configuration fields for the Grok processor.
func NewGrokConfig ¶
func NewGrokConfig() GrokConfig
NewGrokConfig returns a GrokConfig with default values.
type GroupByConfig ¶
type GroupByConfig []GroupByElement
GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.
func NewGroupByConfig ¶
func NewGroupByConfig() GroupByConfig
NewGroupByConfig returns a GroupByConfig with default values.
type GroupByElement ¶
type GroupByElement struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
GroupByElement represents a group determined by a condition and a list of group specific processors.
type GroupByValueConfig ¶
type GroupByValueConfig struct {
Value string `json:"value" yaml:"value"`
}
GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.
func NewGroupByValueConfig ¶
func NewGroupByValueConfig() GroupByValueConfig
NewGroupByValueConfig returns a GroupByValueConfig with default values.
type HTTPConfig ¶
type HTTPConfig struct { BatchAsMultipart bool `json:"batch_as_multipart" yaml:"batch_as_multipart"` Parallel bool `json:"parallel" yaml:"parallel"` ihttpdocs.Config `json:",inline" yaml:",inline"` }
HTTPConfig contains configuration fields for the HTTP processor.
func NewHTTPConfig ¶
func NewHTTPConfig() HTTPConfig
NewHTTPConfig returns a HTTPConfig with default values.
type InsertPartConfig ¶
type InsertPartConfig struct { Index int `json:"index" yaml:"index"` Content string `json:"content" yaml:"content"` }
InsertPartConfig contains configuration fields for the InsertPart processor.
func NewInsertPartConfig ¶
func NewInsertPartConfig() InsertPartConfig
NewInsertPartConfig returns a InsertPartConfig with default values.
type JMESPathConfig ¶
type JMESPathConfig struct {
Query string `json:"query" yaml:"query"`
}
JMESPathConfig contains configuration fields for the JMESPath processor.
func NewJMESPathConfig ¶
func NewJMESPathConfig() JMESPathConfig
NewJMESPathConfig returns a JMESPathConfig with default values.
type JQConfig ¶
type JQConfig struct { Query string `json:"query" yaml:"query"` Raw bool `json:"raw" yaml:"raw"` OutputRaw bool `json:"output_raw" yaml:"output_raw"` }
JQConfig contains configuration fields for the JQ processor.
type JSONSchemaConfig ¶
type JSONSchemaConfig struct { SchemaPath string `json:"schema_path" yaml:"schema_path"` Schema string `json:"schema" yaml:"schema"` }
JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.
func NewJSONSchemaConfig ¶
func NewJSONSchemaConfig() JSONSchemaConfig
NewJSONSchemaConfig returns a JSONSchemaConfig with default values.
type LogConfig ¶
type LogConfig struct { Level string `json:"level" yaml:"level"` Fields map[string]string `json:"fields" yaml:"fields"` FieldsMapping string `json:"fields_mapping" yaml:"fields_mapping"` Message string `json:"message" yaml:"message"` }
LogConfig contains configuration fields for the Log processor.
func NewLogConfig ¶
func NewLogConfig() LogConfig
NewLogConfig returns a LogConfig with default values.
type MetricConfig ¶
type MetricConfig struct { Type string `json:"type" yaml:"type"` Name string `json:"name" yaml:"name"` Labels map[string]string `json:"labels" yaml:"labels"` Value string `json:"value" yaml:"value"` }
MetricConfig contains configuration fields for the Metric processor.
func NewMetricConfig ¶
func NewMetricConfig() MetricConfig
NewMetricConfig returns a MetricConfig with default values.
type MongoDBConfig ¶
type MongoDBConfig struct { MongoDB client.Config `json:",inline" yaml:",inline"` WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"` Operation string `json:"operation" yaml:"operation"` FilterMap string `json:"filter_map" yaml:"filter_map"` DocumentMap string `json:"document_map" yaml:"document_map"` Upsert bool `json:"upsert" yaml:"upsert"` HintMap string `json:"hint_map" yaml:"hint_map"` RetryConfig retries.Config `json:",inline" yaml:",inline"` JSONMarshalMode client.JSONMarshalMode `json:"json_marshal_mode" yaml:"json_marshal_mode"` }
MongoDBConfig contains configuration fields for the MongoDB processor.
func NewMongoDBConfig ¶
func NewMongoDBConfig() MongoDBConfig
NewMongoDBConfig returns a MongoDBConfig with default values.
type ParallelConfig ¶
type ParallelConfig struct { Cap int `json:"cap" yaml:"cap"` Processors []Config `json:"processors" yaml:"processors"` }
ParallelConfig is a config struct containing fields for the Parallel processor.
func NewParallelConfig ¶
func NewParallelConfig() ParallelConfig
NewParallelConfig returns a default ParallelConfig.
type ParseLogConfig ¶
type ParseLogConfig struct { Format string `json:"format" yaml:"format"` Codec string `json:"codec" yaml:"codec"` BestEffort bool `json:"best_effort" yaml:"best_effort"` WithRFC3339 bool `json:"allow_rfc3339" yaml:"allow_rfc3339"` WithYear string `json:"default_year" yaml:"default_year"` WithTimezone string `json:"default_timezone" yaml:"default_timezone"` }
ParseLogConfig contains configuration fields for the ParseLog processor.
func NewParseLogConfig ¶
func NewParseLogConfig() ParseLogConfig
NewParseLogConfig returns a ParseLogConfig with default values.
type Pipeline ¶
type Pipeline interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // CloseAsync triggers the shut down of this component but should not block // the calling goroutine. CloseAsync() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(timeout time.Duration) error }
Pipeline is an interface that implements channel based based consumer and producer methods for streaming data through a processing pipeline.
type PipelineConstructorFunc ¶
PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.
type ProtobufConfig ¶
type ProtobufConfig struct { Operator string `json:"operator" yaml:"operator"` Message string `json:"message" yaml:"message"` ImportPaths []string `json:"import_paths" yaml:"import_paths"` }
ProtobufConfig contains configuration fields for the Protobuf processor.
func NewProtobufConfig ¶
func NewProtobufConfig() ProtobufConfig
NewProtobufConfig returns a ProtobufConfig with default values.
type RateLimitConfig ¶
type RateLimitConfig struct {
Resource string `json:"resource" yaml:"resource"`
}
RateLimitConfig contains configuration fields for the RateLimit processor.
func NewRateLimitConfig ¶
func NewRateLimitConfig() RateLimitConfig
NewRateLimitConfig returns a RateLimitConfig with default values.
type SelectPartsConfig ¶
type SelectPartsConfig struct {
Parts []int `json:"parts" yaml:"parts"`
}
SelectPartsConfig contains configuration fields for the SelectParts processor.
func NewSelectPartsConfig ¶
func NewSelectPartsConfig() SelectPartsConfig
NewSelectPartsConfig returns a SelectPartsConfig with default values.
type SleepConfig ¶
type SleepConfig struct {
Duration string `json:"duration" yaml:"duration"`
}
SleepConfig contains configuration fields for the Sleep processor.
func NewSleepConfig ¶
func NewSleepConfig() SleepConfig
NewSleepConfig returns a SleepConfig with default values.
type SplitConfig ¶
type SplitConfig struct { Size int `json:"size" yaml:"size"` ByteSize int `json:"byte_size" yaml:"byte_size"` }
SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.
func NewSplitConfig ¶
func NewSplitConfig() SplitConfig
NewSplitConfig returns a SplitConfig with default values.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` CodecSend string `json:"codec_send" yaml:"codec_send"` CodecRecv string `json:"codec_recv" yaml:"codec_recv"` }
SubprocessConfig contains configuration fields for the Subprocess processor.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig returns a SubprocessConfig with default values.
type SwitchCaseConfig ¶
type SwitchCaseConfig struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` Fallthrough bool `json:"fallthrough" yaml:"fallthrough"` }
SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.
func NewSwitchCaseConfig ¶
func NewSwitchCaseConfig() SwitchCaseConfig
NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.
func (*SwitchCaseConfig) UnmarshalJSON ¶
func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.
func (*SwitchCaseConfig) UnmarshalYAML ¶
func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type SwitchConfig ¶
type SwitchConfig []SwitchCaseConfig
SwitchConfig is a config struct containing fields for the Switch processor.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig returns a default SwitchConfig.
type V1 ¶
type V1 interface { // ProcessMessage attempts to process a message. This method returns both a // slice of messages or a response indicating whether messages were dropped // due to an intermittent error or were intentionally filtered. // // If an error occurs due to the contents of a message being invalid and you // wish to expose this as a recoverable fault you can use FlagErr to flag a // message as having failed without dropping it. // // More information about this form of error handling can be found at: // https://www.benthos.dev/docs/configuration/error_handling ProcessMessage(*message.Batch) ([]*message.Batch, error) // CloseAsync triggers the shut down of this component but should not block // the calling goroutine. CloseAsync() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(timeout time.Duration) error }
V1 is a common interface implemented by processors.
func NewV2BatchedToV1Processor ¶
func NewV2BatchedToV1Processor(typeStr string, p V2Batched, mgr component.Observability) V1
NewV2BatchedToV1Processor wraps a processor.V2Batched with a struct that implements types.Processor.
func NewV2ToV1Processor ¶
func NewV2ToV1Processor(typeStr string, p V2, mgr component.Observability) V1
NewV2ToV1Processor wraps a processor.V2 with a struct that implements V1.
type V2 ¶
type V2 interface { // Process a message into one or more resulting messages, or return an error // if the message could not be processed. If zero messages are returned and // the error is nil then the message is filtered. Process(ctx context.Context, p *message.Part) ([]*message.Part, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
V2 is a simpler interface to implement than V1.
type V2Batched ¶
type V2Batched interface { // Process a batch of messages into one or more resulting batches, or return // an error if the entire batch could not be processed. If zero messages are // returned and the error is nil then all messages are filtered. ProcessBatch(ctx context.Context, spans []*tracing.Span, b *message.Batch) ([]*message.Batch, error) // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
V2Batched is a simpler interface to implement than V1 and allows batch-wide processing.
type WhileConfig ¶
type WhileConfig struct { AtLeastOnce bool `json:"at_least_once" yaml:"at_least_once"` MaxLoops int `json:"max_loops" yaml:"max_loops"` Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
WhileConfig is a config struct containing fields for the While processor.
func NewWhileConfig ¶
func NewWhileConfig() WhileConfig
NewWhileConfig returns a default WhileConfig.
type WorkflowConfig ¶
type WorkflowConfig struct { MetaPath string `json:"meta_path" yaml:"meta_path"` Order [][]string `json:"order" yaml:"order"` BranchResources []string `json:"branch_resources" yaml:"branch_resources"` Branches map[string]BranchConfig `json:"branches" yaml:"branches"` }
WorkflowConfig is a config struct containing fields for the Workflow processor.
func NewWorkflowConfig ¶
func NewWorkflowConfig() WorkflowConfig
NewWorkflowConfig returns a default WorkflowConfig.
Source Files ¶
- config.go
- config_avro.go
- config_awk.go
- config_bounds_check.go
- config_branch.go
- config_cache.go
- config_compress.go
- config_decompress.go
- config_dedupe.go
- config_grok.go
- config_group_by.go
- config_group_by_value.go
- config_http.go
- config_insert_part.go
- config_jmespath.go
- config_jq.go
- config_jsonschema.go
- config_log.go
- config_metric.go
- config_mongodb.go
- config_parallel.go
- config_parse_log.go
- config_protobuf.go
- config_rate_limit.go
- config_select_parts.go
- config_sleep.go
- config_split.go
- config_subprocess.go
- config_switch.go
- config_while.go
- config_workflow.go
- config_xml.go
- error.go
- execute.go
- interface.go
- processor_v2.go