Documentation ¶
Overview ¶
Package stream creates and manages a full Benthos logical stream, consisting of an input layer of sources, a buffer layer, a processing pipelines layer, and an output layer of sinks.
Inputs -> Buffer -> Processing Pipelines -> Outputs
Example (SplitToBatch) ¶
ExampleSplitToBatch Demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single batch of messages.
package main import ( "bytes" "log" "os" "os/signal" "syscall" "time" "github.com/Jeffail/benthos/lib/input" "github.com/Jeffail/benthos/lib/message" "github.com/Jeffail/benthos/lib/output" "github.com/Jeffail/benthos/lib/types" ) // SplitToBatch is a types.Processor implementation that reads a single message // containing line delimited payloads and splits the payloads into a single // batch of messages per line. type SplitToBatch struct{} // ProcessMessage splits messages of a batch by lines and sends them onwards as // a batch of messages. func (p SplitToBatch) ProcessMessage(m types.Message) ([]types.Message, types.Response) { var splitParts [][]byte m.Iter(func(i int, b types.Part) error { splitParts = append(splitParts, bytes.Split(b.Get(), []byte("\n"))...) return nil }) return []types.Message{message.New(splitParts)}, nil } // ExampleSplitToBatch Demonstrates running a Kafka to Kafka stream where each // incoming message is parsed as a line delimited blob of payloads and the // payloads are sent on as a single batch of messages. func main() { conf := NewConfig() conf.Input.Type = input.TypeKafka conf.Input.Kafka.Addresses = []string{ "localhost:9092", } conf.Input.Kafka.Topic = "example_topic_one" conf.Output.Type = output.TypeKafka conf.Output.Kafka.Addresses = []string{ "localhost:9092", } conf.Output.Kafka.Topic = "example_topic_two" s, err := New(conf, OptAddProcessors(func() (types.Processor, error) { return SplitToBatch{}, nil })) if err != nil { panic(err) } defer s.Stop(time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Wait for termination signal select { case <-sigChan: log.Println("Received SIGTERM, the service is closing.") } }
Output:
Example (SplitToMessages) ¶
ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single message per payload.
package main import ( "bytes" "log" "os" "os/signal" "syscall" "time" "github.com/Jeffail/benthos/lib/input" "github.com/Jeffail/benthos/lib/message" "github.com/Jeffail/benthos/lib/output" "github.com/Jeffail/benthos/lib/types" ) // SplitToMessages is a types.Processor implementation that reads a single // message containing line delimited payloads and splits the payloads into a // single message per line. type SplitToMessages struct{} // ProcessMessage splits messages of a batch by lines and sends them onwards as // an individual message per payload. func (p SplitToMessages) ProcessMessage(m types.Message) ([]types.Message, types.Response) { var splitParts [][]byte m.Iter(func(i int, b types.Part) error { splitParts = append(splitParts, bytes.Split(b.Get(), []byte("\n"))...) return nil }) messages := make([]types.Message, len(splitParts)) for i, part := range splitParts { messages[i] = message.New([][]byte{part}) } return messages, nil } // ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where // each incoming message is parsed as a line delimited blob of payloads and the // payloads are sent on as a single message per payload. func main() { conf := NewConfig() conf.Input.Type = input.TypeKafka conf.Input.Kafka.Addresses = []string{ "localhost:9092", } conf.Input.Kafka.Topic = "example_topic_one" conf.Output.Type = output.TypeKafka conf.Output.Kafka.Addresses = []string{ "localhost:9092", } conf.Output.Kafka.Topic = "example_topic_two" s, err := New(conf, OptAddProcessors(func() (types.Processor, error) { return SplitToMessages{}, nil })) if err != nil { panic(err) } defer s.Stop(time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Wait for termination signal select { case <-sigChan: log.Println("Received SIGTERM, the service is closing.") } }
Output:
Index ¶
- func OptAddInputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
- func OptAddOutputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
- func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)
- func OptOnClose(onClose func()) func(*Type)
- func OptSetLogger(log log.Modular) func(*Type)
- func OptSetManager(mgr types.Manager) func(*Type)
- func OptSetStats(stats metrics.Type) func(*Type)
- type Config
- type Type
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OptAddInputPipelines ¶
func OptAddInputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
OptAddInputPipelines adds additional pipelines that will be constructed for each input of the Benthos stream.
func OptAddOutputPipelines ¶
func OptAddOutputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
OptAddOutputPipelines adds additional pipelines that will be constructed for each output of the Benthos stream.
func OptAddProcessors ¶
func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)
OptAddProcessors adds additional processors that will be constructed for each logical thread of the processing pipeline layer of the Benthos stream.
func OptOnClose ¶
func OptOnClose(onClose func()) func(*Type)
OptOnClose sets a closure to be called when the stream closes.
func OptSetLogger ¶
OptSetLogger sets the logging output to be used by all components of the stream.
func OptSetManager ¶
OptSetManager sets the service manager to be used by all components of the stream.
func OptSetStats ¶
OptSetStats sets the metrics aggregator to be used by all components of the stream.
Types ¶
type Config ¶
type Config struct { Input input.Config `json:"input" yaml:"input"` Buffer buffer.Config `json:"buffer" yaml:"buffer"` Pipeline pipeline.Config `json:"pipeline" yaml:"pipeline"` Output output.Config `json:"output" yaml:"output"` }
Config is a configuration struct for a Benthos stream.