Documentation ¶
Overview ¶
Package stream creates and manages a full Benthos stream pipeline, consisting of an input layer of consumers, an optional buffer layer, a processing pipelines layer, and an output layer of producers:
Inputs -> Buffer -> Processing Pipelines -> Outputs
The number of parallel input consumers, processing pipelines, and output producers depends on the configuration of the stream.
Custom Stream Processors ¶
It is possible to construct a stream with your own custom processor implementations embedded within it. This results in your processors being executed for each discrete message batch at the end of any other configured processors.
Your custom processors will be constructed once per pipeline processing thread, e.g. with four pipeline processing threads the pipeline would look like this:
Inputs -> Buffer -> Processing Pipeline -> Custom Processor -> Outputs \ Processing Pipeline -> Custom Processor / \ Processing Pipeline -> Custom Processor / \ Processing Pipeline -> Custom Processor /
Plugins ¶
Benthos components (inputs, processors, conditions, outputs, etc) are pluggable by design, and can be complemented with your custom implementations by calling RegisterPlugin on a component package.
This method is more complicated than simply adding a custom stream processor, but allows you to use your custom implementations in the same flexible way that native Benthos types can be used.
Message Batches ¶
In Benthos every message is a batch, and it is the configuration of a stream that determines the size of each batch (usually 1.) Therefore all processors, including your custom implementations, support batches.
Sometimes your custom processors will require batches of a certain size in order to function. It is recommended that you perform message batching using the standard Benthos batch or combine processors, as it will ensure resiliency throughout the stream pipeline. For example, you can add a batch processor to your input layer:
conf := NewConfig() conf.Input.Type = input.TypeKafka conf.Input.Kafka.Addresses = []string{"localhost:9092"} conf.Input.Kafka.Topic = "example_topic_one" conf.Input.Processors = append(conf.Input.Processors, processor.NewConfig()) conf.Input.Processors[0].Type = processor.TypeBatch conf.Input.Processors[0].Batch.ByteSize = 10000000 // 10MB
Horizontal Scaling ¶
The standard set of processors of a Benthos stream are stateless and can therefore be horizontally scaled without impacting the results. Horizontal scaling therefore only depends on the sources of data of a stream.
Most message queues/protocols provide mechanisms to automatically distribute messages horizontally across consumers. Horizontally scaling Benthos is therefore as simple as applying those means.
Kafka, for example, allows you to distribute messages across partitions, which can either be statically distributed across consumers or, using the kafka input type, can be dynamically distributed across consumers.
Vertical Scaling ¶
Vertically scaled message processing can be done in Benthos with parallel processing pipelines, where the number of threads is configurable in the pipeline second of a stream configuration. However, in order to saturate those processing threads your configuration needs one of two things: multiple parallel inputs or a memory buffer.
Adding a memory buffer is a simple way of scaling a single input consumer across processing threads, but this removes the automatic delivery guarantees that Benthos provides.
Instead, it is recommended that you create parallel input sources, the number of which should at least match the number of processing threads. This retains the delivery guarantees of your sources and sinks by keeping them tightly coupled and is done by configuring a broker input type, for example, processing across four threads with eight parallel consumers:
// Create a Kafka input with automatic partition balancing inputConf := input.NewConfig() inputConf.Type = input.TypeKafkaBalanced inputConf.KafkaBalanced.Addresses = []string{"localhost:9092"} inputConf.KafkaBalanced.Topics = []string{"example_topic_one"} // Create a decompression processor (default gzip) processorConf := processor.NewConfig() processorConf.Type = processor.TypeDecompress // Create a stream with eight parallel consumers and four processing threads conf := NewConfig() conf.Input.Type = input.TypeBroker conf.Input.Broker.Inputs = append(conf.Input.Broker.Inputs, inputConf) conf.Input.Broker.Copies = 8 conf.Pipeline.Processors = append(conf.Pipeline.Processors, processorConf) conf.Pipeline.Threads = 4
Delivery Guarantees ¶
A Benthos stream, without a buffer (the default), guarantees at-least-once message delivery matching the source and sink protocols used. Meaning if you are consuming a Kafka stream and producing to a Kafka stream then Benthos matches the at-least-once delivery guarantees of Kafka.
If you configure a stream with a buffer then your delivery guarantees will depend on the resiliency of the buffer method you have chosen.
Processor Idempotency ¶
Benthos processors are usually stateless operations that are idempotent by their nature, meaning duplicate messages travelling the pipeline do not impact the result of the processor itself.
If your custom processors are stateful and exhibit side effects you will need to implement your own tooling in order to guarantee exactly-once processing of messages.
Example (Base64Encoder) ¶
ExampleBase64Encoder demonstrates running a Kafka to Kafka stream where each incoming message is encoded with base64.
package main import ( "bytes" "encoding/base64" "log" "os" "os/signal" "syscall" "time" "github.com/Jeffail/benthos/v3/lib/input" "github.com/Jeffail/benthos/v3/lib/output" "github.com/Jeffail/benthos/v3/lib/types" ) // Base64Encoder is a types.Processor implementation that base64 encodes // all messages travelling through a Benthos stream. type Base64Encoder struct{} // ProcessMessage base64 encodes all messages. func (p Base64Encoder) ProcessMessage(m types.Message) ([]types.Message, types.Response) { // Create a copy of the original message result := m.Copy() // For each message part replace its contents with the base64 encoded // version. result.Iter(func(i int, part types.Part) error { var buf bytes.Buffer e := base64.NewEncoder(base64.StdEncoding, &buf) e.Write(part.Get()) e.Close() part.Set(buf.Bytes()) return nil }) return []types.Message{result}, nil } // CloseAsync shuts down the processor and stops processing requests. func (p Base64Encoder) CloseAsync() { // Do nothing as our processor doesn't require resource cleanup. } // WaitForClose blocks until the processor has closed down. func (p Base64Encoder) WaitForClose(timeout time.Duration) error { // Do nothing as our processor doesn't require resource cleanup. return nil } // ExampleBase64Encoder demonstrates running a Kafka to Kafka stream where each // incoming message is encoded with base64. func main() { conf := NewConfig() conf.Input.Type = input.TypeKafka conf.Input.Kafka.Addresses = []string{ "localhost:9092", } conf.Input.Kafka.Topic = "example_topic_one" conf.Output.Type = output.TypeKafka conf.Output.Kafka.Addresses = []string{ "localhost:9092", } conf.Output.Kafka.Topic = "example_topic_two" s, err := New(conf, OptAddProcessors(func() (types.Processor, error) { return Base64Encoder{}, nil })) if err != nil { panic(err) } defer s.Stop(time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Wait for termination signal <-sigChan log.Println("Received SIGTERM, the service is closing.") }
Output:
Example (SplitToBatch) ¶
ExampleSplitToBatch demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single batch of messages.
package main import ( "bytes" "log" "os" "os/signal" "syscall" "time" "github.com/Jeffail/benthos/v3/lib/input" "github.com/Jeffail/benthos/v3/lib/message" "github.com/Jeffail/benthos/v3/lib/output" "github.com/Jeffail/benthos/v3/lib/types" ) // SplitToBatch is a types.Processor implementation that reads a single message // containing line delimited payloads and splits the payloads into a single // batch of messages per line. type SplitToBatch struct{} // ProcessMessage splits messages of a batch by lines and sends them onwards as // a batch of messages. func (p SplitToBatch) ProcessMessage(m types.Message) ([]types.Message, types.Response) { var splitParts [][]byte m.Iter(func(i int, b types.Part) error { splitParts = append(splitParts, bytes.Split(b.Get(), []byte("\n"))...) return nil }) return []types.Message{message.New(splitParts)}, nil } // CloseAsync shuts down the processor and stops processing requests. func (p SplitToBatch) CloseAsync() { // Do nothing as our processor doesn't require resource cleanup. } // WaitForClose blocks until the processor has closed down. func (p SplitToBatch) WaitForClose(timeout time.Duration) error { // Do nothing as our processor doesn't require resource cleanup. return nil } // ExampleSplitToBatch demonstrates running a Kafka to Kafka stream where each // incoming message is parsed as a line delimited blob of payloads and the // payloads are sent on as a single batch of messages. func main() { conf := NewConfig() conf.Input.Type = input.TypeKafka conf.Input.Kafka.Addresses = []string{ "localhost:9092", } conf.Input.Kafka.Topic = "example_topic_one" conf.Output.Type = output.TypeKafka conf.Output.Kafka.Addresses = []string{ "localhost:9092", } conf.Output.Kafka.Topic = "example_topic_two" s, err := New(conf, OptAddProcessors(func() (types.Processor, error) { return SplitToBatch{}, nil })) if err != nil { panic(err) } defer s.Stop(time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Wait for termination signal <-sigChan log.Println("Received SIGTERM, the service is closing.") }
Output:
Example (SplitToMessages) ¶
ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single message per payload.
package main import ( "bytes" "log" "os" "os/signal" "syscall" "time" "github.com/Jeffail/benthos/v3/lib/input" "github.com/Jeffail/benthos/v3/lib/message" "github.com/Jeffail/benthos/v3/lib/output" "github.com/Jeffail/benthos/v3/lib/types" ) // SplitToMessages is a types.Processor implementation that reads a single // message containing line delimited payloads and splits the payloads into a // single message per line. type SplitToMessages struct{} // ProcessMessage splits messages of a batch by lines and sends them onwards as // an individual message per payload. func (p SplitToMessages) ProcessMessage(m types.Message) ([]types.Message, types.Response) { var splitParts [][]byte m.Iter(func(i int, b types.Part) error { splitParts = append(splitParts, bytes.Split(b.Get(), []byte("\n"))...) return nil }) messages := make([]types.Message, len(splitParts)) for i, part := range splitParts { messages[i] = message.New([][]byte{part}) } return messages, nil } // CloseAsync shuts down the processor and stops processing requests. func (p SplitToMessages) CloseAsync() { // Do nothing as our processor doesn't require resource cleanup. } // WaitForClose blocks until the processor has closed down. func (p SplitToMessages) WaitForClose(timeout time.Duration) error { // Do nothing as our processor doesn't require resource cleanup. return nil } // ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where // each incoming message is parsed as a line delimited blob of payloads and the // payloads are sent on as a single message per payload. func main() { conf := NewConfig() conf.Input.Type = input.TypeKafka conf.Input.Kafka.Addresses = []string{ "localhost:9092", } conf.Input.Kafka.Topic = "example_topic_one" conf.Output.Type = output.TypeKafka conf.Output.Kafka.Addresses = []string{ "localhost:9092", } conf.Output.Kafka.Topic = "example_topic_two" s, err := New(conf, OptAddProcessors(func() (types.Processor, error) { return SplitToMessages{}, nil })) if err != nil { panic(err) } defer s.Stop(time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Wait for termination signal <-sigChan log.Println("Received SIGTERM, the service is closing.") }
Output:
Example (YamlConfig) ¶
ExampleYAMLConfig demonstrates running a Benthos stream with a configuration parsed from a YAML file and a custom processor.
package main import ( "log" "os" "os/signal" "syscall" "time" "github.com/Jeffail/benthos/v3/lib/types" yaml "gopkg.in/yaml.v3" ) // CustomProcessor is a types.Processor implementation that does nothing. type CustomProcessor struct{} // ProcessMessage does nothing. func (p CustomProcessor) ProcessMessage(m types.Message) ([]types.Message, types.Response) { return []types.Message{m}, nil } // CloseAsync shuts down the processor and stops processing requests. func (p CustomProcessor) CloseAsync() { // Do nothing as our processor doesn't require resource cleanup. } // WaitForClose blocks until the processor has closed down. func (p CustomProcessor) WaitForClose(timeout time.Duration) error { // Do nothing as our processor doesn't require resource cleanup. return nil } // ExampleYAMLConfig demonstrates running a Benthos stream with a configuration // parsed from a YAML file and a custom processor. func main() { confBytes, err := os.ReadFile("./foo.yaml") if err != nil { panic(err) } conf := NewConfig() if err = yaml.Unmarshal(confBytes, &conf); err != nil { panic(err) } s, err := New(conf, OptAddProcessors(func() (types.Processor, error) { return CustomProcessor{}, nil })) if err != nil { panic(err) } defer s.Stop(time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Wait for termination signal <-sigChan log.Println("Received SIGTERM, the service is closing.") }
Output:
Index ¶
- func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)
- func OptOnClose(onClose func()) func(*Type)
- func OptSetLogSimple(l log.PrintFormatter) func(*Type)
- func OptSetLogger(l log.Modular) func(*Type)
- func OptSetManager(mgr types.Manager) func(*Type)
- func OptSetStats(stats metrics.Type) func(*Type)
- func Spec() docs.FieldSpecs
- type Config
- type Type
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OptAddProcessors ¶
func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)
OptAddProcessors adds additional processors that will be constructed for each logical thread of the processing pipeline layer of the Benthos stream.
func OptOnClose ¶
func OptOnClose(onClose func()) func(*Type)
OptOnClose sets a closure to be called when the stream closes.
func OptSetLogSimple ¶
func OptSetLogSimple(l log.PrintFormatter) func(*Type)
OptSetLogSimple sets the logging output to a simpler log interface (implemented by the standard *log.Logger.)
func OptSetLogger ¶
OptSetLogger sets the logging output to be used by all components of the stream. To avoid implementing the log.Modular interface with a custom logger consider using OptSetLogSimple instead.
func OptSetManager ¶
OptSetManager sets the service manager to be used by all components of the stream.
func OptSetStats ¶
OptSetStats sets the metrics aggregator to be used by all components of the stream.
func Spec ¶ added in v3.47.0
func Spec() docs.FieldSpecs
Spec returns a docs.FieldSpec for a stream configuration.
Types ¶
type Config ¶
type Config struct { Input input.Config `json:"input" yaml:"input"` Buffer buffer.Config `json:"buffer" yaml:"buffer"` Pipeline pipeline.Config `json:"pipeline" yaml:"pipeline"` Output output.Config `json:"output" yaml:"output"` }
Config is a configuration struct representing all four layers of a Benthos stream.