Documentation ¶
Overview ¶
Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.
Index ¶
- Variables
- func Descriptions() string
- func SanitiseConfig(conf Config) (interface{}, error)
- type Archive
- type ArchiveConfig
- type Batch
- type BatchConfig
- type BoundsCheck
- type BoundsCheckConfig
- type Combine
- type CombineConfig
- type Compress
- type CompressConfig
- type Conditional
- type ConditionalConfig
- type Config
- type Decompress
- type DecompressConfig
- type Dedupe
- type DedupeConfig
- type Filter
- type FilterConfig
- type FilterParts
- type FilterPartsConfig
- type Grok
- type GrokConfig
- type HTTP
- type HTTPConfig
- type HashSample
- type HashSampleConfig
- type InsertPart
- type InsertPartConfig
- type JMESPath
- type JMESPathConfig
- type JSON
- type JSONConfig
- type MergeJSON
- type MergeJSONConfig
- type Noop
- type Sample
- type SampleConfig
- type SelectParts
- type SelectPartsConfig
- type Split
- type Type
- func New(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewArchive(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBatch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBoundsCheck(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewCombine(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewCompress(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewConditional(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDecompress(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDedupe(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFilter(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFilterParts(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewGrok(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHTTP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHashSample(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewInsertPart(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewJMESPath(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewJSON(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewMergeJSON(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNoop(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSample(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSelectParts(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSplit(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewUnarchive(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- type TypeSpec
- type Unarchive
- type UnarchiveConfig
Constants ¶
This section is empty.
Variables ¶
var Constructors = map[string]TypeSpec{}
Constructors is a map of all processor types with their specs.
Functions ¶
func Descriptions ¶
func Descriptions() string
Descriptions returns a formatted string of collated descriptions of each type.
func SanitiseConfig ¶ added in v0.8.4
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
Types ¶
type Archive ¶ added in v0.7.7
type Archive struct {
// contains filtered or unexported fields
}
Archive is a processor that can selectively archive parts of a message as a chosen archive type.
type ArchiveConfig ¶ added in v0.7.7
type ArchiveConfig struct { Format string `json:"format" yaml:"format"` Path string `json:"path" yaml:"path"` }
ArchiveConfig contains any configuration for the Archive processor.
func NewArchiveConfig ¶ added in v0.7.7
func NewArchiveConfig() ArchiveConfig
NewArchiveConfig returns a ArchiveConfig with default values.
type Batch ¶ added in v0.13.0
type Batch struct {
// contains filtered or unexported fields
}
Batch is a processor that combines messages into a batch until a size limit is reached, at which point the batch is sent out.
func (*Batch) ProcessMessage ¶ added in v0.13.0
ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it reaches a size limit, at which point it batches those messages into one multiple part message which is sent on.
type BatchConfig ¶ added in v0.13.0
type BatchConfig struct {
ByteSize int `json:"byte_size" yaml:"byte_size"`
}
BatchConfig contains configuration for the Batch processor.
func NewBatchConfig ¶ added in v0.13.0
func NewBatchConfig() BatchConfig
NewBatchConfig returns a BatchConfig with default values.
type BoundsCheck ¶
type BoundsCheck struct {
// contains filtered or unexported fields
}
BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
func (*BoundsCheck) ProcessMessage ¶
ProcessMessage checks each message against a set of bounds.
type BoundsCheckConfig ¶
type BoundsCheckConfig struct { MaxParts int `json:"max_parts" yaml:"max_parts"` MinParts int `json:"min_parts" yaml:"min_parts"` MaxPartSize int `json:"max_part_size" yaml:"max_part_size"` MinPartSize int `json:"min_part_size" yaml:"min_part_size"` }
BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.
func NewBoundsCheckConfig ¶
func NewBoundsCheckConfig() BoundsCheckConfig
NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
type Combine ¶ added in v0.3.1
type Combine struct {
// contains filtered or unexported fields
}
Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.
func (*Combine) ProcessMessage ¶ added in v0.3.1
ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.
type CombineConfig ¶ added in v0.3.1
type CombineConfig struct {
Parts int `json:"parts" yaml:"parts"`
}
CombineConfig contains configuration for the Combine processor.
func NewCombineConfig ¶ added in v0.3.1
func NewCombineConfig() CombineConfig
NewCombineConfig returns a CombineConfig with default values.
type Compress ¶ added in v0.7.7
type Compress struct {
// contains filtered or unexported fields
}
Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.
type CompressConfig ¶ added in v0.7.7
type CompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Level int `json:"level" yaml:"level"` Parts []int `json:"parts" yaml:"parts"` }
CompressConfig contains any configuration for the Compress processor.
func NewCompressConfig ¶ added in v0.7.7
func NewCompressConfig() CompressConfig
NewCompressConfig returns a CompressConfig with default values.
type Conditional ¶ added in v0.13.0
type Conditional struct {
// contains filtered or unexported fields
}
Conditional is a processor that only applies child processors under a certain condition.
func (*Conditional) ProcessMessage ¶ added in v0.13.0
ProcessMessage does nothing and returns the message unchanged.
type ConditionalConfig ¶ added in v0.13.0
type ConditionalConfig struct { Condition condition.Config `json:"condition" yaml:"condition"` Processors []Config `json:"processors" yaml:"processors"` ElseProcessors []Config `json:"else_processors" yaml:"else_processors"` }
ConditionalConfig is a config struct containing fields for the Conditional processor.
func NewConditionalConfig ¶ added in v0.13.0
func NewConditionalConfig() ConditionalConfig
NewConditionalConfig returns a default ConditionalConfig.
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` Archive ArchiveConfig `json:"archive" yaml:"archive"` Batch BatchConfig `json:"batch" yaml:"batch"` BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"` Combine CombineConfig `json:"combine" yaml:"combine"` Compress CompressConfig `json:"compress" yaml:"compress"` Conditional ConditionalConfig `json:"conditional" yaml:"conditional"` Decompress DecompressConfig `json:"decompress" yaml:"decompress"` Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"` Filter FilterConfig `json:"filter" yaml:"filter"` FilterParts FilterPartsConfig `json:"filter_parts" yaml:"filter_parts"` Grok GrokConfig `json:"grok" yaml:"grok"` HashSample HashSampleConfig `json:"hash_sample" yaml:"hash_sample"` HTTP HTTPConfig `json:"http" yaml:"http"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` JSON JSONConfig `json:"json" yaml:"json"` MergeJSON MergeJSONConfig `json:"merge_json" yaml:"merge_json"` Sample SampleConfig `json:"sample" yaml:"sample"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` Split struct{} `json:"split" yaml:"split"` Unarchive UnarchiveConfig `json:"unarchive" yaml:"unarchive"` }
Config is the all encompassing configuration struct for all processor types.
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values.
func (*Config) UnmarshalJSON ¶
UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.
type Decompress ¶ added in v0.7.4
type Decompress struct {
// contains filtered or unexported fields
}
Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.
func (*Decompress) ProcessMessage ¶ added in v0.7.4
ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.
type DecompressConfig ¶ added in v0.7.4
type DecompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Parts []int `json:"parts" yaml:"parts"` }
DecompressConfig contains any configuration for the Decompress processor.
func NewDecompressConfig ¶ added in v0.7.4
func NewDecompressConfig() DecompressConfig
NewDecompressConfig returns a DecompressConfig with default values.
type Dedupe ¶ added in v0.9.7
type Dedupe struct {
// contains filtered or unexported fields
}
Dedupe is a processor that hashes each message and checks if the has is already present in the cache
type DedupeConfig ¶ added in v0.9.7
type DedupeConfig struct { Cache string `json:"cache" yaml:"cache"` HashType string `json:"hash" yaml:"hash"` Parts []int `json:"parts" yaml:"parts"` // message parts to hash JSONPaths []string `json:"json_paths" yaml:"json_paths"` DropOnCacheErr bool `json:"drop_on_err" yaml:"drop_on_err"` }
DedupeConfig contains any configuration for the Dedupe processor.
func NewDedupeConfig ¶ added in v0.9.7
func NewDedupeConfig() DedupeConfig
NewDedupeConfig returns a DedupeConfig with default values.
type Filter ¶ added in v0.13.0
type Filter struct {
// contains filtered or unexported fields
}
Filter is a processor that checks each message against a condition and rejects when the condition returns false.
type FilterConfig ¶ added in v0.13.0
FilterConfig contains configuration fields for the Filter processor.
func NewFilterConfig ¶ added in v0.13.0
func NewFilterConfig() FilterConfig
NewFilterConfig returns a FilterConfig with default values.
type FilterParts ¶ added in v0.14.7
type FilterParts struct {
// contains filtered or unexported fields
}
FilterParts is a processor that checks each message against a condition and rejects when the condition returns false.
func (*FilterParts) ProcessMessage ¶ added in v0.14.7
ProcessMessage checks each message against a set of bounds.
type FilterPartsConfig ¶ added in v0.14.7
FilterPartsConfig contains configuration fields for the FilterParts processor.
func NewFilterPartsConfig ¶ added in v0.14.7
func NewFilterPartsConfig() FilterPartsConfig
NewFilterPartsConfig returns a FilterPartsConfig with default values.
type Grok ¶ added in v0.13.5
type Grok struct {
// contains filtered or unexported fields
}
Grok is a processor that executes Grok queries on a message part and replaces the contents with the result.
type GrokConfig ¶ added in v0.13.5
type GrokConfig struct { Parts []int `json:"parts" yaml:"parts"` Patterns []string `json:"patterns" yaml:"patterns"` RemoveEmpty bool `json:"remove_empty_values" yaml:"remove_empty_values"` NamedOnly bool `json:"named_captures_only" yaml:"named_captures_only"` UseDefaults bool `json:"use_default_patterns" yaml:"use_default_patterns"` To string `json:"output_format" yaml:"output_format"` }
GrokConfig contains any configuration for the Grok processor.
func NewGrokConfig ¶ added in v0.13.5
func NewGrokConfig() GrokConfig
NewGrokConfig returns a GrokConfig with default values.
type HTTP ¶ added in v0.15.4
type HTTP struct {
// contains filtered or unexported fields
}
HTTP is a processor that executes HTTP queries on a message part and replaces the contents with the result.
type HTTPConfig ¶ added in v0.15.4
type HTTPConfig struct { Parts []int `json:"parts" yaml:"parts"` Client client.Config `json:"request" yaml:"request"` RequestMap map[string]string `json:"request_map" yaml:"request_map"` ResponseMap map[string]string `json:"response_map" yaml:"response_map"` StrictReqMapping bool `json:"strict_request_map" yaml:"strict_request_map"` }
HTTPConfig contains any configuration for the HTTP processor.
func NewHTTPConfig ¶ added in v0.15.4
func NewHTTPConfig() HTTPConfig
NewHTTPConfig returns a HTTPConfig with default values.
type HashSample ¶ added in v0.6.11
type HashSample struct {
// contains filtered or unexported fields
}
HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
func (*HashSample) ProcessMessage ¶ added in v0.6.11
ProcessMessage checks each message against a set of bounds.
type HashSampleConfig ¶ added in v0.6.11
type HashSampleConfig struct { RetainMin float64 `json:"retain_min" yaml:"retain_min"` RetainMax float64 `json:"retain_max" yaml:"retain_max"` Parts []int `json:"parts" yaml:"parts"` // message parts to hash }
HashSampleConfig contains any configuration for the HashSample processor.
func NewHashSampleConfig ¶ added in v0.6.11
func NewHashSampleConfig() HashSampleConfig
NewHashSampleConfig returns a HashSampleConfig with default values.
type InsertPart ¶ added in v0.6.19
type InsertPart struct {
// contains filtered or unexported fields
}
InsertPart is a processor that inserts a new message part at a specific index.
func (*InsertPart) ProcessMessage ¶ added in v0.6.19
ProcessMessage prepends a new message part to the message.
type InsertPartConfig ¶ added in v0.6.19
type InsertPartConfig struct { Index int `json:"index" yaml:"index"` Content string `json:"content" yaml:"content"` }
InsertPartConfig contains any configuration for the InsertPart processor.
func NewInsertPartConfig ¶ added in v0.6.19
func NewInsertPartConfig() InsertPartConfig
NewInsertPartConfig returns a InsertPartConfig with default values.
type JMESPath ¶ added in v0.9.8
type JMESPath struct {
// contains filtered or unexported fields
}
JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.
type JMESPathConfig ¶ added in v0.9.8
type JMESPathConfig struct { Parts []int `json:"parts" yaml:"parts"` Query string `json:"query" yaml:"query"` }
JMESPathConfig contains any configuration for the JMESPath processor.
func NewJMESPathConfig ¶ added in v0.9.8
func NewJMESPathConfig() JMESPathConfig
NewJMESPathConfig returns a JMESPathConfig with default values.
type JSON ¶ added in v0.14.0
type JSON struct {
// contains filtered or unexported fields
}
JSON is a processor that performs an operation on a JSON payload.
type JSONConfig ¶ added in v0.14.0
type JSONConfig struct { Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Path string `json:"path" yaml:"path"` Value rawJSONValue `json:"value" yaml:"value"` }
JSONConfig contains any configuration for the JSON processor.
func NewJSONConfig ¶ added in v0.14.0
func NewJSONConfig() JSONConfig
NewJSONConfig returns a JSONConfig with default values.
type MergeJSON ¶ added in v0.11.4
type MergeJSON struct {
// contains filtered or unexported fields
}
MergeJSON is a processor that merges JSON parsed message parts into a single value.
type MergeJSONConfig ¶ added in v0.11.4
type MergeJSONConfig struct { Parts []int `json:"parts" yaml:"parts"` RetainParts bool `json:"retain_parts" yaml:"retain_parts"` }
MergeJSONConfig contains any configuration for the MergeJSON processor.
func NewMergeJSONConfig ¶ added in v0.11.4
func NewMergeJSONConfig() MergeJSONConfig
NewMergeJSONConfig returns a MergeJSONConfig with default values.
type Sample ¶ added in v0.3.2
type Sample struct {
// contains filtered or unexported fields
}
Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
type SampleConfig ¶ added in v0.3.2
type SampleConfig struct { Retain float64 `json:"retain" yaml:"retain"` RandomSeed int64 `json:"seed" yaml:"seed"` }
SampleConfig contains any configuration for the Sample processor.
func NewSampleConfig ¶ added in v0.3.2
func NewSampleConfig() SampleConfig
NewSampleConfig returns a SampleConfig with default values.
type SelectParts ¶ added in v0.4.1
type SelectParts struct {
// contains filtered or unexported fields
}
SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
func (*SelectParts) ProcessMessage ¶ added in v0.4.1
ProcessMessage extracts a set of parts from each message.
type SelectPartsConfig ¶ added in v0.4.1
type SelectPartsConfig struct {
Parts []int `json:"parts" yaml:"parts"`
}
SelectPartsConfig contains any configuration for the SelectParts processor.
func NewSelectPartsConfig ¶ added in v0.4.1
func NewSelectPartsConfig() SelectPartsConfig
NewSelectPartsConfig returns a SelectPartsConfig with default values.
type Split ¶ added in v0.7.6
type Split struct {
// contains filtered or unexported fields
}
Split is a processor that splits messages into a message per part.
type Type ¶
type Type interface { // ProcessMessage attempts to process a message. Since processing can fail // this call returns both a slice of messages in case of success or a // response in case of failure. If the slice of messages is empty the // response should be returned to the source. ProcessMessage(msg types.Message) ([]types.Message, types.Response) }
Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.
func NewArchive ¶ added in v0.7.7
func NewArchive( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewArchive returns a Archive processor.
func NewBoundsCheck ¶
func NewBoundsCheck( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewBoundsCheck returns a BoundsCheck processor.
func NewCombine ¶ added in v0.3.1
func NewCombine( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewCombine returns a Combine processor.
func NewCompress ¶ added in v0.7.7
func NewCompress( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewCompress returns a Compress processor.
func NewConditional ¶ added in v0.13.0
func NewConditional( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewConditional returns a Conditional processor.
func NewDecompress ¶ added in v0.7.4
func NewDecompress( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDecompress returns a Decompress processor.
func NewDedupe ¶ added in v0.9.7
func NewDedupe( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDedupe returns a Dedupe processor.
func NewFilter ¶ added in v0.13.0
func NewFilter( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewFilter returns a Filter processor.
func NewFilterParts ¶ added in v0.14.7
func NewFilterParts( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewFilterParts returns a FilterParts processor.
func NewHashSample ¶ added in v0.6.11
func NewHashSample( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewHashSample returns a HashSample processor.
func NewInsertPart ¶ added in v0.6.19
func NewInsertPart( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewInsertPart returns a InsertPart processor.
func NewJMESPath ¶ added in v0.9.8
func NewJMESPath( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewJMESPath returns a JMESPath processor.
func NewMergeJSON ¶ added in v0.11.4
func NewMergeJSON( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewMergeJSON returns a MergeJSON processor.
func NewSample ¶ added in v0.3.2
func NewSample( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewSample returns a Sample processor.
func NewSelectParts ¶ added in v0.4.1
func NewSelectParts( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewSelectParts returns a SelectParts processor.
type TypeSpec ¶ added in v0.9.1
type TypeSpec struct {
// contains filtered or unexported fields
}
TypeSpec Constructor and a usage description for each processor type.
type Unarchive ¶ added in v0.7.4
type Unarchive struct {
// contains filtered or unexported fields
}
Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.
type UnarchiveConfig ¶ added in v0.7.4
type UnarchiveConfig struct { Format string `json:"format" yaml:"format"` Parts []int `json:"parts" yaml:"parts"` }
UnarchiveConfig contains any configuration for the Unarchive processor.
func NewUnarchiveConfig ¶ added in v0.7.4
func NewUnarchiveConfig() UnarchiveConfig
NewUnarchiveConfig returns a UnarchiveConfig with default values.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
|
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances. |