Documentation ¶
Overview ¶
Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.
Index ¶
- Variables
- func Descriptions() string
- type BlobToMulti
- type BoundsCheck
- type BoundsCheckConfig
- type Combine
- type CombineConfig
- type Config
- type Decompress
- type DecompressConfig
- type HashSample
- type HashSampleConfig
- type InsertPart
- type InsertPartConfig
- type MultiToBlob
- type Noop
- type Sample
- type SampleConfig
- type SelectParts
- type SelectPartsConfig
- type SetJSON
- type SetJSONConfig
- type Split
- type Type
- func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewBlobToMulti(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewBoundsCheck(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewCombine(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewDecompress(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewHashSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewInsertPart(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewMultiToBlob(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewNoop(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewSelectParts(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewSetJSON(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewSplit(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewUnarchive(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- type Unarchive
- type UnarchiveConfig
Constants ¶
This section is empty.
Variables ¶
var (
ErrEmptyTargetPath = errors.New("target path is empty")
)
Errors for the SetJSON type.
Functions ¶
func Descriptions ¶
func Descriptions() string
Descriptions returns a formatted string of collated descriptions of each type.
Types ¶
type BlobToMulti ¶
type BlobToMulti struct {
// contains filtered or unexported fields
}
BlobToMulti is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.
func (*BlobToMulti) ProcessMessage ¶
ProcessMessage takes a message with 1 part in multiple part blob format and returns a multiple part message by decoding it.
type BoundsCheck ¶
type BoundsCheck struct {
// contains filtered or unexported fields
}
BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
func (*BoundsCheck) ProcessMessage ¶
ProcessMessage checks each message against a set of bounds.
type BoundsCheckConfig ¶
type BoundsCheckConfig struct { MaxParts int `json:"max_parts" yaml:"max_parts"` MinParts int `json:"min_parts" yaml:"min_parts"` MaxPartSize int `json:"max_part_size" yaml:"max_part_size"` }
BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.
func NewBoundsCheckConfig ¶
func NewBoundsCheckConfig() BoundsCheckConfig
NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
type Combine ¶ added in v0.3.1
type Combine struct {
// contains filtered or unexported fields
}
Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.
func (*Combine) ProcessMessage ¶ added in v0.3.1
ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.
type CombineConfig ¶ added in v0.3.1
type CombineConfig struct {
Parts int `json:"parts" yaml:"parts"`
}
CombineConfig contains configuration for the Combine processor.
func NewCombineConfig ¶ added in v0.3.1
func NewCombineConfig() CombineConfig
NewCombineConfig returns a CombineConfig with default values.
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` BlobToMulti struct{} `json:"blob_to_multi" yaml:"blob_to_multi"` BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"` Combine CombineConfig `json:"combine" yaml:"combine"` Decompress DecompressConfig `json:"decompress" yaml:"decompress"` HashSample HashSampleConfig `json:"hash_sample" yaml:"hash_sample"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` MultiToBlob struct{} `json:"multi_to_blob" yaml:"multi_to_blob"` Sample SampleConfig `json:"sample" yaml:"sample"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` SetJSON SetJSONConfig `json:"set_json" yaml:"set_json"` Split struct{} `json:"split" yaml:"split"` Unarchive UnarchiveConfig `json:"unarchive" yaml:"unarchive"` }
Config is the all encompassing configuration struct for all processor types.
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values.
func (*Config) UnmarshalJSON ¶
UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.
type Decompress ¶ added in v0.7.4
type Decompress struct {
// contains filtered or unexported fields
}
Decompress is a processor that can selectively decompress parts of a message as a chosen compression algorithm.
func (*Decompress) ProcessMessage ¶ added in v0.7.4
ProcessMessage takes a message, attempts to decompress parts of the message, and returns the result.
type DecompressConfig ¶ added in v0.7.4
type DecompressConfig struct { Algorithm string `json:"algorithm" yaml:"algorithm"` Parts []int `json:"parts" yaml:"parts"` }
DecompressConfig contains any configuration for the Decompress processor.
func NewDecompressConfig ¶ added in v0.7.4
func NewDecompressConfig() DecompressConfig
NewDecompressConfig returns a DecompressConfig with default values.
type HashSample ¶ added in v0.6.11
type HashSample struct {
// contains filtered or unexported fields
}
HashSample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
func (*HashSample) ProcessMessage ¶ added in v0.6.11
ProcessMessage checks each message against a set of bounds.
type HashSampleConfig ¶ added in v0.6.11
type HashSampleConfig struct { RetainMin float64 `json:"retain_min" yaml:"retain_min"` RetainMax float64 `json:"retain_max" yaml:"retain_max"` Parts []int `json:"parts" yaml:"parts"` // message parts to hash }
HashSampleConfig contains any configuration for the HashSample processor.
func NewHashSampleConfig ¶ added in v0.6.11
func NewHashSampleConfig() HashSampleConfig
NewHashSampleConfig returns a HashSampleConfig with default values.
type InsertPart ¶ added in v0.6.19
type InsertPart struct {
// contains filtered or unexported fields
}
InsertPart is a processor that inserts a new message part at a specific index.
func (*InsertPart) ProcessMessage ¶ added in v0.6.19
ProcessMessage prepends a new message part to the message.
type InsertPartConfig ¶ added in v0.6.19
type InsertPartConfig struct { Index int `json:"index" yaml:"index"` Content string `json:"content" yaml:"content"` }
InsertPartConfig contains any configuration for the InsertPart processor.
func NewInsertPartConfig ¶ added in v0.6.19
func NewInsertPartConfig() InsertPartConfig
NewInsertPartConfig returns a InsertPartConfig with default values.
type MultiToBlob ¶
type MultiToBlob struct {
// contains filtered or unexported fields
}
MultiToBlob is a processor that takes messages with potentially multiple parts and converts them into a single part message using the benthos binary format. This message can be converted back to multiple parts using the BlobToMulti processor.
func (MultiToBlob) ProcessMessage ¶
ProcessMessage takes a message of > 0 parts and returns a single part message that can be later converted back to the original parts.
type Sample ¶ added in v0.3.2
type Sample struct {
// contains filtered or unexported fields
}
Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
type SampleConfig ¶ added in v0.3.2
SampleConfig contains any configuration for the Sample processor.
func NewSampleConfig ¶ added in v0.3.2
func NewSampleConfig() SampleConfig
NewSampleConfig returns a SampleConfig with default values.
type SelectParts ¶ added in v0.4.1
type SelectParts struct {
// contains filtered or unexported fields
}
SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.
func (*SelectParts) ProcessMessage ¶ added in v0.4.1
ProcessMessage extracts a set of parts from each message.
type SelectPartsConfig ¶ added in v0.4.1
type SelectPartsConfig struct {
Parts []int `json:"parts" yaml:"parts"`
}
SelectPartsConfig contains any configuration for the SelectParts processor.
func NewSelectPartsConfig ¶ added in v0.4.1
func NewSelectPartsConfig() SelectPartsConfig
NewSelectPartsConfig returns a SelectPartsConfig with default values.
type SetJSON ¶ added in v0.7.2
type SetJSON struct {
// contains filtered or unexported fields
}
SetJSON is a processor that inserts a new message part at a specific index.
type SetJSONConfig ¶ added in v0.7.2
type SetJSONConfig struct { Part int `json:"part" yaml:"part"` Path string `json:"path" yaml:"path"` Value rawJSONValue `json:"value" yaml:"value"` }
SetJSONConfig contains any configuration for the SetJSON processor.
func NewSetJSONConfig ¶ added in v0.7.2
func NewSetJSONConfig() SetJSONConfig
NewSetJSONConfig returns a SetJSONConfig with default values.
type Split ¶ added in v0.7.6
type Split struct {
// contains filtered or unexported fields
}
Split is a processor that splits messages into a message per part.
type Type ¶
type Type interface { // ProcessMessage attempts to process a message. Since processing can fail // this call returns both a slice of messages in case of success or a // response in case of failure. If the slice of messages is empty the // response should be returned to the source. ProcessMessage(msg *types.Message) ([]*types.Message, types.Response) }
Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the, pipeline, or a response that should be sent back to the source instead.
func NewBlobToMulti ¶
NewBlobToMulti returns a BlobToMulti processor.
func NewBoundsCheck ¶
NewBoundsCheck returns a BoundsCheck processor.
func NewCombine ¶ added in v0.3.1
NewCombine returns a Combine processor.
func NewDecompress ¶ added in v0.7.4
NewDecompress returns a Decompress processor.
func NewHashSample ¶ added in v0.6.11
NewHashSample returns a HashSample processor.
func NewInsertPart ¶ added in v0.6.19
NewInsertPart returns a InsertPart processor.
func NewMultiToBlob ¶
NewMultiToBlob returns a MultiToBlob processor.
func NewSelectParts ¶ added in v0.4.1
NewSelectParts returns a SelectParts processor.
func NewSetJSON ¶ added in v0.7.2
NewSetJSON returns a SetJSON processor.
type Unarchive ¶ added in v0.7.4
type Unarchive struct {
// contains filtered or unexported fields
}
Unarchive is a processor that can selectively unarchive parts of a message as a chosen archive type.
type UnarchiveConfig ¶ added in v0.7.4
type UnarchiveConfig struct { Format string `json:"format" yaml:"format"` Parts []int `json:"parts" yaml:"parts"` }
UnarchiveConfig contains any configuration for the Unarchive processor.
func NewUnarchiveConfig ¶ added in v0.7.4
func NewUnarchiveConfig() UnarchiveConfig
NewUnarchiveConfig returns a UnarchiveConfig with default values.