processor

package
v0.14.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2018 License: MIT Imports: 24 Imported by: 11

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig added in v0.8.4

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive added in v0.7.7

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message as a chosen archive type.

func (*Archive) ProcessMessage added in v0.7.7

func (d *Archive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to archive the parts of the message, and returns the result as a single part message.

type ArchiveConfig added in v0.7.7

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains any configuration for the Archive processor.

func NewArchiveConfig added in v0.7.7

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type Batch added in v0.13.0

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a processor that combines messages into a batch until a size limit is reached, at which point the batch is sent out.

func (*Batch) ProcessMessage added in v0.13.0

func (c *Batch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it reaches a size limit, at which point it batches those messages into one multiple part message which is sent on.

type BatchConfig added in v0.13.0

type BatchConfig struct {
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

BatchConfig contains configuration for the Batch processor.

func NewBatchConfig added in v0.13.0

func NewBatchConfig() BatchConfig

NewBatchConfig returns a BatchConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine added in v0.3.1

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage added in v0.3.1

func (c *Combine) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig added in v0.3.1

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig added in v0.3.1

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Compress added in v0.7.7

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) ProcessMessage added in v0.7.7

func (c *Compress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to compress parts of the message and returns the result.

type CompressConfig added in v0.7.7

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains any configuration for the Compress processor.

func NewCompressConfig added in v0.7.7

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Conditional added in v0.13.0

type Conditional struct {
	// contains filtered or unexported fields
}

Conditional is a processor that only applies child processors under a certain condition.

func (*Conditional) ProcessMessage added in v0.13.0

func (c *Conditional) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage does nothing and returns the message unchanged.

type ConditionalConfig added in v0.13.0

type ConditionalConfig struct {
	Condition      condition.Config `json:"condition" yaml:"condition"`
	Processors     []Config         `json:"processors" yaml:"processors"`
	ElseProcessors []Config         `json:"else_processors" yaml:"else_processors"`
}

ConditionalConfig is a config struct containing fields for the Conditional processor.

func NewConditionalConfig added in v0.13.0

func NewConditionalConfig() ConditionalConfig

NewConditionalConfig returns a default ConditionalConfig.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	Archive     ArchiveConfig     `json:"archive" yaml:"archive"`
	Batch       BatchConfig       `json:"batch" yaml:"batch"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
	Compress    CompressConfig    `json:"compress" yaml:"compress"`
	Conditional ConditionalConfig `json:"conditional" yaml:"conditional"`
	Decompress  DecompressConfig  `json:"decompress" yaml:"decompress"`
	Dedupe      DedupeConfig      `json:"dedupe" yaml:"dedupe"`
	Filter      FilterConfig      `json:"filter" yaml:"filter"`
	Grok        GrokConfig        `json:"grok" yaml:"grok"`
	HashSample  HashSampleConfig  `json:"hash_sample" yaml:"hash_sample"`
	InsertPart  InsertPartConfig  `json:"insert_part" yaml:"insert_part"`
	JMESPath    JMESPathConfig    `json:"jmespath" yaml:"jmespath"`
	JSON        JSONConfig        `json:"json" yaml:"json"`
	MergeJSON   MergeJSONConfig   `json:"merge_json" yaml:"merge_json"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	Split       struct{}          `json:"split" yaml:"split"`
	Unarchive   UnarchiveConfig   `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decompress added in v0.7.4

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.

func (*Decompress) ProcessMessage added in v0.7.4

func (d *Decompress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.

type DecompressConfig added in v0.7.4

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains any configuration for the Decompress processor.

func NewDecompressConfig added in v0.7.4

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type Dedupe added in v0.9.7

type Dedupe struct {
	// contains filtered or unexported fields
}

Dedupe is a processor that hashes each message and checks if the has is already present in the cache

func (*Dedupe) ProcessMessage added in v0.9.7

func (d *Dedupe) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type DedupeConfig added in v0.9.7

type DedupeConfig struct {
	Cache          string   `json:"cache" yaml:"cache"`
	HashType       string   `json:"hash" yaml:"hash"`
	Parts          []int    `json:"parts" yaml:"parts"` // message parts to hash
	JSONPaths      []string `json:"json_paths" yaml:"json_paths"`
	DropOnCacheErr bool     `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains any configuration for the Dedupe processor.

func NewDedupeConfig added in v0.9.7

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type Filter added in v0.13.0

type Filter struct {
	// contains filtered or unexported fields
}

Filter is a processor that checks each message against a condition and rejects when the condition returns false.

func (*Filter) ProcessMessage added in v0.13.0

func (c *Filter) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type FilterConfig added in v0.13.0

type FilterConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterConfig contains configuration fields for the Filter processor.

func NewFilterConfig added in v0.13.0

func NewFilterConfig() FilterConfig

NewFilterConfig returns a FilterConfig with default values.

type Grok added in v0.13.5

type Grok struct {
	// contains filtered or unexported fields
}

Grok is a processor that executes Grok queries on a message part and replaces the contents with the result.

func (*Grok) ProcessMessage added in v0.13.5

func (g *Grok) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage parses message parts as grok patterns.

type GrokConfig added in v0.13.5

type GrokConfig struct {
	Parts       []int    `json:"parts" yaml:"parts"`
	Patterns    []string `json:"patterns" yaml:"patterns"`
	RemoveEmpty bool     `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly   bool     `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults bool     `json:"use_default_patterns" yaml:"use_default_patterns"`
	To          string   `json:"output_format" yaml:"output_format"`
}

GrokConfig contains any configuration for the Grok processor.

func NewGrokConfig added in v0.13.5

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type HashSample added in v0.6.11

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*HashSample) ProcessMessage added in v0.6.11

func (s *HashSample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type HashSampleConfig added in v0.6.11

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains any configuration for the HashSample processor.

func NewHashSampleConfig added in v0.6.11

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart added in v0.6.19

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage added in v0.6.19

func (p *InsertPart) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type InsertPartConfig added in v0.6.19

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains any configuration for the InsertPart processor.

func NewInsertPartConfig added in v0.6.19

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPath added in v0.9.8

type JMESPath struct {
	// contains filtered or unexported fields
}

JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.

func (*JMESPath) ProcessMessage added in v0.9.8

func (p *JMESPath) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type JMESPathConfig added in v0.9.8

type JMESPathConfig struct {
	Parts []int  `json:"parts" yaml:"parts"`
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains any configuration for the JMESPath processor.

func NewJMESPathConfig added in v0.9.8

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JSON added in v0.14.0

type JSON struct {
	// contains filtered or unexported fields
}

JSON is a processor that performs an operation on a JSON payload.

func (*JSON) ProcessMessage added in v0.14.0

func (p *JSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type JSONConfig added in v0.14.0

type JSONConfig struct {
	Parts    []int        `json:"parts" yaml:"parts"`
	Operator string       `json:"operator" yaml:"operator"`
	Path     string       `json:"path" yaml:"path"`
	Value    rawJSONValue `json:"value" yaml:"value"`
}

JSONConfig contains any configuration for the JSON processor.

func NewJSONConfig added in v0.14.0

func NewJSONConfig() JSONConfig

NewJSONConfig returns a JSONConfig with default values.

type MergeJSON added in v0.11.4

type MergeJSON struct {
	// contains filtered or unexported fields
}

MergeJSON is a processor that merges JSON parsed message parts into a single value.

func (*MergeJSON) ProcessMessage added in v0.11.4

func (p *MergeJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, returning one or more resulting messages or a response.

type MergeJSONConfig added in v0.11.4

type MergeJSONConfig struct {
	Parts       []int `json:"parts" yaml:"parts"`
	RetainParts bool  `json:"retain_parts" yaml:"retain_parts"`
}

MergeJSONConfig contains any configuration for the MergeJSON processor.

func NewMergeJSONConfig added in v0.11.4

func NewMergeJSONConfig() MergeJSONConfig

NewMergeJSONConfig returns a MergeJSONConfig with default values.

type Noop added in v0.6.5

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage added in v0.6.5

func (c *Noop) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

type Sample added in v0.3.2

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage added in v0.3.2

func (s *Sample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type SampleConfig added in v0.3.2

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig added in v0.3.2

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts added in v0.4.1

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage added in v0.4.1

func (m *SelectParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig added in v0.4.1

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig added in v0.4.1

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type Split added in v0.7.6

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) ProcessMessage added in v0.7.6

func (s *Split) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and returns a slice of messages, containing a message per part.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg types.Message) ([]types.Message, types.Response)
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.

func New

func New(
	conf Config,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive added in v0.7.7

func NewArchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewArchive returns a Archive processor.

func NewBatch added in v0.13.0

func NewBatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBatch returns a Batch processor.

func NewBoundsCheck

func NewBoundsCheck(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine added in v0.3.1

func NewCombine(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCombine returns a Combine processor.

func NewCompress added in v0.7.7

func NewCompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCompress returns a Compress processor.

func NewConditional added in v0.13.0

func NewConditional(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewConditional returns a Conditional processor.

func NewDecompress added in v0.7.4

func NewDecompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecompress returns a Decompress processor.

func NewDedupe added in v0.9.7

func NewDedupe(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDedupe returns a Dedupe processor.

func NewFilter added in v0.13.0

func NewFilter(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilter returns a Filter processor.

func NewGrok added in v0.13.5

func NewGrok(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGrok returns a Grok processor.

func NewHashSample added in v0.6.11

func NewHashSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart added in v0.6.19

func NewInsertPart(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewJMESPath added in v0.9.8

func NewJMESPath(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJMESPath returns a JMESPath processor.

func NewJSON added in v0.14.0

func NewJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJSON returns a JSON processor.

func NewMergeJSON added in v0.11.4

func NewMergeJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMergeJSON returns a MergeJSON processor.

func NewNoop added in v0.6.5

func NewNoop(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewNoop returns a Noop processor.

func NewSample added in v0.3.2

func NewSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts added in v0.4.1

func NewSelectParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSplit added in v0.7.6

func NewSplit(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSplit returns a Split processor.

func NewUnarchive added in v0.7.4

func NewUnarchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewUnarchive returns a Unarchive processor.

type TypeSpec added in v0.9.1

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type Unarchive added in v0.7.4

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.

func (*Unarchive) ProcessMessage added in v0.7.4

func (d *Unarchive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to unarchive parts of the message, and returns the result.

type UnarchiveConfig added in v0.7.4

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains any configuration for the Unarchive processor.

func NewUnarchiveConfig added in v0.7.4

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Directories

Path Synopsis
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL