processor

package
v0.8.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2018 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyTargetPath = errors.New("target path is empty")
)

Errors for the SetJSON type.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message as a chosen archive type.

func (*Archive) ProcessMessage

func (d *Archive) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to archive the parts of the message, and returns the result as a single part message.

type ArchiveConfig

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains any configuration for the Archive processor.

func NewArchiveConfig

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage

func (c *Combine) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Compress

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) ProcessMessage

func (c *Compress) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to compress parts of the message and returns the result.

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains any configuration for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	Archive     ArchiveConfig     `json:"archive" yaml:"archive"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
	Compress    CompressConfig    `json:"compress" yaml:"compress"`
	Decompress  DecompressConfig  `json:"decompress" yaml:"decompress"`
	HashSample  HashSampleConfig  `json:"hash_sample" yaml:"hash_sample"`
	InsertPart  InsertPartConfig  `json:"insert_part" yaml:"insert_part"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	SetJSON     SetJSONConfig     `json:"set_json" yaml:"set_json"`
	Split       struct{}          `json:"split" yaml:"split"`
	Unarchive   UnarchiveConfig   `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decompress

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.

func (*Decompress) ProcessMessage

func (d *Decompress) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains any configuration for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type HashSample

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*HashSample) ProcessMessage

func (s *HashSample) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type HashSampleConfig

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains any configuration for the HashSample processor.

func NewHashSampleConfig

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage

func (p *InsertPart) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains any configuration for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type Noop

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage

func (c *Noop) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

type Sample

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage

func (s *Sample) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage checks each message against a set of bounds.

type SampleConfig

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage

func (m *SelectParts) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SetJSON

type SetJSON struct {
	// contains filtered or unexported fields
}

SetJSON is a processor that inserts a new message part at a specific index.

func (*SetJSON) ProcessMessage

func (p *SetJSON) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage prepends a new message part to the message.

type SetJSONConfig

type SetJSONConfig struct {
	Part  int          `json:"part" yaml:"part"`
	Path  string       `json:"path" yaml:"path"`
	Value rawJSONValue `json:"value" yaml:"value"`
}

SetJSONConfig contains any configuration for the SetJSON processor.

func NewSetJSONConfig

func NewSetJSONConfig() SetJSONConfig

NewSetJSONConfig returns a SetJSONConfig with default values.

type Split

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) ProcessMessage

func (s *Split) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a single message and returns a slice of messages, containing a message per part.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.

func New

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive

func NewArchive(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewArchive returns a Archive processor.

func NewBoundsCheck

func NewBoundsCheck(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine

func NewCombine(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCombine returns a Combine processor.

func NewCompress

func NewCompress(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCompress returns a Compress processor.

func NewDecompress

func NewDecompress(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewDecompress returns a Decompress processor.

func NewHashSample

func NewHashSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart

func NewInsertPart(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewNoop

func NewNoop(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewNoop returns a Noop processor.

func NewSample

func NewSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts

func NewSelectParts(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSetJSON

func NewSetJSON(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSetJSON returns a SetJSON processor.

func NewSplit

func NewSplit(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSplit returns a Split processor.

func NewUnarchive

func NewUnarchive(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewUnarchive returns a Unarchive processor.

type Unarchive

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.

func (*Unarchive) ProcessMessage

func (d *Unarchive) ProcessMessage(msg *types.Message) ([]*types.Message, types.Response)

ProcessMessage takes a message, attempts to unarchive parts of the message, and returns the result.

type UnarchiveConfig

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains any configuration for the Unarchive processor.

func NewUnarchiveConfig

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL