processor

package
v0.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2018 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message as a chosen archive type.

func (*Archive) ProcessMessage

func (d *Archive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to archive the parts of the message, and returns the result as a single part message.

type ArchiveConfig

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains any configuration for the Archive processor.

func NewArchiveConfig

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a processor that combines messages into a batch until a size limit is reached, at which point the batch is sent out.

func (*Batch) ProcessMessage

func (c *Batch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it reaches a size limit, at which point it batches those messages into one multiple part message which is sent on.

type BatchConfig

type BatchConfig struct {
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

BatchConfig contains configuration for the Batch processor.

func NewBatchConfig

func NewBatchConfig() BatchConfig

NewBatchConfig returns a BatchConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage

func (c *Combine) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Compress

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) ProcessMessage

func (c *Compress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to compress parts of the message and returns the result.

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains any configuration for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Conditional

type Conditional struct {
	// contains filtered or unexported fields
}

Conditional is a processor that only applies child processors under a certain condition.

func (*Conditional) ProcessMessage

func (c *Conditional) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage does nothing and returns the message unchanged.

type ConditionalConfig

type ConditionalConfig struct {
	Condition      condition.Config `json:"condition" yaml:"condition"`
	Processors     []Config         `json:"processors" yaml:"processors"`
	ElseProcessors []Config         `json:"else_processors" yaml:"else_processors"`
}

ConditionalConfig is a config struct containing fields for the Conditional processor.

func NewConditionalConfig

func NewConditionalConfig() ConditionalConfig

NewConditionalConfig returns a default ConditionalConfig.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	Archive     ArchiveConfig     `json:"archive" yaml:"archive"`
	Batch       BatchConfig       `json:"batch" yaml:"batch"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
	Compress    CompressConfig    `json:"compress" yaml:"compress"`
	Conditional ConditionalConfig `json:"conditional" yaml:"conditional"`
	Decompress  DecompressConfig  `json:"decompress" yaml:"decompress"`
	Dedupe      DedupeConfig      `json:"dedupe" yaml:"dedupe"`
	Filter      FilterConfig      `json:"filter" yaml:"filter"`
	Grok        GrokConfig        `json:"grok" yaml:"grok"`
	HashSample  HashSampleConfig  `json:"hash_sample" yaml:"hash_sample"`
	InsertPart  InsertPartConfig  `json:"insert_part" yaml:"insert_part"`
	JMESPath    JMESPathConfig    `json:"jmespath" yaml:"jmespath"`
	JSON        JSONConfig        `json:"json" yaml:"json"`
	MergeJSON   MergeJSONConfig   `json:"merge_json" yaml:"merge_json"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	Split       struct{}          `json:"split" yaml:"split"`
	Unarchive   UnarchiveConfig   `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decompress

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.

func (*Decompress) ProcessMessage

func (d *Decompress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains any configuration for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type Dedupe

type Dedupe struct {
	// contains filtered or unexported fields
}

Dedupe is a processor that hashes each message and checks if the has is already present in the cache

func (*Dedupe) ProcessMessage

func (d *Dedupe) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type DedupeConfig

type DedupeConfig struct {
	Cache          string   `json:"cache" yaml:"cache"`
	HashType       string   `json:"hash" yaml:"hash"`
	Parts          []int    `json:"parts" yaml:"parts"` // message parts to hash
	JSONPaths      []string `json:"json_paths" yaml:"json_paths"`
	DropOnCacheErr bool     `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains any configuration for the Dedupe processor.

func NewDedupeConfig

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter is a processor that checks each message against a condition and rejects when the condition returns false.

func (*Filter) ProcessMessage

func (c *Filter) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type FilterConfig

type FilterConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterConfig contains configuration fields for the Filter processor.

func NewFilterConfig

func NewFilterConfig() FilterConfig

NewFilterConfig returns a FilterConfig with default values.

type Grok

type Grok struct {
	// contains filtered or unexported fields
}

Grok is a processor that executes Grok queries on a message part and replaces the contents with the result.

func (*Grok) ProcessMessage

func (g *Grok) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage parses message parts as grok patterns.

type GrokConfig

type GrokConfig struct {
	Parts       []int    `json:"parts" yaml:"parts"`
	Patterns    []string `json:"patterns" yaml:"patterns"`
	RemoveEmpty bool     `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly   bool     `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults bool     `json:"use_default_patterns" yaml:"use_default_patterns"`
	To          string   `json:"output_format" yaml:"output_format"`
}

GrokConfig contains any configuration for the Grok processor.

func NewGrokConfig

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type HashSample

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*HashSample) ProcessMessage

func (s *HashSample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type HashSampleConfig

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains any configuration for the HashSample processor.

func NewHashSampleConfig

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage

func (p *InsertPart) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains any configuration for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPath

type JMESPath struct {
	// contains filtered or unexported fields
}

JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.

func (*JMESPath) ProcessMessage

func (p *JMESPath) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type JMESPathConfig

type JMESPathConfig struct {
	Parts []int  `json:"parts" yaml:"parts"`
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains any configuration for the JMESPath processor.

func NewJMESPathConfig

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JSON

type JSON struct {
	// contains filtered or unexported fields
}

JSON is a processor that performs an operation on a JSON payload.

func (*JSON) ProcessMessage

func (p *JSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type JSONConfig

type JSONConfig struct {
	Parts    []int        `json:"parts" yaml:"parts"`
	Operator string       `json:"operator" yaml:"operator"`
	Path     string       `json:"path" yaml:"path"`
	Value    rawJSONValue `json:"value" yaml:"value"`
}

JSONConfig contains any configuration for the JSON processor.

func NewJSONConfig

func NewJSONConfig() JSONConfig

NewJSONConfig returns a JSONConfig with default values.

type MergeJSON

type MergeJSON struct {
	// contains filtered or unexported fields
}

MergeJSON is a processor that merges JSON parsed message parts into a single value.

func (*MergeJSON) ProcessMessage

func (p *MergeJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, returning one or more resulting messages or a response.

type MergeJSONConfig

type MergeJSONConfig struct {
	Parts       []int `json:"parts" yaml:"parts"`
	RetainParts bool  `json:"retain_parts" yaml:"retain_parts"`
}

MergeJSONConfig contains any configuration for the MergeJSON processor.

func NewMergeJSONConfig

func NewMergeJSONConfig() MergeJSONConfig

NewMergeJSONConfig returns a MergeJSONConfig with default values.

type Noop

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage

func (c *Noop) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

type Sample

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage

func (s *Sample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type SampleConfig

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage

func (m *SelectParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type Split

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) ProcessMessage

func (s *Split) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and returns a slice of messages, containing a message per part.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg types.Message) ([]types.Message, types.Response)
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.

func New

func New(
	conf Config,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive

func NewArchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewArchive returns a Archive processor.

func NewBatch

func NewBatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBatch returns a Batch processor.

func NewBoundsCheck

func NewBoundsCheck(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine

func NewCombine(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCombine returns a Combine processor.

func NewCompress

func NewCompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCompress returns a Compress processor.

func NewConditional

func NewConditional(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewConditional returns a Conditional processor.

func NewDecompress

func NewDecompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecompress returns a Decompress processor.

func NewDedupe

func NewDedupe(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDedupe returns a Dedupe processor.

func NewFilter

func NewFilter(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilter returns a Filter processor.

func NewGrok

func NewGrok(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGrok returns a Grok processor.

func NewHashSample

func NewHashSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart

func NewInsertPart(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewJMESPath

func NewJMESPath(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJMESPath returns a JMESPath processor.

func NewJSON

func NewJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJSON returns a JSON processor.

func NewMergeJSON

func NewMergeJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMergeJSON returns a MergeJSON processor.

func NewNoop

func NewNoop(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewNoop returns a Noop processor.

func NewSample

func NewSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts

func NewSelectParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSplit

func NewSplit(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSplit returns a Split processor.

func NewUnarchive

func NewUnarchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewUnarchive returns a Unarchive processor.

type TypeSpec

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type Unarchive

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.

func (*Unarchive) ProcessMessage

func (d *Unarchive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to unarchive parts of the message, and returns the result.

type UnarchiveConfig

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains any configuration for the Unarchive processor.

func NewUnarchiveConfig

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Directories

Path Synopsis
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL