stream

package
v0.21.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2018 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package stream creates and manages a full Benthos logical stream, consisting of an input layer of sources, a buffer layer, a processing pipelines layer, and an output layer of sinks.

Inputs -> Buffer -> Processing Pipelines -> Outputs

Example (SplitToBatch)

ExampleSplitToBatch Demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single batch of messages.

package main

import (
	"bytes"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Jeffail/benthos/lib/message"
	"github.com/Jeffail/benthos/lib/types"
)

// SplitToBatch is a types.Processor implementation that reads a single message
// containing line delimited payloads and splits the payloads into a single
// batch of messages per line.
type SplitToBatch struct{}

// ProcessMessage splits messages of a batch by lines and sends them onwards as
// a batch of messages.
func (p SplitToBatch) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
	var splitParts [][]byte
	m.Iter(func(i int, b []byte) error {
		splitParts = append(splitParts, bytes.Split(b, []byte("\n"))...)
		return nil
	})

	return []types.Message{message.New(splitParts)}, nil
}

// ExampleSplitToBatch Demonstrates running a Kafka to Kafka stream where each
// incoming message is parsed as a line delimited blob of payloads and the
// payloads are sent on as a single batch of messages.
func main() {
	conf := NewConfig()

	conf.Input.Type = "kafka"
	conf.Input.Kafka.Addresses = []string{
		"localhost:9092",
	}
	conf.Input.Kafka.Topic = "example_topic_one"

	conf.Output.Type = "kafka"
	conf.Output.Kafka.Addresses = []string{
		"localhost:9092",
	}
	conf.Output.Kafka.Topic = "example_topic_two"

	s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
		return SplitToBatch{}, nil
	}))
	if err != nil {
		panic(err)
	}

	defer s.Stop(time.Second)

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

	// Wait for termination signal
	select {
	case <-sigChan:
		log.Println("Received SIGTERM, the service is closing.")
	}
}
Output:

Example (SplitToMessages)

ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single message per payload.

package main

import (
	"bytes"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Jeffail/benthos/lib/message"
	"github.com/Jeffail/benthos/lib/types"
)

// SplitToMessages is a types.Processor implementation that reads a single
// message containing line delimited payloads and splits the payloads into a
// single message per line.
type SplitToMessages struct{}

// ProcessMessage splits messages of a batch by lines and sends them onwards as
// an individual message per payload.
func (p SplitToMessages) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
	var splitParts [][]byte
	m.Iter(func(i int, b []byte) error {
		splitParts = append(splitParts, bytes.Split(b, []byte("\n"))...)
		return nil
	})

	messages := make([]types.Message, len(splitParts))
	for i, part := range splitParts {
		messages[i] = message.New([][]byte{part})
	}
	return messages, nil
}

// ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where
// each incoming message is parsed as a line delimited blob of payloads and the
// payloads are sent on as a single message per payload.
func main() {
	conf := NewConfig()

	conf.Input.Type = "kafka"
	conf.Input.Kafka.Addresses = []string{
		"localhost:9092",
	}
	conf.Input.Kafka.Topic = "example_topic_one"

	conf.Output.Type = "kafka"
	conf.Output.Kafka.Addresses = []string{
		"localhost:9092",
	}
	conf.Output.Kafka.Topic = "example_topic_two"

	s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
		return SplitToMessages{}, nil
	}))
	if err != nil {
		panic(err)
	}

	defer s.Stop(time.Second)

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

	// Wait for termination signal
	select {
	case <-sigChan:
		log.Println("Received SIGTERM, the service is closing.")
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func OptAddInputPipelines

func OptAddInputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)

OptAddInputPipelines adds additional pipelines that will be constructed for each input of the Benthos stream.

func OptAddOutputPipelines

func OptAddOutputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)

OptAddOutputPipelines adds additional pipelines that will be constructed for each output of the Benthos stream.

func OptAddProcessors

func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)

OptAddProcessors adds additional processors that will be constructed for each logical thread of the processing pipeline layer of the Benthos stream.

func OptOnClose

func OptOnClose(onClose func()) func(*Type)

OptOnClose sets a closure to be called when the stream closes.

func OptSetLogger

func OptSetLogger(log log.Modular) func(*Type)

OptSetLogger sets the logging output to be used by all components of the stream.

func OptSetManager

func OptSetManager(mgr types.Manager) func(*Type)

OptSetManager sets the service manager to be used by all components of the stream.

func OptSetStats

func OptSetStats(stats metrics.Type) func(*Type)

OptSetStats sets the metrics aggregator to be used by all components of the stream.

Types

type Config

type Config struct {
	Input    input.Config    `json:"input" yaml:"input"`
	Buffer   buffer.Config   `json:"buffer" yaml:"buffer"`
	Pipeline pipeline.Config `json:"pipeline" yaml:"pipeline"`
	Output   output.Config   `json:"output" yaml:"output"`
}

Config is a configuration struct for a Benthos stream.

func NewConfig

func NewConfig() Config

NewConfig returns a new configuration with default values.

func (Config) Sanitised

func (c Config) Sanitised() (interface{}, error)

Sanitised returns a sanitised copy of the Benthos configuration, meaning fields of no consequence (unused inputs, outputs, processors etc) are excluded.

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type creates and manages the lifetime of a Benthos stream.

func New

func New(conf Config, opts ...func(*Type)) (*Type, error)

New creates a new stream.Type.

func (*Type) Stop

func (t *Type) Stop(timeout time.Duration) error

Stop attempts to close the stream within the specified timeout period. Initially the attempt is graceful, but as the timeout draws close the attempt becomes progressively less graceful.

Directories

Path Synopsis
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL