processor

package
v0.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2018 License: MIT Imports: 19 Imported by: 11

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyTargetPath = errors.New("target path is empty")
)

Errors for the SetJSON type.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig added in v0.8.4

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive added in v0.7.7

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message as a chosen archive type.

func (*Archive) ProcessMessage added in v0.7.7

func (d *Archive) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to archive the parts of the message, and returns the result as a single part message.

type ArchiveConfig added in v0.7.7

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains any configuration for the Archive processor.

func NewArchiveConfig added in v0.7.7

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine added in v0.3.1

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage added in v0.3.1

func (c *Combine) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig added in v0.3.1

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig added in v0.3.1

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Compress added in v0.7.7

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) ProcessMessage added in v0.7.7

func (c *Compress) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to compress parts of the message and returns the result.

type CompressConfig added in v0.7.7

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains any configuration for the Compress processor.

func NewCompressConfig added in v0.7.7

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	Archive     ArchiveConfig     `json:"archive" yaml:"archive"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
	Compress    CompressConfig    `json:"compress" yaml:"compress"`
	Decompress  DecompressConfig  `json:"decompress" yaml:"decompress"`
	HashSample  HashSampleConfig  `json:"hash_sample" yaml:"hash_sample"`
	InsertPart  InsertPartConfig  `json:"insert_part" yaml:"insert_part"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	SetJSON     SetJSONConfig     `json:"set_json" yaml:"set_json"`
	Split       struct{}          `json:"split" yaml:"split"`
	Unarchive   UnarchiveConfig   `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decompress added in v0.7.4

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.

func (*Decompress) ProcessMessage added in v0.7.4

func (d *Decompress) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.

type DecompressConfig added in v0.7.4

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains any configuration for the Decompress processor.

func NewDecompressConfig added in v0.7.4

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type HashSample added in v0.6.11

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*HashSample) ProcessMessage added in v0.6.11

func (s *HashSample) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type HashSampleConfig added in v0.6.11

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains any configuration for the HashSample processor.

func NewHashSampleConfig added in v0.6.11

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart added in v0.6.19

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage added in v0.6.19

func (p *InsertPart) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type InsertPartConfig added in v0.6.19

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains any configuration for the InsertPart processor.

func NewInsertPartConfig added in v0.6.19

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type Noop added in v0.6.5

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage added in v0.6.5

func (c *Noop) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

type Sample added in v0.3.2

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage added in v0.3.2

func (s *Sample) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type SampleConfig added in v0.3.2

type SampleConfig struct {
	Retain     float64 `json:"retain"`
	RandomSeed int64   `json:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig added in v0.3.2

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts added in v0.4.1

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage added in v0.4.1

func (m *SelectParts) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig added in v0.4.1

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig added in v0.4.1

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SetJSON added in v0.7.2

type SetJSON struct {
	// contains filtered or unexported fields
}

SetJSON is a processor that inserts a new message part at a specific index.

func (*SetJSON) ProcessMessage added in v0.7.2

func (p *SetJSON) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type SetJSONConfig added in v0.7.2

type SetJSONConfig struct {
	Part  int          `json:"part" yaml:"part"`
	Path  string       `json:"path" yaml:"path"`
	Value rawJSONValue `json:"value" yaml:"value"`
}

SetJSONConfig contains any configuration for the SetJSON processor.

func NewSetJSONConfig added in v0.7.2

func NewSetJSONConfig() SetJSONConfig

NewSetJSONConfig returns a SetJSONConfig with default values.

type Split added in v0.7.6

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) ProcessMessage added in v0.7.6

func (s *Split) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a single message and returns a slice of messages, containing a message per part.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.

func New

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive added in v0.7.7

func NewArchive(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewArchive returns a Archive processor.

func NewBoundsCheck

func NewBoundsCheck(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine added in v0.3.1

func NewCombine(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCombine returns a Combine processor.

func NewCompress added in v0.7.7

func NewCompress(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCompress returns a Compress processor.

func NewDecompress added in v0.7.4

func NewDecompress(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewDecompress returns a Decompress processor.

func NewHashSample added in v0.6.11

func NewHashSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart added in v0.6.19

func NewInsertPart(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewNoop added in v0.6.5

func NewNoop(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewNoop returns a Noop processor.

func NewSample added in v0.3.2

func NewSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts added in v0.4.1

func NewSelectParts(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSetJSON added in v0.7.2

func NewSetJSON(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSetJSON returns a SetJSON processor.

func NewSplit added in v0.7.6

func NewSplit(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSplit returns a Split processor.

func NewUnarchive added in v0.7.4

func NewUnarchive(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewUnarchive returns a Unarchive processor.

type Unarchive added in v0.7.4

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.

func (*Unarchive) ProcessMessage added in v0.7.4

func (d *Unarchive) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to unarchive parts of the message, and returns the result.

type UnarchiveConfig added in v0.7.4

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains any configuration for the Unarchive processor.

func NewUnarchiveConfig added in v0.7.4

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL