benthos

module
v3.65.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2022 License: MIT

README

Benthos

godoc for Jeffail/benthos goreportcard for Jeffail/benthos Build Status Discord invite Docs site

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform hydration, enrichments, transformations and filters on payloads.

It comes with a powerful mapping language, is easy to deploy and monitor, and ready to drop into your pipeline either as a static binary, docker image, or serverless function, making it cloud native as heck.

Benthos is fully declarative, with stream pipelines defined in a single config file, allowing you to specify connectors and a list of processing stages:

input:
  gcp_pubsub:
    project: foo
    subscription: bar

pipeline:
  processors:
    - bloblang: |
        root.message = this
        root.meta.link_count = this.links.length()
        root.user.age = this.user.age.number()

output:
  redis_streams:
    url: tcp://TODO:6379
    stream: baz
    max_in_flight: 20

Delivery Guarantees

Yep, we got 'em. Benthos implements transaction based resiliency with back pressure. When connecting to at-least-once sources and sinks it guarantees at-least-once delivery without needing to persist messages during transit.

Supported Sources & Sinks

Apache Pulsar, AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (Pub/Sub, Cloud storage), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS JetStream, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), Stdin/Stdout, TCP & UDP, sockets and ZMQ4.

Connectors are being added constantly, if something you want is missing then open an issue.

Documentation

If you want to dive fully into Benthos then don't waste your time in this dump, check out the documentation site.

For guidance on how to configure more advanced stream processing concepts such as stream joins, enrichment workflows, etc, check out the cookbooks section.

For guidance on building your own custom plugins in Go check out the public APIs.

Install

Grab a binary for your OS from here. Or use this script:

curl -Lsf https://sh.benthos.dev | bash

Or pull the docker image:

docker pull jeffail/benthos

Benthos can also be installed via Homebrew:

brew install benthos

For more information check out the getting started guide.

Run

benthos -c ./config.yaml

Or, with docker:

# Using a config file
docker run --rm -v /path/to/your/config.yaml:/benthos.yaml jeffail/benthos

# Using a series of -s flags
docker run --rm -p 4195:4195 jeffail/benthos \
  -s "input.type=http_server" \
  -s "output.type=kafka" \
  -s "output.kafka.addresses=kafka-server:9092" \
  -s "output.kafka.topic=benthos_topic"

Monitoring

Health Checks

Benthos serves two HTTP endpoints for health checks:

  • /ping can be used as a liveness probe as it always returns a 200.
  • /ready can be used as a readiness probe as it serves a 200 only when both the input and output are connected, otherwise a 503 is returned.

Metrics

Benthos exposes lots of metrics either to Statsd, Prometheus or for debugging purposes an HTTP endpoint that returns a JSON formatted object.

Tracing

Benthos also emits tracing events to a tracer of your choice (currently only Jaeger is supported) which can be used to visualise the processors within a pipeline.

Configuration

Benthos provides lots of tools for making configuration discovery, debugging and organisation easy. You can read about them here.

Build

Build with Go (1.16 or later):

git clone git@github.com:Jeffail/benthos
cd benthos
make

Lint

Benthos uses golangci-lint for linting, which you can install with:

curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin

And then run it with make lint.

Plugins

It's pretty easy to write your own custom plugins for Benthos in Go, for information check out the API docs, and for inspiration there's an example repo demonstrating a variety of plugin implementations.

Docker Builds

There's a multi-stage Dockerfile for creating a Benthos docker image which results in a minimal image from scratch. You can build it with:

make docker

Then use the image:

docker run --rm \
	-v /path/to/your/benthos.yaml:/config.yaml \
	-v /tmp/data:/data \
	-p 4195:4195 \
	benthos -c /config.yaml

ZMQ4 Support

Benthos supports ZMQ4 for both data input and output. To add this you need to install libzmq4 and use the compile time flag when building Benthos:

make TAGS=ZMQ4

Or to build a docker image using CGO, which includes ZMQ:

make docker-cgo

Contributing

Contributions are welcome, please read the guidelines, come and chat (links are on the community page), and watch your back.

Directories

Path Synopsis
cmd
internal
batch
Package batch contains internal utilities for interacting with message batches.
Package batch contains internal utilities for interacting with message batches.
bloblang/field
Package field implements a bloblang interpolation function templating syntax used in some dynamic fields within Benthos.
Package field implements a bloblang interpolation function templating syntax used in some dynamic fields within Benthos.
bloblang/mapping
Package mapping provides a parser for the full bloblang mapping spec.
Package mapping provides a parser for the full bloblang mapping spec.
bloblang/query
Package query provides a parser for the right-hand side query part of the bloblang spec.
Package query provides a parser for the right-hand side query part of the bloblang spec.
bundle
Package bundle contains singletons referenced throughout the Benthos codebase that allow imported components to add their constructors and documentation to a service.
Package bundle contains singletons referenced throughout the Benthos codebase that allow imported components to add their constructors and documentation to a service.
checkpoint
Package checkpoint implements a mechanism for tracking checkpointed integer offsets for sequential read at-least-once queue systems such as Kafka or Kinesis.
Package checkpoint implements a mechanism for tracking checkpointed integer offsets for sequential read at-least-once queue systems such as Kafka or Kinesis.
docs
Package docs provides useful functions for creating documentation from Benthos components
Package docs provides useful functions for creating documentation from Benthos components
impl/generic
Package generic contains component implementations that do not require external dependencies.
Package generic contains component implementations that do not require external dependencies.
interop
Package interop provides utilities for initializing Benthos components that default to the old APIs, but when the provided manager supports it the new APIs are used.
Package interop provides utilities for initializing Benthos components that default to the old APIs, but when the provided manager supports it the new APIs are used.
interop/plugins
Package plugins provides a way to have the new components for plugins-v2 able to access and "pull" the old style plugins without introducing cyclic dependencies.
Package plugins provides a way to have the new components for plugins-v2 able to access and "pull" the old style plugins without introducing cyclic dependencies.
mqttconf
Package mqtt contains supporting utils shared between MQTT reader and writer.
Package mqtt contains supporting utils shared between MQTT reader and writer.
tracing
Package tracing implements utility functions for interacting with a global tracing system.
Package tracing implements utility functions for interacting with a global tracing system.
xml
Package xml is a temporary way to convert XML to JSON.
Package xml is a temporary way to convert XML to JSON.
lib
api
Package api implements a type used for creating the Benthos HTTP API.
Package api implements a type used for creating the Benthos HTTP API.
bloblang
Package bloblang is DEPRECATED.
Package bloblang is DEPRECATED.
broker
Package broker implements types used for routing inputs to outputs in non-trivial arrangements, such as fan-out or fan-in models.
Package broker implements types used for routing inputs to outputs in non-trivial arrangements, such as fan-out or fan-in models.
buffer
Package buffer is both a types.Consumer and types.Producer implementation that is able to sit between other stream components, effectively decoupling their transaction channels by storing messages in a buffer implementation.
Package buffer is both a types.Consumer and types.Producer implementation that is able to sit between other stream components, effectively decoupling their transaction channels by storing messages in a buffer implementation.
buffer/parallel
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
buffer/single
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
cache
Package cache implements the types.Cache interface for storing and retrieving key/value pairs from a range of storage strategies.
Package cache implements the types.Cache interface for storing and retrieving key/value pairs from a range of storage strategies.
condition
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
config
Package config defines the full configuration structure used by the Benthos service, including methods of normalising and linting user configuration.
Package config defines the full configuration structure used by the Benthos service, including methods of normalising and linting user configuration.
input
Package input defines consumers for aggregating data from a variety of sources.
Package input defines consumers for aggregating data from a variety of sources.
input/reader
Package reader defines implementations of an interface for generic message reading from various third party sources.
Package reader defines implementations of an interface for generic message reading from various third party sources.
log
Package log contains utilities for logging in a modular interface.
Package log contains utilities for logging in a modular interface.
manager
Package manager implements the types.Manager interface used for creating and sharing resources across a Benthos service.
Package manager implements the types.Manager interface used for creating and sharing resources across a Benthos service.
message
Package message contains implementations of types.Message.
Package message contains implementations of types.Message.
message/batch
Package batch provides tooling for creating and executing Benthos message batch policies.
Package batch provides tooling for creating and executing Benthos message batch policies.
message/io
Package io provides tooling for serialising and deserialising messages.
Package io provides tooling for serialising and deserialising messages.
message/mapper
Package mapper implements ways of splicing and mapping batches of messages so that their derivative parts can be processed individually and merged back into the original batch.
Package mapper implements ways of splicing and mapping batches of messages so that their derivative parts can be processed individually and merged back into the original batch.
message/metadata
Package metadata contains implementations of types.Metadata.
Package metadata contains implementations of types.Metadata.
message/roundtrip
Package roundtrip provides tooling for composing request/response behaviour through messages.
Package roundtrip provides tooling for composing request/response behaviour through messages.
message/tracing
Package tracing implements utility functions for recording opentracing events for messages.
Package tracing implements utility functions for recording opentracing events for messages.
metrics
Package metrics contains a type for aggregating and propagating metrics to various services based on configuration.
Package metrics contains a type for aggregating and propagating metrics to various services based on configuration.
output
Package output defines all sinks for sending Benthos messages to a variety of third party destinations.
Package output defines all sinks for sending Benthos messages to a variety of third party destinations.
output/writer
Package writer defines implementations of an interface for generic message writing that outputs to various third party sinks.
Package writer defines implementations of an interface for generic message writing that outputs to various third party sinks.
pipeline
Package pipeline contains structures that implement both the Producer and Consumer interfaces.
Package pipeline contains structures that implement both the Producer and Consumer interfaces.
processor
Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.
Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.
ratelimit
Package ratelimit implements the types.RateLimit interface for limiting access to resources shared service wide.
Package ratelimit implements the types.RateLimit interface for limiting access to resources shared service wide.
response
Package response contains implementations of types.Response.
Package response contains implementations of types.Response.
serverless
Package serverless contains shared components for serverless distributions of Benthos.
Package serverless contains shared components for serverless distributions of Benthos.
serverless/lambda
Package lambda contains the execution logic for running Benthos as an AWS lambda function.
Package lambda contains the execution logic for running Benthos as an AWS lambda function.
service
Package service contains the main execution logic of the Benthos service, which is responsible for parsing commandline flags, running a stream and capturing termination signals.
Package service contains the main execution logic of the Benthos service, which is responsible for parsing commandline flags, running a stream and capturing termination signals.
service/test
Package test implements the Benthos service unit testing command.
Package test implements the Benthos service unit testing command.
stream
Package stream creates and manages a full Benthos stream pipeline, consisting of an input layer of consumers, an optional buffer layer, a processing pipelines layer, and an output layer of producers:
Package stream creates and manages a full Benthos stream pipeline, consisting of an input layer of consumers, an optional buffer layer, a processing pipelines layer, and an output layer of producers:
stream/manager
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.
test/integration
Package integration implements integration tests using docker.
Package integration implements integration tests using docker.
tracer
Package tracer contains components able to send opentracing events.
Package tracer contains components able to send opentracing events.
types
Package types defines any general structs and interfaces used throughout the benthos code base.
Package types defines any general structs and interfaces used throughout the benthos code base.
util/checkpoint
Package checkpoint implements a mechanism for tracking checkpointed integer offsets for sequential read at-least-once queue systems such as Kafka or Kinesis.
Package checkpoint implements a mechanism for tracking checkpointed integer offsets for sequential read at-least-once queue systems such as Kafka or Kinesis.
util/config
Package config contains utilities for reading and parsing service configuration files.
Package config contains utilities for reading and parsing service configuration files.
util/disk
Package disk contains cross platform disk statistics utilities.
Package disk contains cross platform disk statistics utilities.
util/http/auth
Package auth provides configuration fields and implementations of HTTP request authentication strategies.
Package auth provides configuration fields and implementations of HTTP request authentication strategies.
util/retries
Package retries implements backoff strategies around a standard configuration scheme.
Package retries implements backoff strategies around a standard configuration scheme.
util/text
Package text includes utilities for working with text that might contain variables.
Package text includes utilities for working with text that might contain variables.
util/throttle
Package throttle implements throttle strategies.
Package throttle implements throttle strategies.
util/tls
Package tls provides Benthos configuration fields and wrappers for a crypto/tls config.
Package tls provides Benthos configuration fields and wrappers for a crypto/tls config.
public
bloblang
Package bloblang provides high level APIs for registering custom Bloblang plugins, as well as for parsing and executing Bloblang mappings.
Package bloblang provides high level APIs for registering custom Bloblang plugins, as well as for parsing and executing Bloblang mappings.
components/all
Package all imports all component implementations that ship with the open source Benthos repo.
Package all imports all component implementations that ship with the open source Benthos repo.
components/legacy
Package legacy imports old legacy component definitions (and plugins), and also walks them during init in order to register their docs and constructors using the new APIs.
Package legacy imports old legacy component definitions (and plugins), and also walks them during init in order to register their docs and constructors using the new APIs.
service
Package service provides a high level API for registering custom plugin components and executing either a standard Benthos CLI, or programmatically building isolated pipelines with a StreamBuilder API.
Package service provides a high level API for registering custom plugin components and executing either a standard Benthos CLI, or programmatically building isolated pipelines with a StreamBuilder API.
resources
website

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL