processor

package
v4.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2022 License: MIT Imports: 70 Imported by: 0

Documentation

Overview

Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.

Index

Constants

View Source
const (
	TypeArchive      = "archive"
	TypeAvro         = "avro"
	TypeAWK          = "awk"
	TypeBloblang     = "bloblang"
	TypeBoundsCheck  = "bounds_check"
	TypeBranch       = "branch"
	TypeCache        = "cache"
	TypeCatch        = "catch"
	TypeCompress     = "compress"
	TypeDecompress   = "decompress"
	TypeDedupe       = "dedupe"
	TypeForEach      = "for_each"
	TypeGrok         = "grok"
	TypeGroupBy      = "group_by"
	TypeGroupByValue = "group_by_value"
	TypeHTTP         = "http"
	TypeInsertPart   = "insert_part"
	TypeJMESPath     = "jmespath"
	TypeJQ           = "jq"
	TypeJSONSchema   = "json_schema"
	TypeLog          = "log"
	TypeMetric       = "metric"
	TypeMongoDB      = "mongodb"
	TypeNoop         = "noop"
	TypeParallel     = "parallel"
	TypeParseLog     = "parse_log"
	TypeProtobuf     = "protobuf"
	TypeRateLimit    = "rate_limit"
	TypeRedis        = "redis"
	TypeResource     = "resource"
	TypeSelectParts  = "select_parts"
	TypeSleep        = "sleep"
	TypeSplit        = "split"
	TypeSubprocess   = "subprocess"
	TypeSwitch       = "switch"
	TypeSyncResponse = "sync_response"
	TypeTry          = "try"
	TypeThrottle     = "throttle"
	TypeUnarchive    = "unarchive"
	TypeWhile        = "while"
	TypeWorkflow     = "workflow"
	TypeXML          = "xml"
)

String constants representing each processor type. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

View Source
var DocsUsesBatches = `` /* 177-byte string literal not displayed */

DocsUsesBatches returns a documentation paragraph regarding processors that benefit from input level batching.

View Source
var FailFlagKey = message.FailFlagKey

FailFlagKey is a metadata key used for flagging processor errors in Benthos. If a message part has any non-empty value for this metadata key then it will be interpretted as having failed a processor step somewhere in the pipeline.

Functions

func ClearFail

func ClearFail(part *message.Part)

ClearFail removes any existing failure flags from a message part.

func ExecuteAll

func ExecuteAll(procs []processor.V1, msgs ...*message.Batch) ([]*message.Batch, error)

ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func ExecuteCatchAll

func ExecuteCatchAll(procs []processor.V1, msgs ...*message.Batch) ([]*message.Batch, error)

ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.

func ExecuteTryAll

func ExecuteTryAll(procs []processor.V1, msgs ...*message.Batch) ([]*message.Batch, error)

ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.

func FlagErr

func FlagErr(part *message.Part, err error)

FlagErr marks a message part as having failed at a processing step with an error message. If the error is nil the message part remains unchanged.

func FlagFail

func FlagFail(part *message.Part)

FlagFail marks a message part as having failed at a processing step.

func GetFail

func GetFail(part *message.Part) string

GetFail returns an error string for a message part if it has failed, or an empty string if not.

func HasFailed

func HasFailed(part *message.Part) bool

HasFailed checks whether a message part has failed a processing step.

func IteratePartsWithSpanV2

func IteratePartsWithSpanV2(
	operationName string, parts []int, msg *message.Batch,
	iter func(int, *tracing.Span, *message.Part) error,
)

IteratePartsWithSpanV2 iterates the parts of a message according to a slice of indexes (if empty all parts are iterated) and calls a func for each part along with a tracing span for that part. If an error is returned the part is flagged as failed and the span has the error logged.

func New

func New(
	conf Config,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (processor.V1, error)

New creates a processor type based on a processor configuration.

func NewBloblangFromExecutor

func NewBloblangFromExecutor(exec *mapping.Executor, log log.Modular) processor.V2Batched

NewBloblangFromExecutor returns a new bloblang processor from an executor.

func NewLog

func NewLog(
	conf Config, mgr interop.Manager, logger log.Modular, stats metrics.Type,
) (processor.V1, error)

NewLog returns a Log processor.

func NewMetric

func NewMetric(
	conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type,
) (processor.V1, error)

NewMetric returns a Metric processor.

func NewNoop

func NewNoop(
	conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type,
) (processor.V1, error)

NewNoop returns a Noop processor.

func NewResource

func NewResource(
	conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type,
) (processor.V1, error)

NewResource returns a resource processor.

func NewSyncResponse

func NewSyncResponse(
	conf Config, mgr interop.Manager, logger log.Modular, stats metrics.Type,
) (processor.V1, error)

NewSyncResponse returns a SyncResponse processor.

func WalkConstructors

func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))

WalkConstructors iterates each component constructor.

Types

type AWKConfig

type AWKConfig struct {
	Codec   string `json:"codec" yaml:"codec"`
	Program string `json:"program" yaml:"program"`
}

AWKConfig contains configuration fields for the AWK processor.

func NewAWKConfig

func NewAWKConfig() AWKConfig

NewAWKConfig returns a AWKConfig with default values.

type ArchiveConfig

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains configuration fields for the Archive processor.

func NewArchiveConfig

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type AvroConfig

type AvroConfig struct {
	Operator   string `json:"operator" yaml:"operator"`
	Encoding   string `json:"encoding" yaml:"encoding"`
	Schema     string `json:"schema" yaml:"schema"`
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
}

AvroConfig contains configuration fields for the Avro processor.

func NewAvroConfig

func NewAvroConfig() AvroConfig

NewAvroConfig returns a AvroConfig with default values.

type BloblangConfig

type BloblangConfig string

BloblangConfig contains configuration fields for the Bloblang processor.

func NewBloblangConfig

func NewBloblangConfig() BloblangConfig

NewBloblangConfig returns a BloblangConfig with default values.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Branch

type Branch struct {
	// contains filtered or unexported fields
}

Branch contains conditions and maps for transforming a batch of messages into a subset of request messages, and mapping results from those requests back into the original message batch.

func (*Branch) CloseAsync

func (b *Branch) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Branch) ProcessMessage

func (b *Branch) ProcessMessage(msg *message.Batch) ([]*message.Batch, error)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Branch) WaitForClose

func (b *Branch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type BranchConfig

type BranchConfig struct {
	RequestMap string   `json:"request_map" yaml:"request_map"`
	Processors []Config `json:"processors" yaml:"processors"`
	ResultMap  string   `json:"result_map" yaml:"result_map"`
}

BranchConfig contains configuration fields for the Branch processor.

func NewBranchConfig

func NewBranchConfig() BranchConfig

NewBranchConfig returns a BranchConfig with default values.

type CacheConfig

type CacheConfig struct {
	Resource string `json:"resource" yaml:"resource"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
	TTL      string `json:"ttl" yaml:"ttl"`
}

CacheConfig contains configuration fields for the Cache processor.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig returns a CacheConfig with default values.

type CatchConfig

type CatchConfig []Config

CatchConfig is a config struct containing fields for the Catch processor.

func NewCatchConfig

func NewCatchConfig() CatchConfig

NewCatchConfig returns a default CatchConfig.

type Category

type Category string

Category describes the general purpose of a processor.

var (
	CategoryMapping     Category = "Mapping"
	CategoryParsing     Category = "Parsing"
	CategoryIntegration Category = "Integration"
	CategoryComposition Category = "Composition"
	CategoryUtility     Category = "Utility"
)

Processor categories

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Config

type Config struct {
	Label        string             `json:"label" yaml:"label"`
	Type         string             `json:"type" yaml:"type"`
	Archive      ArchiveConfig      `json:"archive" yaml:"archive"`
	Avro         AvroConfig         `json:"avro" yaml:"avro"`
	AWK          AWKConfig          `json:"awk" yaml:"awk"`
	Bloblang     BloblangConfig     `json:"bloblang" yaml:"bloblang"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Branch       BranchConfig       `json:"branch" yaml:"branch"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Catch        CatchConfig        `json:"catch" yaml:"catch"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	ForEach      ForEachConfig      `json:"for_each" yaml:"for_each"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	GroupBy      GroupByConfig      `json:"group_by" yaml:"group_by"`
	GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
	HTTP         HTTPConfig         `json:"http" yaml:"http"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JQ           JQConfig           `json:"jq" yaml:"jq"`
	JSONSchema   JSONSchemaConfig   `json:"json_schema" yaml:"json_schema"`
	Log          LogConfig          `json:"log" yaml:"log"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	MongoDB      MongoDBConfig      `json:"mongodb" yaml:"mongodb"`
	Noop         NoopConfig         `json:"noop" yaml:"noop"`
	Plugin       interface{}        `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Parallel     ParallelConfig     `json:"parallel" yaml:"parallel"`
	ParseLog     ParseLogConfig     `json:"parse_log" yaml:"parse_log"`
	ProcessBatch ForEachConfig      `json:"process_batch" yaml:"process_batch"`
	Protobuf     ProtobufConfig     `json:"protobuf" yaml:"protobuf"`
	RateLimit    RateLimitConfig    `json:"rate_limit" yaml:"rate_limit"`
	Redis        RedisConfig        `json:"redis" yaml:"redis"`
	Resource     string             `json:"resource" yaml:"resource"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Sleep        SleepConfig        `json:"sleep" yaml:"sleep"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse SyncResponseConfig `json:"sync_response" yaml:"sync_response"`
	Try          TryConfig          `json:"try" yaml:"try"`
	Unarchive    UnarchiveConfig    `json:"unarchive" yaml:"unarchive"`
	While        WhileConfig        `json:"while" yaml:"while"`
	Workflow     WorkflowConfig     `json:"workflow" yaml:"workflow"`
	XML          XMLConfig          `json:"xml" yaml:"xml"`
}

Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl

func (*Config) UnmarshalYAML

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type ConstructorFunc

type ConstructorFunc func(Config, interop.Manager, log.Modular, metrics.Type) (processor.V1, error)

ConstructorFunc is a func signature able to construct a processor.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type DedupeConfig

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type ForEachConfig

type ForEachConfig []Config

ForEachConfig is a config struct containing fields for the ForEach processor.

func NewForEachConfig

func NewForEachConfig() ForEachConfig

NewForEachConfig returns a default ForEachConfig.

type GrokConfig

type GrokConfig struct {
	Expressions        []string          `json:"expressions" yaml:"expressions"`
	RemoveEmpty        bool              `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly          bool              `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults        bool              `json:"use_default_patterns" yaml:"use_default_patterns"`
	PatternPaths       []string          `json:"pattern_paths" yaml:"pattern_paths"`
	PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type GroupByConfig

type GroupByConfig []GroupByElement

GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.

func NewGroupByConfig

func NewGroupByConfig() GroupByConfig

NewGroupByConfig returns a GroupByConfig with default values.

type GroupByElement

type GroupByElement struct {
	Check      string   `json:"check" yaml:"check"`
	Processors []Config `json:"processors" yaml:"processors"`
}

GroupByElement represents a group determined by a condition and a list of group specific processors.

type GroupByValueConfig

type GroupByValueConfig struct {
	Value string `json:"value" yaml:"value"`
}

GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.

func NewGroupByValueConfig

func NewGroupByValueConfig() GroupByValueConfig

NewGroupByValueConfig returns a GroupByValueConfig with default values.

type HTTPConfig

type HTTPConfig struct {
	BatchAsMultipart bool `json:"batch_as_multipart" yaml:"batch_as_multipart"`
	Parallel         bool `json:"parallel" yaml:"parallel"`
	ihttpdocs.Config `json:",inline" yaml:",inline"`
}

HTTPConfig contains configuration fields for the HTTP processor.

func NewHTTPConfig

func NewHTTPConfig() HTTPConfig

NewHTTPConfig returns a HTTPConfig with default values.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPathConfig

type JMESPathConfig struct {
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JQConfig

type JQConfig struct {
	Query     string `json:"query" yaml:"query"`
	Raw       bool   `json:"raw" yaml:"raw"`
	OutputRaw bool   `json:"output_raw" yaml:"output_raw"`
}

JQConfig contains configuration fields for the JQ processor.

func NewJQConfig

func NewJQConfig() JQConfig

NewJQConfig returns a JQConfig with default values.

type JSONSchemaConfig

type JSONSchemaConfig struct {
	SchemaPath string `json:"schema_path" yaml:"schema_path"`
	Schema     string `json:"schema" yaml:"schema"`
}

JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.

func NewJSONSchemaConfig

func NewJSONSchemaConfig() JSONSchemaConfig

NewJSONSchemaConfig returns a JSONSchemaConfig with default values.

type Log

type Log struct {
	// contains filtered or unexported fields
}

Log is a processor that prints a log event each time it processes a message.

func (*Log) CloseAsync

func (l *Log) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Log) ProcessMessage

func (l *Log) ProcessMessage(msg *message.Batch) ([]*message.Batch, error)

ProcessMessage logs an event and returns the message unchanged.

func (*Log) WaitForClose

func (l *Log) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type LogConfig

type LogConfig struct {
	Level         string            `json:"level" yaml:"level"`
	Fields        map[string]string `json:"fields" yaml:"fields"`
	FieldsMapping string            `json:"fields_mapping" yaml:"fields_mapping"`
	Message       string            `json:"message" yaml:"message"`
}

LogConfig contains configuration fields for the Log processor.

func NewLogConfig

func NewLogConfig() LogConfig

NewLogConfig returns a LogConfig with default values.

type Metric

type Metric struct {
	// contains filtered or unexported fields
}

Metric is a processor that creates a metric from extracted values from a message part.

func (*Metric) CloseAsync

func (m *Metric) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Metric) ProcessMessage

func (m *Metric) ProcessMessage(msg *message.Batch) ([]*message.Batch, error)

ProcessMessage applies the processor to a message

func (*Metric) WaitForClose

func (m *Metric) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type MetricConfig

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Name   string            `json:"name" yaml:"name"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type MongoDBConfig

type MongoDBConfig struct {
	MongoDB      client.Config       `json:",inline" yaml:",inline"`
	WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"`

	Operation       string                 `json:"operation" yaml:"operation"`
	FilterMap       string                 `json:"filter_map" yaml:"filter_map"`
	DocumentMap     string                 `json:"document_map" yaml:"document_map"`
	Upsert          bool                   `json:"upsert" yaml:"upsert"`
	HintMap         string                 `json:"hint_map" yaml:"hint_map"`
	RetryConfig     retries.Config         `json:",inline" yaml:",inline"`
	JSONMarshalMode client.JSONMarshalMode `json:"json_marshal_mode" yaml:"json_marshal_mode"`
}

MongoDBConfig contains configuration fields for the MongoDB processor.

func NewMongoDBConfig

func NewMongoDBConfig() MongoDBConfig

NewMongoDBConfig returns a MongoDBConfig with default values.

type Noop

type Noop struct{}

Noop is a no-op processor that does nothing.

func (*Noop) CloseAsync

func (c *Noop) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Noop) ProcessMessage

func (c *Noop) ProcessMessage(msg *message.Batch) ([]*message.Batch, error)

ProcessMessage does nothing and returns the message unchanged.

func (*Noop) WaitForClose

func (c *Noop) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type NoopConfig

type NoopConfig struct{}

NoopConfig configures the no-op processor.

func NewNoopConfig

func NewNoopConfig() NoopConfig

NewNoopConfig creates a new default no-op processor config.

type ParallelConfig

type ParallelConfig struct {
	Cap        int      `json:"cap" yaml:"cap"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ParallelConfig is a config struct containing fields for the Parallel processor.

func NewParallelConfig

func NewParallelConfig() ParallelConfig

NewParallelConfig returns a default ParallelConfig.

type ParseLogConfig

type ParseLogConfig struct {
	Format       string `json:"format" yaml:"format"`
	Codec        string `json:"codec" yaml:"codec"`
	BestEffort   bool   `json:"best_effort" yaml:"best_effort"`
	WithRFC3339  bool   `json:"allow_rfc3339" yaml:"allow_rfc3339"`
	WithYear     string `json:"default_year" yaml:"default_year"`
	WithTimezone string `json:"default_timezone" yaml:"default_timezone"`
}

ParseLogConfig contains configuration fields for the ParseLog processor.

func NewParseLogConfig

func NewParseLogConfig() ParseLogConfig

NewParseLogConfig returns a ParseLogConfig with default values.

type ProtobufConfig

type ProtobufConfig struct {
	Operator    string   `json:"operator" yaml:"operator"`
	Message     string   `json:"message" yaml:"message"`
	ImportPaths []string `json:"import_paths" yaml:"import_paths"`
}

ProtobufConfig contains configuration fields for the Protobuf processor.

func NewProtobufConfig

func NewProtobufConfig() ProtobufConfig

NewProtobufConfig returns a ProtobufConfig with default values.

type RateLimitConfig

type RateLimitConfig struct {
	Resource string `json:"resource" yaml:"resource"`
}

RateLimitConfig contains configuration fields for the RateLimit processor.

func NewRateLimitConfig

func NewRateLimitConfig() RateLimitConfig

NewRateLimitConfig returns a RateLimitConfig with default values.

type RedisConfig

type RedisConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Operator      string `json:"operator" yaml:"operator"`
	Key           string `json:"key" yaml:"key"`

	// TODO: V4 replace this
	Retries     int    `json:"retries" yaml:"retries"`
	RetryPeriod string `json:"retry_period" yaml:"retry_period"`
}

RedisConfig contains configuration fields for the Redis processor.

func NewRedisConfig

func NewRedisConfig() RedisConfig

NewRedisConfig returns a RedisConfig with default values.

type Resource

type Resource struct {
	// contains filtered or unexported fields
}

Resource is a processor that returns the result of a processor resource.

func (*Resource) CloseAsync

func (r *Resource) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Resource) ProcessMessage

func (r *Resource) ProcessMessage(msg *message.Batch) (msgs []*message.Batch, res error)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

func (*Resource) WaitForClose

func (r *Resource) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type SleepConfig

type SleepConfig struct {
	Duration string `json:"duration" yaml:"duration"`
}

SleepConfig contains configuration fields for the Sleep processor.

func NewSleepConfig

func NewSleepConfig() SleepConfig

NewSleepConfig returns a SleepConfig with default values.

type SplitConfig

type SplitConfig struct {
	Size     int `json:"size" yaml:"size"`
	ByteSize int `json:"byte_size" yaml:"byte_size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type SubprocessConfig

type SubprocessConfig struct {
	Name      string   `json:"name" yaml:"name"`
	Args      []string `json:"args" yaml:"args"`
	MaxBuffer int      `json:"max_buffer" yaml:"max_buffer"`
	CodecSend string   `json:"codec_send" yaml:"codec_send"`
	CodecRecv string   `json:"codec_recv" yaml:"codec_recv"`
}

SubprocessConfig contains configuration fields for the Subprocess processor.

func NewSubprocessConfig

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig returns a SubprocessConfig with default values.

type SwitchCaseConfig

type SwitchCaseConfig struct {
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
	Fallthrough bool     `json:"fallthrough" yaml:"fallthrough"`
}

SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.

func NewSwitchCaseConfig

func NewSwitchCaseConfig() SwitchCaseConfig

NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.

func (*SwitchCaseConfig) UnmarshalJSON

func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.

func (*SwitchCaseConfig) UnmarshalYAML

func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type SwitchConfig

type SwitchConfig []SwitchCaseConfig

SwitchConfig is a config struct containing fields for the Switch processor.

func NewSwitchConfig

func NewSwitchConfig() SwitchConfig

NewSwitchConfig returns a default SwitchConfig.

type SyncResponse

type SyncResponse struct {
	// contains filtered or unexported fields
}

SyncResponse is a processor that prints a log event each time it processes a message.

func (*SyncResponse) CloseAsync

func (s *SyncResponse) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*SyncResponse) ProcessMessage

func (s *SyncResponse) ProcessMessage(msg *message.Batch) ([]*message.Batch, error)

ProcessMessage logs an event and returns the message unchanged.

func (*SyncResponse) WaitForClose

func (s *SyncResponse) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type SyncResponseConfig

type SyncResponseConfig struct{}

SyncResponseConfig contains configuration fields for the SyncResponse processor.

func NewSyncResponseConfig

func NewSyncResponseConfig() SyncResponseConfig

NewSyncResponseConfig returns a SyncResponseConfig with default values.

type TryConfig

type TryConfig []Config

TryConfig is a config struct containing fields for the Try processor.

func NewTryConfig

func NewTryConfig() TryConfig

NewTryConfig returns a default TryConfig.

type TypeSpec

type TypeSpec struct {

	// UsesBatches indicates whether this processors functionality is best
	// applied on messages that are already batched.
	UsesBatches bool

	Status      docs.Status
	Version     string
	Summary     string
	Description string
	Categories  []Category
	Footnotes   string

	FieldSpecs docs.FieldSpecs
	Examples   []docs.AnnotatedExample
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type UnarchiveConfig

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
}

UnarchiveConfig contains configuration fields for the Unarchive processor.

func NewUnarchiveConfig

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

type WhileConfig

type WhileConfig struct {
	AtLeastOnce bool     `json:"at_least_once" yaml:"at_least_once"`
	MaxLoops    int      `json:"max_loops" yaml:"max_loops"`
	Check       string   `json:"check" yaml:"check"`
	Processors  []Config `json:"processors" yaml:"processors"`
}

WhileConfig is a config struct containing fields for the While processor.

func NewWhileConfig

func NewWhileConfig() WhileConfig

NewWhileConfig returns a default WhileConfig.

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.

func NewWorkflow

func NewWorkflow(conf WorkflowConfig, mgr interop.Manager) (*Workflow, error)

NewWorkflow instanciates a new workflow processor.

func (*Workflow) CloseAsync

func (w *Workflow) CloseAsync()

CloseAsync shuts down the processor and stops processing requests.

func (*Workflow) Flow

func (w *Workflow) Flow() [][]string

Flow returns the calculated workflow as a 2D slice.

func (*Workflow) ProcessMessage

func (w *Workflow) ProcessMessage(msg *message.Batch) ([]*message.Batch, error)

ProcessMessage applies workflow stages to each part of a message type.

func (*Workflow) WaitForClose

func (w *Workflow) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type WorkflowConfig

type WorkflowConfig struct {
	MetaPath        string                  `json:"meta_path" yaml:"meta_path"`
	Order           [][]string              `json:"order" yaml:"order"`
	BranchResources []string                `json:"branch_resources" yaml:"branch_resources"`
	Branches        map[string]BranchConfig `json:"branches" yaml:"branches"`
}

WorkflowConfig is a config struct containing fields for the Workflow processor.

func NewWorkflowConfig

func NewWorkflowConfig() WorkflowConfig

NewWorkflowConfig returns a default WorkflowConfig.

type XMLConfig

type XMLConfig struct {
	Operator string `json:"operator" yaml:"operator"`
	Cast     bool   `json:"cast" yaml:"cast"`
}

XMLConfig contains configuration fields for the XML processor.

func NewXMLConfig

func NewXMLConfig() XMLConfig

NewXMLConfig returns a XMLConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL