Documentation ¶
Overview ¶
Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.
Index ¶
- Constants
- Variables
- func ClearFail(part *message.Part)
- func ExecuteAll(procs []processor.V1, msgs ...*message.Batch) ([]*message.Batch, error)
- func ExecuteCatchAll(procs []processor.V1, msgs ...*message.Batch) ([]*message.Batch, error)
- func ExecuteTryAll(procs []processor.V1, msgs ...*message.Batch) ([]*message.Batch, error)
- func FlagErr(part *message.Part, err error)
- func FlagFail(part *message.Part)
- func GetFail(part *message.Part) string
- func HasFailed(part *message.Part) bool
- func IteratePartsWithSpanV2(operationName string, parts []int, msg *message.Batch, ...)
- func New(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (processor.V1, error)
- func NewBloblangFromExecutor(exec *mapping.Executor, log log.Modular) processor.V2Batched
- func NewLog(conf Config, mgr interop.Manager, logger log.Modular, stats metrics.Type) (processor.V1, error)
- func NewMetric(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (processor.V1, error)
- func NewNoop(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (processor.V1, error)
- func NewResource(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (processor.V1, error)
- func NewSyncResponse(conf Config, mgr interop.Manager, logger log.Modular, stats metrics.Type) (processor.V1, error)
- func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
- type AWKConfig
- type ArchiveConfig
- type AvroConfig
- type BloblangConfig
- type BoundsCheckConfig
- type Branch
- type BranchConfig
- type CacheConfig
- type CatchConfig
- type Category
- type CompressConfig
- type Config
- type ConstructorFunc
- type DecompressConfig
- type DedupeConfig
- type ForEachConfig
- type GrokConfig
- type GroupByConfig
- type GroupByElement
- type GroupByValueConfig
- type HTTPConfig
- type InsertPartConfig
- type JMESPathConfig
- type JQConfig
- type JSONSchemaConfig
- type Log
- type LogConfig
- type Metric
- type MetricConfig
- type MongoDBConfig
- type Noop
- type NoopConfig
- type ParallelConfig
- type ParseLogConfig
- type ProtobufConfig
- type RateLimitConfig
- type RedisConfig
- type Resource
- type SelectPartsConfig
- type SleepConfig
- type SplitConfig
- type SubprocessConfig
- type SwitchCaseConfig
- type SwitchConfig
- type SyncResponse
- type SyncResponseConfig
- type TryConfig
- type TypeSpec
- type UnarchiveConfig
- type WhileConfig
- type Workflow
- type WorkflowConfig
- type XMLConfig
Constants ¶
const ( TypeArchive = "archive" TypeAvro = "avro" TypeAWK = "awk" TypeBloblang = "bloblang" TypeBoundsCheck = "bounds_check" TypeBranch = "branch" TypeCache = "cache" TypeCatch = "catch" TypeCompress = "compress" TypeDecompress = "decompress" TypeDedupe = "dedupe" TypeForEach = "for_each" TypeGrok = "grok" TypeGroupBy = "group_by" TypeGroupByValue = "group_by_value" TypeHTTP = "http" TypeInsertPart = "insert_part" TypeJMESPath = "jmespath" TypeJQ = "jq" TypeJSONSchema = "json_schema" TypeLog = "log" TypeMetric = "metric" TypeMongoDB = "mongodb" TypeNoop = "noop" TypeParallel = "parallel" TypeParseLog = "parse_log" TypeProtobuf = "protobuf" TypeRateLimit = "rate_limit" TypeRedis = "redis" TypeResource = "resource" TypeSelectParts = "select_parts" TypeSleep = "sleep" TypeSplit = "split" TypeSubprocess = "subprocess" TypeSwitch = "switch" TypeSyncResponse = "sync_response" TypeTry = "try" TypeThrottle = "throttle" TypeUnarchive = "unarchive" TypeWhile = "while" TypeWorkflow = "workflow" TypeXML = "xml" )
String constants representing each processor type. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
Variables ¶
var Constructors = map[string]TypeSpec{}
Constructors is a map of all processor types with their specs.
var DocsUsesBatches = `` /* 177-byte string literal not displayed */
DocsUsesBatches returns a documentation paragraph regarding processors that benefit from input level batching.
var FailFlagKey = message.FailFlagKey
FailFlagKey is a metadata key used for flagging processor errors in Benthos. If a message part has any non-empty value for this metadata key then it will be interpretted as having failed a processor step somewhere in the pipeline.
Functions ¶
func ExecuteAll ¶
ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func ExecuteCatchAll ¶
ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response.
func ExecuteTryAll ¶
ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func FlagErr ¶
FlagErr marks a message part as having failed at a processing step with an error message. If the error is nil the message part remains unchanged.
func GetFail ¶
GetFail returns an error string for a message part if it has failed, or an empty string if not.
func IteratePartsWithSpanV2 ¶
func IteratePartsWithSpanV2( operationName string, parts []int, msg *message.Batch, iter func(int, *tracing.Span, *message.Part) error, )
IteratePartsWithSpanV2 iterates the parts of a message according to a slice of indexes (if empty all parts are iterated) and calls a func for each part along with a tracing span for that part. If an error is returned the part is flagged as failed and the span has the error logged.
func New ¶
func New( conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type, ) (processor.V1, error)
New creates a processor type based on a processor configuration.
func NewBloblangFromExecutor ¶
NewBloblangFromExecutor returns a new bloblang processor from an executor.
func NewLog ¶
func NewLog( conf Config, mgr interop.Manager, logger log.Modular, stats metrics.Type, ) (processor.V1, error)
NewLog returns a Log processor.
func NewMetric ¶
func NewMetric( conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type, ) (processor.V1, error)
NewMetric returns a Metric processor.
func NewNoop ¶
func NewNoop( conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type, ) (processor.V1, error)
NewNoop returns a Noop processor.
func NewResource ¶
func NewResource( conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type, ) (processor.V1, error)
NewResource returns a resource processor.
func NewSyncResponse ¶
func NewSyncResponse( conf Config, mgr interop.Manager, logger log.Modular, stats metrics.Type, ) (processor.V1, error)
NewSyncResponse returns a SyncResponse processor.
func WalkConstructors ¶
func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
WalkConstructors iterates each component constructor.
Types ¶
type AWKConfig ¶
type AWKConfig struct { Codec string `json:"codec" yaml:"codec"` Program string `json:"program" yaml:"program"` }
AWKConfig contains configuration fields for the AWK processor.
func NewAWKConfig ¶
func NewAWKConfig() AWKConfig
NewAWKConfig returns a AWKConfig with default values.
type ArchiveConfig ¶
type ArchiveConfig struct { Format string `json:"format" yaml:"format"` Path string `json:"path" yaml:"path"` }
ArchiveConfig contains configuration fields for the Archive processor.
func NewArchiveConfig ¶
func NewArchiveConfig() ArchiveConfig
NewArchiveConfig returns a ArchiveConfig with default values.
type AvroConfig ¶
type AvroConfig struct { Operator string `json:"operator" yaml:"operator"` Encoding string `json:"encoding" yaml:"encoding"` Schema string `json:"schema" yaml:"schema"` SchemaPath string `json:"schema_path" yaml:"schema_path"` }
AvroConfig contains configuration fields for the Avro processor.
func NewAvroConfig ¶
func NewAvroConfig() AvroConfig
NewAvroConfig returns a AvroConfig with default values.
type BloblangConfig ¶
type BloblangConfig string
BloblangConfig contains configuration fields for the Bloblang processor.
func NewBloblangConfig ¶
func NewBloblangConfig() BloblangConfig
NewBloblangConfig returns a BloblangConfig with default values.
type BoundsCheckConfig ¶
type BoundsCheckConfig struct { MaxParts int `json:"max_parts" yaml:"max_parts"` MinParts int `json:"min_parts" yaml:"min_parts"` MaxPartSize int `json:"max_part_size" yaml:"max_part_size"` MinPartSize int `json:"min_part_size" yaml:"min_part_size"` }
BoundsCheckConfig contains configuration fields for the BoundsCheck processor.
func NewBoundsCheckConfig ¶
func NewBoundsCheckConfig() BoundsCheckConfig
NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
type Branch ¶
type Branch struct {
// contains filtered or unexported fields
}
Branch contains conditions and maps for transforming a batch of messages into a subset of request messages, and mapping results from those requests back into the original message batch.
func (*Branch) CloseAsync ¶
func (b *Branch) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Branch) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type BranchConfig ¶
type BranchConfig struct { RequestMap string `json:"request_map" yaml:"request_map"` Processors []Config `json:"processors" yaml:"processors"` ResultMap string `json:"result_map" yaml:"result_map"` }
BranchConfig contains configuration fields for the Branch processor.
func NewBranchConfig ¶
func NewBranchConfig() BranchConfig
NewBranchConfig returns a BranchConfig with default values.
type CacheConfig ¶
type CacheConfig struct { Resource string `json:"resource" yaml:"resource"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` Value string `json:"value" yaml:"value"` TTL string `json:"ttl" yaml:"ttl"` }
CacheConfig contains configuration fields for the Cache processor.
func NewCacheConfig ¶
func NewCacheConfig() CacheConfig
NewCacheConfig returns a CacheConfig with default values.
type CatchConfig ¶
type CatchConfig []Config
CatchConfig is a config struct containing fields for the Catch processor.
func NewCatchConfig ¶
func NewCatchConfig() CatchConfig
NewCatchConfig returns a default CatchConfig.
type CompressConfig ¶
type CompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Level int `json:"level" yaml:"level"` }
CompressConfig contains configuration fields for the Compress processor.
func NewCompressConfig ¶
func NewCompressConfig() CompressConfig
NewCompressConfig returns a CompressConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Archive ArchiveConfig `json:"archive" yaml:"archive"` Avro AvroConfig `json:"avro" yaml:"avro"` AWK AWKConfig `json:"awk" yaml:"awk"` Bloblang BloblangConfig `json:"bloblang" yaml:"bloblang"` BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"` Branch BranchConfig `json:"branch" yaml:"branch"` Cache CacheConfig `json:"cache" yaml:"cache"` Catch CatchConfig `json:"catch" yaml:"catch"` Compress CompressConfig `json:"compress" yaml:"compress"` Decompress DecompressConfig `json:"decompress" yaml:"decompress"` Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"` ForEach ForEachConfig `json:"for_each" yaml:"for_each"` Grok GrokConfig `json:"grok" yaml:"grok"` GroupBy GroupByConfig `json:"group_by" yaml:"group_by"` GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"` HTTP HTTPConfig `json:"http" yaml:"http"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` JQ JQConfig `json:"jq" yaml:"jq"` JSONSchema JSONSchemaConfig `json:"json_schema" yaml:"json_schema"` Log LogConfig `json:"log" yaml:"log"` Metric MetricConfig `json:"metric" yaml:"metric"` MongoDB MongoDBConfig `json:"mongodb" yaml:"mongodb"` Noop NoopConfig `json:"noop" yaml:"noop"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` Parallel ParallelConfig `json:"parallel" yaml:"parallel"` ParseLog ParseLogConfig `json:"parse_log" yaml:"parse_log"` ProcessBatch ForEachConfig `json:"process_batch" yaml:"process_batch"` Protobuf ProtobufConfig `json:"protobuf" yaml:"protobuf"` RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"` Redis RedisConfig `json:"redis" yaml:"redis"` Resource string `json:"resource" yaml:"resource"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` Sleep SleepConfig `json:"sleep" yaml:"sleep"` Split SplitConfig `json:"split" yaml:"split"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse SyncResponseConfig `json:"sync_response" yaml:"sync_response"` Try TryConfig `json:"try" yaml:"try"` Unarchive UnarchiveConfig `json:"unarchive" yaml:"unarchive"` While WhileConfig `json:"while" yaml:"while"` Workflow WorkflowConfig `json:"workflow" yaml:"workflow"` XML XMLConfig `json:"xml" yaml:"xml"` }
Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
type ConstructorFunc ¶
ConstructorFunc is a func signature able to construct a processor.
type DecompressConfig ¶
type DecompressConfig struct {
Algorithm string `json:"algorithm" yaml:"algorithm"`
}
DecompressConfig contains configuration fields for the Decompress processor.
func NewDecompressConfig ¶
func NewDecompressConfig() DecompressConfig
NewDecompressConfig returns a DecompressConfig with default values.
type DedupeConfig ¶
type DedupeConfig struct { Cache string `json:"cache" yaml:"cache"` Key string `json:"key" yaml:"key"` DropOnCacheErr bool `json:"drop_on_err" yaml:"drop_on_err"` }
DedupeConfig contains configuration fields for the Dedupe processor.
func NewDedupeConfig ¶
func NewDedupeConfig() DedupeConfig
NewDedupeConfig returns a DedupeConfig with default values.
type ForEachConfig ¶
type ForEachConfig []Config
ForEachConfig is a config struct containing fields for the ForEach processor.
func NewForEachConfig ¶
func NewForEachConfig() ForEachConfig
NewForEachConfig returns a default ForEachConfig.
type GrokConfig ¶
type GrokConfig struct { Expressions []string `json:"expressions" yaml:"expressions"` RemoveEmpty bool `json:"remove_empty_values" yaml:"remove_empty_values"` NamedOnly bool `json:"named_captures_only" yaml:"named_captures_only"` UseDefaults bool `json:"use_default_patterns" yaml:"use_default_patterns"` PatternPaths []string `json:"pattern_paths" yaml:"pattern_paths"` PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"` }
GrokConfig contains configuration fields for the Grok processor.
func NewGrokConfig ¶
func NewGrokConfig() GrokConfig
NewGrokConfig returns a GrokConfig with default values.
type GroupByConfig ¶
type GroupByConfig []GroupByElement
GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.
func NewGroupByConfig ¶
func NewGroupByConfig() GroupByConfig
NewGroupByConfig returns a GroupByConfig with default values.
type GroupByElement ¶
type GroupByElement struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
GroupByElement represents a group determined by a condition and a list of group specific processors.
type GroupByValueConfig ¶
type GroupByValueConfig struct {
Value string `json:"value" yaml:"value"`
}
GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.
func NewGroupByValueConfig ¶
func NewGroupByValueConfig() GroupByValueConfig
NewGroupByValueConfig returns a GroupByValueConfig with default values.
type HTTPConfig ¶
type HTTPConfig struct { BatchAsMultipart bool `json:"batch_as_multipart" yaml:"batch_as_multipart"` Parallel bool `json:"parallel" yaml:"parallel"` ihttpdocs.Config `json:",inline" yaml:",inline"` }
HTTPConfig contains configuration fields for the HTTP processor.
func NewHTTPConfig ¶
func NewHTTPConfig() HTTPConfig
NewHTTPConfig returns a HTTPConfig with default values.
type InsertPartConfig ¶
type InsertPartConfig struct { Index int `json:"index" yaml:"index"` Content string `json:"content" yaml:"content"` }
InsertPartConfig contains configuration fields for the InsertPart processor.
func NewInsertPartConfig ¶
func NewInsertPartConfig() InsertPartConfig
NewInsertPartConfig returns a InsertPartConfig with default values.
type JMESPathConfig ¶
type JMESPathConfig struct {
Query string `json:"query" yaml:"query"`
}
JMESPathConfig contains configuration fields for the JMESPath processor.
func NewJMESPathConfig ¶
func NewJMESPathConfig() JMESPathConfig
NewJMESPathConfig returns a JMESPathConfig with default values.
type JQConfig ¶
type JQConfig struct { Query string `json:"query" yaml:"query"` Raw bool `json:"raw" yaml:"raw"` OutputRaw bool `json:"output_raw" yaml:"output_raw"` }
JQConfig contains configuration fields for the JQ processor.
type JSONSchemaConfig ¶
type JSONSchemaConfig struct { SchemaPath string `json:"schema_path" yaml:"schema_path"` Schema string `json:"schema" yaml:"schema"` }
JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.
func NewJSONSchemaConfig ¶
func NewJSONSchemaConfig() JSONSchemaConfig
NewJSONSchemaConfig returns a JSONSchemaConfig with default values.
type Log ¶
type Log struct {
// contains filtered or unexported fields
}
Log is a processor that prints a log event each time it processes a message.
func (*Log) CloseAsync ¶
func (l *Log) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Log) ProcessMessage ¶
ProcessMessage logs an event and returns the message unchanged.
type LogConfig ¶
type LogConfig struct { Level string `json:"level" yaml:"level"` Fields map[string]string `json:"fields" yaml:"fields"` FieldsMapping string `json:"fields_mapping" yaml:"fields_mapping"` Message string `json:"message" yaml:"message"` }
LogConfig contains configuration fields for the Log processor.
func NewLogConfig ¶
func NewLogConfig() LogConfig
NewLogConfig returns a LogConfig with default values.
type Metric ¶
type Metric struct {
// contains filtered or unexported fields
}
Metric is a processor that creates a metric from extracted values from a message part.
func (*Metric) CloseAsync ¶
func (m *Metric) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Metric) ProcessMessage ¶
ProcessMessage applies the processor to a message
type MetricConfig ¶
type MetricConfig struct { Type string `json:"type" yaml:"type"` Name string `json:"name" yaml:"name"` Labels map[string]string `json:"labels" yaml:"labels"` Value string `json:"value" yaml:"value"` }
MetricConfig contains configuration fields for the Metric processor.
func NewMetricConfig ¶
func NewMetricConfig() MetricConfig
NewMetricConfig returns a MetricConfig with default values.
type MongoDBConfig ¶
type MongoDBConfig struct { MongoDB client.Config `json:",inline" yaml:",inline"` WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"` Operation string `json:"operation" yaml:"operation"` FilterMap string `json:"filter_map" yaml:"filter_map"` DocumentMap string `json:"document_map" yaml:"document_map"` Upsert bool `json:"upsert" yaml:"upsert"` HintMap string `json:"hint_map" yaml:"hint_map"` RetryConfig retries.Config `json:",inline" yaml:",inline"` JSONMarshalMode client.JSONMarshalMode `json:"json_marshal_mode" yaml:"json_marshal_mode"` }
MongoDBConfig contains configuration fields for the MongoDB processor.
func NewMongoDBConfig ¶
func NewMongoDBConfig() MongoDBConfig
NewMongoDBConfig returns a MongoDBConfig with default values.
type Noop ¶
type Noop struct{}
Noop is a no-op processor that does nothing.
func (*Noop) CloseAsync ¶
func (c *Noop) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Noop) ProcessMessage ¶
ProcessMessage does nothing and returns the message unchanged.
type NoopConfig ¶
type NoopConfig struct{}
NoopConfig configures the no-op processor.
func NewNoopConfig ¶
func NewNoopConfig() NoopConfig
NewNoopConfig creates a new default no-op processor config.
type ParallelConfig ¶
type ParallelConfig struct { Cap int `json:"cap" yaml:"cap"` Processors []Config `json:"processors" yaml:"processors"` }
ParallelConfig is a config struct containing fields for the Parallel processor.
func NewParallelConfig ¶
func NewParallelConfig() ParallelConfig
NewParallelConfig returns a default ParallelConfig.
type ParseLogConfig ¶
type ParseLogConfig struct { Format string `json:"format" yaml:"format"` Codec string `json:"codec" yaml:"codec"` BestEffort bool `json:"best_effort" yaml:"best_effort"` WithRFC3339 bool `json:"allow_rfc3339" yaml:"allow_rfc3339"` WithYear string `json:"default_year" yaml:"default_year"` WithTimezone string `json:"default_timezone" yaml:"default_timezone"` }
ParseLogConfig contains configuration fields for the ParseLog processor.
func NewParseLogConfig ¶
func NewParseLogConfig() ParseLogConfig
NewParseLogConfig returns a ParseLogConfig with default values.
type ProtobufConfig ¶
type ProtobufConfig struct { Operator string `json:"operator" yaml:"operator"` Message string `json:"message" yaml:"message"` ImportPaths []string `json:"import_paths" yaml:"import_paths"` }
ProtobufConfig contains configuration fields for the Protobuf processor.
func NewProtobufConfig ¶
func NewProtobufConfig() ProtobufConfig
NewProtobufConfig returns a ProtobufConfig with default values.
type RateLimitConfig ¶
type RateLimitConfig struct {
Resource string `json:"resource" yaml:"resource"`
}
RateLimitConfig contains configuration fields for the RateLimit processor.
func NewRateLimitConfig ¶
func NewRateLimitConfig() RateLimitConfig
NewRateLimitConfig returns a RateLimitConfig with default values.
type RedisConfig ¶
type RedisConfig struct { bredis.Config `json:",inline" yaml:",inline"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` // TODO: V4 replace this Retries int `json:"retries" yaml:"retries"` RetryPeriod string `json:"retry_period" yaml:"retry_period"` }
RedisConfig contains configuration fields for the Redis processor.
func NewRedisConfig ¶
func NewRedisConfig() RedisConfig
NewRedisConfig returns a RedisConfig with default values.
type Resource ¶
type Resource struct {
// contains filtered or unexported fields
}
Resource is a processor that returns the result of a processor resource.
func (*Resource) CloseAsync ¶
func (r *Resource) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Resource) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type SelectPartsConfig ¶
type SelectPartsConfig struct {
Parts []int `json:"parts" yaml:"parts"`
}
SelectPartsConfig contains configuration fields for the SelectParts processor.
func NewSelectPartsConfig ¶
func NewSelectPartsConfig() SelectPartsConfig
NewSelectPartsConfig returns a SelectPartsConfig with default values.
type SleepConfig ¶
type SleepConfig struct {
Duration string `json:"duration" yaml:"duration"`
}
SleepConfig contains configuration fields for the Sleep processor.
func NewSleepConfig ¶
func NewSleepConfig() SleepConfig
NewSleepConfig returns a SleepConfig with default values.
type SplitConfig ¶
type SplitConfig struct { Size int `json:"size" yaml:"size"` ByteSize int `json:"byte_size" yaml:"byte_size"` }
SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.
func NewSplitConfig ¶
func NewSplitConfig() SplitConfig
NewSplitConfig returns a SplitConfig with default values.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` CodecSend string `json:"codec_send" yaml:"codec_send"` CodecRecv string `json:"codec_recv" yaml:"codec_recv"` }
SubprocessConfig contains configuration fields for the Subprocess processor.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig returns a SubprocessConfig with default values.
type SwitchCaseConfig ¶
type SwitchCaseConfig struct { Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` Fallthrough bool `json:"fallthrough" yaml:"fallthrough"` }
SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.
func NewSwitchCaseConfig ¶
func NewSwitchCaseConfig() SwitchCaseConfig
NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.
func (*SwitchCaseConfig) UnmarshalJSON ¶
func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.
func (*SwitchCaseConfig) UnmarshalYAML ¶
func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type SwitchConfig ¶
type SwitchConfig []SwitchCaseConfig
SwitchConfig is a config struct containing fields for the Switch processor.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig returns a default SwitchConfig.
type SyncResponse ¶
type SyncResponse struct {
// contains filtered or unexported fields
}
SyncResponse is a processor that prints a log event each time it processes a message.
func (*SyncResponse) CloseAsync ¶
func (s *SyncResponse) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*SyncResponse) ProcessMessage ¶
ProcessMessage logs an event and returns the message unchanged.
func (*SyncResponse) WaitForClose ¶
func (s *SyncResponse) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type SyncResponseConfig ¶
type SyncResponseConfig struct{}
SyncResponseConfig contains configuration fields for the SyncResponse processor.
func NewSyncResponseConfig ¶
func NewSyncResponseConfig() SyncResponseConfig
NewSyncResponseConfig returns a SyncResponseConfig with default values.
type TryConfig ¶
type TryConfig []Config
TryConfig is a config struct containing fields for the Try processor.
type TypeSpec ¶
type TypeSpec struct { // UsesBatches indicates whether this processors functionality is best // applied on messages that are already batched. UsesBatches bool Status docs.Status Version string Summary string Description string Categories []Category Footnotes string FieldSpecs docs.FieldSpecs Examples []docs.AnnotatedExample // contains filtered or unexported fields }
TypeSpec Constructor and a usage description for each processor type.
type UnarchiveConfig ¶
type UnarchiveConfig struct {
Format string `json:"format" yaml:"format"`
}
UnarchiveConfig contains configuration fields for the Unarchive processor.
func NewUnarchiveConfig ¶
func NewUnarchiveConfig() UnarchiveConfig
NewUnarchiveConfig returns a UnarchiveConfig with default values.
type WhileConfig ¶
type WhileConfig struct { AtLeastOnce bool `json:"at_least_once" yaml:"at_least_once"` MaxLoops int `json:"max_loops" yaml:"max_loops"` Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
WhileConfig is a config struct containing fields for the While processor.
func NewWhileConfig ¶
func NewWhileConfig() WhileConfig
NewWhileConfig returns a default WhileConfig.
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
Workflow is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.
func NewWorkflow ¶
func NewWorkflow(conf WorkflowConfig, mgr interop.Manager) (*Workflow, error)
NewWorkflow instanciates a new workflow processor.
func (*Workflow) CloseAsync ¶
func (w *Workflow) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Workflow) ProcessMessage ¶
ProcessMessage applies workflow stages to each part of a message type.
type WorkflowConfig ¶
type WorkflowConfig struct { MetaPath string `json:"meta_path" yaml:"meta_path"` Order [][]string `json:"order" yaml:"order"` BranchResources []string `json:"branch_resources" yaml:"branch_resources"` Branches map[string]BranchConfig `json:"branches" yaml:"branches"` }
WorkflowConfig is a config struct containing fields for the Workflow processor.
func NewWorkflowConfig ¶
func NewWorkflowConfig() WorkflowConfig
NewWorkflowConfig returns a default WorkflowConfig.
Source Files ¶
- archive.go
- avro.go
- awk.go
- bloblang.go
- bounds_check.go
- branch.go
- cache.go
- catch.go
- compress.go
- config_mongodb.go
- config_redis.go
- constructor.go
- decompress.go
- dedupe.go
- docs.go
- for_each.go
- grok.go
- group_by.go
- group_by_value.go
- http.go
- insert_part.go
- jmespath.go
- jq.go
- jsonschema.go
- log.go
- metric.go
- noop.go
- package.go
- parallel.go
- parse_log.go
- protobuf.go
- rate_limit.go
- resource.go
- select_parts.go
- sleep.go
- split.go
- subprocess.go
- switch.go
- sync_response.go
- try.go
- unarchive.go
- util.go
- while.go
- workflow.go
- workflow_branch_map.go
- xml.go