Documentation ¶
Overview ¶
Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.
Index ¶
- Constants
- Variables
- func Block(typeStr, reason string)
- func ClearFail(part types.Part)
- func Descriptions() string
- func DocumentPlugin(typeString, description string, configSanitiser PluginConfigSanitiser)
- func ExecuteAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)
- func ExecuteCatchAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)
- func ExecuteTryAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)
- func FlagErr(part types.Part, err error)
- func FlagFail(part types.Part)
- func GetFail(part types.Part) string
- func HasFailed(part types.Part) bool
- func IteratePartsWithSpan(operationName string, parts []int, msg types.Message, ...)
- func PluginCount() int
- func PluginDescriptions() string
- func RegisterPlugin(typeString string, configConstructor PluginConfigConstructor, ...)
- func SanitiseConfig(conf Config) (interface{}, error)
- func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
- type AWK
- type AWKConfig
- type Archive
- type ArchiveConfig
- type Avro
- type AvroConfig
- type Batch
- type BatchConfig
- type Bloblang
- type BloblangConfig
- type BoundsCheck
- type BoundsCheckConfig
- type Branch
- type BranchConfig
- type Cache
- type CacheConfig
- type Catch
- type CatchConfig
- type Category
- type Compress
- type CompressConfig
- type Conditional
- type ConditionalConfig
- type Config
- type ConstructorFunc
- type DAGDepsConfig
- type Decode
- type DecodeConfig
- type Decompress
- type DecompressConfig
- type Dedupe
- type DedupeConfig
- type DepProcessMapConfig
- type Encode
- type EncodeConfig
- type Filter
- type FilterConfig
- type FilterParts
- type FilterPartsConfig
- type ForEach
- type ForEachConfig
- type Grok
- type GrokConfig
- type GroupBy
- type GroupByConfig
- type GroupByElement
- type GroupByValue
- type GroupByValueConfig
- type HTTP
- type HTTPConfig
- type Hash
- type HashConfig
- type HashSample
- type HashSampleConfig
- type InsertPart
- type InsertPartConfig
- type JMESPath
- type JMESPathConfig
- type JQ
- type JQConfig
- type JSON
- type JSONConfig
- type JSONSchema
- type JSONSchemaConfig
- type Lambda
- type LambdaConfig
- type Log
- type LogConfig
- type MergeJSON
- type MergeJSONConfig
- type Metadata
- type MetadataConfig
- type Metric
- type MetricConfig
- type MongoDBConfig
- type Noop
- type NoopConfig
- type Number
- type NumberConfig
- type Parallel
- type ParallelConfig
- type ParseLog
- type ParseLogConfig
- type PluginConfigConstructor
- type PluginConfigSanitiser
- type PluginConstructor
- type ProcessDAG
- type ProcessDAGConfig
- type ProcessField
- type ProcessFieldConfig
- type ProcessMap
- func (p *ProcessMap) CloseAsync()
- func (p *ProcessMap) CreateResult(msg types.Message) error
- func (p *ProcessMap) OverlayResult(payload, response types.Message) ([]int, error)
- func (p *ProcessMap) ProcessMessage(msg types.Message) ([]types.Message, types.Response)
- func (p *ProcessMap) TargetsProvided() []string
- func (p *ProcessMap) TargetsUsed() []string
- func (p *ProcessMap) WaitForClose(timeout time.Duration) error
- type ProcessMapConfig
- type Protobuf
- type ProtobufConfig
- type RateLimit
- type RateLimitConfig
- type Redis
- type RedisConfig
- type Resource
- type SQL
- type SQLConfig
- type Sample
- type SampleConfig
- type SelectParts
- type SelectPartsConfig
- type Sleep
- type SleepConfig
- type Split
- type SplitConfig
- type Subprocess
- type SubprocessConfig
- type Switch
- type SwitchCaseConfig
- type SwitchConfig
- type SyncResponse
- type SyncResponseConfig
- type Text
- type TextConfig
- type Throttle
- type ThrottleConfig
- type Try
- type TryConfig
- type Type
- func New(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWK(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSLambda(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewArchive(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAvro(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBatch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBloblang(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBloblangFromExecutor(exec *mapping.Executor, log log.Modular, stats metrics.Type) Type
- func NewBoundsCheck(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBranch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewCache(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewCatch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewCompress(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewConditional(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDecode(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDecompress(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDedupe(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewEncode(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFilter(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFilterParts(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewForEach(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewGrok(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewGroupBy(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewGroupByValue(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHTTP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHash(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHashSample(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewInsertPart(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewJMESPath(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewJQ(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewJSON(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewJSONSchema(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewLambda(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewLog(conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type) (Type, error)
- func NewMergeJSON(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewMetadata(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewMetric(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNoop(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNumber(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewParallel(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewParseLog(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewProcessBatch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewProcessDAG(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewProcessField(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewProtobuf(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRateLimit(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewResource(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSQL(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSample(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSelectParts(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSleep(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSplit(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSubprocess(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSwitch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSyncResponse(conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type) (Type, error)
- func NewText(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewThrottle(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewTry(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewUnarchive(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewWhile(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewWorkflow(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewXML(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- type TypeSpec
- type Unarchive
- type UnarchiveConfig
- type While
- type WhileConfig
- type Workflow
- type WorkflowConfig
- type XML
- type XMLConfig
Constants ¶
const ( TypeArchive = "archive" TypeAvro = "avro" TypeAWK = "awk" TypeAWSLambda = "aws_lambda" TypeBatch = "batch" TypeBloblang = "bloblang" TypeBoundsCheck = "bounds_check" TypeBranch = "branch" TypeCache = "cache" TypeCatch = "catch" TypeCompress = "compress" TypeConditional = "conditional" TypeDecode = "decode" TypeDecompress = "decompress" TypeDedupe = "dedupe" TypeEncode = "encode" TypeFilter = "filter" TypeFilterParts = "filter_parts" TypeForEach = "for_each" TypeGrok = "grok" TypeGroupBy = "group_by" TypeGroupByValue = "group_by_value" TypeHash = "hash" TypeHashSample = "hash_sample" TypeHTTP = "http" TypeInsertPart = "insert_part" TypeJMESPath = "jmespath" TypeJQ = "jq" TypeJSON = "json" TypeJSONSchema = "json_schema" TypeLambda = "lambda" TypeLog = "log" TypeMergeJSON = "merge_json" TypeMetadata = "metadata" TypeMetric = "metric" TypeMongoDB = "mongodb" TypeNoop = "noop" TypeNumber = "number" TypeParallel = "parallel" TypeParseLog = "parse_log" TypeProcessBatch = "process_batch" TypeProcessDAG = "process_dag" TypeProcessField = "process_field" TypeProcessMap = "process_map" TypeProtobuf = "protobuf" TypeRateLimit = "rate_limit" TypeRedis = "redis" TypeResource = "resource" TypeSample = "sample" TypeSelectParts = "select_parts" TypeSleep = "sleep" TypeSplit = "split" TypeSQL = "sql" TypeSubprocess = "subprocess" TypeSwitch = "switch" TypeSyncResponse = "sync_response" TypeText = "text" TypeTry = "try" TypeThrottle = "throttle" TypeUnarchive = "unarchive" TypeWhile = "while" TypeWorkflow = "workflow" TypeXML = "xml" )
String constants representing each processor type. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
Variables ¶
var Constructors = map[string]TypeSpec{}
Constructors is a map of all processor types with their specs.
var DocsUsesBatches = `` /* 177-byte string literal not displayed */
DocsUsesBatches returns a documentation paragraph regarding processors that benefit from input level batching.
var FailFlagKey = types.FailFlagKey
FailFlagKey is a metadata key used for flagging processor errors in Benthos. If a message part has any non-empty value for this metadata key then it will be interpretted as having failed a processor step somewhere in the pipeline.
var PartsFieldSpec = docs.FieldInt(
"parts",
`An optional array of message indexes of a batch that the processor should apply to.
If left empty all messages are processed. This field is only applicable when
batching messages [at the input level](/docs/configuration/batching).
Indexes can be negative, and if so the part will be selected from the end
counting backwards starting from -1.`,
).Array().Advanced()
PartsFieldSpec documents the parts field common to many processor types.
Functions ¶
func Block ¶
func Block(typeStr, reason string)
Block replaces the constructor of a Benthos processor such that its construction will always return an error. This is useful for building strict pipelines where certain processors should not be available. NOTE: This does not remove the processor from the configuration spec, and normalisation will still work the same for blocked processors.
EXPERIMENTAL: This function is experimental and therefore subject to change outside of major version releases.
func Descriptions ¶
func Descriptions() string
Descriptions returns a formatted string of collated descriptions of each type.
func DocumentPlugin ¶
func DocumentPlugin( typeString, description string, configSanitiser PluginConfigSanitiser, )
DocumentPlugin adds a description and an optional configuration sanitiser function to the definition of a registered plugin. This improves the documentation generated by PluginDescriptions.
func ExecuteAll ¶
ExecuteAll attempts to execute a slice of processors to a message. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func ExecuteCatchAll ¶
func ExecuteCatchAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)
ExecuteCatchAll attempts to execute a slice of processors to only messages that have failed a processing step. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func ExecuteTryAll ¶
func ExecuteTryAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response)
ExecuteTryAll attempts to execute a slice of processors to messages, if a message has failed a processing step it is prevented from being sent to subsequent processors. Returns N resulting messages or a response. The response may indicate either a NoAck in the event of the message being buffered or an unrecoverable error.
func FlagErr ¶
FlagErr marks a message part as having failed at a processing step with an error message. If the error is nil the message part remains unchanged.
func GetFail ¶ added in v3.26.0
GetFail returns an error string for a message part if it has failed, or an empty string if not.
func IteratePartsWithSpan ¶
func IteratePartsWithSpan( operationName string, parts []int, msg types.Message, iter func(int, opentracing.Span, types.Part) error, )
IteratePartsWithSpan iterates the parts of a message according to a slice of indexes (if empty all parts are iterated) and calls a func for each part along with a tracing span for that part. If an error is returned the part is flagged as failed and the span has the error logged.
func PluginCount ¶
func PluginCount() int
PluginCount returns the number of registered plugins. This does NOT count the standard set of components.
func PluginDescriptions ¶
func PluginDescriptions() string
PluginDescriptions generates and returns a markdown formatted document listing each registered plugin and an example configuration for it.
func RegisterPlugin ¶
func RegisterPlugin( typeString string, configConstructor PluginConfigConstructor, constructor PluginConstructor, )
RegisterPlugin registers a plugin by a unique name so that it can be constructed similar to regular processors. If configuration is not needed for this plugin then configConstructor can be nil. A constructor for the plugin itself must be provided.
func SanitiseConfig ¶
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
func WalkConstructors ¶ added in v3.43.0
func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
WalkConstructors iterates each component constructor.
Types ¶
type AWK ¶
type AWK struct {
// contains filtered or unexported fields
}
AWK is a processor that executes AWK programs on a message part and replaces the contents with the result.
func (*AWK) CloseAsync ¶
func (a *AWK) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*AWK) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type AWKConfig ¶
type AWKConfig struct { Parts []int `json:"parts" yaml:"parts"` Codec string `json:"codec" yaml:"codec"` Program string `json:"program" yaml:"program"` }
AWKConfig contains configuration fields for the AWK processor.
func NewAWKConfig ¶
func NewAWKConfig() AWKConfig
NewAWKConfig returns a AWKConfig with default values.
type Archive ¶
type Archive struct {
// contains filtered or unexported fields
}
Archive is a processor that can selectively archive parts of a message into a single part using a chosen archive type.
func (*Archive) CloseAsync ¶
func (d *Archive) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Archive) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type ArchiveConfig ¶
type ArchiveConfig struct { Format string `json:"format" yaml:"format"` Path string `json:"path" yaml:"path"` }
ArchiveConfig contains configuration fields for the Archive processor.
func NewArchiveConfig ¶
func NewArchiveConfig() ArchiveConfig
NewArchiveConfig returns a ArchiveConfig with default values.
type Avro ¶
type Avro struct {
// contains filtered or unexported fields
}
Avro is a processor that performs an operation on an Avro payload.
func (*Avro) CloseAsync ¶
func (p *Avro) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Avro) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type AvroConfig ¶
type AvroConfig struct { Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Encoding string `json:"encoding" yaml:"encoding"` Schema string `json:"schema" yaml:"schema"` SchemaPath string `json:"schema_path" yaml:"schema_path"` }
AvroConfig contains configuration fields for the Avro processor.
func NewAvroConfig ¶
func NewAvroConfig() AvroConfig
NewAvroConfig returns a AvroConfig with default values.
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch is a processor that combines messages into a batch until a size limit or other condition is reached, at which point the batch is sent out. When a message is combined without yet producing a batch a NoAck response is returned, which is interpretted as source types as an instruction to send another message through but hold off on acknowledging this one.
Eventually, when the batch reaches its target size, the batch is sent through the pipeline as a single message and an acknowledgement for that message determines whether the whole batch of messages are acknowledged.
TODO: V4 Remove me.
func (*Batch) CloseAsync ¶
func (b *Batch) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Batch) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type BatchConfig ¶
type BatchConfig struct { ByteSize int `json:"byte_size" yaml:"byte_size"` Count int `json:"count" yaml:"count"` Condition condition.Config `json:"condition" yaml:"condition"` Period string `json:"period" yaml:"period"` }
BatchConfig contains configuration fields for the Batch processor.
func NewBatchConfig ¶
func NewBatchConfig() BatchConfig
NewBatchConfig returns a BatchConfig with default values.
type Bloblang ¶ added in v3.13.0
type Bloblang struct {
// contains filtered or unexported fields
}
Bloblang is a processor that performs a Bloblang mapping.
func (*Bloblang) CloseAsync ¶ added in v3.13.0
func (b *Bloblang) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Bloblang) ProcessMessage ¶ added in v3.13.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type BloblangConfig ¶ added in v3.13.0
type BloblangConfig string
BloblangConfig contains configuration fields for the Bloblang processor.
func NewBloblangConfig ¶ added in v3.13.0
func NewBloblangConfig() BloblangConfig
NewBloblangConfig returns a BloblangConfig with default values.
type BoundsCheck ¶
type BoundsCheck struct {
// contains filtered or unexported fields
}
BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
func (*BoundsCheck) CloseAsync ¶
func (m *BoundsCheck) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*BoundsCheck) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*BoundsCheck) WaitForClose ¶
func (m *BoundsCheck) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type BoundsCheckConfig ¶
type BoundsCheckConfig struct { MaxParts int `json:"max_parts" yaml:"max_parts"` MinParts int `json:"min_parts" yaml:"min_parts"` MaxPartSize int `json:"max_part_size" yaml:"max_part_size"` MinPartSize int `json:"min_part_size" yaml:"min_part_size"` }
BoundsCheckConfig contains configuration fields for the BoundsCheck processor.
func NewBoundsCheckConfig ¶
func NewBoundsCheckConfig() BoundsCheckConfig
NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
type Branch ¶ added in v3.25.0
type Branch struct {
// contains filtered or unexported fields
}
Branch contains conditions and maps for transforming a batch of messages into a subset of request messages, and mapping results from those requests back into the original message batch.
func (*Branch) CloseAsync ¶ added in v3.25.0
func (b *Branch) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Branch) ProcessMessage ¶ added in v3.25.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type BranchConfig ¶ added in v3.25.0
type BranchConfig struct { RequestMap string `json:"request_map" yaml:"request_map"` Processors []Config `json:"processors" yaml:"processors"` ResultMap string `json:"result_map" yaml:"result_map"` }
BranchConfig contains configuration fields for the Branch processor.
func NewBranchConfig ¶ added in v3.25.0
func NewBranchConfig() BranchConfig
NewBranchConfig returns a BranchConfig with default values.
func (BranchConfig) Sanitise ¶ added in v3.25.0
func (b BranchConfig) Sanitise() (map[string]interface{}, error)
Sanitise the configuration into a minimal structure that can be printed without changing the intent.
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
Cache is a processor that stores or retrieves data from a cache for each message of a batch via an interpolated key.
func (*Cache) CloseAsync ¶
func (c *Cache) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Cache) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type CacheConfig ¶
type CacheConfig struct { Cache string `json:"cache" yaml:"cache"` Resource string `json:"resource" yaml:"resource"` Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` Value string `json:"value" yaml:"value"` TTL string `json:"ttl" yaml:"ttl"` }
CacheConfig contains configuration fields for the Cache processor.
func NewCacheConfig ¶
func NewCacheConfig() CacheConfig
NewCacheConfig returns a CacheConfig with default values.
type Catch ¶
type Catch struct {
// contains filtered or unexported fields
}
Catch is a processor that applies a list of child processors to each message of a batch individually, where processors are skipped for messages that failed a previous processor step.
func (*Catch) CloseAsync ¶
func (p *Catch) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Catch) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type CatchConfig ¶
type CatchConfig []Config
CatchConfig is a config struct containing fields for the Catch processor.
func NewCatchConfig ¶
func NewCatchConfig() CatchConfig
NewCatchConfig returns a default CatchConfig.
type Category ¶ added in v3.26.0
type Category string
Category describes the general purpose of a processor.
type Compress ¶
type Compress struct {
// contains filtered or unexported fields
}
Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.
func (*Compress) CloseAsync ¶
func (c *Compress) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Compress) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type CompressConfig ¶
type CompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Level int `json:"level" yaml:"level"` Parts []int `json:"parts" yaml:"parts"` }
CompressConfig contains configuration fields for the Compress processor.
func NewCompressConfig ¶
func NewCompressConfig() CompressConfig
NewCompressConfig returns a CompressConfig with default values.
type Conditional ¶
type Conditional struct {
// contains filtered or unexported fields
}
Conditional is a processor that only applies child processors under a certain condition.
func (*Conditional) CloseAsync ¶
func (c *Conditional) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Conditional) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*Conditional) WaitForClose ¶
func (c *Conditional) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type ConditionalConfig ¶
type ConditionalConfig struct { Condition condition.Config `json:"condition" yaml:"condition"` Processors []Config `json:"processors" yaml:"processors"` ElseProcessors []Config `json:"else_processors" yaml:"else_processors"` }
ConditionalConfig is a config struct containing fields for the Conditional processor.
func NewConditionalConfig ¶
func NewConditionalConfig() ConditionalConfig
NewConditionalConfig returns a default ConditionalConfig.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Archive ArchiveConfig `json:"archive" yaml:"archive"` Avro AvroConfig `json:"avro" yaml:"avro"` AWK AWKConfig `json:"awk" yaml:"awk"` AWSLambda LambdaConfig `json:"aws_lambda" yaml:"aws_lambda"` Batch BatchConfig `json:"batch" yaml:"batch"` Bloblang BloblangConfig `json:"bloblang" yaml:"bloblang"` BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"` Branch BranchConfig `json:"branch" yaml:"branch"` Cache CacheConfig `json:"cache" yaml:"cache"` Catch CatchConfig `json:"catch" yaml:"catch"` Compress CompressConfig `json:"compress" yaml:"compress"` Conditional ConditionalConfig `json:"conditional" yaml:"conditional"` Decode DecodeConfig `json:"decode" yaml:"decode"` Decompress DecompressConfig `json:"decompress" yaml:"decompress"` Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"` Encode EncodeConfig `json:"encode" yaml:"encode"` Filter FilterConfig `json:"filter" yaml:"filter"` FilterParts FilterPartsConfig `json:"filter_parts" yaml:"filter_parts"` ForEach ForEachConfig `json:"for_each" yaml:"for_each"` Grok GrokConfig `json:"grok" yaml:"grok"` GroupBy GroupByConfig `json:"group_by" yaml:"group_by"` GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"` Hash HashConfig `json:"hash" yaml:"hash"` HashSample HashSampleConfig `json:"hash_sample" yaml:"hash_sample"` HTTP HTTPConfig `json:"http" yaml:"http"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` JQ JQConfig `json:"jq" yaml:"jq"` JSON JSONConfig `json:"json" yaml:"json"` JSONSchema JSONSchemaConfig `json:"json_schema" yaml:"json_schema"` Lambda LambdaConfig `json:"lambda" yaml:"lambda"` Log LogConfig `json:"log" yaml:"log"` MergeJSON MergeJSONConfig `json:"merge_json" yaml:"merge_json"` Metadata MetadataConfig `json:"metadata" yaml:"metadata"` Metric MetricConfig `json:"metric" yaml:"metric"` MongoDB MongoDBConfig `json:"mongodb" yaml:"mongodb"` Noop NoopConfig `json:"noop" yaml:"noop"` Number NumberConfig `json:"number" yaml:"number"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` Parallel ParallelConfig `json:"parallel" yaml:"parallel"` ParseLog ParseLogConfig `json:"parse_log" yaml:"parse_log"` ProcessBatch ForEachConfig `json:"process_batch" yaml:"process_batch"` ProcessDAG ProcessDAGConfig `json:"process_dag" yaml:"process_dag"` ProcessField ProcessFieldConfig `json:"process_field" yaml:"process_field"` ProcessMap ProcessMapConfig `json:"process_map" yaml:"process_map"` Protobuf ProtobufConfig `json:"protobuf" yaml:"protobuf"` RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"` Redis RedisConfig `json:"redis" yaml:"redis"` Resource string `json:"resource" yaml:"resource"` Sample SampleConfig `json:"sample" yaml:"sample"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` Sleep SleepConfig `json:"sleep" yaml:"sleep"` Split SplitConfig `json:"split" yaml:"split"` SQL SQLConfig `json:"sql" yaml:"sql"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse SyncResponseConfig `json:"sync_response" yaml:"sync_response"` Text TextConfig `json:"text" yaml:"text"` Try TryConfig `json:"try" yaml:"try"` Throttle ThrottleConfig `json:"throttle" yaml:"throttle"` Unarchive UnarchiveConfig `json:"unarchive" yaml:"unarchive"` While WhileConfig `json:"while" yaml:"while"` Workflow WorkflowConfig `json:"workflow" yaml:"workflow"` XML XMLConfig `json:"xml" yaml:"xml"` }
Config is the all encompassing configuration struct for all processor types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
type ConstructorFunc ¶ added in v3.43.0
ConstructorFunc is a func signature able to construct a processor.
func GetDeprecatedPlugin ¶ added in v3.43.0
func GetDeprecatedPlugin(name string) (ConstructorFunc, bool)
GetDeprecatedPlugin returns a constructor for an old plugin if it exists.
type DAGDepsConfig ¶
type DAGDepsConfig struct {
Dependencies []string `json:"dependencies" yaml:"dependencies"`
}
DAGDepsConfig is a config containing dependency based configuration values for a ProcessDAG child.
func NewDAGDepsConfig ¶
func NewDAGDepsConfig() DAGDepsConfig
NewDAGDepsConfig returns a default DAGDepsConfig.
func (*DAGDepsConfig) UnmarshalJSON ¶
func (p *DAGDepsConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.
func (*DAGDepsConfig) UnmarshalYAML ¶
func (p *DAGDepsConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.
type Decode ¶
type Decode struct {
// contains filtered or unexported fields
}
Decode is a processor that can selectively decode parts of a message following a chosen scheme.
func (*Decode) CloseAsync ¶
func (c *Decode) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Decode) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type DecodeConfig ¶
type DecodeConfig struct { Scheme string `json:"scheme" yaml:"scheme"` Parts []int `json:"parts" yaml:"parts"` }
DecodeConfig contains configuration fields for the Decode processor.
func NewDecodeConfig ¶
func NewDecodeConfig() DecodeConfig
NewDecodeConfig returns a DecodeConfig with default values.
type Decompress ¶
type Decompress struct {
// contains filtered or unexported fields
}
Decompress is a processor that can decompress parts of a message following a chosen compression algorithm.
func (*Decompress) CloseAsync ¶
func (d *Decompress) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Decompress) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*Decompress) WaitForClose ¶
func (d *Decompress) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type DecompressConfig ¶
type DecompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Parts []int `json:"parts" yaml:"parts"` }
DecompressConfig contains configuration fields for the Decompress processor.
func NewDecompressConfig ¶
func NewDecompressConfig() DecompressConfig
NewDecompressConfig returns a DecompressConfig with default values.
type Dedupe ¶
type Dedupe struct {
// contains filtered or unexported fields
}
Dedupe is a processor that deduplicates messages either by hashing the full contents of message parts or by hashing the value of an interpolated string.
func (*Dedupe) CloseAsync ¶
func (d *Dedupe) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Dedupe) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type DedupeConfig ¶
type DedupeConfig struct { Cache string `json:"cache" yaml:"cache"` HashType string `json:"hash" yaml:"hash"` Parts []int `json:"parts" yaml:"parts"` // message parts to hash Key string `json:"key" yaml:"key"` DropOnCacheErr bool `json:"drop_on_err" yaml:"drop_on_err"` }
DedupeConfig contains configuration fields for the Dedupe processor.
func NewDedupeConfig ¶
func NewDedupeConfig() DedupeConfig
NewDedupeConfig returns a DedupeConfig with default values.
type DepProcessMapConfig ¶
type DepProcessMapConfig struct { DAGDepsConfig `json:",inline" yaml:",inline"` ProcessMapConfig `json:",inline" yaml:",inline"` }
DepProcessMapConfig contains a superset of a ProcessMap config and some DAG specific fields.
func NewDepProcessMapConfig ¶
func NewDepProcessMapConfig() DepProcessMapConfig
NewDepProcessMapConfig returns a default DepProcessMapConfig.
type Encode ¶
type Encode struct {
// contains filtered or unexported fields
}
Encode is a processor that can selectively encode parts of a message following a chosen scheme.
func (*Encode) CloseAsync ¶
func (c *Encode) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Encode) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type EncodeConfig ¶
type EncodeConfig struct { Scheme string `json:"scheme" yaml:"scheme"` Parts []int `json:"parts" yaml:"parts"` }
EncodeConfig contains configuration fields for the Encode processor.
func NewEncodeConfig ¶
func NewEncodeConfig() EncodeConfig
NewEncodeConfig returns a EncodeConfig with default values.
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter is a processor that checks each message against a condition and rejects the message if a condition returns false.
func (*Filter) CloseAsync ¶
func (c *Filter) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Filter) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type FilterConfig ¶
FilterConfig contains configuration fields for the Filter processor.
func NewFilterConfig ¶
func NewFilterConfig() FilterConfig
NewFilterConfig returns a FilterConfig with default values.
func (FilterConfig) MarshalYAML ¶
func (f FilterConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints the child condition instead of {}.
type FilterParts ¶
type FilterParts struct {
// contains filtered or unexported fields
}
FilterParts is a processor that checks each part from a message against a condition and removes the part if the condition returns false.
func (*FilterParts) CloseAsync ¶
func (c *FilterParts) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*FilterParts) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*FilterParts) WaitForClose ¶
func (c *FilterParts) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type FilterPartsConfig ¶
FilterPartsConfig contains configuration fields for the FilterParts processor.
func NewFilterPartsConfig ¶
func NewFilterPartsConfig() FilterPartsConfig
NewFilterPartsConfig returns a FilterPartsConfig with default values.
func (FilterPartsConfig) MarshalYAML ¶
func (f FilterPartsConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints the child condition instead of {}.
type ForEach ¶
type ForEach struct {
// contains filtered or unexported fields
}
ForEach is a processor that applies a list of child processors to each message of a batch individually.
func (*ForEach) CloseAsync ¶
func (p *ForEach) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*ForEach) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type ForEachConfig ¶
type ForEachConfig []Config
ForEachConfig is a config struct containing fields for the ForEach processor.
func NewForEachConfig ¶
func NewForEachConfig() ForEachConfig
NewForEachConfig returns a default ForEachConfig.
type Grok ¶
type Grok struct {
// contains filtered or unexported fields
}
Grok is a processor that executes Grok queries on a message part and replaces the contents with the result.
func (*Grok) CloseAsync ¶
func (g *Grok) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Grok) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type GrokConfig ¶
type GrokConfig struct { Parts []int `json:"parts" yaml:"parts"` Expressions []string `json:"expressions" yaml:"expressions"` RemoveEmpty bool `json:"remove_empty_values" yaml:"remove_empty_values"` NamedOnly bool `json:"named_captures_only" yaml:"named_captures_only"` UseDefaults bool `json:"use_default_patterns" yaml:"use_default_patterns"` To string `json:"output_format" yaml:"output_format"` PatternPaths []string `json:"pattern_paths" yaml:"pattern_paths"` PatternDefinitions map[string]string `json:"pattern_definitions" yaml:"pattern_definitions"` // TODO: V4 Remove this Patterns []string `json:"patterns" yaml:"patterns"` }
GrokConfig contains configuration fields for the Grok processor.
func NewGrokConfig ¶
func NewGrokConfig() GrokConfig
NewGrokConfig returns a GrokConfig with default values.
type GroupBy ¶
type GroupBy struct {
// contains filtered or unexported fields
}
GroupBy is a processor that group_bys messages into a message per part.
func (*GroupBy) CloseAsync ¶
func (g *GroupBy) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*GroupBy) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type GroupByConfig ¶
type GroupByConfig []GroupByElement
GroupByConfig is a configuration struct containing fields for the GroupBy processor, which breaks message batches down into N batches of a smaller size according to conditions.
func NewGroupByConfig ¶
func NewGroupByConfig() GroupByConfig
NewGroupByConfig returns a GroupByConfig with default values.
type GroupByElement ¶
type GroupByElement struct { Condition condition.Config `json:"condition" yaml:"condition"` Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` }
GroupByElement represents a group determined by a condition and a list of group specific processors.
type GroupByValue ¶
type GroupByValue struct {
// contains filtered or unexported fields
}
GroupByValue is a processor that breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.
func (*GroupByValue) CloseAsync ¶
func (g *GroupByValue) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*GroupByValue) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*GroupByValue) WaitForClose ¶
func (g *GroupByValue) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type GroupByValueConfig ¶
type GroupByValueConfig struct {
Value string `json:"value" yaml:"value"`
}
GroupByValueConfig is a configuration struct containing fields for the GroupByValue processor, which breaks message batches down into N batches of a smaller size according to a function interpolated string evaluated per message part.
func NewGroupByValueConfig ¶
func NewGroupByValueConfig() GroupByValueConfig
NewGroupByValueConfig returns a GroupByValueConfig with default values.
type HTTP ¶
type HTTP struct {
// contains filtered or unexported fields
}
HTTP is a processor that performs an HTTP request using the message as the request body, and returns the response.
func (*HTTP) CloseAsync ¶
func (h *HTTP) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*HTTP) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type HTTPConfig ¶
type HTTPConfig struct { Parallel bool `json:"parallel" yaml:"parallel"` MaxParallel int `json:"max_parallel" yaml:"max_parallel"` Client client.Config `json:"request" yaml:"request"` client.Config `json:",inline" yaml:",inline"` }
HTTPConfig contains configuration fields for the HTTP processor.
func NewHTTPConfig ¶
func NewHTTPConfig() HTTPConfig
NewHTTPConfig returns a HTTPConfig with default values.
type Hash ¶
type Hash struct {
// contains filtered or unexported fields
}
Hash is a processor that can selectively hash parts of a message following a chosen algorithm.
func (*Hash) CloseAsync ¶
func (c *Hash) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Hash) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type HashConfig ¶
type HashConfig struct { Parts []int `json:"parts" yaml:"parts"` Algorithm string `json:"algorithm" yaml:"algorithm"` Key string `json:"key" yaml:"key"` }
HashConfig contains configuration fields for the Hash processor.
func NewHashConfig ¶
func NewHashConfig() HashConfig
NewHashConfig returns a HashConfig with default values.
type HashSample ¶
type HashSample struct {
// contains filtered or unexported fields
}
HashSample is a processor that removes messages based on a sample factor by hashing its contents.
func (*HashSample) CloseAsync ¶
func (s *HashSample) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*HashSample) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*HashSample) WaitForClose ¶
func (s *HashSample) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type HashSampleConfig ¶
type HashSampleConfig struct { RetainMin float64 `json:"retain_min" yaml:"retain_min"` RetainMax float64 `json:"retain_max" yaml:"retain_max"` Parts []int `json:"parts" yaml:"parts"` // message parts to hash }
HashSampleConfig contains configuration fields for the HashSample processor.
func NewHashSampleConfig ¶
func NewHashSampleConfig() HashSampleConfig
NewHashSampleConfig returns a HashSampleConfig with default values.
type InsertPart ¶
type InsertPart struct {
// contains filtered or unexported fields
}
InsertPart is a processor that inserts a new message part at a specific index.
func (*InsertPart) CloseAsync ¶
func (p *InsertPart) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*InsertPart) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*InsertPart) WaitForClose ¶
func (p *InsertPart) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type InsertPartConfig ¶
type InsertPartConfig struct { Index int `json:"index" yaml:"index"` Content string `json:"content" yaml:"content"` }
InsertPartConfig contains configuration fields for the InsertPart processor.
func NewInsertPartConfig ¶
func NewInsertPartConfig() InsertPartConfig
NewInsertPartConfig returns a InsertPartConfig with default values.
type JMESPath ¶
type JMESPath struct {
// contains filtered or unexported fields
}
JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.
func (*JMESPath) CloseAsync ¶
func (p *JMESPath) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*JMESPath) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type JMESPathConfig ¶
type JMESPathConfig struct { Parts []int `json:"parts" yaml:"parts"` Query string `json:"query" yaml:"query"` }
JMESPathConfig contains configuration fields for the JMESPath processor.
func NewJMESPathConfig ¶
func NewJMESPathConfig() JMESPathConfig
NewJMESPathConfig returns a JMESPathConfig with default values.
type JQ ¶ added in v3.27.0
type JQ struct {
// contains filtered or unexported fields
}
JQ is a processor that passes messages through gojq.
func (*JQ) CloseAsync ¶ added in v3.27.0
func (*JQ) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*JQ) ProcessMessage ¶ added in v3.27.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type JQConfig ¶ added in v3.27.0
JQConfig contains configuration fields for the JQ processor.
func NewJQConfig ¶ added in v3.27.0
func NewJQConfig() JQConfig
NewJQConfig returns a JQConfig with default values.
type JSON ¶
type JSON struct {
// contains filtered or unexported fields
}
JSON is a processor that performs an operation on a JSON payload.
func (*JSON) CloseAsync ¶
func (p *JSON) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*JSON) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type JSONConfig ¶
type JSONConfig struct { Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Path string `json:"path" yaml:"path"` Value rawJSONValue `json:"value" yaml:"value"` }
JSONConfig contains configuration fields for the JSON processor.
func NewJSONConfig ¶
func NewJSONConfig() JSONConfig
NewJSONConfig returns a JSONConfig with default values.
type JSONSchema ¶ added in v3.5.0
type JSONSchema struct {
// contains filtered or unexported fields
}
JSONSchema is a processor that validates messages against a specified json schema.
func (*JSONSchema) CloseAsync ¶ added in v3.5.0
func (s *JSONSchema) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*JSONSchema) ProcessMessage ¶ added in v3.5.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*JSONSchema) WaitForClose ¶ added in v3.5.0
func (s *JSONSchema) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type JSONSchemaConfig ¶ added in v3.5.0
type JSONSchemaConfig struct { Parts []int `json:"parts" yaml:"parts"` SchemaPath string `json:"schema_path" yaml:"schema_path"` Schema string `json:"schema" yaml:"schema"` }
JSONSchemaConfig is a configuration struct containing fields for the jsonschema processor.
func NewJSONSchemaConfig ¶ added in v3.5.0
func NewJSONSchemaConfig() JSONSchemaConfig
NewJSONSchemaConfig returns a JSONSchemaConfig with default values.
type Lambda ¶
type Lambda struct {
// contains filtered or unexported fields
}
Lambda is a processor that invokes an AWS Lambda using the message as the request body, and returns the response.
func (*Lambda) CloseAsync ¶
func (l *Lambda) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Lambda) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type LambdaConfig ¶
type LambdaConfig struct { client.Config `json:",inline" yaml:",inline"` Parallel bool `json:"parallel" yaml:"parallel"` }
LambdaConfig contains configuration fields for the Lambda processor.
func NewLambdaConfig ¶
func NewLambdaConfig() LambdaConfig
NewLambdaConfig returns a LambdaConfig with default values.
type Log ¶
type Log struct {
// contains filtered or unexported fields
}
Log is a processor that prints a log event each time it processes a message.
func (*Log) CloseAsync ¶
func (l *Log) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Log) ProcessMessage ¶
ProcessMessage logs an event and returns the message unchanged.
type LogConfig ¶
type LogConfig struct { Level string `json:"level" yaml:"level"` Fields map[string]string `json:"fields" yaml:"fields"` FieldsMapping string `json:"fields_mapping" yaml:"fields_mapping"` Message string `json:"message" yaml:"message"` }
LogConfig contains configuration fields for the Log processor.
func NewLogConfig ¶
func NewLogConfig() LogConfig
NewLogConfig returns a LogConfig with default values.
type MergeJSON ¶
type MergeJSON struct {
// contains filtered or unexported fields
}
MergeJSON is a processor that merges JSON parsed message parts into a single value.
func (*MergeJSON) CloseAsync ¶
func (p *MergeJSON) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*MergeJSON) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type MergeJSONConfig ¶
type MergeJSONConfig struct { Parts []int `json:"parts" yaml:"parts"` RetainParts bool `json:"retain_parts" yaml:"retain_parts"` }
MergeJSONConfig contains configuration fields for the MergeJSON processor.
func NewMergeJSONConfig ¶
func NewMergeJSONConfig() MergeJSONConfig
NewMergeJSONConfig returns a MergeJSONConfig with default values.
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
Metadata is a processor that performs an operation on the Metadata of a message.
func (*Metadata) CloseAsync ¶
func (p *Metadata) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Metadata) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type MetadataConfig ¶
type MetadataConfig struct { Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` Value string `json:"value" yaml:"value"` }
MetadataConfig contains configuration fields for the Metadata processor.
func NewMetadataConfig ¶
func NewMetadataConfig() MetadataConfig
NewMetadataConfig returns a MetadataConfig with default values.
type Metric ¶
type Metric struct {
// contains filtered or unexported fields
}
Metric is a processor that creates a metric from extracted values from a message part.
func (*Metric) CloseAsync ¶
func (m *Metric) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Metric) ProcessMessage ¶
ProcessMessage applies the processor to a message
type MetricConfig ¶
type MetricConfig struct { Parts []int `json:"parts" yaml:"parts"` Type string `json:"type" yaml:"type"` Path string `json:"path" yaml:"path"` Name string `json:"name" yaml:"name"` Labels map[string]string `json:"labels" yaml:"labels"` Value string `json:"value" yaml:"value"` }
MetricConfig contains configuration fields for the Metric processor.
func NewMetricConfig ¶
func NewMetricConfig() MetricConfig
NewMetricConfig returns a MetricConfig with default values.
type MongoDBConfig ¶ added in v3.43.0
type MongoDBConfig struct { MongoDB client.Config `json:",inline" yaml:",inline"` WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"` Parts []int `json:"parts" yaml:"parts"` Operation string `json:"operation" yaml:"operation"` FilterMap string `json:"filter_map" yaml:"filter_map"` DocumentMap string `json:"document_map" yaml:"document_map"` HintMap string `json:"hint_map" yaml:"hint_map"` RetryConfig retries.Config `json:",inline" yaml:",inline"` }
MongoDBConfig contains configuration fields for the MongoDB processor.
func NewMongoDBConfig ¶ added in v3.43.0
func NewMongoDBConfig() MongoDBConfig
NewMongoDBConfig returns a MongoDBConfig with default values.
type Noop ¶
type Noop struct{}
Noop is a no-op processor that does nothing.
func (*Noop) CloseAsync ¶
func (c *Noop) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Noop) ProcessMessage ¶
ProcessMessage does nothing and returns the message unchanged.
type NoopConfig ¶ added in v3.33.0
type NoopConfig struct{}
NoopConfig configures the no-op processor.
func NewNoopConfig ¶ added in v3.33.0
func NewNoopConfig() NoopConfig
NewNoopConfig creates a new default no-op processor config.
type Number ¶
type Number struct {
// contains filtered or unexported fields
}
Number is a processor that performs number based operations on payloads.
func (*Number) CloseAsync ¶
func (n *Number) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Number) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type NumberConfig ¶
type NumberConfig struct { Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Value interface{} `json:"value" yaml:"value"` }
NumberConfig contains configuration fields for the Number processor.
func NewNumberConfig ¶
func NewNumberConfig() NumberConfig
NewNumberConfig returns a NumberConfig with default values.
type Parallel ¶
type Parallel struct {
// contains filtered or unexported fields
}
Parallel is a processor that applies a list of child processors to each message of a batch individually.
func (*Parallel) CloseAsync ¶
func (p *Parallel) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Parallel) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type ParallelConfig ¶
type ParallelConfig struct { Cap int `json:"cap" yaml:"cap"` Processors []Config `json:"processors" yaml:"processors"` }
ParallelConfig is a config struct containing fields for the Parallel processor.
func NewParallelConfig ¶
func NewParallelConfig() ParallelConfig
NewParallelConfig returns a default ParallelConfig.
type ParseLog ¶ added in v3.10.0
type ParseLog struct {
// contains filtered or unexported fields
}
ParseLog is a processor that parses properly formatted messages.
func (*ParseLog) CloseAsync ¶ added in v3.10.0
func (s *ParseLog) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*ParseLog) ProcessMessage ¶ added in v3.10.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type ParseLogConfig ¶ added in v3.10.0
type ParseLogConfig struct { Parts []int `json:"parts" yaml:"parts"` Format string `json:"format" yaml:"format"` Codec string `json:"codec" yaml:"codec"` BestEffort bool `json:"best_effort" yaml:"best_effort"` WithRFC3339 bool `json:"allow_rfc3339" yaml:"allow_rfc3339"` WithYear string `json:"default_year" yaml:"default_year"` WithTimezone string `json:"default_timezone" yaml:"default_timezone"` }
ParseLogConfig contains configuration fields for the ParseLog processor.
func NewParseLogConfig ¶ added in v3.10.0
func NewParseLogConfig() ParseLogConfig
NewParseLogConfig returns a ParseLogConfig with default values.
type PluginConfigConstructor ¶
type PluginConfigConstructor func() interface{}
PluginConfigConstructor is a func that returns a pointer to a new and fully populated configuration struct for a plugin type.
type PluginConfigSanitiser ¶
type PluginConfigSanitiser func(conf interface{}) interface{}
PluginConfigSanitiser is a function that takes a configuration object for a plugin and returns a sanitised (minimal) version of it for printing in examples and plugin documentation.
This function is useful for when a plugins configuration struct is very large and complex, but can sometimes be expressed in a more concise way without losing the original intent.
type PluginConstructor ¶
type PluginConstructor func( config interface{}, manager types.Manager, logger log.Modular, metrics metrics.Type, ) (types.Processor, error)
PluginConstructor is a func that constructs a Benthos processor plugin. These are plugins that are specific to certain use cases, experimental, private or otherwise unfit for widespread general use. Any number of plugins can be specified when using Benthos as a framework.
The configuration object will be the result of the PluginConfigConstructor after overlaying the user configuration.
type ProcessDAG ¶
type ProcessDAG struct {
// contains filtered or unexported fields
}
ProcessDAG is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.
func (*ProcessDAG) CloseAsync ¶
func (p *ProcessDAG) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*ProcessDAG) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*ProcessDAG) WaitForClose ¶
func (p *ProcessDAG) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type ProcessDAGConfig ¶
type ProcessDAGConfig map[string]DepProcessMapConfig
ProcessDAGConfig is a config struct containing fields for the ProcessDAG processor.
func NewProcessDAGConfig ¶
func NewProcessDAGConfig() ProcessDAGConfig
NewProcessDAGConfig returns a default ProcessDAGConfig.
type ProcessField ¶
type ProcessField struct {
// contains filtered or unexported fields
}
ProcessField is a processor that applies a list of child processors to a field extracted from the original payload.
func (*ProcessField) CloseAsync ¶
func (p *ProcessField) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*ProcessField) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*ProcessField) WaitForClose ¶
func (p *ProcessField) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type ProcessFieldConfig ¶
type ProcessFieldConfig struct { Parts []int `json:"parts" yaml:"parts"` Codec string `json:"codec" yaml:"codec"` Path string `json:"path" yaml:"path"` ResultType string `json:"result_type" yaml:"result_type"` Processors []Config `json:"processors" yaml:"processors"` }
ProcessFieldConfig is a config struct containing fields for the ProcessField processor.
func NewProcessFieldConfig ¶
func NewProcessFieldConfig() ProcessFieldConfig
NewProcessFieldConfig returns a default ProcessFieldConfig.
type ProcessMap ¶
type ProcessMap struct {
// contains filtered or unexported fields
}
ProcessMap is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.
func NewProcessMap ¶
func NewProcessMap( conf ProcessMapConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*ProcessMap, error)
NewProcessMap returns a ProcessField processor.
func (*ProcessMap) CloseAsync ¶
func (p *ProcessMap) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*ProcessMap) CreateResult ¶
func (p *ProcessMap) CreateResult(msg types.Message) error
CreateResult performs reduction and child processors to a payload. The size of the payload will remain unchanged, where reduced indexes are nil. This result can be overlayed onto the original message in order to complete the map.
func (*ProcessMap) OverlayResult ¶
func (p *ProcessMap) OverlayResult(payload, response types.Message) ([]int, error)
OverlayResult attempts to merge the result of a process_map with the original
payload as per the map specified in the postmap and postmap_optional fields.
func (*ProcessMap) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*ProcessMap) TargetsProvided ¶
func (p *ProcessMap) TargetsProvided() []string
TargetsProvided returns a list of targets provided by this processor derived from its postmap and postmap_optional fields.
func (*ProcessMap) TargetsUsed ¶
func (p *ProcessMap) TargetsUsed() []string
TargetsUsed returns a list of target dependencies of this processor derived from its premap and premap_optional fields.
func (*ProcessMap) WaitForClose ¶
func (p *ProcessMap) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type ProcessMapConfig ¶
type ProcessMapConfig struct { Parts []int `json:"parts" yaml:"parts"` Conditions []condition.Config `json:"conditions" yaml:"conditions"` Premap map[string]string `json:"premap" yaml:"premap"` PremapOptional map[string]string `json:"premap_optional" yaml:"premap_optional"` Postmap map[string]string `json:"postmap" yaml:"postmap"` PostmapOptional map[string]string `json:"postmap_optional" yaml:"postmap_optional"` Processors []Config `json:"processors" yaml:"processors"` }
ProcessMapConfig is a config struct containing fields for the ProcessMap processor.
func NewProcessMapConfig ¶
func NewProcessMapConfig() ProcessMapConfig
NewProcessMapConfig returns a default ProcessMapConfig.
func (ProcessMapConfig) Sanitise ¶
func (p ProcessMapConfig) Sanitise() (map[string]interface{}, error)
Sanitise the configuration into a minimal structure that can be printed without changing the intent.
func (*ProcessMapConfig) UnmarshalJSON ¶
func (p *ProcessMapConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.
func (*ProcessMapConfig) UnmarshalYAML ¶
func (p *ProcessMapConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.
type Protobuf ¶ added in v3.25.0
type Protobuf struct {
// contains filtered or unexported fields
}
Protobuf is a processor that performs an operation on an Protobuf payload.
func (*Protobuf) CloseAsync ¶ added in v3.25.0
func (p *Protobuf) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Protobuf) ProcessMessage ¶ added in v3.25.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type ProtobufConfig ¶ added in v3.25.0
type ProtobufConfig struct { Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Message string `json:"message" yaml:"message"` ImportPaths []string `json:"import_paths" yaml:"import_paths"` ImportPath string `json:"import_path" yaml:"import_path"` }
ProtobufConfig contains configuration fields for the Protobuf processor.
func NewProtobufConfig ¶ added in v3.25.0
func NewProtobufConfig() ProtobufConfig
NewProtobufConfig returns a ProtobufConfig with default values.
type RateLimit ¶
type RateLimit struct {
// contains filtered or unexported fields
}
RateLimit is a processor that performs an RateLimit request using the message as the request body, and returns the response.
func (*RateLimit) CloseAsync ¶
func (r *RateLimit) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*RateLimit) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type RateLimitConfig ¶
type RateLimitConfig struct {
Resource string `json:"resource" yaml:"resource"`
}
RateLimitConfig contains configuration fields for the RateLimit processor.
func NewRateLimitConfig ¶
func NewRateLimitConfig() RateLimitConfig
NewRateLimitConfig returns a RateLimitConfig with default values.
type Redis ¶ added in v3.1.0
type Redis struct {
// contains filtered or unexported fields
}
Redis is a processor that performs redis operations
func (*Redis) CloseAsync ¶ added in v3.1.0
func (r *Redis) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Redis) ProcessMessage ¶ added in v3.1.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type RedisConfig ¶ added in v3.1.0
type RedisConfig struct { bredis.Config `json:",inline" yaml:",inline"` Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Key string `json:"key" yaml:"key"` Retries int `json:"retries" yaml:"retries"` RetryPeriod string `json:"retry_period" yaml:"retry_period"` }
RedisConfig contains configuration fields for the Redis processor.
func NewRedisConfig ¶ added in v3.1.0
func NewRedisConfig() RedisConfig
NewRedisConfig returns a RedisConfig with default values.
type Resource ¶ added in v3.6.0
type Resource struct {
// contains filtered or unexported fields
}
Resource is a processor that returns the result of a processor resource.
func (*Resource) CloseAsync ¶ added in v3.6.0
func (r *Resource) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Resource) ProcessMessage ¶ added in v3.6.0
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type SQL ¶
type SQL struct {
// contains filtered or unexported fields
}
SQL is a processor that executes an SQL query for each message.
func (*SQL) CloseAsync ¶
func (s *SQL) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*SQL) ProcessMessage ¶
ProcessMessage logs an event and returns the message unchanged.
type SQLConfig ¶
type SQLConfig struct { Driver string `json:"driver" yaml:"driver"` DataSourceName string `json:"data_source_name" yaml:"data_source_name"` DSN string `json:"dsn" yaml:"dsn"` Query string `json:"query" yaml:"query"` UnsafeDynamicQuery bool `json:"unsafe_dynamic_query" yaml:"unsafe_dynamic_query"` Args []string `json:"args" yaml:"args"` ArgsMapping string `json:"args_mapping" yaml:"args_mapping"` ResultCodec string `json:"result_codec" yaml:"result_codec"` }
SQLConfig contains configuration fields for the SQL processor.
func NewSQLConfig ¶
func NewSQLConfig() SQLConfig
NewSQLConfig returns a SQLConfig with default values.
type Sample ¶
type Sample struct {
// contains filtered or unexported fields
}
Sample is a processor that drops messages based on a random sample.
func (*Sample) CloseAsync ¶
func (s *Sample) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Sample) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type SampleConfig ¶
type SampleConfig struct { Retain float64 `json:"retain" yaml:"retain"` RandomSeed int64 `json:"seed" yaml:"seed"` }
SampleConfig contains configuration fields for the Sample processor.
func NewSampleConfig ¶
func NewSampleConfig() SampleConfig
NewSampleConfig returns a SampleConfig with default values.
type SelectParts ¶
type SelectParts struct {
// contains filtered or unexported fields
}
SelectParts is a processor that selects parts from a message to append to a new message.
func (*SelectParts) CloseAsync ¶
func (m *SelectParts) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*SelectParts) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
func (*SelectParts) WaitForClose ¶
func (m *SelectParts) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type SelectPartsConfig ¶
type SelectPartsConfig struct {
Parts []int `json:"parts" yaml:"parts"`
}
SelectPartsConfig contains configuration fields for the SelectParts processor.
func NewSelectPartsConfig ¶
func NewSelectPartsConfig() SelectPartsConfig
NewSelectPartsConfig returns a SelectPartsConfig with default values.
type Sleep ¶
type Sleep struct {
// contains filtered or unexported fields
}
Sleep is a processor that limits the stream of a pipeline to one message batch per period specified.
func (*Sleep) CloseAsync ¶
func (s *Sleep) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Sleep) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type SleepConfig ¶
type SleepConfig struct {
Duration string `json:"duration" yaml:"duration"`
}
SleepConfig contains configuration fields for the Sleep processor.
func NewSleepConfig ¶
func NewSleepConfig() SleepConfig
NewSleepConfig returns a SleepConfig with default values.
type Split ¶
type Split struct {
// contains filtered or unexported fields
}
Split is a processor that splits messages into a message per part.
func (*Split) CloseAsync ¶
func (s *Split) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Split) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type SplitConfig ¶
type SplitConfig struct { Size int `json:"size" yaml:"size"` ByteSize int `json:"byte_size" yaml:"byte_size"` }
SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.
func NewSplitConfig ¶
func NewSplitConfig() SplitConfig
NewSplitConfig returns a SplitConfig with default values.
type Subprocess ¶
type Subprocess struct {
// contains filtered or unexported fields
}
Subprocess is a processor that executes a command.
func (*Subprocess) CloseAsync ¶
func (e *Subprocess) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Subprocess) ProcessMessage ¶
ProcessMessage logs an event and returns the message unchanged.
func (*Subprocess) WaitForClose ¶
func (e *Subprocess) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type SubprocessConfig ¶
type SubprocessConfig struct { Parts []int `json:"parts" yaml:"parts"` Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` CodecSend string `json:"codec_send" yaml:"codec_send"` CodecRecv string `json:"codec_recv" yaml:"codec_recv"` }
SubprocessConfig contains configuration fields for the Subprocess processor.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig returns a SubprocessConfig with default values.
type Switch ¶
type Switch struct {
// contains filtered or unexported fields
}
Switch is a processor that only applies child processors under a certain condition.
func (*Switch) CloseAsync ¶
func (s *Switch) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Switch) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type SwitchCaseConfig ¶
type SwitchCaseConfig struct { Condition condition.Config `json:"condition" yaml:"condition"` Check string `json:"check" yaml:"check"` Processors []Config `json:"processors" yaml:"processors"` Fallthrough bool `json:"fallthrough" yaml:"fallthrough"` }
SwitchCaseConfig contains a condition, processors and other fields for an individual case in the Switch processor.
func NewSwitchCaseConfig ¶
func NewSwitchCaseConfig() SwitchCaseConfig
NewSwitchCaseConfig returns a new SwitchCaseConfig with default values.
func (*SwitchCaseConfig) UnmarshalJSON ¶
func (s *SwitchCaseConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.
func (*SwitchCaseConfig) UnmarshalYAML ¶
func (s *SwitchCaseConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type SwitchConfig ¶
type SwitchConfig []SwitchCaseConfig
SwitchConfig is a config struct containing fields for the Switch processor.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig returns a default SwitchConfig.
type SyncResponse ¶ added in v3.7.0
type SyncResponse struct {
// contains filtered or unexported fields
}
SyncResponse is a processor that prints a log event each time it processes a message.
func (*SyncResponse) CloseAsync ¶ added in v3.7.0
func (s *SyncResponse) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*SyncResponse) ProcessMessage ¶ added in v3.7.0
ProcessMessage logs an event and returns the message unchanged.
func (*SyncResponse) WaitForClose ¶ added in v3.7.0
func (s *SyncResponse) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the processor has closed down.
type SyncResponseConfig ¶ added in v3.7.0
type SyncResponseConfig struct{}
SyncResponseConfig contains configuration fields for the SyncResponse processor.
func NewSyncResponseConfig ¶ added in v3.7.0
func NewSyncResponseConfig() SyncResponseConfig
NewSyncResponseConfig returns a SyncResponseConfig with default values.
type Text ¶
type Text struct {
// contains filtered or unexported fields
}
Text is a processor that performs a text based operation on a payload.
func (*Text) CloseAsync ¶
func (t *Text) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Text) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type TextConfig ¶
type TextConfig struct { Parts []int `json:"parts" yaml:"parts"` Operator string `json:"operator" yaml:"operator"` Arg string `json:"arg" yaml:"arg"` Value string `json:"value" yaml:"value"` }
TextConfig contains configuration fields for the Text processor.
func NewTextConfig ¶
func NewTextConfig() TextConfig
NewTextConfig returns a TextConfig with default values.
type Throttle ¶
type Throttle struct {
// contains filtered or unexported fields
}
Throttle is a processor that limits the stream of a pipeline to one message batch per period specified.
func (*Throttle) CloseAsync ¶
func (m *Throttle) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Throttle) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type ThrottleConfig ¶
type ThrottleConfig struct {
Period string `json:"period" yaml:"period"`
}
ThrottleConfig contains configuration fields for the Throttle processor.
func NewThrottleConfig ¶
func NewThrottleConfig() ThrottleConfig
NewThrottleConfig returns a ThrottleConfig with default values.
type Try ¶
type Try struct {
// contains filtered or unexported fields
}
Try is a processor that applies a list of child processors to each message of a batch individually, where processors are skipped for messages that failed a previous processor step.
func (*Try) CloseAsync ¶
func (p *Try) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Try) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type TryConfig ¶
type TryConfig []Config
TryConfig is a config struct containing fields for the Try processor.
type Type ¶
type Type interface { // ProcessMessage attempts to process a message. Since processing can fail // this call returns both a slice of messages in case of success or a // response in case of failure. If the slice of messages is empty the // response should be returned to the source. ProcessMessage(msg types.Message) ([]types.Message, types.Response) types.Closable }
Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the pipeline, or a response that should be sent back to the source instead.
func NewAWSLambda ¶ added in v3.36.0
func NewAWSLambda( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewAWSLambda returns a Lambda processor.
func NewArchive ¶
func NewArchive( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewArchive returns a Archive processor.
func NewBloblang ¶ added in v3.13.0
func NewBloblang( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewBloblang returns a Bloblang processor.
func NewBloblangFromExecutor ¶ added in v3.43.0
NewBloblangFromExecutor returns a Bloblang processor.
func NewBoundsCheck ¶
func NewBoundsCheck( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewBoundsCheck returns a BoundsCheck processor.
func NewBranch ¶ added in v3.25.0
func NewBranch( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewBranch creates a new branch processor.
func NewCompress ¶
func NewCompress( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewCompress returns a Compress processor.
func NewConditional ¶
func NewConditional( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewConditional returns a Conditional processor.
func NewDecode ¶
func NewDecode( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDecode returns a Decode processor.
func NewDecompress ¶
func NewDecompress( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDecompress returns a Decompress processor.
func NewDedupe ¶
func NewDedupe( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDedupe returns a Dedupe processor.
func NewEncode ¶
func NewEncode( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewEncode returns a Encode processor.
func NewFilter ¶
func NewFilter( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewFilter returns a Filter processor.
func NewFilterParts ¶
func NewFilterParts( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewFilterParts returns a FilterParts processor.
func NewForEach ¶
func NewForEach( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewForEach returns a ForEach processor.
func NewGroupBy ¶
func NewGroupBy( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewGroupBy returns a GroupBy processor.
func NewGroupByValue ¶
func NewGroupByValue( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewGroupByValue returns a GroupByValue processor.
func NewHashSample ¶
func NewHashSample( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewHashSample returns a HashSample processor.
func NewInsertPart ¶
func NewInsertPart( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewInsertPart returns a InsertPart processor.
func NewJMESPath ¶
func NewJMESPath( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewJMESPath returns a JMESPath processor.
func NewJSONSchema ¶ added in v3.5.0
func NewJSONSchema( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewJSONSchema returns a JSONSchema processor.
func NewLambda ¶
func NewLambda( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewLambda returns a Lambda processor.
func NewLog ¶
func NewLog( conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type, ) (Type, error)
NewLog returns a Log processor.
func NewMergeJSON ¶
func NewMergeJSON( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewMergeJSON returns a MergeJSON processor.
func NewMetadata ¶
func NewMetadata( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewMetadata returns a Metadata processor.
func NewMetric ¶
func NewMetric( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewMetric returns a Metric processor.
func NewNumber ¶
func NewNumber( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewNumber returns a Number processor.
func NewParallel ¶
func NewParallel( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewParallel returns a Parallel processor.
func NewParseLog ¶ added in v3.10.0
func NewParseLog( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewParseLog returns a ParseLog processor.
func NewProcessBatch ¶
func NewProcessBatch( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewProcessBatch returns a ForEach processor.
func NewProcessDAG ¶
func NewProcessDAG( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewProcessDAG returns a ProcessField processor.
func NewProcessField ¶
func NewProcessField( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewProcessField returns a ProcessField processor.
func NewProtobuf ¶ added in v3.25.0
func NewProtobuf( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewProtobuf returns an Protobuf processor.
func NewRateLimit ¶
func NewRateLimit( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewRateLimit returns a RateLimit processor.
func NewResource ¶ added in v3.6.0
func NewResource( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewResource returns a resource processor.
func NewSample ¶
func NewSample( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewSample returns a Sample processor.
func NewSelectParts ¶
func NewSelectParts( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewSelectParts returns a SelectParts processor.
func NewSubprocess ¶
func NewSubprocess( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewSubprocess returns a Subprocess processor.
func NewSwitch ¶
func NewSwitch( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewSwitch returns a Switch processor.
func NewSyncResponse ¶ added in v3.7.0
func NewSyncResponse( conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type, ) (Type, error)
NewSyncResponse returns a SyncResponse processor.
func NewThrottle ¶
func NewThrottle( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewThrottle returns a Throttle processor.
func NewUnarchive ¶
func NewUnarchive( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewUnarchive returns a Unarchive processor.
type TypeSpec ¶
type TypeSpec struct { // UsesBatches indicates whether this processors functionality is best // applied on messages that are already batched. UsesBatches bool Status docs.Status Version string Summary string Description string Categories []Category Footnotes string FieldSpecs docs.FieldSpecs Examples []docs.AnnotatedExample // contains filtered or unexported fields }
TypeSpec Constructor and a usage description for each processor type.
type Unarchive ¶
type Unarchive struct {
// contains filtered or unexported fields
}
Unarchive is a processor that can selectively unarchive parts of a message following a chosen archive type.
func (*Unarchive) CloseAsync ¶
func (d *Unarchive) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Unarchive) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type UnarchiveConfig ¶
type UnarchiveConfig struct { Format string `json:"format" yaml:"format"` Parts []int `json:"parts" yaml:"parts"` }
UnarchiveConfig contains configuration fields for the Unarchive processor.
func NewUnarchiveConfig ¶
func NewUnarchiveConfig() UnarchiveConfig
NewUnarchiveConfig returns a UnarchiveConfig with default values.
type While ¶
type While struct {
// contains filtered or unexported fields
}
While is a processor that applies child processors for as long as a child condition resolves to true.
func (*While) CloseAsync ¶
func (w *While) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*While) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
type WhileConfig ¶
type WhileConfig struct { AtLeastOnce bool `json:"at_least_once" yaml:"at_least_once"` MaxLoops int `json:"max_loops" yaml:"max_loops"` Check string `json:"check" yaml:"check"` Condition condition.Config `json:"condition" yaml:"condition"` Processors []Config `json:"processors" yaml:"processors"` }
WhileConfig is a config struct containing fields for the While processor.
func NewWhileConfig ¶
func NewWhileConfig() WhileConfig
NewWhileConfig returns a default WhileConfig.
type Workflow ¶ added in v3.6.0
type Workflow struct {
// contains filtered or unexported fields
}
Workflow is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.
func (*Workflow) CloseAsync ¶ added in v3.6.0
func (w *Workflow) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*Workflow) ProcessMessage ¶ added in v3.6.0
ProcessMessage applies workflow stages to each part of a message type.
type WorkflowConfig ¶ added in v3.6.0
type WorkflowConfig struct { MetaPath string `json:"meta_path" yaml:"meta_path"` Order [][]string `json:"order" yaml:"order"` BranchResources []string `json:"branch_resources" yaml:"branch_resources"` Branches map[string]BranchConfig `json:"branches" yaml:"branches"` Stages map[string]DepProcessMapConfig `json:"stages" yaml:"stages"` }
WorkflowConfig is a config struct containing fields for the Workflow processor.
func NewWorkflowConfig ¶ added in v3.6.0
func NewWorkflowConfig() WorkflowConfig
NewWorkflowConfig returns a default WorkflowConfig.
type XML ¶
type XML struct {
// contains filtered or unexported fields
}
XML is a processor that performs an operation on a XML payload.
func (*XML) CloseAsync ¶
func (p *XML) CloseAsync()
CloseAsync shuts down the processor and stops processing requests.
func (*XML) ProcessMessage ¶
ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.
Source Files ¶
- archive.go
- avro.go
- awk.go
- aws_lambda.go
- batch.go
- bloblang.go
- bounds_check.go
- branch.go
- cache.go
- catch.go
- compress.go
- conditional.go
- constructor.go
- decode.go
- decompress.go
- dedupe.go
- docs.go
- encode.go
- filter.go
- filter_parts.go
- for_each.go
- grok.go
- group_by.go
- group_by_value.go
- hash.go
- hash_sample.go
- http.go
- insert_part.go
- jmespath.go
- jq.go
- json.go
- jsonschema.go
- log.go
- merge_json.go
- metadata.go
- metric.go
- mongodb.go
- noop.go
- number.go
- package.go
- parallel.go
- parse_log.go
- plugin.go
- process_dag.go
- process_field.go
- process_map.go
- protobuf.go
- rate_limit.go
- redis.go
- resource.go
- sample.go
- select_parts.go
- sleep.go
- split.go
- sql.go
- sql_deprecated.go
- sql_extra.go
- subprocess.go
- switch.go
- switch_deprecated.go
- sync_response.go
- text.go
- throttle.go
- try.go
- type.go
- unarchive.go
- util.go
- while.go
- workflow.go
- workflow_branch_map.go
- workflow_deprecated.go
- xml.go