connect

module
v0.26.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2018 License: MIT

README

Benthos

godoc for Jeffail/benthos goreportcard for Jeffail/benthos Build Status

Benthos is a high performance and resilient message streaming service, able to connect various sources and sinks and perform arbitrary actions, transformations and filters on payloads. It is easy to deploy and monitor, and ready to drop into your pipeline either as a static binary or a docker image. It can also be used as a framework for building your own resilient stream processors in Go.

A Benthos stream consists of four layers: inputs, optional buffer, processor workers and outputs. Inputs and outputs can be combined in a range of broker patterns. It is possible to run multiple isolated streams within a single Benthos instance using --streams mode, and perform CRUD operations on the running streams via REST endpoints.

Delivery Guarantees

Benthos is crash resilient by default. When connecting to at-least-once sources and sinks without a buffer it guarantees at-least-once delivery without needing to persist messages during transit.

When running a Benthos stream with a buffer there are various options for choosing a level of resiliency that meets your needs.

Supported Sources & Sinks

Documentation

Documentation for Benthos components, concepts and recommendations can be found in the docs directory.

For building your own stream processors using Benthos as a framework check out the stream package, which also includes some examples.

For some applied examples of Benthos such as streaming and deduplicating the Twitter firehose to Kafka check out the cookbook section.

Run

benthos -c ./config.yaml

Or, with docker:

# Send HTTP /POST data to Kafka:
docker run --rm \
	-e "INPUT_TYPE=http_server" \
	-e "OUTPUT_TYPE=kafka" \
	-e "OUTPUT_KAFKA_ADDRESSES=kafka-server:9092" \
	-e "OUTPUT_KAFKA_TOPIC=benthos_topic" \
	-p 4195:4195 \
	jeffail/benthos

# Using your own config file:
docker run --rm -v /path/to/your/config.yaml:/benthos.yaml jeffail/benthos

Metrics

Benthos exposes lots of metrics either to Statsd, Prometheus or for debugging purposes an HTTP endpoint that returns a JSON formatted object.

Configuration

The configuration file for a Benthos stream is made up of four main sections; input, buffer, pipeline, output. If we were to pipe stdin directly to Kafka it would look like this:

input:
  type: stdin
buffer:
  type: none
pipeline:
  threads: 1
  processors: []
output:
  type: kafka
  kafka:
    addresses:
    - localhost:9092
    topic: benthos_stream

There are also sections for setting logging, metrics and HTTP server options.

Benthos provides lots of tools for making configuration discovery and debugging easy. You can read about them here.

You can also find runnable example configs demonstrating each input, output, buffer and processor option here.

Environment Variables

It is possible to select fields inside a configuration file to be set via environment variables. The docker image, for example, is built with a config file where all common fields can be set this way.

Install

Build with Go:

make deps
make

Or, pull the docker image:

docker pull jeffail/benthos

Or, grab a binary for your OS from here.

Docker Builds

There's a multi-stage Dockerfile for creating a Benthos docker image which results in a minimal image from scratch. You can build it with:

make docker

Then use the image:

docker run --rm \
	-v /path/to/your/benthos.yaml:/config.yaml \
	-v /tmp/data:/data \
	-p 4195:4195 \
	benthos -c /config.yaml

There are a few examples here that show you some ways of setting up Benthos containers using docker-compose.

ZMQ4 Support

Benthos supports ZMQ4 for both data input and output. To add this you need to install libzmq4 and use the compile time flag when building Benthos:

make TAGS=ZMQ4

Contributing

Contributions are welcome, please read the guidelines.

Directories

Path Synopsis
cmd
lib
Package lib contains a collection of packages used to create a Benthos stream pipeline.
Package lib contains a collection of packages used to create a Benthos stream pipeline.
api
Package api implements a type used for creating the Benthos HTTP API.
Package api implements a type used for creating the Benthos HTTP API.
broker
Package broker implements types used for routing inputs to outputs in non-trivial arrangements, such as fan-out or fan-in models.
Package broker implements types used for routing inputs to outputs in non-trivial arrangements, such as fan-out or fan-in models.
buffer
Package buffer is both a types.Consumer and types.Producer implementation that is able to sit between other stream components, effectively decoupling their transaction channels by storing messages in a buffer implementation.
Package buffer is both a types.Consumer and types.Producer implementation that is able to sit between other stream components, effectively decoupling their transaction channels by storing messages in a buffer implementation.
buffer/parallel
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
buffer/single
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
cache
Package cache implements the types.Cache interface for storing and retrieving key/value pairs from a range of storage strategies.
Package cache implements the types.Cache interface for storing and retrieving key/value pairs from a range of storage strategies.
input
Package input defines consumers for aggregating data from a variety of sources.
Package input defines consumers for aggregating data from a variety of sources.
input/reader
Package reader defines implementations of an interface for generic message reading from various third party sources.
Package reader defines implementations of an interface for generic message reading from various third party sources.
log
Package log contains utilities for logging in a modular interface.
Package log contains utilities for logging in a modular interface.
manager
Package manager implements the types.Manager interface used for creating and sharing resources across a Benthos service.
Package manager implements the types.Manager interface used for creating and sharing resources across a Benthos service.
message
Package message contains implementations of types.Message.
Package message contains implementations of types.Message.
message/mapper
Package mapper implements ways of splicing and mapping batches of messages so that their derivative parts can be processed individually and merged back into the original batch.
Package mapper implements ways of splicing and mapping batches of messages so that their derivative parts can be processed individually and merged back into the original batch.
message/metadata
Package metadata contains implementations of types.Metadata.
Package metadata contains implementations of types.Metadata.
metrics
Package metrics contains a type for aggregating and propagating metrics to various services based on configuration.
Package metrics contains a type for aggregating and propagating metrics to various services based on configuration.
output
Package output defines all sinks for sending Benthos messages to a variety of third party destinations.
Package output defines all sinks for sending Benthos messages to a variety of third party destinations.
output/writer
Package writer defines implementations of an interface for generic message writing that outputs to various third party sinks.
Package writer defines implementations of an interface for generic message writing that outputs to various third party sinks.
pipeline
Package pipeline contains structures that implement both the Producer and Consumer interfaces.
Package pipeline contains structures that implement both the Producer and Consumer interfaces.
processor
Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.
Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.
processor/condition
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
response
Package response contains implementations of types.Response.
Package response contains implementations of types.Response.
stream
Package stream creates and manages a full Benthos stream pipeline, consisting of an input layer of consumers, an optional buffer layer, a processing pipelines layer, and an output layer of producers:
Package stream creates and manages a full Benthos stream pipeline, consisting of an input layer of consumers, an optional buffer layer, a processing pipelines layer, and an output layer of producers:
stream/manager
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.
test/integration
Package integration implements integration tests using docker.
Package integration implements integration tests using docker.
types
Package types defines any general structs and interfaces used throughout the benthos code base.
Package types defines any general structs and interfaces used throughout the benthos code base.
util/config
Package config contains utilities for reading and parsing service configuration files.
Package config contains utilities for reading and parsing service configuration files.
util/disk
Package disk contains cross platform disk statistics utilities.
Package disk contains cross platform disk statistics utilities.
util/http/auth
Package auth provides configuration fields and implementations of HTTP request authentication strategies.
Package auth provides configuration fields and implementations of HTTP request authentication strategies.
util/service
Package service contains utilities for bootstrapping a server such as config, logging and metrics.
Package service contains utilities for bootstrapping a server such as config, logging and metrics.
util/text
Package text includes utilities for working with text that might contain variables.
Package text includes utilities for working with text that might contain variables.
util/throttle
Package throttle implements throttle strategies.
Package throttle implements throttle strategies.
util/tls
Package tls provides Benthos configuration fields and wrappers for a crypto/tls config.
Package tls provides Benthos configuration fields and wrappers for a crypto/tls config.
public
bundle/free Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL