processor

package
v4.0.0-...-13a3402 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteAll

func ExecuteAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll

func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.

func ExecuteTryAll

func ExecuteTryAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func MarkErr

func MarkErr(part *message.Part, span *tracing.Span, err error)

MarkErr marks a message part as having failed. This includes modifying metadata to contain this error as well as adding the error to a tracing span if the message has one.

Types

type AutoObserved

type AutoObserved interface {
	// Process a message into one or more resulting messages, or return an error
	// if one occurred during processing, in which case the message will
	// continue unchanged except for having that error now affiliated with it.
	//
	// If zero messages are returned and the error is nil then the message is
	// filtered.
	Process(ctx context.Context, p *message.Part) ([]*message.Part, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

AutoObserved is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.

type AutoObservedBatched

type AutoObservedBatched interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if one occurred during processing, in which case all messages
	// will continue unchanged except for having that error now affiliated with
	// them.
	//
	// In order to associate individual messages with an error please use
	// ctx.OnError instead of msg.ErrorSet. They are similar, but using
	// ctx.OnError ensures observability data is updated as well as the message
	// being affiliated with the error.
	//
	// If zero message batches are returned and the error is nil then all
	// messages are filtered.
	ProcessBatch(ctx *BatchProcContext, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

AutoObservedBatched is a simpler processor interface to implement than V1 as it is not required to emit observability information within the implementation itself.

type BatchProcContext

type BatchProcContext struct {
	// contains filtered or unexported fields
}

BatchProcContext provides methods for triggering observability updates and accessing processor specific spans.

func TestBatchProcContext

func TestBatchProcContext(ctx context.Context, spans []*tracing.Span, parts []*message.Part) *BatchProcContext

TestBatchProcContext creates a context for batch processors. It's safe to provide nil spans and parts functions for testing purposes.

func (*BatchProcContext) Context

func (b *BatchProcContext) Context() context.Context

Context returns the underlying processor context.Context.

func (*BatchProcContext) OnError

func (b *BatchProcContext) OnError(err error, index int, p *message.Part)

OnError should be called when an individual message has encountered an error, this should be used instead of .ErrorSet() as it includes observability updates.

This method can be called with index -1 in order to set generalised observability information without marking specific message errors.

func (*BatchProcContext) Span

func (b *BatchProcContext) Span(index int) *tracing.Span

Span returns a span created specifically for the invocation of the processor. This can be used in order to add context to what the processor did.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type BranchConfig

type BranchConfig struct {
	RequestMap string   `json:"request_map" yaml:"request_map"`
	Processors []Config `json:"processors" yaml:"processors"`
	ResultMap  string   `json:"result_map" yaml:"result_map"`
}

BranchConfig contains configuration fields for the Branch processor.

func NewBranchConfig

func NewBranchConfig() BranchConfig

NewBranchConfig returns a BranchConfig with default values.

type CacheConfig

type CacheConfig struct {
	Resource string `json:"resource" yaml:"resource"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
	TTL      string `json:"ttl" yaml:"ttl"`
}

CacheConfig contains configuration fields for the Cache processor.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig returns a CacheConfig with default values.

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Config

type Config struct {
	Label        string             `json:"label" yaml:"label"`
	Type         string             `json:"type" yaml:"type"`
	Bloblang     string             `json:"bloblang" yaml:"bloblang"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Branch       BranchConfig       `json:"branch" yaml:"branch"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Catch        []Config           `json:"catch" yaml:"catch"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	ForEach      []Config           `json:"for_each" yaml:"for_each"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	GroupBy      GroupByConfig      `json:"group_by" yaml:"group_by"`
	GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JQ           JQConfig           `json:"jq" yaml:"jq"`
	JSONSchema   JSONSchemaConfig   `json:"json_schema" yaml:"json_schema"`
	Log          LogConfig          `json:"log" yaml:"log"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	Noop         struct{}           `json:"noop" yaml:"noop"`
	Plugin       any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Parallel     ParallelConfig     `json:"parallel" yaml:"parallel"`
	ParseLog     ParseLogConfig     `json:"parse_log" yaml:"parse_log"`
	RateLimit    RateLimitConfig    `json:"rate_limit" yaml:"rate_limit"`
	Resource     string             `json:"resource" yaml:"resource"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Sleep        SleepConfig        `json:"sleep" yaml:"sleep"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse struct{}           `json:"sync_response" yaml:"sync_response"`
	Try          []Config           `json:"try" yaml:"try"`
	While        WhileConfig        `json:"while" yaml:"while"`
	Workflow     WorkflowConfig     `json:"workflow" yaml:"workflow"`
	XML          XMLConfig          `json:"xml" yaml:"xml"`
}

Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func (*Config) UnmarshalYAML

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type DedupeConfig

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type GrokConfig

type GrokConfig struct {
	Expressions        []string          `json:"expressions" yaml:"expressions"`
	RemoveEmpty        bool              `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly          bool              `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults        bool              `json:"use_default_patterns" yaml:"use_default_patterns"`
	PatternPaths       []string          `json:"pattern_paths" yaml:"pattern_paths"`
	PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type GroupByConfig

type GroupByConfig []GroupByElement

GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.

func NewGroupByConfig

func NewGroupByConfig() GroupByConfig

NewGroupByConfig returns a GroupByConfig with default values.

type GroupByElement

type GroupByElement struct {
	Check      string   `json:"check" yaml:"check"`
	Processors []Config `json:"processors" yaml:"processors"`
}

GroupByElement represents a group determined by a condition and a list of group specific processors.

type GroupByValueConfig

type GroupByValueConfig struct {
	Value string `json:"value" yaml:"value"`
}

GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func NewGroupByValueConfig

func NewGroupByValueConfig() GroupByValueConfig

NewGroupByValueConfig returns a GroupByValueConfig with default values.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPathConfig

type JMESPathConfig struct {
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JQConfig

type JQConfig struct {
	Query     string `json:"query" yaml:"query"`
	Raw       bool   `json:"raw" yaml:"raw"`
	OutputRaw bool   `json:"output_raw" yaml:"output_raw"`
}

JQConfig contains configuration fields for the JQ processor.

func NewJQConfig

func NewJQConfig() JQConfig

NewJQConfig returns a JQConfig with default values.

type JSONSchemaConfig

type JSONSchemaConfig struct {
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
	Schema     string `json:"schema" yaml:"schema"`
}

JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.

func NewJSONSchemaConfig

func NewJSONSchemaConfig() JSONSchemaConfig

NewJSONSchemaConfig returns a JSONSchemaConfig with default values.

type LogConfig

type LogConfig struct {
	Level         string            `json:"level" yaml:"level"`
	Fields        map[string]string `json:"fields" yaml:"fields"`
	FieldsMapping string            `json:"fields_mapping" yaml:"fields_mapping"`
	Message       string            `json:"message" yaml:"message"`
}

LogConfig contains configuration fields for the Log processor.

func NewLogConfig

func NewLogConfig() LogConfig

NewLogConfig returns a LogConfig with default values.

type MetricConfig

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Name   string            `json:"name" yaml:"name"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type ParallelConfig

type ParallelConfig struct {
	Cap        int      `json:"cap" yaml:"cap"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ParallelConfig is a config struct containing fields for the Parallel processor.

func NewParallelConfig

func NewParallelConfig() ParallelConfig

NewParallelConfig returns a default ParallelConfig.

type ParseLogConfig

type ParseLogConfig struct {
	Format       string `json:"format" yaml:"format"`
	Codec        string `json:"codec" yaml:"codec"`
	BestEffort   bool   `json:"best_effort" yaml:"best_effort"`
	WithRFC3339  bool   `json:"allow_rfc3339" yaml:"allow_rfc3339"`
	WithYear     string `json:"default_year" yaml:"default_year"`
	WithTimezone string `json:"default_timezone" yaml:"default_timezone"`
}

ParseLogConfig contains configuration fields for the ParseLog processor.

func NewParseLogConfig

func NewParseLogConfig() ParseLogConfig

NewParseLogConfig returns a ParseLogConfig with default values.

type Pipeline

type Pipeline interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// TriggerCloseNow signals that the component should close immediately,
	// messages in flight will be dropped.
	TriggerCloseNow()

	// WaitForClose blocks until the component has closed down or the context is
	// cancelled. Closing occurs either when the input transaction channel is
	// closed and messages are flushed (and acked), or when CloseNowAsync is
	// called.
	WaitForClose(ctx context.Context) error
}

Pipeline is an interface that implements channel based consumer and producer methods for streaming data through a processing pipeline.

type PipelineConstructorFunc

type PipelineConstructorFunc func() (Pipeline, error)

PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.

type RateLimitConfig

type RateLimitConfig struct {
	Resource string `json:"resource" yaml:"resource"`
}

RateLimitConfig contains configuration fields for the RateLimit processor.

func NewRateLimitConfig

func NewRateLimitConfig() RateLimitConfig

NewRateLimitConfig returns a RateLimitConfig with default values.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SleepConfig

type SleepConfig struct {
	Duration string `json:"duration" yaml:"duration"`
}

SleepConfig contains configuration fields for the Sleep processor.

func NewSleepConfig

func NewSleepConfig() SleepConfig

NewSleepConfig returns a SleepConfig with default values.

type SplitConfig

type SplitConfig struct {
	Size     int `json:"size" yaml:"size"`
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type SubprocessConfig

type SubprocessConfig struct {
	Name      string   `json:"name" yaml:"name"`
	Args      []string `json:"args" yaml:"args"`
	MaxBuffer int      `json:"max_buffer" yaml:"max_buffer"`
	CodecSend string   `json:"codec_send" yaml:"codec_send"`
	CodecRecv string   `json:"codec_recv" yaml:"codec_recv"`
}

SubprocessConfig contains configuration fields for the Subprocess processor.

func NewSubprocessConfig

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig returns a SubprocessConfig with default values.

type SwitchCaseConfig

type SwitchCaseConfig struct {
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
	Fallthrough bool     `json:"fallthrough" yaml:"fallthrough"`
}

SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.

func NewSwitchCaseConfig

func NewSwitchCaseConfig() SwitchCaseConfig

NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.

func (*SwitchCaseConfig) UnmarshalJSON

func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.

func (*SwitchCaseConfig) UnmarshalYAML

func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(any) error) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type SwitchConfig

type SwitchConfig []SwitchCaseConfig

SwitchConfig is a config struct containing fields for the Switch processor.

func NewSwitchConfig

func NewSwitchConfig() SwitchConfig

NewSwitchConfig returns a default SwitchConfig.

type V1

type V1 interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed, currently the only
	// valid reason for returning an error is if the context was cancelled.
	//
	// If zero messages are returned and the error is nil then all messages are
	// filtered.
	ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V1 is a common interface implemented by processors. The implementation of a V1 processor is responsible for all expected observability and error handling behaviour described within Benthos documentation.

func NewAutoObservedBatchedProcessor

func NewAutoObservedBatchedProcessor(typeStr string, p AutoObservedBatched, mgr component.Observability) V1

NewAutoObservedBatchProcessor wraps an AutoObservedBatched processor with an implementation of V1 which handles observability information.

func NewAutoObservedProcessor

func NewAutoObservedProcessor(typeStr string, p AutoObserved, mgr component.Observability) V1

NewAutoObservedProcessor wraps an AutoObserved processor with an implementation of V1 which handles observability information.

func Unwrap

func Unwrap(p V1) V1

Unwrap attempts to access a wrapped processor from the provided implementation where applicable, otherwise the provided processor is returned. This is necessary when access raw implementations that could have been wrapped in a tracing mechanism (or other).

type WhileConfig

type WhileConfig struct {
	AtLeastOnce bool     `json:"at_least_once" yaml:"at_least_once"`
	MaxLoops    int      `json:"max_loops" yaml:"max_loops"`
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
}

WhileConfig is a config struct containing fields for the While processor.

func NewWhileConfig

func NewWhileConfig() WhileConfig

NewWhileConfig returns a default WhileConfig.

type WorkflowConfig

type WorkflowConfig struct {
	MetaPath        string                  `json:"meta_path" yaml:"meta_path"`
	Order           [][]string              `json:"order" yaml:"order"`
	BranchResources []string                `json:"branch_resources" yaml:"branch_resources"`
	Branches        map[string]BranchConfig `json:"branches" yaml:"branches"`
}

WorkflowConfig is a config struct containing fields for the Workflow processor.

func NewWorkflowConfig

func NewWorkflowConfig() WorkflowConfig

NewWorkflowConfig returns a default WorkflowConfig.

type XMLConfig

type XMLConfig struct {
	Operator string `json:"operator" yaml:"operator"`
	Cast     bool   `json:"cast" yaml:"cast"`
}

XMLConfig contains configuration fields for the XML processor.

func NewXMLConfig

func NewXMLConfig() XMLConfig

NewXMLConfig returns a XMLConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL