processor

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2018 License: MIT Imports: 13 Imported by: 11

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyTargetPath = errors.New("target path is empty")
)

Errors for the SetJSON type.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

Types

type BlobToMulti

type BlobToMulti struct {
	// contains filtered or unexported fields
}

BlobToMulti is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*BlobToMulti) ProcessMessage

func (m *BlobToMulti) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage takes a message with 1 part in multiple part blob format and returns a multiple part message by decoding it.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine added in v0.3.1

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage added in v0.3.1

func (c *Combine) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig added in v0.3.1

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig added in v0.3.1

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	InsertPart  InsertPartConfig  `json:"insert_part" yaml:"insert_part"`
	SetJSON     SetJSONConfig     `json:"set_json" yaml:"set_json"`
	BlobToMulti struct{}          `json:"blob_to_multi" yaml:"blob_to_multi"`
	MultiToBlob struct{}          `json:"multi_to_blob" yaml:"multi_to_blob"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	HashSample  HashSampleConfig  `json:"hash_sample" yaml:"hash_sample"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type HashSample added in v0.6.11

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*HashSample) ProcessMessage added in v0.6.11

func (s *HashSample) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage checks each message against a set of bounds.

type HashSampleConfig added in v0.6.11

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains any configuration for the HashSample processor.

func NewHashSampleConfig added in v0.6.11

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart added in v0.6.19

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage added in v0.6.19

func (p *InsertPart) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage prepends a new message part to the message.

type InsertPartConfig added in v0.6.19

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains any configuration for the InsertPart processor.

func NewInsertPartConfig added in v0.6.19

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type MultiToBlob

type MultiToBlob struct {
	// contains filtered or unexported fields
}

MultiToBlob is a processor that takes messages with potentially multiple parts and converts them into a single part message using the benthos binary format. This message can be converted back to multiple parts using the BlobToMulti processor.

func (MultiToBlob) ProcessMessage

func (m MultiToBlob) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage takes a message of > 0 parts and returns a single part message that can be later converted back to the original parts.

type Noop added in v0.6.5

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage added in v0.6.5

func (c *Noop) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage does nothing and returns the message unchanged.

type Sample added in v0.3.2

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage added in v0.3.2

func (s *Sample) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage checks each message against a set of bounds.

type SampleConfig added in v0.3.2

type SampleConfig struct {
	Retain     float64 `json:"retain"`
	RandomSeed int64   `json:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig added in v0.3.2

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts added in v0.4.1

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage added in v0.4.1

func (m *SelectParts) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig added in v0.4.1

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig added in v0.4.1

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SetJSON added in v0.7.2

type SetJSON struct {
	// contains filtered or unexported fields
}

SetJSON is a processor that inserts a new message part at a specific index.

func (*SetJSON) ProcessMessage added in v0.7.2

func (p *SetJSON) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage prepends a new message part to the message.

type SetJSONConfig added in v0.7.2

type SetJSONConfig struct {
	Part  int          `json:"part" yaml:"part"`
	Path  string       `json:"path" yaml:"path"`
	Value rawJSONValue `json:"value" yaml:"value"`
}

SetJSONConfig contains any configuration for the SetJSON processor.

func NewSetJSONConfig added in v0.7.2

func NewSetJSONConfig() SetJSONConfig

NewSetJSONConfig returns a SetJSONConfig with default values.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a message in case of success, a response in case
	// of failure, and a bool flag indicating (true == success) which of the two
	// should be used.
	ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)
}

Type reads a message, performs a processing operation, and returns a message and a flag indicating whether that message should be propagated or not.

func New

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates a processor type based on a processor configuration.

func NewBlobToMulti

func NewBlobToMulti(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewBlobToMulti returns a BlobToMulti processor.

func NewBoundsCheck

func NewBoundsCheck(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine added in v0.3.1

func NewCombine(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCombine returns a Combine processor.

func NewHashSample added in v0.6.11

func NewHashSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart added in v0.6.19

func NewInsertPart(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewMultiToBlob

func NewMultiToBlob(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewMultiToBlob returns a MultiToBlob processor.

func NewNoop added in v0.6.5

func NewNoop(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewNoop returns a Noop processor.

func NewSample added in v0.3.2

func NewSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts added in v0.4.1

func NewSelectParts(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSetJSON added in v0.7.2

func NewSetJSON(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSetJSON returns a SetJSON processor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL