processor

package
v0.40.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2018 License: MIT Imports: 46 Imported by: 11

Documentation

Overview

Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.

Index

Constants

View Source
const (
	TypeArchive      = "archive"
	TypeBatch        = "batch"
	TypeBoundsCheck  = "bounds_check"
	TypeCatch        = "catch"
	TypeCompress     = "compress"
	TypeConditional  = "conditional"
	TypeDecode       = "decode"
	TypeDecompress   = "decompress"
	TypeDedupe       = "dedupe"
	TypeEncode       = "encode"
	TypeFilter       = "filter"
	TypeFilterParts  = "filter_parts"
	TypeGrok         = "grok"
	TypeGroupBy      = "group_by"
	TypeGroupByValue = "group_by_value"
	TypeHash         = "hash"
	TypeHashSample   = "hash_sample"
	TypeHTTP         = "http"
	TypeInsertPart   = "insert_part"
	TypeJMESPath     = "jmespath"
	TypeJSON         = "json"
	TypeLambda       = "lambda"
	TypeLog          = "log"
	TypeMergeJSON    = "merge_json"
	TypeMetadata     = "metadata"
	TypeMetric       = "metric"
	TypeNoop         = "noop"
	TypeProcessBatch = "process_batch"
	TypeProcessDAG   = "process_dag"
	TypeProcessField = "process_field"
	TypeProcessMap   = "process_map"
	TypeSample       = "sample"
	TypeSelectParts  = "select_parts"
	TypeSplit        = "split"
	TypeSubprocess   = "subprocess"
	TypeText         = "text"
	TypeTry          = "try"
	TypeThrottle     = "throttle"
	TypeUnarchive    = "unarchive"
)

String constants representing each processor type.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

View Source
var FailFlagKey = "benthos_processing_failed"

FailFlagKey is a metadata key used for flagging processor errors in Benthos. If a message part has any non-empty value for this metadata key then it will be interpretted as having failed a processor step somewhere in the pipeline.

Functions

func ClearFail added in v0.38.10

func ClearFail(part types.Part)

ClearFail removes any existing failure flags from a message part.

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func DocumentPlugin added in v0.27.2

func DocumentPlugin(
	typeString, description string,
	configSanitiser PluginConfigSanitiser,
)

DocumentPlugin adds a description and an optional configuration sanitiser function to the definition of a registered plugin. This improves the documentation generated by PluginDescriptions.

func ExecuteAll added in v0.38.10

func ExecuteAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll added in v0.39.2

func ExecuteCatchAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteTryAll added in v0.39.2

func ExecuteTryAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func FlagFail added in v0.38.10

func FlagFail(part types.Part)

FlagFail marks a message part as having failed at a processing step.

func HasFailed added in v0.38.10

func HasFailed(part types.Part) bool

HasFailed checks whether a message part has failed a processing step.

func PluginDescriptions added in v0.27.1

func PluginDescriptions() string

PluginDescriptions generates and returns a markdown formatted document listing each registered plugin and an example configuration for it.

func RegisterPlugin added in v0.27.1

func RegisterPlugin(
	typeString string,
	configConstructor PluginConfigConstructor,
	constructor PluginConstructor,
)

RegisterPlugin registers a plugin by a unique name so that it can be constucted similar to regular processors. A constructor for both the plugin itself as well as its configuration struct must be provided.

WARNING: This API is experimental and could (is likely) to change.

func SanitiseConfig added in v0.8.4

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive added in v0.7.7

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message into a single part using a chosen archive type.

func (*Archive) CloseAsync added in v0.40.0

func (d *Archive) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Archive) ProcessMessage added in v0.7.7

func (d *Archive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Archive) WaitForClose added in v0.40.0

func (d *Archive) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ArchiveConfig added in v0.7.7

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains configuration fields for the Archive processor.

func NewArchiveConfig added in v0.7.7

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type Batch added in v0.13.0

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a processor that combines messages into a batch until a size limit or other condition is reached, at which point the batch is sent out. When a message is combined without yet producing a batch a NoAck response is returned, which is interpretted as source types as an instruction to send another message through but hold off on acknowledging this one.

Eventually, when the batch reaches its target size, the batch is sent through the pipeline as a single message and an acknowledgement for that message determines whether the whole batch of messages are acknowledged.

func (*Batch) CloseAsync added in v0.40.0

func (c *Batch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Batch) ProcessMessage added in v0.13.0

func (c *Batch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Batch) WaitForClose added in v0.40.0

func (c *Batch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type BatchConfig added in v0.13.0

type BatchConfig struct {
	ByteSize  int              `json:"byte_size" yaml:"byte_size"`
	Count     int              `json:"count" yaml:"count"`
	Condition condition.Config `json:"condition" yaml:"condition"`
	PeriodMS  int              `json:"period_ms" yaml:"period_ms"`
}

BatchConfig contains configuration fields for the Batch processor.

func NewBatchConfig added in v0.13.0

func NewBatchConfig() BatchConfig

NewBatchConfig returns a BatchConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) CloseAsync added in v0.40.0

func (m *BoundsCheck) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*BoundsCheck) WaitForClose added in v0.40.0

func (m *BoundsCheck) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Catch added in v0.39.2

type Catch struct {
	// contains filtered or unexported fields
}

Catch is a processor that applies a list of child processors to each message of a batch individually, where processors are skipped for messages that failed a previous processor step.

func (*Catch) CloseAsync added in v0.40.0

func (p *Catch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Catch) ProcessMessage added in v0.39.2

func (p *Catch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Catch) WaitForClose added in v0.40.0

func (p *Catch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type CatchConfig added in v0.39.2

type CatchConfig []Config

CatchConfig is a config struct containing fields for the Catch processor.

func NewCatchConfig added in v0.39.2

func NewCatchConfig() CatchConfig

NewCatchConfig returns a default CatchConfig.

type Compress added in v0.7.7

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) CloseAsync added in v0.40.0

func (c *Compress) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Compress) ProcessMessage added in v0.7.7

func (c *Compress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Compress) WaitForClose added in v0.40.0

func (c *Compress) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type CompressConfig added in v0.7.7

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig added in v0.7.7

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Conditional added in v0.13.0

type Conditional struct {
	// contains filtered or unexported fields
}

Conditional is a processor that only applies child processors under a certain condition.

func (*Conditional) CloseAsync added in v0.40.0

func (c *Conditional) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Conditional) ProcessMessage added in v0.13.0

func (c *Conditional) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Conditional) WaitForClose added in v0.40.0

func (c *Conditional) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ConditionalConfig added in v0.13.0

type ConditionalConfig struct {
	Condition      condition.Config `json:"condition" yaml:"condition"`
	Processors     []Config         `json:"processors" yaml:"processors"`
	ElseProcessors []Config         `json:"else_processors" yaml:"else_processors"`
}

ConditionalConfig is a config struct containing fields for the Conditional processor.

func NewConditionalConfig added in v0.13.0

func NewConditionalConfig() ConditionalConfig

NewConditionalConfig returns a default ConditionalConfig.

type Config

type Config struct {
	Type         string             `json:"type" yaml:"type"`
	Archive      ArchiveConfig      `json:"archive" yaml:"archive"`
	Batch        BatchConfig        `json:"batch" yaml:"batch"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Catch        CatchConfig        `json:"catch" yaml:"catch"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Conditional  ConditionalConfig  `json:"conditional" yaml:"conditional"`
	Decode       DecodeConfig       `json:"decode" yaml:"decode"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	Encode       EncodeConfig       `json:"encode" yaml:"encode"`
	Filter       FilterConfig       `json:"filter" yaml:"filter"`
	FilterParts  FilterPartsConfig  `json:"filter_parts" yaml:"filter_parts"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	GroupBy      GroupByConfig      `json:"group_by" yaml:"group_by"`
	GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
	Hash         HashConfig         `json:"hash" yaml:"hash"`
	HashSample   HashSampleConfig   `json:"hash_sample" yaml:"hash_sample"`
	HTTP         HTTPConfig         `json:"http" yaml:"http"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JSON         JSONConfig         `json:"json" yaml:"json"`
	Lambda       LambdaConfig       `json:"lambda" yaml:"lambda"`
	Log          LogConfig          `json:"log" yaml:"log"`
	MergeJSON    MergeJSONConfig    `json:"merge_json" yaml:"merge_json"`
	Metadata     MetadataConfig     `json:"metadata" yaml:"metadata"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	Plugin       interface{}        `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	ProcessBatch ProcessBatchConfig `json:"process_batch" yaml:"process_batch"`
	ProcessDAG   ProcessDAGConfig   `json:"process_dag" yaml:"process_dag"`
	ProcessField ProcessFieldConfig `json:"process_field" yaml:"process_field"`
	ProcessMap   ProcessMapConfig   `json:"process_map" yaml:"process_map"`
	Sample       SampleConfig       `json:"sample" yaml:"sample"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Text         TextConfig         `json:"text" yaml:"text"`
	Try          TryConfig          `json:"try" yaml:"try"`
	Throttle     ThrottleConfig     `json:"throttle" yaml:"throttle"`
	Unarchive    UnarchiveConfig    `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decode added in v0.15.5

type Decode struct {
	// contains filtered or unexported fields
}

Decode is a processor that can selectively decode parts of a message following a chosen scheme.

func (*Decode) CloseAsync added in v0.40.0

func (c *Decode) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Decode) ProcessMessage added in v0.15.5

func (c *Decode) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Decode) WaitForClose added in v0.40.0

func (c *Decode) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type DecodeConfig added in v0.15.5

type DecodeConfig struct {
	Scheme string `json:"scheme" yaml:"scheme"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

DecodeConfig contains configuration fields for the Decode processor.

func NewDecodeConfig added in v0.15.5

func NewDecodeConfig() DecodeConfig

NewDecodeConfig returns a DecodeConfig with default values.

type Decompress added in v0.7.4

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can decompress parts of a message following a chosen compression algorithm.

func (*Decompress) CloseAsync added in v0.40.0

func (d *Decompress) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Decompress) ProcessMessage added in v0.7.4

func (d *Decompress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Decompress) WaitForClose added in v0.40.0

func (d *Decompress) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type DecompressConfig added in v0.7.4

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig added in v0.7.4

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type Dedupe added in v0.9.7

type Dedupe struct {
	// contains filtered or unexported fields
}

Dedupe is a processor that deduplicates messages either by hashing the full contents of message parts or by hashing the value of an interpolated string.

func (*Dedupe) CloseAsync added in v0.40.0

func (d *Dedupe) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Dedupe) ProcessMessage added in v0.9.7

func (d *Dedupe) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Dedupe) WaitForClose added in v0.40.0

func (d *Dedupe) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type DedupeConfig added in v0.9.7

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	HashType       string `json:"hash" yaml:"hash"`
	Parts          []int  `json:"parts" yaml:"parts"` // message parts to hash
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig added in v0.9.7

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type DepProcessMapConfig added in v0.34.0

type DepProcessMapConfig struct {
	Dependencies     []string `json:"dependencies" yaml:"dependencies"`
	ProcessMapConfig `json:",inline" yaml:",inline"`
}

DepProcessMapConfig contains a superset of a ProcessMap config and some DAG specific fields.

type Encode added in v0.15.5

type Encode struct {
	// contains filtered or unexported fields
}

Encode is a processor that can selectively encode parts of a message following a chosen scheme.

func (*Encode) CloseAsync added in v0.40.0

func (c *Encode) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Encode) ProcessMessage added in v0.15.5

func (c *Encode) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Encode) WaitForClose added in v0.40.0

func (c *Encode) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type EncodeConfig added in v0.15.5

type EncodeConfig struct {
	Scheme string `json:"scheme" yaml:"scheme"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

EncodeConfig contains configuration fields for the Encode processor.

func NewEncodeConfig added in v0.15.5

func NewEncodeConfig() EncodeConfig

NewEncodeConfig returns a EncodeConfig with default values.

type Filter added in v0.13.0

type Filter struct {
	// contains filtered or unexported fields
}

Filter is a processor that checks each message against a condition and rejects the message if a condition returns false.

func (*Filter) CloseAsync added in v0.40.0

func (c *Filter) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Filter) ProcessMessage added in v0.13.0

func (c *Filter) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Filter) WaitForClose added in v0.40.0

func (c *Filter) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type FilterConfig added in v0.13.0

type FilterConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterConfig contains configuration fields for the Filter processor.

func NewFilterConfig added in v0.13.0

func NewFilterConfig() FilterConfig

NewFilterConfig returns a FilterConfig with default values.

type FilterParts added in v0.14.7

type FilterParts struct {
	// contains filtered or unexported fields
}

FilterParts is a processor that checks each part from a message against a condition and removes the part if the condition returns false.

func (*FilterParts) CloseAsync added in v0.40.0

func (c *FilterParts) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*FilterParts) ProcessMessage added in v0.14.7

func (c *FilterParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*FilterParts) WaitForClose added in v0.40.0

func (c *FilterParts) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type FilterPartsConfig added in v0.14.7

type FilterPartsConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterPartsConfig contains configuration fields for the FilterParts processor.

func NewFilterPartsConfig added in v0.14.7

func NewFilterPartsConfig() FilterPartsConfig

NewFilterPartsConfig returns a FilterPartsConfig with default values.

type Grok added in v0.13.5

type Grok struct {
	// contains filtered or unexported fields
}

Grok is a processor that executes Grok queries on a message part and replaces the contents with the result.

func (*Grok) CloseAsync added in v0.40.0

func (g *Grok) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Grok) ProcessMessage added in v0.13.5

func (g *Grok) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Grok) WaitForClose added in v0.40.0

func (g *Grok) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type GrokConfig added in v0.13.5

type GrokConfig struct {
	Parts       []int    `json:"parts" yaml:"parts"`
	Patterns    []string `json:"patterns" yaml:"patterns"`
	RemoveEmpty bool     `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly   bool     `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults bool     `json:"use_default_patterns" yaml:"use_default_patterns"`
	To          string   `json:"output_format" yaml:"output_format"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig added in v0.13.5

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type GroupBy added in v0.34.10

type GroupBy struct {
	// contains filtered or unexported fields
}

GroupBy is a processor that group_bys messages into a message per part.

func (*GroupBy) CloseAsync added in v0.40.0

func (g *GroupBy) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*GroupBy) ProcessMessage added in v0.34.10

func (g *GroupBy) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*GroupBy) WaitForClose added in v0.40.0

func (g *GroupBy) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type GroupByConfig added in v0.34.10

type GroupByConfig []GroupByElement

GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.

func NewGroupByConfig added in v0.34.10

func NewGroupByConfig() GroupByConfig

NewGroupByConfig returns a GroupByConfig with default values.

type GroupByElement added in v0.34.10

type GroupByElement struct {
	Condition  condition.Config `json:"condition" yaml:"condition"`
	Processors []Config         `json:"processors" yaml:"processors"`
}

GroupByElement represents a group determined by a condition and a list of group specific processors.

type GroupByValue added in v0.38.6

type GroupByValue struct {
	// contains filtered or unexported fields
}

GroupByValue is a processor that breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func (*GroupByValue) CloseAsync added in v0.40.0

func (g *GroupByValue) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*GroupByValue) ProcessMessage added in v0.38.6

func (g *GroupByValue) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*GroupByValue) WaitForClose added in v0.40.0

func (g *GroupByValue) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type GroupByValueConfig added in v0.38.6

type GroupByValueConfig struct {
	Value string `json:"value" yaml:"value"`
}

GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func NewGroupByValueConfig added in v0.38.6

func NewGroupByValueConfig() GroupByValueConfig

NewGroupByValueConfig returns a GroupByValueConfig with default values.

type HTTP added in v0.15.4

type HTTP struct {
	// contains filtered or unexported fields
}

HTTP is a processor that performs an HTTP request using the message as the request body, and returns the response.

func (*HTTP) CloseAsync added in v0.40.0

func (h *HTTP) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*HTTP) ProcessMessage added in v0.15.4

func (h *HTTP) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*HTTP) WaitForClose added in v0.40.0

func (h *HTTP) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type HTTPConfig added in v0.15.4

type HTTPConfig struct {
	Client      client.Config `json:"request" yaml:"request"`
	Parallel    bool          `json:"parallel" yaml:"parallel"`
	MaxParallel int           `json:"max_parallel" yaml:"max_parallel"`
}

HTTPConfig contains configuration fields for the HTTP processor.

func NewHTTPConfig added in v0.15.4

func NewHTTPConfig() HTTPConfig

NewHTTPConfig returns a HTTPConfig with default values.

type Hash added in v0.25.0

type Hash struct {
	// contains filtered or unexported fields
}

Hash is a processor that can selectively hash parts of a message following a chosen algorithm.

func (*Hash) CloseAsync added in v0.40.0

func (c *Hash) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Hash) ProcessMessage added in v0.25.0

func (c *Hash) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Hash) WaitForClose added in v0.40.0

func (c *Hash) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type HashConfig added in v0.25.0

type HashConfig struct {
	Parts     []int  `json:"parts" yaml:"parts"`
	Algorithm string `json:"algorithm" yaml:"algorithm"`
}

HashConfig contains configuration fields for the Hash processor.

func NewHashConfig added in v0.25.0

func NewHashConfig() HashConfig

NewHashConfig returns a HashConfig with default values.

type HashSample added in v0.6.11

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that removes messages based on a sample factor by hashing its contents.

func (*HashSample) CloseAsync added in v0.40.0

func (s *HashSample) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*HashSample) ProcessMessage added in v0.6.11

func (s *HashSample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*HashSample) WaitForClose added in v0.40.0

func (s *HashSample) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type HashSampleConfig added in v0.6.11

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains configuration fields for the HashSample processor.

func NewHashSampleConfig added in v0.6.11

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart added in v0.6.19

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) CloseAsync added in v0.40.0

func (p *InsertPart) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*InsertPart) ProcessMessage added in v0.6.19

func (p *InsertPart) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*InsertPart) WaitForClose added in v0.40.0

func (p *InsertPart) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type InsertPartConfig added in v0.6.19

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig added in v0.6.19

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPath added in v0.9.8

type JMESPath struct {
	// contains filtered or unexported fields
}

JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.

func (*JMESPath) CloseAsync added in v0.40.0

func (p *JMESPath) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*JMESPath) ProcessMessage added in v0.9.8

func (p *JMESPath) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*JMESPath) WaitForClose added in v0.40.0

func (p *JMESPath) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type JMESPathConfig added in v0.9.8

type JMESPathConfig struct {
	Parts []int  `json:"parts" yaml:"parts"`
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig added in v0.9.8

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JSON added in v0.14.0

type JSON struct {
	// contains filtered or unexported fields
}

JSON is a processor that performs an operation on a JSON payload.

func (*JSON) CloseAsync added in v0.40.0

func (p *JSON) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*JSON) ProcessMessage added in v0.14.0

func (p *JSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*JSON) WaitForClose added in v0.40.0

func (p *JSON) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type JSONConfig added in v0.14.0

type JSONConfig struct {
	Parts    []int        `json:"parts" yaml:"parts"`
	Operator string       `json:"operator" yaml:"operator"`
	Path     string       `json:"path" yaml:"path"`
	Value    rawJSONValue `json:"value" yaml:"value"`
}

JSONConfig contains configuration fields for the JSON processor.

func NewJSONConfig added in v0.14.0

func NewJSONConfig() JSONConfig

NewJSONConfig returns a JSONConfig with default values.

type Lambda added in v0.33.0

type Lambda struct {
	// contains filtered or unexported fields
}

Lambda is a processor that invokes an AWS Lambda using the message as the request body, and returns the response.

func (*Lambda) CloseAsync added in v0.40.0

func (l *Lambda) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Lambda) ProcessMessage added in v0.33.0

func (l *Lambda) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Lambda) WaitForClose added in v0.40.0

func (l *Lambda) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type LambdaConfig added in v0.33.0

type LambdaConfig struct {
	client.Config `json:",inline" yaml:",inline"`
	Parallel      bool `json:"parallel" yaml:"parallel"`
}

LambdaConfig contains configuration fields for the Lambda processor.

func NewLambdaConfig added in v0.33.0

func NewLambdaConfig() LambdaConfig

NewLambdaConfig returns a LambdaConfig with default values.

type Log added in v0.33.0

type Log struct {
	// contains filtered or unexported fields
}

Log is a processor that prints a log event each time it processes a message.

func (*Log) CloseAsync added in v0.40.0

func (l *Log) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Log) ProcessMessage added in v0.33.0

func (l *Log) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage logs an event and returns the message unchanged.

func (*Log) WaitForClose added in v0.40.0

func (l *Log) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type LogConfig added in v0.33.0

type LogConfig struct {
	Level   string `json:"level" yaml:"level"`
	Message string `json:"message" yaml:"message"`
}

LogConfig contains configuration fields for the Log processor.

func NewLogConfig added in v0.33.0

func NewLogConfig() LogConfig

NewLogConfig returns a LogConfig with default values.

type MergeJSON added in v0.11.4

type MergeJSON struct {
	// contains filtered or unexported fields
}

MergeJSON is a processor that merges JSON parsed message parts into a single value.

func (*MergeJSON) CloseAsync added in v0.40.0

func (p *MergeJSON) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*MergeJSON) ProcessMessage added in v0.11.4

func (p *MergeJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*MergeJSON) WaitForClose added in v0.40.0

func (p *MergeJSON) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type MergeJSONConfig added in v0.11.4

type MergeJSONConfig struct {
	Parts       []int `json:"parts" yaml:"parts"`
	RetainParts bool  `json:"retain_parts" yaml:"retain_parts"`
}

MergeJSONConfig contains configuration fields for the MergeJSON processor.

func NewMergeJSONConfig added in v0.11.4

func NewMergeJSONConfig() MergeJSONConfig

NewMergeJSONConfig returns a MergeJSONConfig with default values.

type Metadata added in v0.20.1

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata is a processor that performs an operation on the Metadata of a message.

func (*Metadata) CloseAsync added in v0.40.0

func (p *Metadata) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Metadata) ProcessMessage added in v0.20.1

func (p *Metadata) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Metadata) WaitForClose added in v0.40.0

func (p *Metadata) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type MetadataConfig added in v0.20.1

type MetadataConfig struct {
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
}

MetadataConfig contains configuration fields for the Metadata processor.

func NewMetadataConfig added in v0.20.1

func NewMetadataConfig() MetadataConfig

NewMetadataConfig returns a MetadataConfig with default values.

type Metric added in v0.26.2

type Metric struct {
	// contains filtered or unexported fields
}

Metric is a processor that creates a metric from extracted values from a message part.

func (*Metric) CloseAsync added in v0.40.0

func (m *Metric) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Metric) ProcessMessage added in v0.26.2

func (m *Metric) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message

func (*Metric) WaitForClose added in v0.40.0

func (m *Metric) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type MetricConfig added in v0.26.2

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Path   string            `json:"path" yaml:"path"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig added in v0.26.2

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type Noop added in v0.6.5

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) CloseAsync added in v0.40.0

func (c *Noop) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Noop) ProcessMessage added in v0.6.5

func (c *Noop) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

func (*Noop) WaitForClose added in v0.40.0

func (c *Noop) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type PluginConfigConstructor added in v0.27.1

type PluginConfigConstructor func() interface{}

PluginConfigConstructor is a func that returns a pointer to a new and fully populated configuration struct for a plugin type. It is valid to return a pointer to an empty struct (&struct{}{}) if no configuration fields are needed.

type PluginConfigSanitiser added in v0.27.2

type PluginConfigSanitiser func(conf interface{}) interface{}

PluginConfigSanitiser is a function that takes a configuration object for a plugin and returns a sanitised (minimal) version of it for printing in examples and plugin documentation.

This function is useful for when a plugins configuration struct is very large and complex, but can sometimes be expressed in a more concise way without losing the original intent.

type PluginConstructor added in v0.27.1

type PluginConstructor func(
	config interface{},
	manager types.Manager,
	logger log.Modular,
	metrics metrics.Type,
) (types.Processor, error)

PluginConstructor is a func that constructs a Benthos processor plugin. These are plugins that are specific to certain use cases, experimental, private or otherwise unfit for widespread general use. Any number of plugins can be specified when using Benthos as a framework.

The configuration object will be the result of the PluginConfigConstructor after overlaying the user configuration.

type ProcessBatch added in v0.32.0

type ProcessBatch struct {
	// contains filtered or unexported fields
}

ProcessBatch is a processor that applies a list of child processors to each message of a batch individually.

func (*ProcessBatch) CloseAsync added in v0.40.0

func (p *ProcessBatch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ProcessBatch) ProcessMessage added in v0.32.0

func (p *ProcessBatch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ProcessBatch) WaitForClose added in v0.40.0

func (p *ProcessBatch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProcessBatchConfig added in v0.32.0

type ProcessBatchConfig []Config

ProcessBatchConfig is a config struct containing fields for the ProcessBatch processor.

func NewProcessBatchConfig added in v0.32.0

func NewProcessBatchConfig() ProcessBatchConfig

NewProcessBatchConfig returns a default ProcessBatchConfig.

type ProcessDAG added in v0.34.0

type ProcessDAG struct {
	// contains filtered or unexported fields
}

ProcessDAG is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.

func (*ProcessDAG) CloseAsync added in v0.40.0

func (p *ProcessDAG) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ProcessDAG) ProcessMessage added in v0.34.0

func (p *ProcessDAG) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ProcessDAG) WaitForClose added in v0.40.0

func (p *ProcessDAG) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProcessDAGConfig added in v0.34.0

type ProcessDAGConfig map[string]DepProcessMapConfig

ProcessDAGConfig is a config struct containing fields for the ProcessDAG processor.

func NewProcessDAGConfig added in v0.34.0

func NewProcessDAGConfig() ProcessDAGConfig

NewProcessDAGConfig returns a default ProcessDAGConfig.

type ProcessField added in v0.18.0

type ProcessField struct {
	// contains filtered or unexported fields
}

ProcessField is a processor that applies a list of child processors to a field extracted from the original payload.

func (*ProcessField) CloseAsync added in v0.40.0

func (p *ProcessField) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ProcessField) ProcessMessage added in v0.18.0

func (p *ProcessField) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ProcessField) WaitForClose added in v0.40.0

func (p *ProcessField) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProcessFieldConfig added in v0.18.0

type ProcessFieldConfig struct {
	Parts      []int    `json:"parts" yaml:"parts"`
	Path       string   `json:"path" yaml:"path"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ProcessFieldConfig is a config struct containing fields for the ProcessField processor.

func NewProcessFieldConfig added in v0.18.0

func NewProcessFieldConfig() ProcessFieldConfig

NewProcessFieldConfig returns a default ProcessFieldConfig.

type ProcessMap added in v0.18.0

type ProcessMap struct {
	// contains filtered or unexported fields
}

ProcessMap is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.

func NewProcessMap added in v0.18.0

func NewProcessMap(
	conf ProcessMapConfig, mgr types.Manager, log log.Modular, stats metrics.Type,
) (*ProcessMap, error)

NewProcessMap returns a ProcessField processor.

func (*ProcessMap) CloseAsync added in v0.40.0

func (p *ProcessMap) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ProcessMap) CreateResult added in v0.34.0

func (p *ProcessMap) CreateResult(msg types.Message) (types.Message, error)

CreateResult performs reduction and child processors to a payload and returns the result. The resulting message should have the same dimension as the payload, where reduced indexes are nil. This result can be overlayed onto the original message in order to complete the map.

func (*ProcessMap) OverlayResult added in v0.34.0

func (p *ProcessMap) OverlayResult(payload, response types.Message) ([]int, error)

OverlayResult attempts to merge the result of a process_map with the original

payload as per the map specified in the postmap and postmap_optional fields.

func (*ProcessMap) ProcessMessage added in v0.18.0

func (p *ProcessMap) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ProcessMap) TargetsProvided added in v0.34.0

func (p *ProcessMap) TargetsProvided() []string

TargetsProvided returns a list of targets provided by this processor derived from its postmap and postmap_optional fields.

func (*ProcessMap) TargetsUsed added in v0.34.0

func (p *ProcessMap) TargetsUsed() []string

TargetsUsed returns a list of target dependencies of this processor derived from its premap and premap_optional fields.

func (*ProcessMap) WaitForClose added in v0.40.0

func (p *ProcessMap) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProcessMapConfig added in v0.18.0

type ProcessMapConfig struct {
	Parts           []int              `json:"parts" yaml:"parts"`
	Conditions      []condition.Config `json:"conditions" yaml:"conditions"`
	Premap          map[string]string  `json:"premap" yaml:"premap"`
	PremapOptional  map[string]string  `json:"premap_optional" yaml:"premap_optional"`
	Postmap         map[string]string  `json:"postmap" yaml:"postmap"`
	PostmapOptional map[string]string  `json:"postmap_optional" yaml:"postmap_optional"`
	Processors      []Config           `json:"processors" yaml:"processors"`
}

ProcessMapConfig is a config struct containing fields for the ProcessMap processor.

func NewProcessMapConfig added in v0.18.0

func NewProcessMapConfig() ProcessMapConfig

NewProcessMapConfig returns a default ProcessMapConfig.

func (ProcessMapConfig) Sanitise added in v0.34.0

func (p ProcessMapConfig) Sanitise() (map[string]interface{}, error)

Sanitise the configuration into a minimal structure that can be printed without changing the intent.

func (*ProcessMapConfig) UnmarshalJSON added in v0.34.0

func (p *ProcessMapConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*ProcessMapConfig) UnmarshalYAML added in v0.34.0

func (p *ProcessMapConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Sample added in v0.3.2

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that drops messages based on a random sample.

func (*Sample) CloseAsync added in v0.40.0

func (s *Sample) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Sample) ProcessMessage added in v0.3.2

func (s *Sample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Sample) WaitForClose added in v0.40.0

func (s *Sample) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SampleConfig added in v0.3.2

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains configuration fields for the Sample processor.

func NewSampleConfig added in v0.3.2

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts added in v0.4.1

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that selects parts from a message to append to a new message.

func (*SelectParts) CloseAsync added in v0.40.0

func (m *SelectParts) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*SelectParts) ProcessMessage added in v0.4.1

func (m *SelectParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*SelectParts) WaitForClose added in v0.40.0

func (m *SelectParts) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SelectPartsConfig added in v0.4.1

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig added in v0.4.1

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type Split added in v0.7.6

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) CloseAsync added in v0.40.0

func (s *Split) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Split) ProcessMessage added in v0.7.6

func (s *Split) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Split) WaitForClose added in v0.40.0

func (s *Split) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SplitConfig added in v0.23.1

type SplitConfig struct {
	Size int `json:"size" yaml:"size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig added in v0.23.1

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type Subprocess added in v0.40.0

type Subprocess struct {
	// contains filtered or unexported fields
}

Subprocess is a processor that executes a command.

func (*Subprocess) CloseAsync added in v0.40.0

func (e *Subprocess) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Subprocess) ProcessMessage added in v0.40.0

func (e *Subprocess) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage logs an event and returns the message unchanged.

func (*Subprocess) WaitForClose added in v0.40.0

func (e *Subprocess) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SubprocessConfig added in v0.40.0

type SubprocessConfig struct {
	Parts []int    `json:"parts" yaml:"parts"`
	Name  string   `json:"name" yaml:"name"`
	Args  []string `json:"args" yaml:"args"`
}

SubprocessConfig contains configuration fields for the Subprocess processor.

func NewSubprocessConfig added in v0.40.0

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig returns a SubprocessConfig with default values.

type Text added in v0.19.0

type Text struct {
	// contains filtered or unexported fields
}

Text is a processor that performs a text based operation on a payload.

func (*Text) CloseAsync added in v0.40.0

func (t *Text) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Text) ProcessMessage added in v0.19.0

func (t *Text) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Text) WaitForClose added in v0.40.0

func (t *Text) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type TextConfig added in v0.19.0

type TextConfig struct {
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
	Arg      string `json:"arg" yaml:"arg"`
	Value    string `json:"value" yaml:"value"`
}

TextConfig contains configuration fields for the Text processor.

func NewTextConfig added in v0.19.0

func NewTextConfig() TextConfig

NewTextConfig returns a TextConfig with default values.

type Throttle added in v0.23.9

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle is a processor that limits the stream of a pipeline to one message batch per period specified.

func (*Throttle) CloseAsync added in v0.40.0

func (m *Throttle) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Throttle) ProcessMessage added in v0.23.9

func (m *Throttle) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Throttle) WaitForClose added in v0.40.0

func (m *Throttle) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ThrottleConfig added in v0.23.9

type ThrottleConfig struct {
	Period string `json:"period" yaml:"period"`
}

ThrottleConfig contains configuration fields for the Throttle processor.

func NewThrottleConfig added in v0.23.9

func NewThrottleConfig() ThrottleConfig

NewThrottleConfig returns a ThrottleConfig with default values.

type Try added in v0.39.2

type Try struct {
	// contains filtered or unexported fields
}

Try is a processor that applies a list of child processors to each message of a batch individually, where processors are skipped for messages that failed a previous processor step.

func (*Try) CloseAsync added in v0.40.0

func (p *Try) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Try) ProcessMessage added in v0.39.2

func (p *Try) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Try) WaitForClose added in v0.40.0

func (p *Try) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type TryConfig added in v0.39.2

type TryConfig []Config

TryConfig is a config struct containing fields for the Try processor.

func NewTryConfig added in v0.39.2

func NewTryConfig() TryConfig

NewTryConfig returns a default TryConfig.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg types.Message) ([]types.Message, types.Response)

	types.Closable
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the pipeline, or a response that should be sent back to the source instead.

func New

func New(
	conf Config,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive added in v0.7.7

func NewArchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewArchive returns a Archive processor.

func NewBatch added in v0.13.0

func NewBatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBatch returns a Batch processor.

func NewBoundsCheck

func NewBoundsCheck(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCatch added in v0.39.2

func NewCatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCatch returns a Catch processor.

func NewCompress added in v0.7.7

func NewCompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCompress returns a Compress processor.

func NewConditional added in v0.13.0

func NewConditional(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewConditional returns a Conditional processor.

func NewDecode added in v0.15.5

func NewDecode(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecode returns a Decode processor.

func NewDecompress added in v0.7.4

func NewDecompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecompress returns a Decompress processor.

func NewDedupe added in v0.9.7

func NewDedupe(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDedupe returns a Dedupe processor.

func NewEncode added in v0.15.5

func NewEncode(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewEncode returns a Encode processor.

func NewFilter added in v0.13.0

func NewFilter(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilter returns a Filter processor.

func NewFilterParts added in v0.14.7

func NewFilterParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilterParts returns a FilterParts processor.

func NewGrok added in v0.13.5

func NewGrok(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGrok returns a Grok processor.

func NewGroupBy added in v0.34.10

func NewGroupBy(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGroupBy returns a GroupBy processor.

func NewGroupByValue added in v0.38.6

func NewGroupByValue(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGroupByValue returns a GroupByValue processor.

func NewHTTP added in v0.15.4

func NewHTTP(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHTTP returns a HTTP processor.

func NewHash added in v0.25.0

func NewHash(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHash returns a Hash processor.

func NewHashSample added in v0.6.11

func NewHashSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart added in v0.6.19

func NewInsertPart(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewJMESPath added in v0.9.8

func NewJMESPath(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJMESPath returns a JMESPath processor.

func NewJSON added in v0.14.0

func NewJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJSON returns a JSON processor.

func NewLambda added in v0.33.0

func NewLambda(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewLambda returns a Lambda processor.

func NewLog added in v0.33.0

func NewLog(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewLog returns a Log processor.

func NewMergeJSON added in v0.11.4

func NewMergeJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMergeJSON returns a MergeJSON processor.

func NewMetadata added in v0.20.1

func NewMetadata(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMetadata returns a Metadata processor.

func NewMetric added in v0.26.2

func NewMetric(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMetric returns a Metric processor.

func NewNoop added in v0.6.5

func NewNoop(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewNoop returns a Noop processor.

func NewProcessBatch added in v0.32.0

func NewProcessBatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessBatch returns a ProcessBatch processor.

func NewProcessDAG added in v0.34.0

func NewProcessDAG(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessDAG returns a ProcessField processor.

func NewProcessField added in v0.18.0

func NewProcessField(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessField returns a ProcessField processor.

func NewSample added in v0.3.2

func NewSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts added in v0.4.1

func NewSelectParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSplit added in v0.7.6

func NewSplit(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSplit returns a Split processor.

func NewSubprocess added in v0.40.0

func NewSubprocess(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSubprocess returns a Subprocess processor.

func NewText added in v0.19.0

func NewText(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewText returns a Text processor.

func NewThrottle added in v0.23.9

func NewThrottle(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewThrottle returns a Throttle processor.

func NewTry added in v0.39.2

func NewTry(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewTry returns a Try processor.

func NewUnarchive added in v0.7.4

func NewUnarchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewUnarchive returns a Unarchive processor.

type TypeSpec added in v0.9.1

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type Unarchive added in v0.7.4

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message following a chosen archive type.

func (*Unarchive) CloseAsync added in v0.40.0

func (d *Unarchive) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Unarchive) ProcessMessage added in v0.7.4

func (d *Unarchive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Unarchive) WaitForClose added in v0.40.0

func (d *Unarchive) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type UnarchiveConfig added in v0.7.4

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains configuration fields for the Unarchive processor.

func NewUnarchiveConfig added in v0.7.4

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Directories

Path Synopsis
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL