processor

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2018 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

View Source
var (
	ErrEmptyTargetPath = errors.New("target path is empty")
)

Errors for the SelectJSON type.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig added in v0.8.4

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive added in v0.7.7

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message as a chosen archive type.

func (*Archive) ProcessMessage added in v0.7.7

func (d *Archive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to archive the parts of the message, and returns the result as a single part message.

type ArchiveConfig added in v0.7.7

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains any configuration for the Archive processor.

func NewArchiveConfig added in v0.7.7

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine added in v0.3.1

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage added in v0.3.1

func (c *Combine) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig added in v0.3.1

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig added in v0.3.1

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Compress added in v0.7.7

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) ProcessMessage added in v0.7.7

func (c *Compress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to compress parts of the message and returns the result.

type CompressConfig added in v0.7.7

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains any configuration for the Compress processor.

func NewCompressConfig added in v0.7.7

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Condition added in v0.9.4

type Condition struct {
	// contains filtered or unexported fields
}

Condition is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Condition) ProcessMessage added in v0.9.4

func (c *Condition) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type ConditionConfig added in v0.9.4

type ConditionConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

ConditionConfig contains configuration fields for the Condition processor.

func NewConditionConfig added in v0.9.4

func NewConditionConfig() ConditionConfig

NewConditionConfig returns a ConditionConfig with default values.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	Archive     ArchiveConfig     `json:"archive" yaml:"archive"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
	Compress    CompressConfig    `json:"compress" yaml:"compress"`
	Condition   ConditionConfig   `json:"condition" yaml:"condition"`
	Decompress  DecompressConfig  `json:"decompress" yaml:"decompress"`
	Dedupe      DedupeConfig      `json:"dedupe" yaml:"dedupe"`
	HashSample  HashSampleConfig  `json:"hash_sample" yaml:"hash_sample"`
	InsertPart  InsertPartConfig  `json:"insert_part" yaml:"insert_part"`
	JMESPath    JMESPathConfig    `json:"jmespath" yaml:"jmespath"`
	MergeJSON   MergeJSONConfig   `json:"merge_json" yaml:"merge_json"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	SelectJSON  SelectJSONConfig  `json:"select_json" yaml:"select_json"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	SetJSON     SetJSONConfig     `json:"set_json" yaml:"set_json"`
	Split       struct{}          `json:"split" yaml:"split"`
	Unarchive   UnarchiveConfig   `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decompress added in v0.7.4

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.

func (*Decompress) ProcessMessage added in v0.7.4

func (d *Decompress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.

type DecompressConfig added in v0.7.4

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains any configuration for the Decompress processor.

func NewDecompressConfig added in v0.7.4

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type Dedupe added in v0.9.7

type Dedupe struct {
	// contains filtered or unexported fields
}

Dedupe is a processor that hashes each message and checks if the has is already present in the cache

func (*Dedupe) ProcessMessage added in v0.9.7

func (d *Dedupe) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type DedupeConfig added in v0.9.7

type DedupeConfig struct {
	Cache          string   `json:"cache" yaml:"cache"`
	HashType       string   `json:"hash" yaml:"hash"`
	Parts          []int    `json:"parts" yaml:"parts"` // message parts to hash
	JSONPaths      []string `json:"json_paths" yaml:"json_paths"`
	DropOnCacheErr bool     `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains any configuration for the Dedupe processor.

func NewDedupeConfig added in v0.9.7

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type HashSample added in v0.6.11

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*HashSample) ProcessMessage added in v0.6.11

func (s *HashSample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type HashSampleConfig added in v0.6.11

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains any configuration for the HashSample processor.

func NewHashSampleConfig added in v0.6.11

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart added in v0.6.19

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage added in v0.6.19

func (p *InsertPart) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type InsertPartConfig added in v0.6.19

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains any configuration for the InsertPart processor.

func NewInsertPartConfig added in v0.6.19

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPath added in v0.9.8

type JMESPath struct {
	// contains filtered or unexported fields
}

JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.

func (*JMESPath) ProcessMessage added in v0.9.8

func (p *JMESPath) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type JMESPathConfig added in v0.9.8

type JMESPathConfig struct {
	Parts []int  `json:"parts" yaml:"parts"`
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains any configuration for the JMESPath processor.

func NewJMESPathConfig added in v0.9.8

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type MergeJSON added in v0.11.4

type MergeJSON struct {
	// contains filtered or unexported fields
}

MergeJSON is a processor that merges JSON parsed message parts into a single value.

func (*MergeJSON) ProcessMessage added in v0.11.4

func (p *MergeJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, returning one or more resulting messages or a response.

type MergeJSONConfig added in v0.11.4

type MergeJSONConfig struct {
	Parts       []int `json:"parts" yaml:"parts"`
	RetainParts bool  `json:"retain_parts" yaml:"retain_parts"`
}

MergeJSONConfig contains any configuration for the MergeJSON processor.

func NewMergeJSONConfig added in v0.11.4

func NewMergeJSONConfig() MergeJSONConfig

NewMergeJSONConfig returns a MergeJSONConfig with default values.

type Noop added in v0.6.5

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage added in v0.6.5

func (c *Noop) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

type Sample added in v0.3.2

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage added in v0.3.2

func (s *Sample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type SampleConfig added in v0.3.2

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig added in v0.3.2

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectJSON added in v0.9.7

type SelectJSON struct {
	// contains filtered or unexported fields
}

SelectJSON is a processor that extracts a JSON field from a message part and replaces the contents with the field value.

func (*SelectJSON) ProcessMessage added in v0.9.7

func (p *SelectJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type SelectJSONConfig added in v0.9.7

type SelectJSONConfig struct {
	Parts []int  `json:"parts" yaml:"parts"`
	Path  string `json:"path" yaml:"path"`
}

SelectJSONConfig contains any configuration for the SelectJSON processor.

func NewSelectJSONConfig added in v0.9.7

func NewSelectJSONConfig() SelectJSONConfig

NewSelectJSONConfig returns a SelectJSONConfig with default values.

type SelectParts added in v0.4.1

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage added in v0.4.1

func (m *SelectParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig added in v0.4.1

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig added in v0.4.1

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SetJSON added in v0.7.2

type SetJSON struct {
	// contains filtered or unexported fields
}

SetJSON is a processor that inserts a new message part at a specific index.

func (*SetJSON) ProcessMessage added in v0.7.2

func (p *SetJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type SetJSONConfig added in v0.7.2

type SetJSONConfig struct {
	Parts []int        `json:"parts" yaml:"parts"`
	Path  string       `json:"path" yaml:"path"`
	Value rawJSONValue `json:"value" yaml:"value"`
}

SetJSONConfig contains any configuration for the SetJSON processor.

func NewSetJSONConfig added in v0.7.2

func NewSetJSONConfig() SetJSONConfig

NewSetJSONConfig returns a SetJSONConfig with default values.

type Split added in v0.7.6

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) ProcessMessage added in v0.7.6

func (s *Split) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and returns a slice of messages, containing a message per part.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg types.Message) ([]types.Message, types.Response)
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.

func New

func New(
	conf Config,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive added in v0.7.7

func NewArchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewArchive returns a Archive processor.

func NewBoundsCheck

func NewBoundsCheck(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine added in v0.3.1

func NewCombine(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCombine returns a Combine processor.

func NewCompress added in v0.7.7

func NewCompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCompress returns a Compress processor.

func NewCondition added in v0.9.4

func NewCondition(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCondition returns a Condition processor.

func NewDecompress added in v0.7.4

func NewDecompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecompress returns a Decompress processor.

func NewDedupe added in v0.9.7

func NewDedupe(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDedupe returns a Dedupe processor.

func NewHashSample added in v0.6.11

func NewHashSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart added in v0.6.19

func NewInsertPart(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewJMESPath added in v0.9.8

func NewJMESPath(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJMESPath returns a JMESPath processor.

func NewMergeJSON added in v0.11.4

func NewMergeJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMergeJSON returns a MergeJSON processor.

func NewNoop added in v0.6.5

func NewNoop(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewNoop returns a Noop processor.

func NewSample added in v0.3.2

func NewSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSample returns a Sample processor.

func NewSelectJSON added in v0.9.7

func NewSelectJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSelectJSON returns a SelectJSON processor.

func NewSelectParts added in v0.4.1

func NewSelectParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSetJSON added in v0.7.2

func NewSetJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSetJSON returns a SetJSON processor.

func NewSplit added in v0.7.6

func NewSplit(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSplit returns a Split processor.

func NewUnarchive added in v0.7.4

func NewUnarchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewUnarchive returns a Unarchive processor.

type TypeSpec added in v0.9.1

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type Unarchive added in v0.7.4

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.

func (*Unarchive) ProcessMessage added in v0.7.4

func (d *Unarchive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to unarchive parts of the message, and returns the result.

type UnarchiveConfig added in v0.7.4

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains any configuration for the Unarchive processor.

func NewUnarchiveConfig added in v0.7.4

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Directories

Path Synopsis
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL