processor

package
v4.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteAll added in v4.1.0

func ExecuteAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll added in v4.1.0

func ExecuteCatchAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.

func ExecuteTryAll added in v4.1.0

func ExecuteTryAll(ctx context.Context, procs []V1, msgs ...message.Batch) ([]message.Batch, error)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func MarkErr

func MarkErr(part *message.Part, span *tracing.Span, err error)

MarkErr marks a message part as having failed. This includes modifying metadata to contain this error as well as adding the error to a tracing span if the message has one.

Types

type AWKConfig added in v4.1.0

type AWKConfig struct {
	Codec   string `json:"codec" yaml:"codec"`
	Program string `json:"program" yaml:"program"`
}

AWKConfig contains configuration fields for the AWK processor.

func NewAWKConfig added in v4.1.0

func NewAWKConfig() AWKConfig

NewAWKConfig returns a AWKConfig with default values.

type AvroConfig added in v4.1.0

type AvroConfig struct {
	Operator   string `json:"operator" yaml:"operator"`
	Encoding   string `json:"encoding" yaml:"encoding"`
	Schema     string `json:"schema" yaml:"schema"`
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
}

AvroConfig contains configuration fields for the Avro processor.

func NewAvroConfig added in v4.1.0

func NewAvroConfig() AvroConfig

NewAvroConfig returns a AvroConfig with default values.

type BoundsCheckConfig added in v4.1.0

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig added in v4.1.0

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type BranchConfig added in v4.1.0

type BranchConfig struct {
	RequestMap string   `json:"request_map" yaml:"request_map"`
	Processors []Config `json:"processors" yaml:"processors"`
	ResultMap  string   `json:"result_map" yaml:"result_map"`
}

BranchConfig contains configuration fields for the Branch processor.

func NewBranchConfig added in v4.1.0

func NewBranchConfig() BranchConfig

NewBranchConfig returns a BranchConfig with default values.

type CacheConfig added in v4.1.0

type CacheConfig struct {
	Resource string `json:"resource" yaml:"resource"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
	TTL      string `json:"ttl" yaml:"ttl"`
}

CacheConfig contains configuration fields for the Cache processor.

func NewCacheConfig added in v4.1.0

func NewCacheConfig() CacheConfig

NewCacheConfig returns a CacheConfig with default values.

type CompressConfig added in v4.1.0

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig added in v4.1.0

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Config added in v4.1.0

type Config struct {
	Label        string             `json:"label" yaml:"label"`
	Type         string             `json:"type" yaml:"type"`
	Avro         AvroConfig         `json:"avro" yaml:"avro"`
	AWK          AWKConfig          `json:"awk" yaml:"awk"`
	Bloblang     string             `json:"bloblang" yaml:"bloblang"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Branch       BranchConfig       `json:"branch" yaml:"branch"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Catch        []Config           `json:"catch" yaml:"catch"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	ForEach      []Config           `json:"for_each" yaml:"for_each"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	GroupBy      GroupByConfig      `json:"group_by" yaml:"group_by"`
	GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
	HTTP         HTTPConfig         `json:"http" yaml:"http"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JQ           JQConfig           `json:"jq" yaml:"jq"`
	JSONSchema   JSONSchemaConfig   `json:"json_schema" yaml:"json_schema"`
	Log          LogConfig          `json:"log" yaml:"log"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	MongoDB      MongoDBConfig      `json:"mongodb" yaml:"mongodb"`
	Noop         struct{}           `json:"noop" yaml:"noop"`
	Plugin       any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Parallel     ParallelConfig     `json:"parallel" yaml:"parallel"`
	ParseLog     ParseLogConfig     `json:"parse_log" yaml:"parse_log"`
	Protobuf     ProtobufConfig     `json:"protobuf" yaml:"protobuf"`
	RateLimit    RateLimitConfig    `json:"rate_limit" yaml:"rate_limit"`
	Resource     string             `json:"resource" yaml:"resource"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Sleep        SleepConfig        `json:"sleep" yaml:"sleep"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse struct{}           `json:"sync_response" yaml:"sync_response"`
	Try          []Config           `json:"try" yaml:"try"`
	While        WhileConfig        `json:"while" yaml:"while"`
	Workflow     WorkflowConfig     `json:"workflow" yaml:"workflow"`
	XML          XMLConfig          `json:"xml" yaml:"xml"`
}

Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl

func NewConfig added in v4.1.0

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl

func (*Config) UnmarshalYAML added in v4.1.0

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type DecompressConfig added in v4.1.0

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig added in v4.1.0

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type DedupeConfig added in v4.1.0

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig added in v4.1.0

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type GrokConfig added in v4.1.0

type GrokConfig struct {
	Expressions        []string          `json:"expressions" yaml:"expressions"`
	RemoveEmpty        bool              `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly          bool              `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults        bool              `json:"use_default_patterns" yaml:"use_default_patterns"`
	PatternPaths       []string          `json:"pattern_paths" yaml:"pattern_paths"`
	PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig added in v4.1.0

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type GroupByConfig added in v4.1.0

type GroupByConfig []GroupByElement

GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.

func NewGroupByConfig added in v4.1.0

func NewGroupByConfig() GroupByConfig

NewGroupByConfig returns a GroupByConfig with default values.

type GroupByElement added in v4.1.0

type GroupByElement struct {
	Check      string   `json:"check" yaml:"check"`
	Processors []Config `json:"processors" yaml:"processors"`
}

GroupByElement represents a group determined by a condition and a list of group specific processors.

type GroupByValueConfig added in v4.1.0

type GroupByValueConfig struct {
	Value string `json:"value" yaml:"value"`
}

GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func NewGroupByValueConfig added in v4.1.0

func NewGroupByValueConfig() GroupByValueConfig

NewGroupByValueConfig returns a GroupByValueConfig with default values.

type HTTPConfig added in v4.1.0

type HTTPConfig struct {
	BatchAsMultipart    bool `json:"batch_as_multipart" yaml:"batch_as_multipart"`
	Parallel            bool `json:"parallel" yaml:"parallel"`
	oldconfig.OldConfig `json:",inline" yaml:",inline"`
}

HTTPConfig contains configuration fields for the HTTP processor.

func NewHTTPConfig added in v4.1.0

func NewHTTPConfig() HTTPConfig

NewHTTPConfig returns a HTTPConfig with default values.

type InsertPartConfig added in v4.1.0

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig added in v4.1.0

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPathConfig added in v4.1.0

type JMESPathConfig struct {
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig added in v4.1.0

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JQConfig added in v4.1.0

type JQConfig struct {
	Query     string `json:"query" yaml:"query"`
	Raw       bool   `json:"raw" yaml:"raw"`
	OutputRaw bool   `json:"output_raw" yaml:"output_raw"`
}

JQConfig contains configuration fields for the JQ processor.

func NewJQConfig added in v4.1.0

func NewJQConfig() JQConfig

NewJQConfig returns a JQConfig with default values.

type JSONSchemaConfig added in v4.1.0

type JSONSchemaConfig struct {
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
	Schema     string `json:"schema" yaml:"schema"`
}

JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.

func NewJSONSchemaConfig added in v4.1.0

func NewJSONSchemaConfig() JSONSchemaConfig

NewJSONSchemaConfig returns a JSONSchemaConfig with default values.

type LogConfig added in v4.1.0

type LogConfig struct {
	Level         string            `json:"level" yaml:"level"`
	Fields        map[string]string `json:"fields" yaml:"fields"`
	FieldsMapping string            `json:"fields_mapping" yaml:"fields_mapping"`
	Message       string            `json:"message" yaml:"message"`
}

LogConfig contains configuration fields for the Log processor.

func NewLogConfig added in v4.1.0

func NewLogConfig() LogConfig

NewLogConfig returns a LogConfig with default values.

type MetricConfig added in v4.1.0

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Name   string            `json:"name" yaml:"name"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig added in v4.1.0

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type MongoDBConfig added in v4.1.0

type MongoDBConfig struct {
	MongoDB      client.Config       `json:",inline" yaml:",inline"`
	WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"`

	Operation       string                 `json:"operation" yaml:"operation"`
	FilterMap       string                 `json:"filter_map" yaml:"filter_map"`
	DocumentMap     string                 `json:"document_map" yaml:"document_map"`
	Upsert          bool                   `json:"upsert" yaml:"upsert"`
	HintMap         string                 `json:"hint_map" yaml:"hint_map"`
	RetryConfig     retries.Config         `json:",inline" yaml:",inline"`
	JSONMarshalMode client.JSONMarshalMode `json:"json_marshal_mode" yaml:"json_marshal_mode"`
}

MongoDBConfig contains configuration fields for the MongoDB processor.

func NewMongoDBConfig added in v4.1.0

func NewMongoDBConfig() MongoDBConfig

NewMongoDBConfig returns a MongoDBConfig with default values.

type ParallelConfig added in v4.1.0

type ParallelConfig struct {
	Cap        int      `json:"cap" yaml:"cap"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ParallelConfig is a config struct containing fields for the Parallel processor.

func NewParallelConfig added in v4.1.0

func NewParallelConfig() ParallelConfig

NewParallelConfig returns a default ParallelConfig.

type ParseLogConfig added in v4.1.0

type ParseLogConfig struct {
	Format       string `json:"format" yaml:"format"`
	Codec        string `json:"codec" yaml:"codec"`
	BestEffort   bool   `json:"best_effort" yaml:"best_effort"`
	WithRFC3339  bool   `json:"allow_rfc3339" yaml:"allow_rfc3339"`
	WithYear     string `json:"default_year" yaml:"default_year"`
	WithTimezone string `json:"default_timezone" yaml:"default_timezone"`
}

ParseLogConfig contains configuration fields for the ParseLog processor.

func NewParseLogConfig added in v4.1.0

func NewParseLogConfig() ParseLogConfig

NewParseLogConfig returns a ParseLogConfig with default values.

type Pipeline

type Pipeline interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// TriggerCloseNow signals that the component should close immediately,
	// messages in flight will be dropped.
	TriggerCloseNow()

	// WaitForClose blocks until the component has closed down or the context is
	// cancelled. Closing occurs either when the input transaction channel is
	// closed and messages are flushed (and acked), or when CloseNowAsync is
	// called.
	WaitForClose(ctx context.Context) error
}

Pipeline is an interface that implements channel based based consumer and producer methods for streaming data through a processing pipeline.

type PipelineConstructorFunc

type PipelineConstructorFunc func() (Pipeline, error)

PipelineConstructorFunc is a constructor to be called for each parallel stream pipeline thread in order to construct a custom pipeline implementation.

type ProtobufConfig added in v4.1.0

type ProtobufConfig struct {
	Operator    string   `json:"operator" yaml:"operator"`
	Message     string   `json:"message" yaml:"message"`
	ImportPaths []string `json:"import_paths" yaml:"import_paths"`
}

ProtobufConfig contains configuration fields for the Protobuf processor.

func NewProtobufConfig added in v4.1.0

func NewProtobufConfig() ProtobufConfig

NewProtobufConfig returns a ProtobufConfig with default values.

type RateLimitConfig added in v4.1.0

type RateLimitConfig struct {
	Resource string `json:"resource" yaml:"resource"`
}

RateLimitConfig contains configuration fields for the RateLimit processor.

func NewRateLimitConfig added in v4.1.0

func NewRateLimitConfig() RateLimitConfig

NewRateLimitConfig returns a RateLimitConfig with default values.

type SelectPartsConfig added in v4.1.0

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig added in v4.1.0

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SleepConfig added in v4.1.0

type SleepConfig struct {
	Duration string `json:"duration" yaml:"duration"`
}

SleepConfig contains configuration fields for the Sleep processor.

func NewSleepConfig added in v4.1.0

func NewSleepConfig() SleepConfig

NewSleepConfig returns a SleepConfig with default values.

type SplitConfig added in v4.1.0

type SplitConfig struct {
	Size     int `json:"size" yaml:"size"`
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig added in v4.1.0

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type SubprocessConfig added in v4.1.0

type SubprocessConfig struct {
	Name      string   `json:"name" yaml:"name"`
	Args      []string `json:"args" yaml:"args"`
	MaxBuffer int      `json:"max_buffer" yaml:"max_buffer"`
	CodecSend string   `json:"codec_send" yaml:"codec_send"`
	CodecRecv string   `json:"codec_recv" yaml:"codec_recv"`
}

SubprocessConfig contains configuration fields for the Subprocess processor.

func NewSubprocessConfig added in v4.1.0

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig returns a SubprocessConfig with default values.

type SwitchCaseConfig added in v4.1.0

type SwitchCaseConfig struct {
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
	Fallthrough bool     `json:"fallthrough" yaml:"fallthrough"`
}

SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.

func NewSwitchCaseConfig added in v4.1.0

func NewSwitchCaseConfig() SwitchCaseConfig

NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.

func (*SwitchCaseConfig) UnmarshalJSON added in v4.1.0

func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.

func (*SwitchCaseConfig) UnmarshalYAML added in v4.1.0

func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(any) error) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type SwitchConfig added in v4.1.0

type SwitchConfig []SwitchCaseConfig

SwitchConfig is a config struct containing fields for the Switch processor.

func NewSwitchConfig added in v4.1.0

func NewSwitchConfig() SwitchConfig

NewSwitchConfig returns a default SwitchConfig.

type V1

type V1 interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed. If zero messages are
	// returned and the error is nil then all messages are filtered.
	ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V1 is a common interface implemented by processors.

func NewV2BatchedToV1Processor

func NewV2BatchedToV1Processor(typeStr string, p V2Batched, mgr component.Observability) V1

NewV2BatchedToV1Processor wraps a processor.V2Batched with a struct that implements types.Processor.

func NewV2ToV1Processor

func NewV2ToV1Processor(typeStr string, p V2, mgr component.Observability) V1

NewV2ToV1Processor wraps a processor.V2 with a struct that implements V1.

type V2

type V2 interface {
	// Process a message into one or more resulting messages, or return an error
	// if the message could not be processed. If zero messages are returned and
	// the error is nil then the message is filtered.
	Process(ctx context.Context, p *message.Part) ([]*message.Part, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V2 is a simpler interface to implement than V1.

type V2Batched

type V2Batched interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed. If zero messages are
	// returned and the error is nil then all messages are filtered.
	ProcessBatch(ctx context.Context, spans []*tracing.Span, b message.Batch) ([]message.Batch, error)

	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

V2Batched is a simpler interface to implement than V1 and allows batch-wide processing.

type WhileConfig added in v4.1.0

type WhileConfig struct {
	AtLeastOnce bool     `json:"at_least_once" yaml:"at_least_once"`
	MaxLoops    int      `json:"max_loops" yaml:"max_loops"`
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
}

WhileConfig is a config struct containing fields for the While processor.

func NewWhileConfig added in v4.1.0

func NewWhileConfig() WhileConfig

NewWhileConfig returns a default WhileConfig.

type WorkflowConfig added in v4.1.0

type WorkflowConfig struct {
	MetaPath        string                  `json:"meta_path" yaml:"meta_path"`
	Order           [][]string              `json:"order" yaml:"order"`
	BranchResources []string                `json:"branch_resources" yaml:"branch_resources"`
	Branches        map[string]BranchConfig `json:"branches" yaml:"branches"`
}

WorkflowConfig is a config struct containing fields for the Workflow processor.

func NewWorkflowConfig added in v4.1.0

func NewWorkflowConfig() WorkflowConfig

NewWorkflowConfig returns a default WorkflowConfig.

type XMLConfig added in v4.1.0

type XMLConfig struct {
	Operator string `json:"operator" yaml:"operator"`
	Cast     bool   `json:"cast" yaml:"cast"`
}

XMLConfig contains configuration fields for the XML processor.

func NewXMLConfig added in v4.1.0

func NewXMLConfig() XMLConfig

NewXMLConfig returns a XMLConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL