processor

package
v4.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteAll

func ExecuteAll(procs []V1, msgs ...*message.Batch) ([]*message.Batch, error)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll

func ExecuteCatchAll(procs []V1, msgs ...*message.Batch) ([]*message.Batch, error)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.

func ExecuteTryAll

func ExecuteTryAll(procs []V1, msgs ...*message.Batch) ([]*message.Batch, error)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func IteratePartsWithSpanV2

func IteratePartsWithSpanV2(
	tracer trace.TracerProvider,
	operationName string, parts []int, msg *message.Batch,
	iter func(int, *tracing.Span, *message.Part) error,
)

IteratePartsWithSpanV2 iterates the parts of a message according to a slice of indexes (if empty all parts are iterated) and calls a func for each part along with a tracing span for that part. If an error is returned the part is flagged as failed and the span has the error logged.

func MarkErr

func MarkErr(part *message.Part, span *tracing.Span, err error)

MarkErr marks a message part as having failed. This includes modifying metadata to contain this error as well as adding the error to a tracing span if the message has one.

Types

type AWKConfig

type AWKConfig struct {
	Codec   string `json:"codec" yaml:"codec"`
	Program string `json:"program" yaml:"program"`
}

AWKConfig contains configuration fields for the AWK processor.

func NewAWKConfig

func NewAWKConfig() AWKConfig

NewAWKConfig returns a AWKConfig with default values.

type AvroConfig

type AvroConfig struct {
	Operator   string `json:"operator" yaml:"operator"`
	Encoding   string `json:"encoding" yaml:"encoding"`
	Schema     string `json:"schema" yaml:"schema"`
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
}

AvroConfig contains configuration fields for the Avro processor.

func NewAvroConfig

func NewAvroConfig() AvroConfig

NewAvroConfig returns a AvroConfig with default values.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type BranchConfig

type BranchConfig struct {
	RequestMap string   `json:"request_map" yaml:"request_map"`
	Processors []Config `json:"processors" yaml:"processors"`
	ResultMap  string   `json:"result_map" yaml:"result_map"`
}

BranchConfig contains configuration fields for the Branch processor.

func NewBranchConfig

func NewBranchConfig() BranchConfig

NewBranchConfig returns a BranchConfig with default values.

type CacheConfig

type CacheConfig struct {
	Resource string `json:"resource" yaml:"resource"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
	TTL      string `json:"ttl" yaml:"ttl"`
}

CacheConfig contains configuration fields for the Cache processor.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig returns a CacheConfig with default values.

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Config

type Config struct {
	Label        string             `json:"label" yaml:"label"`
	Type         string             `json:"type" yaml:"type"`
	Avro         AvroConfig         `json:"avro" yaml:"avro"`
	AWK          AWKConfig          `json:"awk" yaml:"awk"`
	Bloblang     string             `json:"bloblang" yaml:"bloblang"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Branch       BranchConfig       `json:"branch" yaml:"branch"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Catch        []Config           `json:"catch" yaml:"catch"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	ForEach      []Config           `json:"for_each" yaml:"for_each"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	GroupBy      GroupByConfig      `json:"group_by" yaml:"group_by"`
	GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
	HTTP         HTTPConfig         `json:"http" yaml:"http"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JQ           JQConfig           `json:"jq" yaml:"jq"`
	JSONSchema   JSONSchemaConfig   `json:"json_schema" yaml:"json_schema"`
	Log          LogConfig          `json:"log" yaml:"log"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	MongoDB      MongoDBConfig      `json:"mongodb" yaml:"mongodb"`
	Noop         struct{}           `json:"noop" yaml:"noop"`
	Plugin       interface{}        `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Parallel     ParallelConfig     `json:"parallel" yaml:"parallel"`
	ParseLog     ParseLogConfig     `json:"parse_log" yaml:"parse_log"`
	Protobuf     ProtobufConfig     `json:"protobuf" yaml:"protobuf"`
	RateLimit    RateLimitConfig    `json:"rate_limit" yaml:"rate_limit"`
	Resource     string             `json:"resource" yaml:"resource"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Sleep        SleepConfig        `json:"sleep" yaml:"sleep"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse struct{}           `json:"sync_response" yaml:"sync_response"`
	Try          []Config           `json:"try" yaml:"try"`
	While        WhileConfig        `json:"while" yaml:"while"`
	Workflow     WorkflowConfig     `json:"workflow" yaml:"workflow"`
	XML          XMLConfig          `json:"xml" yaml:"xml"`
}

Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl

func (*Config) UnmarshalYAML

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type DedupeConfig

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type GrokConfig

type GrokConfig struct {
	Expressions        []string          `json:"expressions" yaml:"expressions"`
	RemoveEmpty        bool              `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly          bool              `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults        bool              `json:"use_default_patterns" yaml:"use_default_patterns"`
	PatternPaths       []string          `json:"pattern_paths" yaml:"pattern_paths"`
	PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type GroupByConfig

type GroupByConfig []GroupByElement

GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.

func NewGroupByConfig

func NewGroupByConfig() GroupByConfig

NewGroupByConfig returns a GroupByConfig with default values.

type GroupByElement

type GroupByElement struct {
	Check      string   `json:"check" yaml:"check"`
	Processors []Config `json:"processors" yaml:"processors"`
}

GroupByElement represents a group determined by a condition and a list of group specific processors.

type GroupByValueConfig

type GroupByValueConfig struct {
	Value string `json:"value" yaml:"value"`
}

GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func NewGroupByValueConfig

func NewGroupByValueConfig() GroupByValueConfig

NewGroupByValueConfig returns a GroupByValueConfig with default values.

type HTTPConfig

type HTTPConfig struct {
	BatchAsMultipart bool `json:"batch_as_multipart" yaml:"batch_as_multipart"`
	Parallel         bool `json:"parallel" yaml:"parallel"`
	ihttpdocs.Config `json:",inline" yaml:",inline"`
}

HTTPConfig contains configuration fields for the HTTP processor.

func NewHTTPConfig

func NewHTTPConfig() HTTPConfig

NewHTTPConfig returns a HTTPConfig with default values.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPathConfig

type JMESPathConfig struct {
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JQConfig

type JQConfig struct {
	Query     string `json:"query" yaml:"query"`
	Raw       bool   `json:"raw" yaml:"raw"`
	OutputRaw bool   `json:"output_raw" yaml:"output_raw"`
}

JQConfig contains configuration fields for the JQ processor.

func NewJQConfig

func NewJQConfig() JQConfig

NewJQConfig returns a JQConfig with default values.

type JSONSchemaConfig

type JSONSchemaConfig struct {
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
	Schema     string `json:"schema" yaml:"schema"`
}

JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.

func NewJSONSchemaConfig

func NewJSONSchemaConfig() JSONSchemaConfig

NewJSONSchemaConfig returns a JSONSchemaConfig with default values.

type LogConfig

type LogConfig struct {
	Level         string            `json:"level" yaml:"level"`
	Fields        map[string]string `json:"fields" yaml:"fields"`
	FieldsMapping string            `json:"fields_mapping" yaml:"fields_mapping"`
	Message       string            `json:"message" yaml:"message"`
}

LogConfig contains configuration fields for the Log processor.

func NewLogConfig

func NewLogConfig() LogConfig

NewLogConfig returns a LogConfig with default values.

type MetricConfig

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Name   string            `json:"name" yaml:"name"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type MongoDBConfig

type MongoDBConfig struct {
	MongoDB      client.Config       `json:",inline" yaml:",inline"`
	WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"`

	Operation       string                 `json:"operation" yaml:"operation"`
	FilterMap       string                 `json:"filter_map" yaml:"filter_map"`
	DocumentMap     string                 `json:"document_map" yaml:"document_map"`
	Upsert          bool                   `json:"upsert" yaml:"upsert"`
	HintMap         string                 `json:"hint_map" yaml:"hint_map"`
	RetryConfig     retries.Config         `json:",inline" yaml:",inline"`
	JSONMarshalMode client.JSONMarshalMode `json:"json_marshal_mode" yaml:"json_marshal_mode"`
}

MongoDBConfig contains configuration fields for the MongoDB processor.

func NewMongoDBConfig

func NewMongoDBConfig() MongoDBConfig

NewMongoDBConfig returns a MongoDBConfig with default values.

type ParallelConfig

type ParallelConfig struct {
	Cap        int      `json:"cap" yaml:"cap"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ParallelConfig is a config struct containing fields for the Parallel processor.

func NewParallelConfig

func NewParallelConfig() ParallelConfig

NewParallelConfig returns a default ParallelConfig.

type ParseLogConfig

type ParseLogConfig struct {
	Format       string `json:"format" yaml:"format"`
	Codec        string `json:"codec" yaml:"codec"`
	BestEffort   bool   `json:"best_effort" yaml:"best_effort"`
	WithRFC3339  bool   `json:"allow_rfc3339" yaml:"allow_rfc3339"`
	WithYear     string `json:"default_year" yaml:"default_year"`
	WithTimezone string `json:"default_timezone" yaml:"default_timezone"`
}

ParseLogConfig contains configuration fields for the ParseLog processor.

func NewParseLogConfig

func NewParseLogConfig() ParseLogConfig

NewParseLogConfig returns a ParseLogConfig with default values.

type Pipeline

type Pipeline interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// CloseAsync triggers the shut down of this component but should not block
	// the calling goroutine.
	CloseAsync()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(timeout time.Duration) error
}

Pipeline is an interface that implements channel based based consumer and producer methods for streaming data through a processing pipeline.

type PipelineConstructorFunc

type PipelineConstructorFunc func() (Pipeline, error)

PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.

type ProtobufConfig

type ProtobufConfig struct {
	Operator    string   `json:"operator" yaml:"operator"`
	Message     string   `json:"message" yaml:"message"`
	ImportPaths []string `json:"import_paths" yaml:"import_paths"`
}

ProtobufConfig contains configuration fields for the Protobuf processor.

func NewProtobufConfig

func NewProtobufConfig() ProtobufConfig

NewProtobufConfig returns a ProtobufConfig with default values.

type RateLimitConfig

type RateLimitConfig struct {
	Resource string `json:"resource" yaml:"resource"`
}

RateLimitConfig contains configuration fields for the RateLimit processor.

func NewRateLimitConfig

func NewRateLimitConfig() RateLimitConfig

NewRateLimitConfig returns a RateLimitConfig with default values.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SleepConfig

type SleepConfig struct {
	Duration string `json:"duration" yaml:"duration"`
}

SleepConfig contains configuration fields for the Sleep processor.

func NewSleepConfig

func NewSleepConfig() SleepConfig

NewSleepConfig returns a SleepConfig with default values.

type SplitConfig

type SplitConfig struct {
	Size     int `json:"size" yaml:"size"`
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type SubprocessConfig

type SubprocessConfig struct {
	Name      string   `json:"name" yaml:"name"`
	Args      []string `json:"args" yaml:"args"`
	MaxBuffer int      `json:"max_buffer" yaml:"max_buffer"`
	CodecSend string   `json:"codec_send" yaml:"codec_send"`
	CodecRecv string   `json:"codec_recv" yaml:"codec_recv"`
}

SubprocessConfig contains configuration fields for the Subprocess processor.

func NewSubprocessConfig

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig returns a SubprocessConfig with default values.

type SwitchCaseConfig

type SwitchCaseConfig struct {
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
	Fallthrough bool     `json:"fallthrough" yaml:"fallthrough"`
}

SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.

func NewSwitchCaseConfig

func NewSwitchCaseConfig() SwitchCaseConfig

NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.

func (*SwitchCaseConfig) UnmarshalJSON

func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.

func (*SwitchCaseConfig) UnmarshalYAML

func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type SwitchConfig

type SwitchConfig []SwitchCaseConfig

SwitchConfig is a config struct containing fields for the Switch processor.

func NewSwitchConfig

func NewSwitchConfig() SwitchConfig

NewSwitchConfig returns a default SwitchConfig.

type V1

type V1 interface {
	// ProcessMessage attempts to process a message. This method returns both a
	// slice of messages or a response indicating whether messages were dropped
	// due to an intermittent error or were intentionally filtered.
	//
	// If an error occurs due to the contents of a message being invalid and you
	// wish to expose this as a recoverable fault you can use FlagErr to flag a
	// message as having failed without dropping it.
	//
	// More information about this form of error handling can be found at:
	// https://www.benthos.dev/docs/configuration/error_handling
	ProcessMessage(*message.Batch) ([]*message.Batch, error)

	// CloseAsync triggers the shut down of this component but should not block
	// the calling goroutine.
	CloseAsync()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(timeout time.Duration) error
}

V1 is a common interface implemented by processors.

func NewV2BatchedToV1Processor

func NewV2BatchedToV1Processor(typeStr string, p V2Batched, mgr component.Observability) V1

NewV2BatchedToV1Processor wraps a processor.V2Batched with a struct that implements types.Processor.

func NewV2ToV1Processor

func NewV2ToV1Processor(typeStr string, p V2, mgr component.Observability) V1

NewV2ToV1Processor wraps a processor.V2 with a struct that implements V1.

type V2

type V2 interface {
	// Process a message into one or more resulting messages, or return an error
	// if the message could not be processed. If zero messages are returned and
	// the error is nil then the message is filtered.
	Process(ctx context.Context, p *message.Part) ([]*message.Part, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V2 is a simpler interface to implement than V1.

type V2Batched

type V2Batched interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed. If zero messages are
	// returned and the error is nil then all messages are filtered.
	ProcessBatch(ctx context.Context, spans []*tracing.Span, b *message.Batch) ([]*message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V2Batched is a simpler interface to implement than V1 and allows batch-wide processing.

type WhileConfig

type WhileConfig struct {
	AtLeastOnce bool     `json:"at_least_once" yaml:"at_least_once"`
	MaxLoops    int      `json:"max_loops" yaml:"max_loops"`
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
}

WhileConfig is a config struct containing fields for the While processor.

func NewWhileConfig

func NewWhileConfig() WhileConfig

NewWhileConfig returns a default WhileConfig.

type WorkflowConfig

type WorkflowConfig struct {
	MetaPath        string                  `json:"meta_path" yaml:"meta_path"`
	Order           [][]string              `json:"order" yaml:"order"`
	BranchResources []string                `json:"branch_resources" yaml:"branch_resources"`
	Branches        map[string]BranchConfig `json:"branches" yaml:"branches"`
}

WorkflowConfig is a config struct containing fields for the Workflow processor.

func NewWorkflowConfig

func NewWorkflowConfig() WorkflowConfig

NewWorkflowConfig returns a default WorkflowConfig.

type XMLConfig

type XMLConfig struct {
	Operator string `json:"operator" yaml:"operator"`
	Cast     bool   `json:"cast" yaml:"cast"`
}

XMLConfig contains configuration fields for the XML processor.

func NewXMLConfig

func NewXMLConfig() XMLConfig

NewXMLConfig returns a XMLConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL