processor

package
v3.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2020 License: MIT Imports: 83 Imported by: 15

Documentation

Overview

Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.

Index

Constants

View Source
const (
	TypeArchive      = "archive"
	TypeAvro         = "avro"
	TypeAWK          = "awk"
	TypeBatch        = "batch"
	TypeBloblang     = "bloblang"
	TypeBoundsCheck  = "bounds_check"
	TypeBranch       = "branch"
	TypeCache        = "cache"
	TypeCatch        = "catch"
	TypeCompress     = "compress"
	TypeConditional  = "conditional"
	TypeDecode       = "decode"
	TypeDecompress   = "decompress"
	TypeDedupe       = "dedupe"
	TypeEncode       = "encode"
	TypeFilter       = "filter"
	TypeFilterParts  = "filter_parts"
	TypeForEach      = "for_each"
	TypeGrok         = "grok"
	TypeGroupBy      = "group_by"
	TypeGroupByValue = "group_by_value"
	TypeHash         = "hash"
	TypeHashSample   = "hash_sample"
	TypeHTTP         = "http"
	TypeInsertPart   = "insert_part"
	TypeJMESPath     = "jmespath"
	TypeJQ           = "jq"
	TypeJSON         = "json"
	TypeJSONSchema   = "json_schema"
	TypeLambda       = "lambda"
	TypeLog          = "log"
	TypeMergeJSON    = "merge_json"
	TypeMetadata     = "metadata"
	TypeMetric       = "metric"
	TypeNoop         = "noop"
	TypeNumber       = "number"
	TypeParallel     = "parallel"
	TypeParseLog     = "parse_log"
	TypeProcessBatch = "process_batch"
	TypeProcessDAG   = "process_dag"
	TypeProcessField = "process_field"
	TypeProcessMap   = "process_map"
	TypeProtobuf     = "protobuf"
	TypeRateLimit    = "rate_limit"
	TypeRedis        = "redis"
	TypeResource     = "resource"
	TypeSample       = "sample"
	TypeSelectParts  = "select_parts"
	TypeSleep        = "sleep"
	TypeSplit        = "split"
	TypeSQL          = "sql"
	TypeSubprocess   = "subprocess"
	TypeSwitch       = "switch"
	TypeSyncResponse = "sync_response"
	TypeText         = "text"
	TypeTry          = "try"
	TypeThrottle     = "throttle"
	TypeUnarchive    = "unarchive"
	TypeWhile        = "while"
	TypeWorkflow     = "workflow"
	TypeXML          = "xml"
)

String constants representing each processor type.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

View Source
var DocsUsesBatches = `` /* 177-byte string literal not displayed */

DocsUsesBatches returns a documentation paragraph regarding processors that benefit from input level batching.

View Source
var FailFlagKey = types.FailFlagKey

FailFlagKey is a metadata key used for flagging processor errors in Benthos. If a message part has any non-empty value for this metadata key then it will be interpretted as having failed a processor step somewhere in the pipeline.

Functions

func Block

func Block(typeStr, reason string)

Block replaces the constructor of a Benthos processor such that its construction will always return an error. This is useful for building strict pipelines where certain processors should not be available. NOTE: This does not remove the processor from the configuration spec, and normalisation will still work the same for blocked processors.

EXPERIMENTAL: This function is experimental and therefore subject to change outside of major version releases.

func ClearFail

func ClearFail(part types.Part)

ClearFail removes any existing failure flags from a message part.

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func DocumentPlugin

func DocumentPlugin(
	typeString, description string,
	configSanitiser PluginConfigSanitiser,
)

DocumentPlugin adds a description and an optional configuration sanitiser function to the definition of a registered plugin. This improves the documentation generated by PluginDescriptions.

func ExecuteAll

func ExecuteAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll

func ExecuteCatchAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteTryAll

func ExecuteTryAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func FlagErr

func FlagErr(part types.Part, err error)

FlagErr marks a message part as having failed at a processing step with an error message. If the error is nil the message part remains unchanged.

func FlagFail

func FlagFail(part types.Part)

FlagFail marks a message part as having failed at a processing step.

func GetFail added in v3.26.0

func GetFail(part types.Part) string

GetFail returns an error string for a message part if it has failed, or an empty string if not.

func HasFailed

func HasFailed(part types.Part) bool

HasFailed checks whether a message part has failed a processing step.

func IteratePartsWithSpan

func IteratePartsWithSpan(
	operationName string, parts []int, msg types.Message,
	iter func(int, opentracing.Span, types.Part) error,
)

IteratePartsWithSpan iterates the parts of a message according to a slice of indexes (if empty all parts are iterated) and calls a func for each part along with a tracing span for that part. If an error is returned the part is flagged as failed and the span has the error logged.

func PluginCount

func PluginCount() int

PluginCount returns the number of registered plugins. This does NOT count the standard set of components.

func PluginDescriptions

func PluginDescriptions() string

PluginDescriptions generates and returns a markdown formatted document listing each registered plugin and an example configuration for it.

func RegisterPlugin

func RegisterPlugin(
	typeString string,
	configConstructor PluginConfigConstructor,
	constructor PluginConstructor,
)

RegisterPlugin registers a plugin by a unique name so that it can be constructed similar to regular processors. If configuration is not needed for this plugin then configConstructor can be nil. A constructor for the plugin itself must be provided.

func SanitiseConfig

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type AWK

type AWK struct {
	// contains filtered or unexported fields
}

AWK is a processor that executes AWK programs on a message part and replaces the contents with the result.

func (*AWK) CloseAsync

func (a *AWK) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*AWK) ProcessMessage

func (a *AWK) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*AWK) WaitForClose

func (a *AWK) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type AWKConfig

type AWKConfig struct {
	Parts   []int  `json:"parts" yaml:"parts"`
	Codec   string `json:"codec" yaml:"codec"`
	Program string `json:"program" yaml:"program"`
}

AWKConfig contains configuration fields for the AWK processor.

func NewAWKConfig

func NewAWKConfig() AWKConfig

NewAWKConfig returns a AWKConfig with default values.

type Archive

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message into a single part using a chosen archive type.

func (*Archive) CloseAsync

func (d *Archive) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Archive) ProcessMessage

func (d *Archive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Archive) WaitForClose

func (d *Archive) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ArchiveConfig

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains configuration fields for the Archive processor.

func NewArchiveConfig

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type Avro

type Avro struct {
	// contains filtered or unexported fields
}

Avro is a processor that performs an operation on an Avro payload.

func (*Avro) CloseAsync

func (p *Avro) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Avro) ProcessMessage

func (p *Avro) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Avro) WaitForClose

func (p *Avro) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type AvroConfig

type AvroConfig struct {
	Parts      []int  `json:"parts" yaml:"parts"`
	Operator   string `json:"operator" yaml:"operator"`
	Encoding   string `json:"encoding" yaml:"encoding"`
	Schema     string `json:"schema" yaml:"schema"`
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
}

AvroConfig contains configuration fields for the Avro processor.

func NewAvroConfig

func NewAvroConfig() AvroConfig

NewAvroConfig returns a AvroConfig with default values.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a processor that combines messages into a batch until a size limit or other condition is reached, at which point the batch is sent out. When a message is combined without yet producing a batch a NoAck response is returned, which is interpretted as source types as an instruction to send another message through but hold off on acknowledging this one.

Eventually, when the batch reaches its target size, the batch is sent through the pipeline as a single message and an acknowledgement for that message determines whether the whole batch of messages are acknowledged.

TODO: V4 Remove me.

func (*Batch) CloseAsync

func (b *Batch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Batch) ProcessMessage

func (b *Batch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Batch) WaitForClose

func (b *Batch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type BatchConfig

type BatchConfig struct {
	ByteSize  int              `json:"byte_size" yaml:"byte_size"`
	Count     int              `json:"count" yaml:"count"`
	Condition condition.Config `json:"condition" yaml:"condition"`
	Period    string           `json:"period" yaml:"period"`
}

BatchConfig contains configuration fields for the Batch processor.

func NewBatchConfig

func NewBatchConfig() BatchConfig

NewBatchConfig returns a BatchConfig with default values.

type Bloblang added in v3.13.0

type Bloblang struct {
	// contains filtered or unexported fields
}

Bloblang is a processor that performs a Bloblang mapping.

func (*Bloblang) CloseAsync added in v3.13.0

func (b *Bloblang) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Bloblang) ProcessMessage added in v3.13.0

func (b *Bloblang) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Bloblang) WaitForClose added in v3.13.0

func (b *Bloblang) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type BloblangConfig added in v3.13.0

type BloblangConfig string

BloblangConfig contains configuration fields for the Bloblang processor.

func NewBloblangConfig added in v3.13.0

func NewBloblangConfig() BloblangConfig

NewBloblangConfig returns a BloblangConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) CloseAsync

func (m *BoundsCheck) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*BoundsCheck) WaitForClose

func (m *BoundsCheck) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Branch added in v3.25.0

type Branch struct {
	// contains filtered or unexported fields
}

Branch contains conditions and maps for transforming a batch of messages into a subset of request messages, and mapping results from those requests back into the original message batch.

func (*Branch) CloseAsync added in v3.25.0

func (b *Branch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Branch) ProcessMessage added in v3.25.0

func (b *Branch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Branch) WaitForClose added in v3.25.0

func (b *Branch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type BranchConfig added in v3.25.0

type BranchConfig struct {
	RequestMap string   `json:"request_map" yaml:"request_map"`
	Processors []Config `json:"processors" yaml:"processors"`
	ResultMap  string   `json:"result_map" yaml:"result_map"`
}

BranchConfig contains configuration fields for the Branch processor.

func NewBranchConfig added in v3.25.0

func NewBranchConfig() BranchConfig

NewBranchConfig returns a BranchConfig with default values.

func (BranchConfig) Sanitise added in v3.25.0

func (b BranchConfig) Sanitise() (map[string]interface{}, error)

Sanitise the configuration into a minimal structure that can be printed without changing the intent.

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache is a processor that stores or retrieves data from a cache for each message of a batch via an interpolated key.

func (*Cache) CloseAsync

func (c *Cache) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Cache) ProcessMessage

func (c *Cache) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Cache) WaitForClose

func (c *Cache) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type CacheConfig

type CacheConfig struct {
	Cache    string `json:"cache" yaml:"cache"`
	Resource string `json:"resource" yaml:"resource"`
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
}

CacheConfig contains configuration fields for the Cache processor.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig returns a CacheConfig with default values.

type Catch

type Catch struct {
	// contains filtered or unexported fields
}

Catch is a processor that applies a list of child processors to each message of a batch individually, where processors are skipped for messages that failed a previous processor step.

func (*Catch) CloseAsync

func (p *Catch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Catch) ProcessMessage

func (p *Catch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Catch) WaitForClose

func (p *Catch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type CatchConfig

type CatchConfig []Config

CatchConfig is a config struct containing fields for the Catch processor.

func NewCatchConfig

func NewCatchConfig() CatchConfig

NewCatchConfig returns a default CatchConfig.

type Category added in v3.26.0

type Category string

Category describes the general purpose of a processor.

var (
	CategoryMapping     Category = "Mapping"
	CategoryParsing     Category = "Parsing"
	CategoryIntegration Category = "Integration"
	CategoryComposition Category = "Composition"
	CategoryUtility     Category = "Utility"
)

Processor categories

type Compress

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) CloseAsync

func (c *Compress) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Compress) ProcessMessage

func (c *Compress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Compress) WaitForClose

func (c *Compress) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Conditional

type Conditional struct {
	// contains filtered or unexported fields
}

Conditional is a processor that only applies child processors under a certain condition.

func (*Conditional) CloseAsync

func (c *Conditional) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Conditional) ProcessMessage

func (c *Conditional) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Conditional) WaitForClose

func (c *Conditional) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ConditionalConfig

type ConditionalConfig struct {
	Condition      condition.Config `json:"condition" yaml:"condition"`
	Processors     []Config         `json:"processors" yaml:"processors"`
	ElseProcessors []Config         `json:"else_processors" yaml:"else_processors"`
}

ConditionalConfig is a config struct containing fields for the Conditional processor.

func NewConditionalConfig

func NewConditionalConfig() ConditionalConfig

NewConditionalConfig returns a default ConditionalConfig.

type Config

type Config struct {
	Type         string             `json:"type" yaml:"type"`
	Archive      ArchiveConfig      `json:"archive" yaml:"archive"`
	Avro         AvroConfig         `json:"avro" yaml:"avro"`
	AWK          AWKConfig          `json:"awk" yaml:"awk"`
	Batch        BatchConfig        `json:"batch" yaml:"batch"`
	Bloblang     BloblangConfig     `json:"bloblang" yaml:"bloblang"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Branch       BranchConfig       `json:"branch" yaml:"branch"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Catch        CatchConfig        `json:"catch" yaml:"catch"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Conditional  ConditionalConfig  `json:"conditional" yaml:"conditional"`
	Decode       DecodeConfig       `json:"decode" yaml:"decode"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	Encode       EncodeConfig       `json:"encode" yaml:"encode"`
	Filter       FilterConfig       `json:"filter" yaml:"filter"`
	FilterParts  FilterPartsConfig  `json:"filter_parts" yaml:"filter_parts"`
	ForEach      ForEachConfig      `json:"for_each" yaml:"for_each"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	GroupBy      GroupByConfig      `json:"group_by" yaml:"group_by"`
	GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
	Hash         HashConfig         `json:"hash" yaml:"hash"`
	HashSample   HashSampleConfig   `json:"hash_sample" yaml:"hash_sample"`
	HTTP         HTTPConfig         `json:"http" yaml:"http"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JQ           JQConfig           `json:"jq" yaml:"jq"`
	JSON         JSONConfig         `json:"json" yaml:"json"`
	JSONSchema   JSONSchemaConfig   `json:"json_schema" yaml:"json_schema"`
	Lambda       LambdaConfig       `json:"lambda" yaml:"lambda"`
	Log          LogConfig          `json:"log" yaml:"log"`
	MergeJSON    MergeJSONConfig    `json:"merge_json" yaml:"merge_json"`
	Metadata     MetadataConfig     `json:"metadata" yaml:"metadata"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	Number       NumberConfig       `json:"number" yaml:"number"`
	Plugin       interface{}        `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Parallel     ParallelConfig     `json:"parallel" yaml:"parallel"`
	ParseLog     ParseLogConfig     `json:"parse_log" yaml:"parse_log"`
	ProcessBatch ForEachConfig      `json:"process_batch" yaml:"process_batch"`
	ProcessDAG   ProcessDAGConfig   `json:"process_dag" yaml:"process_dag"`
	ProcessField ProcessFieldConfig `json:"process_field" yaml:"process_field"`
	ProcessMap   ProcessMapConfig   `json:"process_map" yaml:"process_map"`
	Protobuf     ProtobufConfig     `json:"protobuf" yaml:"protobuf"`
	RateLimit    RateLimitConfig    `json:"rate_limit" yaml:"rate_limit"`
	Redis        RedisConfig        `json:"redis" yaml:"redis"`
	Resource     string             `json:"resource" yaml:"resource"`
	Sample       SampleConfig       `json:"sample" yaml:"sample"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Sleep        SleepConfig        `json:"sleep" yaml:"sleep"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	SQL          SQLConfig          `json:"sql" yaml:"sql"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse SyncResponseConfig `json:"sync_response" yaml:"sync_response"`
	Text         TextConfig         `json:"text" yaml:"text"`
	Try          TryConfig          `json:"try" yaml:"try"`
	Throttle     ThrottleConfig     `json:"throttle" yaml:"throttle"`
	Unarchive    UnarchiveConfig    `json:"unarchive" yaml:"unarchive"`
	While        WhileConfig        `json:"while" yaml:"while"`
	Workflow     WorkflowConfig     `json:"workflow" yaml:"workflow"`
	XML          XMLConfig          `json:"xml" yaml:"xml"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (Config) Sanitised added in v3.24.0

func (conf Config) Sanitised(removeDeprecated bool) (interface{}, error)

Sanitised returns a sanitised version of the config, meaning sections that aren't relevant to behaviour are removed. Also optionally removes deprecated fields.

func (*Config) UnmarshalYAML

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type DAGDepsConfig

type DAGDepsConfig struct {
	Dependencies []string `json:"dependencies" yaml:"dependencies"`
}

DAGDepsConfig is a config containing dependency based configuration values for a ProcessDAG child.

func NewDAGDepsConfig

func NewDAGDepsConfig() DAGDepsConfig

NewDAGDepsConfig returns a default DAGDepsConfig.

func (*DAGDepsConfig) UnmarshalJSON

func (p *DAGDepsConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*DAGDepsConfig) UnmarshalYAML

func (p *DAGDepsConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decode

type Decode struct {
	// contains filtered or unexported fields
}

Decode is a processor that can selectively decode parts of a message following a chosen scheme.

func (*Decode) CloseAsync

func (c *Decode) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Decode) ProcessMessage

func (c *Decode) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Decode) WaitForClose

func (c *Decode) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type DecodeConfig

type DecodeConfig struct {
	Scheme string `json:"scheme" yaml:"scheme"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

DecodeConfig contains configuration fields for the Decode processor.

func NewDecodeConfig

func NewDecodeConfig() DecodeConfig

NewDecodeConfig returns a DecodeConfig with default values.

type Decompress

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can decompress parts of a message following a chosen compression algorithm.

func (*Decompress) CloseAsync

func (d *Decompress) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Decompress) ProcessMessage

func (d *Decompress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Decompress) WaitForClose

func (d *Decompress) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type Dedupe

type Dedupe struct {
	// contains filtered or unexported fields
}

Dedupe is a processor that deduplicates messages either by hashing the full contents of message parts or by hashing the value of an interpolated string.

func (*Dedupe) CloseAsync

func (d *Dedupe) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Dedupe) ProcessMessage

func (d *Dedupe) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Dedupe) WaitForClose

func (d *Dedupe) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type DedupeConfig

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	HashType       string `json:"hash" yaml:"hash"`
	Parts          []int  `json:"parts" yaml:"parts"` // message parts to hash
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type DepProcessMapConfig

type DepProcessMapConfig struct {
	DAGDepsConfig    `json:",inline" yaml:",inline"`
	ProcessMapConfig `json:",inline" yaml:",inline"`
}

DepProcessMapConfig contains a superset of a ProcessMap config and some DAG specific fields.

func NewDepProcessMapConfig

func NewDepProcessMapConfig() DepProcessMapConfig

NewDepProcessMapConfig returns a default DepProcessMapConfig.

type Encode

type Encode struct {
	// contains filtered or unexported fields
}

Encode is a processor that can selectively encode parts of a message following a chosen scheme.

func (*Encode) CloseAsync

func (c *Encode) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Encode) ProcessMessage

func (c *Encode) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Encode) WaitForClose

func (c *Encode) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type EncodeConfig

type EncodeConfig struct {
	Scheme string `json:"scheme" yaml:"scheme"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

EncodeConfig contains configuration fields for the Encode processor.

func NewEncodeConfig

func NewEncodeConfig() EncodeConfig

NewEncodeConfig returns a EncodeConfig with default values.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter is a processor that checks each message against a condition and rejects the message if a condition returns false.

func (*Filter) CloseAsync

func (c *Filter) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Filter) ProcessMessage

func (c *Filter) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Filter) WaitForClose

func (c *Filter) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type FilterConfig

type FilterConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterConfig contains configuration fields for the Filter processor.

func NewFilterConfig

func NewFilterConfig() FilterConfig

NewFilterConfig returns a FilterConfig with default values.

func (FilterConfig) MarshalYAML

func (f FilterConfig) MarshalYAML() (interface{}, error)

MarshalYAML prints the child condition instead of {}.

type FilterParts

type FilterParts struct {
	// contains filtered or unexported fields
}

FilterParts is a processor that checks each part from a message against a condition and removes the part if the condition returns false.

func (*FilterParts) CloseAsync

func (c *FilterParts) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*FilterParts) ProcessMessage

func (c *FilterParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*FilterParts) WaitForClose

func (c *FilterParts) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type FilterPartsConfig

type FilterPartsConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterPartsConfig contains configuration fields for the FilterParts processor.

func NewFilterPartsConfig

func NewFilterPartsConfig() FilterPartsConfig

NewFilterPartsConfig returns a FilterPartsConfig with default values.

func (FilterPartsConfig) MarshalYAML

func (f FilterPartsConfig) MarshalYAML() (interface{}, error)

MarshalYAML prints the child condition instead of {}.

type ForEach

type ForEach struct {
	// contains filtered or unexported fields
}

ForEach is a processor that applies a list of child processors to each message of a batch individually.

func (*ForEach) CloseAsync

func (p *ForEach) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ForEach) ProcessMessage

func (p *ForEach) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ForEach) WaitForClose

func (p *ForEach) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ForEachConfig

type ForEachConfig []Config

ForEachConfig is a config struct containing fields for the ForEach processor.

func NewForEachConfig

func NewForEachConfig() ForEachConfig

NewForEachConfig returns a default ForEachConfig.

type Grok

type Grok struct {
	// contains filtered or unexported fields
}

Grok is a processor that executes Grok queries on a message part and replaces the contents with the result.

func (*Grok) CloseAsync

func (g *Grok) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Grok) ProcessMessage

func (g *Grok) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Grok) WaitForClose

func (g *Grok) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type GrokConfig

type GrokConfig struct {
	Parts              []int             `json:"parts" yaml:"parts"`
	Patterns           []string          `json:"patterns" yaml:"patterns"`
	RemoveEmpty        bool              `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly          bool              `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults        bool              `json:"use_default_patterns" yaml:"use_default_patterns"`
	To                 string            `json:"output_format" yaml:"output_format"`
	PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type GroupBy

type GroupBy struct {
	// contains filtered or unexported fields
}

GroupBy is a processor that group_bys messages into a message per part.

func (*GroupBy) CloseAsync

func (g *GroupBy) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*GroupBy) ProcessMessage

func (g *GroupBy) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*GroupBy) WaitForClose

func (g *GroupBy) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type GroupByConfig

type GroupByConfig []GroupByElement

GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.

func NewGroupByConfig

func NewGroupByConfig() GroupByConfig

NewGroupByConfig returns a GroupByConfig with default values.

type GroupByElement

type GroupByElement struct {
	Condition  condition.Config `json:"condition" yaml:"condition"`
	Processors []Config         `json:"processors" yaml:"processors"`
}

GroupByElement represents a group determined by a condition and a list of group specific processors.

type GroupByValue

type GroupByValue struct {
	// contains filtered or unexported fields
}

GroupByValue is a processor that breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func (*GroupByValue) CloseAsync

func (g *GroupByValue) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*GroupByValue) ProcessMessage

func (g *GroupByValue) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*GroupByValue) WaitForClose

func (g *GroupByValue) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type GroupByValueConfig

type GroupByValueConfig struct {
	Value string `json:"value" yaml:"value"`
}

GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func NewGroupByValueConfig

func NewGroupByValueConfig() GroupByValueConfig

NewGroupByValueConfig returns a GroupByValueConfig with default values.

type HTTP

type HTTP struct {
	// contains filtered or unexported fields
}

HTTP is a processor that performs an HTTP request using the message as the request body, and returns the response.

func (*HTTP) CloseAsync

func (h *HTTP) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*HTTP) ProcessMessage

func (h *HTTP) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*HTTP) WaitForClose

func (h *HTTP) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type HTTPConfig

type HTTPConfig struct {
	Parallel      bool          `json:"parallel" yaml:"parallel"`
	MaxParallel   int           `json:"max_parallel" yaml:"max_parallel"`
	Client        client.Config `json:"request" yaml:"request"`
	client.Config `json:",inline" yaml:",inline"`
}

HTTPConfig contains configuration fields for the HTTP processor.

func NewHTTPConfig

func NewHTTPConfig() HTTPConfig

NewHTTPConfig returns a HTTPConfig with default values.

type Hash

type Hash struct {
	// contains filtered or unexported fields
}

Hash is a processor that can selectively hash parts of a message following a chosen algorithm.

func (*Hash) CloseAsync

func (c *Hash) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Hash) ProcessMessage

func (c *Hash) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Hash) WaitForClose

func (c *Hash) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type HashConfig

type HashConfig struct {
	Parts     []int  `json:"parts" yaml:"parts"`
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Key       string `json:"key" yaml:"key"`
}

HashConfig contains configuration fields for the Hash processor.

func NewHashConfig

func NewHashConfig() HashConfig

NewHashConfig returns a HashConfig with default values.

type HashSample

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that removes messages based on a sample factor by hashing its contents.

func (*HashSample) CloseAsync

func (s *HashSample) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*HashSample) ProcessMessage

func (s *HashSample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*HashSample) WaitForClose

func (s *HashSample) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type HashSampleConfig

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains configuration fields for the HashSample processor.

func NewHashSampleConfig

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) CloseAsync

func (p *InsertPart) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*InsertPart) ProcessMessage

func (p *InsertPart) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*InsertPart) WaitForClose

func (p *InsertPart) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPath

type JMESPath struct {
	// contains filtered or unexported fields
}

JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.

func (*JMESPath) CloseAsync

func (p *JMESPath) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*JMESPath) ProcessMessage

func (p *JMESPath) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*JMESPath) WaitForClose

func (p *JMESPath) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type JMESPathConfig

type JMESPathConfig struct {
	Parts []int  `json:"parts" yaml:"parts"`
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JQ added in v3.27.0

type JQ struct {
	// contains filtered or unexported fields
}

JQ is a processor that passes messages through gojq.

func (*JQ) CloseAsync added in v3.27.0

func (*JQ) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*JQ) ProcessMessage added in v3.27.0

func (j *JQ) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*JQ) WaitForClose added in v3.27.0

func (*JQ) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type JQConfig added in v3.27.0

type JQConfig struct {
	Query string `json:"query" yaml:"query"`
	Raw   bool   `json:"raw" yaml:"raw"`
}

JQConfig contains configuration fields for the JQ processor.

func NewJQConfig added in v3.27.0

func NewJQConfig() JQConfig

NewJQConfig returns a JQConfig with default values.

type JSON

type JSON struct {
	// contains filtered or unexported fields
}

JSON is a processor that performs an operation on a JSON payload.

func (*JSON) CloseAsync

func (p *JSON) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*JSON) ProcessMessage

func (p *JSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*JSON) WaitForClose

func (p *JSON) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type JSONConfig

type JSONConfig struct {
	Parts    []int        `json:"parts" yaml:"parts"`
	Operator string       `json:"operator" yaml:"operator"`
	Path     string       `json:"path" yaml:"path"`
	Value    rawJSONValue `json:"value" yaml:"value"`
}

JSONConfig contains configuration fields for the JSON processor.

func NewJSONConfig

func NewJSONConfig() JSONConfig

NewJSONConfig returns a JSONConfig with default values.

type JSONSchema added in v3.5.0

type JSONSchema struct {
	// contains filtered or unexported fields
}

JSONSchema is a processor that validates messages against a specified json schema.

func (*JSONSchema) CloseAsync added in v3.5.0

func (s *JSONSchema) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*JSONSchema) ProcessMessage added in v3.5.0

func (s *JSONSchema) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*JSONSchema) WaitForClose added in v3.5.0

func (s *JSONSchema) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type JSONSchemaConfig added in v3.5.0

type JSONSchemaConfig struct {
	Parts      []int  `json:"parts" yaml:"parts"`
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
	Schema     string `json:"schema" yaml:"schema"`
}

JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.

func NewJSONSchemaConfig added in v3.5.0

func NewJSONSchemaConfig() JSONSchemaConfig

NewJSONSchemaConfig returns a JSONSchemaConfig with default values.

type Lambda

type Lambda struct {
	// contains filtered or unexported fields
}

Lambda is a processor that invokes an AWS Lambda using the message as the request body, and returns the response.

func (*Lambda) CloseAsync

func (l *Lambda) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Lambda) ProcessMessage

func (l *Lambda) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Lambda) WaitForClose

func (l *Lambda) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type LambdaConfig

type LambdaConfig struct {
	client.Config `json:",inline" yaml:",inline"`
	Parallel      bool `json:"parallel" yaml:"parallel"`
}

LambdaConfig contains configuration fields for the Lambda processor.

func NewLambdaConfig

func NewLambdaConfig() LambdaConfig

NewLambdaConfig returns a LambdaConfig with default values.

type Log

type Log struct {
	// contains filtered or unexported fields
}

Log is a processor that prints a log event each time it processes a message.

func (*Log) CloseAsync

func (l *Log) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Log) ProcessMessage

func (l *Log) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage logs an event and returns the message unchanged.

func (*Log) WaitForClose

func (l *Log) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type LogConfig

type LogConfig struct {
	Level   string            `json:"level" yaml:"level"`
	Fields  map[string]string `json:"fields" yaml:"fields"`
	Message string            `json:"message" yaml:"message"`
}

LogConfig contains configuration fields for the Log processor.

func NewLogConfig

func NewLogConfig() LogConfig

NewLogConfig returns a LogConfig with default values.

type MergeJSON

type MergeJSON struct {
	// contains filtered or unexported fields
}

MergeJSON is a processor that merges JSON parsed message parts into a single value.

func (*MergeJSON) CloseAsync

func (p *MergeJSON) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*MergeJSON) ProcessMessage

func (p *MergeJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*MergeJSON) WaitForClose

func (p *MergeJSON) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type MergeJSONConfig

type MergeJSONConfig struct {
	Parts       []int `json:"parts" yaml:"parts"`
	RetainParts bool  `json:"retain_parts" yaml:"retain_parts"`
}

MergeJSONConfig contains configuration fields for the MergeJSON processor.

func NewMergeJSONConfig

func NewMergeJSONConfig() MergeJSONConfig

NewMergeJSONConfig returns a MergeJSONConfig with default values.

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata is a processor that performs an operation on the Metadata of a message.

func (*Metadata) CloseAsync

func (p *Metadata) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Metadata) ProcessMessage

func (p *Metadata) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Metadata) WaitForClose

func (p *Metadata) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type MetadataConfig

type MetadataConfig struct {
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
}

MetadataConfig contains configuration fields for the Metadata processor.

func NewMetadataConfig

func NewMetadataConfig() MetadataConfig

NewMetadataConfig returns a MetadataConfig with default values.

type Metric

type Metric struct {
	// contains filtered or unexported fields
}

Metric is a processor that creates a metric from extracted values from a message part.

func (*Metric) CloseAsync

func (m *Metric) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Metric) ProcessMessage

func (m *Metric) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message

func (*Metric) WaitForClose

func (m *Metric) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type MetricConfig

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Path   string            `json:"path" yaml:"path"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type Noop

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) CloseAsync

func (c *Noop) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Noop) ProcessMessage

func (c *Noop) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

func (*Noop) WaitForClose

func (c *Noop) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type Number

type Number struct {
	// contains filtered or unexported fields
}

Number is a processor that performs number based operations on payloads.

func (*Number) CloseAsync

func (n *Number) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Number) ProcessMessage

func (n *Number) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Number) WaitForClose

func (n *Number) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type NumberConfig

type NumberConfig struct {
	Parts    []int       `json:"parts" yaml:"parts"`
	Operator string      `json:"operator" yaml:"operator"`
	Value    interface{} `json:"value" yaml:"value"`
}

NumberConfig contains configuration fields for the Number processor.

func NewNumberConfig

func NewNumberConfig() NumberConfig

NewNumberConfig returns a NumberConfig with default values.

type Parallel

type Parallel struct {
	// contains filtered or unexported fields
}

Parallel is a processor that applies a list of child processors to each message of a batch individually.

func (*Parallel) CloseAsync

func (p *Parallel) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Parallel) ProcessMessage

func (p *Parallel) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Parallel) WaitForClose

func (p *Parallel) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ParallelConfig

type ParallelConfig struct {
	Cap        int      `json:"cap" yaml:"cap"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ParallelConfig is a config struct containing fields for the Parallel processor.

func NewParallelConfig

func NewParallelConfig() ParallelConfig

NewParallelConfig returns a default ParallelConfig.

type ParseLog added in v3.10.0

type ParseLog struct {
	// contains filtered or unexported fields
}

ParseLog is a processor that parses properly formatted messages.

func (*ParseLog) CloseAsync added in v3.10.0

func (s *ParseLog) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ParseLog) ProcessMessage added in v3.10.0

func (s *ParseLog) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ParseLog) WaitForClose added in v3.10.0

func (s *ParseLog) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ParseLogConfig added in v3.10.0

type ParseLogConfig struct {
	Parts        []int  `json:"parts" yaml:"parts"`
	Format       string `json:"format" yaml:"format"`
	Codec        string `json:"codec" yaml:"codec"`
	BestEffort   bool   `json:"best_effort" yaml:"best_effort"`
	WithRFC3339  bool   `json:"allow_rfc3339" yaml:"allow_rfc3339"`
	WithYear     string `json:"default_year" yaml:"default_year"`
	WithTimezone string `json:"default_timezone" yaml:"default_timezone"`
}

ParseLogConfig contains configuration fields for the ParseLog processor.

func NewParseLogConfig added in v3.10.0

func NewParseLogConfig() ParseLogConfig

NewParseLogConfig returns a ParseLogConfig with default values.

type PluginConfigConstructor

type PluginConfigConstructor func() interface{}

PluginConfigConstructor is a func that returns a pointer to a new and fully populated configuration struct for a plugin type.

type PluginConfigSanitiser

type PluginConfigSanitiser func(conf interface{}) interface{}

PluginConfigSanitiser is a function that takes a configuration object for a plugin and returns a sanitised (minimal) version of it for printing in examples and plugin documentation.

This function is useful for when a plugins configuration struct is very large and complex, but can sometimes be expressed in a more concise way without losing the original intent.

type PluginConstructor

type PluginConstructor func(
	config interface{},
	manager types.Manager,
	logger log.Modular,
	metrics metrics.Type,
) (types.Processor, error)

PluginConstructor is a func that constructs a Benthos processor plugin. These are plugins that are specific to certain use cases, experimental, private or otherwise unfit for widespread general use. Any number of plugins can be specified when using Benthos as a framework.

The configuration object will be the result of the PluginConfigConstructor after overlaying the user configuration.

type ProcessDAG

type ProcessDAG struct {
	// contains filtered or unexported fields
}

ProcessDAG is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.

func (*ProcessDAG) CloseAsync

func (p *ProcessDAG) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ProcessDAG) ProcessMessage

func (p *ProcessDAG) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ProcessDAG) WaitForClose

func (p *ProcessDAG) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProcessDAGConfig

type ProcessDAGConfig map[string]DepProcessMapConfig

ProcessDAGConfig is a config struct containing fields for the ProcessDAG processor.

func NewProcessDAGConfig

func NewProcessDAGConfig() ProcessDAGConfig

NewProcessDAGConfig returns a default ProcessDAGConfig.

type ProcessField

type ProcessField struct {
	// contains filtered or unexported fields
}

ProcessField is a processor that applies a list of child processors to a field extracted from the original payload.

func (*ProcessField) CloseAsync

func (p *ProcessField) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ProcessField) ProcessMessage

func (p *ProcessField) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ProcessField) WaitForClose

func (p *ProcessField) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProcessFieldConfig

type ProcessFieldConfig struct {
	Parts      []int    `json:"parts" yaml:"parts"`
	Codec      string   `json:"codec" yaml:"codec"`
	Path       string   `json:"path" yaml:"path"`
	ResultType string   `json:"result_type" yaml:"result_type"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ProcessFieldConfig is a config struct containing fields for the ProcessField processor.

func NewProcessFieldConfig

func NewProcessFieldConfig() ProcessFieldConfig

NewProcessFieldConfig returns a default ProcessFieldConfig.

type ProcessMap

type ProcessMap struct {
	// contains filtered or unexported fields
}

ProcessMap is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.

func NewProcessMap

func NewProcessMap(
	conf ProcessMapConfig, mgr types.Manager, log log.Modular, stats metrics.Type,
) (*ProcessMap, error)

NewProcessMap returns a ProcessField processor.

func (*ProcessMap) CloseAsync

func (p *ProcessMap) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*ProcessMap) CreateResult

func (p *ProcessMap) CreateResult(msg types.Message) error

CreateResult performs reduction and child processors to a payload. The size of the payload will remain unchanged, where reduced indexes are nil. This result can be overlayed onto the original message in order to complete the map.

func (*ProcessMap) OverlayResult

func (p *ProcessMap) OverlayResult(payload, response types.Message) ([]int, error)

OverlayResult attempts to merge the result of a process_map with the original

payload as per the map specified in the postmap and postmap_optional fields.

func (*ProcessMap) ProcessMessage

func (p *ProcessMap) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*ProcessMap) TargetsProvided

func (p *ProcessMap) TargetsProvided() []string

TargetsProvided returns a list of targets provided by this processor derived from its postmap and postmap_optional fields.

func (*ProcessMap) TargetsUsed

func (p *ProcessMap) TargetsUsed() []string

TargetsUsed returns a list of target dependencies of this processor derived from its premap and premap_optional fields.

func (*ProcessMap) WaitForClose

func (p *ProcessMap) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProcessMapConfig

type ProcessMapConfig struct {
	Parts           []int              `json:"parts" yaml:"parts"`
	Conditions      []condition.Config `json:"conditions" yaml:"conditions"`
	Premap          map[string]string  `json:"premap" yaml:"premap"`
	PremapOptional  map[string]string  `json:"premap_optional" yaml:"premap_optional"`
	Postmap         map[string]string  `json:"postmap" yaml:"postmap"`
	PostmapOptional map[string]string  `json:"postmap_optional" yaml:"postmap_optional"`
	Processors      []Config           `json:"processors" yaml:"processors"`
}

ProcessMapConfig is a config struct containing fields for the ProcessMap processor.

func NewProcessMapConfig

func NewProcessMapConfig() ProcessMapConfig

NewProcessMapConfig returns a default ProcessMapConfig.

func (ProcessMapConfig) Sanitise

func (p ProcessMapConfig) Sanitise() (map[string]interface{}, error)

Sanitise the configuration into a minimal structure that can be printed without changing the intent.

func (*ProcessMapConfig) UnmarshalJSON

func (p *ProcessMapConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*ProcessMapConfig) UnmarshalYAML

func (p *ProcessMapConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Protobuf added in v3.25.0

type Protobuf struct {
	// contains filtered or unexported fields
}

Protobuf is a processor that performs an operation on an Protobuf payload.

func (*Protobuf) CloseAsync added in v3.25.0

func (p *Protobuf) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Protobuf) ProcessMessage added in v3.25.0

func (p *Protobuf) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Protobuf) WaitForClose added in v3.25.0

func (p *Protobuf) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ProtobufConfig added in v3.25.0

type ProtobufConfig struct {
	Parts      []int  `json:"parts" yaml:"parts"`
	Operator   string `json:"operator" yaml:"operator"`
	Message    string `json:"message" yaml:"message"`
	ImportPath string `json:"import_path" yaml:"import_path"`
}

ProtobufConfig contains configuration fields for the Protobuf processor.

func NewProtobufConfig added in v3.25.0

func NewProtobufConfig() ProtobufConfig

NewProtobufConfig returns a ProtobufConfig with default values.

type RateLimit

type RateLimit struct {
	// contains filtered or unexported fields
}

RateLimit is a processor that performs an RateLimit request using the message as the request body, and returns the response.

func (*RateLimit) CloseAsync

func (r *RateLimit) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*RateLimit) ProcessMessage

func (r *RateLimit) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*RateLimit) WaitForClose

func (r *RateLimit) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type RateLimitConfig

type RateLimitConfig struct {
	Resource string `json:"resource" yaml:"resource"`
}

RateLimitConfig contains configuration fields for the RateLimit processor.

func NewRateLimitConfig

func NewRateLimitConfig() RateLimitConfig

NewRateLimitConfig returns a RateLimitConfig with default values.

type Redis added in v3.1.0

type Redis struct {
	// contains filtered or unexported fields
}

Redis is a processor that performs redis operations

func (*Redis) CloseAsync added in v3.1.0

func (r *Redis) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Redis) ProcessMessage added in v3.1.0

func (r *Redis) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Redis) WaitForClose added in v3.1.0

func (r *Redis) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type RedisConfig added in v3.1.0

type RedisConfig struct {
	URL         string `json:"url" yaml:"url"`
	Parts       []int  `json:"parts" yaml:"parts"`
	Operator    string `json:"operator" yaml:"operator"`
	Key         string `json:"key" yaml:"key"`
	Retries     int    `json:"retries" yaml:"retries"`
	RetryPeriod string `json:"retry_period" yaml:"retry_period"`
}

RedisConfig contains configuration fields for the Redis processor.

func NewRedisConfig added in v3.1.0

func NewRedisConfig() RedisConfig

NewRedisConfig returns a RedisConfig with default values.

type Resource added in v3.6.0

type Resource struct {
	// contains filtered or unexported fields
}

Resource is a processor that returns the result of a processor resource.

func (*Resource) CloseAsync added in v3.6.0

func (r *Resource) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Resource) ProcessMessage added in v3.6.0

func (r *Resource) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Resource) WaitForClose added in v3.6.0

func (r *Resource) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SQL

type SQL struct {
	// contains filtered or unexported fields
}

SQL is a processor that executes an SQL query for each message.

func (*SQL) CloseAsync

func (s *SQL) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*SQL) ProcessMessage

func (s *SQL) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage logs an event and returns the message unchanged.

func (*SQL) WaitForClose

func (s *SQL) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SQLConfig

type SQLConfig struct {
	Driver         string   `json:"driver" yaml:"driver"`
	DataSourceName string   `json:"data_source_name" yaml:"data_source_name"`
	DSN            string   `json:"dsn" yaml:"dsn"`
	Query          string   `json:"query" yaml:"query"`
	Args           []string `json:"args" yaml:"args"`
	ResultCodec    string   `json:"result_codec" yaml:"result_codec"`
}

SQLConfig contains configuration fields for the SQL processor.

func NewSQLConfig

func NewSQLConfig() SQLConfig

NewSQLConfig returns a SQLConfig with default values.

type Sample

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that drops messages based on a random sample.

func (*Sample) CloseAsync

func (s *Sample) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Sample) ProcessMessage

func (s *Sample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Sample) WaitForClose

func (s *Sample) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SampleConfig

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains configuration fields for the Sample processor.

func NewSampleConfig

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that selects parts from a message to append to a new message.

func (*SelectParts) CloseAsync

func (m *SelectParts) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*SelectParts) ProcessMessage

func (m *SelectParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*SelectParts) WaitForClose

func (m *SelectParts) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type Sleep

type Sleep struct {
	// contains filtered or unexported fields
}

Sleep is a processor that limits the stream of a pipeline to one message batch per period specified.

func (*Sleep) CloseAsync

func (s *Sleep) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Sleep) ProcessMessage

func (s *Sleep) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Sleep) WaitForClose

func (s *Sleep) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SleepConfig

type SleepConfig struct {
	Duration string `json:"duration" yaml:"duration"`
}

SleepConfig contains configuration fields for the Sleep processor.

func NewSleepConfig

func NewSleepConfig() SleepConfig

NewSleepConfig returns a SleepConfig with default values.

type Split

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) CloseAsync

func (s *Split) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Split) ProcessMessage

func (s *Split) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Split) WaitForClose

func (s *Split) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SplitConfig

type SplitConfig struct {
	Size     int `json:"size" yaml:"size"`
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type Subprocess

type Subprocess struct {
	// contains filtered or unexported fields
}

Subprocess is a processor that executes a command.

func (*Subprocess) CloseAsync

func (e *Subprocess) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Subprocess) ProcessMessage

func (e *Subprocess) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage logs an event and returns the message unchanged.

func (*Subprocess) WaitForClose

func (e *Subprocess) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SubprocessConfig

type SubprocessConfig struct {
	Parts     []int    `json:"parts" yaml:"parts"`
	Name      string   `json:"name" yaml:"name"`
	Args      []string `json:"args" yaml:"args"`
	MaxBuffer int      `json:"max_buffer" yaml:"max_buffer"`
}

SubprocessConfig contains configuration fields for the Subprocess processor.

func NewSubprocessConfig

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig returns a SubprocessConfig with default values.

type Switch

type Switch struct {
	// contains filtered or unexported fields
}

Switch is a processor that only applies child processors under a certain condition.

func (*Switch) CloseAsync

func (s *Switch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Switch) ProcessMessage

func (s *Switch) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Switch) WaitForClose

func (s *Switch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SwitchCaseConfig

type SwitchCaseConfig struct {
	Condition   condition.Config `json:"condition" yaml:"condition"`
	Processors  []Config         `json:"processors" yaml:"processors"`
	Fallthrough bool             `json:"fallthrough" yaml:"fallthrough"`
}

SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.

func NewSwitchCaseConfig

func NewSwitchCaseConfig() SwitchCaseConfig

NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.

func (*SwitchCaseConfig) UnmarshalJSON

func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.

func (*SwitchCaseConfig) UnmarshalYAML

func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type SwitchConfig

type SwitchConfig []SwitchCaseConfig

SwitchConfig is a config struct containing fields for the Switch processor.

func NewSwitchConfig

func NewSwitchConfig() SwitchConfig

NewSwitchConfig returns a default SwitchConfig.

type SyncResponse added in v3.7.0

type SyncResponse struct {
	// contains filtered or unexported fields
}

SyncResponse is a processor that prints a log event each time it processes a message.

func (*SyncResponse) CloseAsync added in v3.7.0

func (s *SyncResponse) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*SyncResponse) ProcessMessage added in v3.7.0

func (s *SyncResponse) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage logs an event and returns the message unchanged.

func (*SyncResponse) WaitForClose added in v3.7.0

func (s *SyncResponse) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SyncResponseConfig added in v3.7.0

type SyncResponseConfig struct{}

SyncResponseConfig contains configuration fields for the SyncResponse processor.

func NewSyncResponseConfig added in v3.7.0

func NewSyncResponseConfig() SyncResponseConfig

NewSyncResponseConfig returns a SyncResponseConfig with default values.

type Text

type Text struct {
	// contains filtered or unexported fields
}

Text is a processor that performs a text based operation on a payload.

func (*Text) CloseAsync

func (t *Text) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Text) ProcessMessage

func (t *Text) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Text) WaitForClose

func (t *Text) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type TextConfig

type TextConfig struct {
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
	Arg      string `json:"arg" yaml:"arg"`
	Value    string `json:"value" yaml:"value"`
}

TextConfig contains configuration fields for the Text processor.

func NewTextConfig

func NewTextConfig() TextConfig

NewTextConfig returns a TextConfig with default values.

type Throttle

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle is a processor that limits the stream of a pipeline to one message batch per period specified.

func (*Throttle) CloseAsync

func (m *Throttle) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Throttle) ProcessMessage

func (m *Throttle) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Throttle) WaitForClose

func (m *Throttle) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type ThrottleConfig

type ThrottleConfig struct {
	Period string `json:"period" yaml:"period"`
}

ThrottleConfig contains configuration fields for the Throttle processor.

func NewThrottleConfig

func NewThrottleConfig() ThrottleConfig

NewThrottleConfig returns a ThrottleConfig with default values.

type Try

type Try struct {
	// contains filtered or unexported fields
}

Try is a processor that applies a list of child processors to each message of a batch individually, where processors are skipped for messages that failed a previous processor step.

func (*Try) CloseAsync

func (p *Try) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Try) ProcessMessage

func (p *Try) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Try) WaitForClose

func (p *Try) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type TryConfig

type TryConfig []Config

TryConfig is a config struct containing fields for the Try processor.

func NewTryConfig

func NewTryConfig() TryConfig

NewTryConfig returns a default TryConfig.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg types.Message) ([]types.Message, types.Response)

	types.Closable
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the pipeline, or a response that should be sent back to the source instead.

func New

func New(
	conf Config,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (Type, error)

New creates a processor type based on a processor configuration.

func NewAWK

func NewAWK(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewAWK returns a AWK processor.

func NewArchive

func NewArchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewArchive returns a Archive processor.

func NewAvro

func NewAvro(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewAvro returns an Avro processor.

func NewBatch

func NewBatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBatch returns a Batch processor.

func NewBloblang added in v3.13.0

func NewBloblang(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBloblang returns a Bloblang processor.

func NewBoundsCheck

func NewBoundsCheck(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewBranch added in v3.25.0

func NewBranch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBranch creates a new branch processor.

func NewCache

func NewCache(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCache returns a Cache processor.

func NewCatch

func NewCatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCatch returns a Catch processor.

func NewCompress

func NewCompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCompress returns a Compress processor.

func NewConditional

func NewConditional(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewConditional returns a Conditional processor.

func NewDecode

func NewDecode(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecode returns a Decode processor.

func NewDecompress

func NewDecompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecompress returns a Decompress processor.

func NewDedupe

func NewDedupe(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDedupe returns a Dedupe processor.

func NewEncode

func NewEncode(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewEncode returns a Encode processor.

func NewFilter

func NewFilter(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilter returns a Filter processor.

func NewFilterParts

func NewFilterParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilterParts returns a FilterParts processor.

func NewForEach

func NewForEach(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewForEach returns a ForEach processor.

func NewGrok

func NewGrok(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGrok returns a Grok processor.

func NewGroupBy

func NewGroupBy(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGroupBy returns a GroupBy processor.

func NewGroupByValue

func NewGroupByValue(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGroupByValue returns a GroupByValue processor.

func NewHTTP

func NewHTTP(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHTTP returns a HTTP processor.

func NewHash

func NewHash(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHash returns a Hash processor.

func NewHashSample

func NewHashSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart

func NewInsertPart(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewJMESPath

func NewJMESPath(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJMESPath returns a JMESPath processor.

func NewJQ added in v3.27.0

func NewJQ(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJQ returns a JQ processor.

func NewJSON

func NewJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJSON returns a JSON processor.

func NewJSONSchema added in v3.5.0

func NewJSONSchema(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJSONSchema returns a JSONSchema processor.

func NewLambda

func NewLambda(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewLambda returns a Lambda processor.

func NewLog

func NewLog(
	conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type,
) (Type, error)

NewLog returns a Log processor.

func NewMergeJSON

func NewMergeJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMergeJSON returns a MergeJSON processor.

func NewMetadata

func NewMetadata(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMetadata returns a Metadata processor.

func NewMetric

func NewMetric(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMetric returns a Metric processor.

func NewNoop

func NewNoop(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewNoop returns a Noop processor.

func NewNumber

func NewNumber(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewNumber returns a Number processor.

func NewParallel

func NewParallel(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewParallel returns a Parallel processor.

func NewParseLog added in v3.10.0

func NewParseLog(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewParseLog returns a ParseLog processor.

func NewProcessBatch

func NewProcessBatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessBatch returns a ForEach processor.

func NewProcessDAG

func NewProcessDAG(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessDAG returns a ProcessField processor.

func NewProcessField

func NewProcessField(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessField returns a ProcessField processor.

func NewProtobuf added in v3.25.0

func NewProtobuf(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProtobuf returns an Protobuf processor.

func NewRateLimit

func NewRateLimit(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewRateLimit returns a RateLimit processor.

func NewRedis added in v3.1.0

func NewRedis(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewRedis returns a Redis processor.

func NewResource added in v3.6.0

func NewResource(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewResource returns a resource processor.

func NewSQL

func NewSQL(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSQL returns a SQL processor.

func NewSample

func NewSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts

func NewSelectParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSleep

func NewSleep(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSleep returns a Sleep processor.

func NewSplit

func NewSplit(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSplit returns a Split processor.

func NewSubprocess

func NewSubprocess(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSubprocess returns a Subprocess processor.

func NewSwitch

func NewSwitch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSwitch returns a Switch processor.

func NewSyncResponse added in v3.7.0

func NewSyncResponse(
	conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type,
) (Type, error)

NewSyncResponse returns a SyncResponse processor.

func NewText

func NewText(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewText returns a Text processor.

func NewThrottle

func NewThrottle(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewThrottle returns a Throttle processor.

func NewTry

func NewTry(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewTry returns a Try processor.

func NewUnarchive

func NewUnarchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewUnarchive returns a Unarchive processor.

func NewWhile

func NewWhile(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewWhile returns a While processor.

func NewWorkflow added in v3.6.0

func NewWorkflow(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewWorkflow returns a new workflow processor.

func NewXML

func NewXML(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewXML returns a XML processor.

type TypeSpec

type TypeSpec struct {

	// UsesBatches indicates whether this processors functionality is best
	// applied on messages that are already batched.
	UsesBatches bool

	Summary     string
	Description string
	Categories  []Category
	Footnotes   string
	FieldSpecs  docs.FieldSpecs
	Examples    []docs.AnnotatedExample
	Beta        bool
	Deprecated  bool
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type Unarchive

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message following a chosen archive type.

func (*Unarchive) CloseAsync

func (d *Unarchive) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Unarchive) ProcessMessage

func (d *Unarchive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Unarchive) WaitForClose

func (d *Unarchive) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type UnarchiveConfig

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains configuration fields for the Unarchive processor.

func NewUnarchiveConfig

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

type While

type While struct {
	// contains filtered or unexported fields
}

While is a processor that applies child processors for as long as a child condition resolves to true.

func (*While) CloseAsync

func (w *While) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*While) ProcessMessage

func (w *While) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*While) WaitForClose

func (w *While) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type WhileConfig

type WhileConfig struct {
	AtLeastOnce bool             `json:"at_least_once" yaml:"at_least_once"`
	MaxLoops    int              `json:"max_loops" yaml:"max_loops"`
	Condition   condition.Config `json:"condition" yaml:"condition"`
	Processors  []Config         `json:"processors" yaml:"processors"`
}

WhileConfig is a config struct containing fields for the While processor.

func NewWhileConfig

func NewWhileConfig() WhileConfig

NewWhileConfig returns a default WhileConfig.

type Workflow added in v3.6.0

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.

func (*Workflow) CloseAsync added in v3.6.0

func (w *Workflow) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Workflow) ProcessMessage added in v3.6.0

func (w *Workflow) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies workflow stages to each part of a message type.

func (*Workflow) WaitForClose added in v3.6.0

func (w *Workflow) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type WorkflowConfig added in v3.6.0

type WorkflowConfig struct {
	MetaPath string                         `json:"meta_path" yaml:"meta_path"`
	Order    [][]string                     `json:"order" yaml:"order"`
	Branches map[string]BranchConfig        `json:"branches" yaml:"branches"`
	Stages   map[string]DepProcessMapConfig `json:"stages" yaml:"stages"`
}

WorkflowConfig is a config struct containing fields for the Workflow processor.

func NewWorkflowConfig added in v3.6.0

func NewWorkflowConfig() WorkflowConfig

NewWorkflowConfig returns a default WorkflowConfig.

type XML

type XML struct {
	// contains filtered or unexported fields
}

XML is a processor that performs an operation on a XML payload.

func (*XML) CloseAsync

func (p *XML) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*XML) ProcessMessage

func (p *XML) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*XML) WaitForClose

func (p *XML) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type XMLConfig

type XMLConfig struct {
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
}

XMLConfig contains configuration fields for the XML processor.

func NewXMLConfig

func NewXMLConfig() XMLConfig

NewXMLConfig returns a XMLConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL