processor

package
v0.9.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2018 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

View Source
var (
	ErrEmptyTargetPath = errors.New("target path is empty")
)

Errors for the SetJSON type.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message as a chosen archive type.

func (*Archive) ProcessMessage

func (d *Archive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to archive the parts of the message, and returns the result as a single part message.

type ArchiveConfig

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains any configuration for the Archive processor.

func NewArchiveConfig

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage

func (c *Combine) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Compress

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) ProcessMessage

func (c *Compress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to compress parts of the message and returns the result.

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains any configuration for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Condition

type Condition struct {
	// contains filtered or unexported fields
}

Condition is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Condition) ProcessMessage

func (c *Condition) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type ConditionConfig

type ConditionConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

ConditionConfig contains configuration fields for the Condition processor.

func NewConditionConfig

func NewConditionConfig() ConditionConfig

NewConditionConfig returns a ConditionConfig with default values.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	Archive     ArchiveConfig     `json:"archive" yaml:"archive"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
	Compress    CompressConfig    `json:"compress" yaml:"compress"`
	Condition   ConditionConfig   `json:"condition" yaml:"condition"`
	Decompress  DecompressConfig  `json:"decompress" yaml:"decompress"`
	HashSample  HashSampleConfig  `json:"hash_sample" yaml:"hash_sample"`
	InsertPart  InsertPartConfig  `json:"insert_part" yaml:"insert_part"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	SetJSON     SetJSONConfig     `json:"set_json" yaml:"set_json"`
	Split       struct{}          `json:"split" yaml:"split"`
	Unarchive   UnarchiveConfig   `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decompress

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.

func (*Decompress) ProcessMessage

func (d *Decompress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains any configuration for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type HashSample

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*HashSample) ProcessMessage

func (s *HashSample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type HashSampleConfig

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains any configuration for the HashSample processor.

func NewHashSampleConfig

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage

func (p *InsertPart) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains any configuration for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type Noop

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage

func (c *Noop) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

type Sample

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage

func (s *Sample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type SampleConfig

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage

func (m *SelectParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SetJSON

type SetJSON struct {
	// contains filtered or unexported fields
}

SetJSON is a processor that inserts a new message part at a specific index.

func (*SetJSON) ProcessMessage

func (p *SetJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type SetJSONConfig

type SetJSONConfig struct {
	Part  int          `json:"part" yaml:"part"`
	Path  string       `json:"path" yaml:"path"`
	Value rawJSONValue `json:"value" yaml:"value"`
}

SetJSONConfig contains any configuration for the SetJSON processor.

func NewSetJSONConfig

func NewSetJSONConfig() SetJSONConfig

NewSetJSONConfig returns a SetJSONConfig with default values.

type Split

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) ProcessMessage

func (s *Split) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a single message and returns a slice of messages, containing a message per part.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg types.Message) ([]types.Message, types.Response)
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.

func New

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive

func NewArchive(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewArchive returns a Archive processor.

func NewBoundsCheck

func NewBoundsCheck(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine

func NewCombine(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCombine returns a Combine processor.

func NewCompress

func NewCompress(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCompress returns a Compress processor.

func NewCondition

func NewCondition(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCondition returns a Condition processor.

func NewDecompress

func NewDecompress(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewDecompress returns a Decompress processor.

func NewHashSample

func NewHashSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart

func NewInsertPart(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewNoop

func NewNoop(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewNoop returns a Noop processor.

func NewSample

func NewSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts

func NewSelectParts(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSetJSON

func NewSetJSON(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSetJSON returns a SetJSON processor.

func NewSplit

func NewSplit(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSplit returns a Split processor.

func NewUnarchive

func NewUnarchive(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewUnarchive returns a Unarchive processor.

type TypeSpec

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type Unarchive

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.

func (*Unarchive) ProcessMessage

func (d *Unarchive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage takes a message, attempts to unarchive parts of the message, and returns the result.

type UnarchiveConfig

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains any configuration for the Unarchive processor.

func NewUnarchiveConfig

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Directories

Path Synopsis
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL