stream

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2018 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package stream creates and manages a full Benthos logical stream, consisting of an input layer of sources, a buffer layer, a processing pipelines layer, and an output layer of sinks.

Inputs -> Buffer -> Processing Pipelines -> Outputs

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OptAddInputPipelines

func OptAddInputPipelines(pipes ...pipeline.ConstructorFunc) func(*Type)

OptAddInputPipelines adds additional pipelines that will be constructed for each input of the Benthos stream.

func OptAddOutputPipelines

func OptAddOutputPipelines(pipes ...pipeline.ConstructorFunc) func(*Type)

OptAddOutputPipelines adds additional pipelines that will be constructed for each output of the Benthos stream.

func OptAddProcessors

func OptAddProcessors(procs ...pipeline.ProcConstructorFunc) func(*Type)

OptAddProcessors adds additional processors that will be constructed for each logical thread of the processing pipeline layer of the Benthos stream.

func OptOnClose

func OptOnClose(onClose func()) func(*Type)

OptOnClose sets a closure to be called when the stream closes.

func OptSetLogger

func OptSetLogger(log log.Modular) func(*Type)

OptSetLogger sets the logging output to be used by all components of the stream.

func OptSetManager

func OptSetManager(mgr types.Manager) func(*Type)

OptSetManager sets the service manager to be used by all components of the stream.

func OptSetStats

func OptSetStats(stats metrics.Type) func(*Type)

OptSetStats sets the metrics aggregator to be used by all components of the stream.

Types

type Config

type Config struct {
	Input    input.Config    `json:"input" yaml:"input"`
	Buffer   buffer.Config   `json:"buffer" yaml:"buffer"`
	Pipeline pipeline.Config `json:"pipeline" yaml:"pipeline"`
	Output   output.Config   `json:"output" yaml:"output"`
}

Config is a configuration struct for a Benthos stream.

func NewConfig

func NewConfig() Config

NewConfig returns a new configuration with default values.

func (Config) Sanitised

func (c Config) Sanitised() (interface{}, error)

Sanitised returns a sanitised copy of the Benthos configuration, meaning fields of no consequence (unused inputs, outputs, processors etc) are excluded.

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type creates and manages the lifetime of a Benthos stream.

func New

func New(conf Config, opts ...func(*Type)) (*Type, error)

New creates a new stream.Type.

func (*Type) Stop

func (t *Type) Stop(timeout time.Duration) error

Stop attempts to close the stream within the specified timeout period. Initially the attempt is graceful, but as the timeout draws close the attempt becomes progressively less graceful.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL