service

package
v4.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2023 License: MIT Imports: 59 Imported by: 0

Documentation

Overview

Package service provides a high level API for registering custom plugin components and executing either a standard Benthos CLI, or programmatically building isolated pipelines with a StreamBuilder API.

For a video guide on Benthos plugins check out: https://youtu.be/uH6mKw-Ly0g And an example repo containing component plugins and tests can be found at: https://github.com/benthosdev/benthos-plugin-example

In order to add custom Bloblang functions and methods use the ./public/bloblang package.

Example (BufferPlugin)

This example demonstrates how to create a buffer plugin. Buffers are an advanced component type that most plugin authors aren't likely to require.

package main

import (
	"context"
	"sync"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

type memoryBuffer struct {
	messages       chan service.MessageBatch
	endOfInputChan chan struct{}
	closeOnce      sync.Once
}

func newMemoryBuffer(n int) *memoryBuffer {
	return &memoryBuffer{
		messages:       make(chan service.MessageBatch, n),
		endOfInputChan: make(chan struct{}),
	}
}

func (m *memoryBuffer) WriteBatch(ctx context.Context, batch service.MessageBatch, aFn service.AckFunc) error {
	select {
	case m.messages <- batch:
	case <-ctx.Done():
		return ctx.Err()
	}
	// We weaken delivery guarantees here by acknowledging receipt of our batch
	// immediately.
	return aFn(ctx, nil)
}

func yoloIgnoreNacks(context.Context, error) error {
	// YOLO: Drop messages that are nacked
	return nil
}

func (m *memoryBuffer) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
	select {
	case msg := <-m.messages:
		return msg, yoloIgnoreNacks, nil
	case <-ctx.Done():
		return nil, nil, ctx.Err()
	case <-m.endOfInputChan:
		// Input has ended, so return ErrEndOfBuffer if our buffer is empty.
		select {
		case msg := <-m.messages:
			return msg, yoloIgnoreNacks, nil
		default:
			return nil, nil, service.ErrEndOfBuffer
		}
	}
}

func (m *memoryBuffer) EndOfInput() {
	m.closeOnce.Do(func() {
		close(m.endOfInputChan)
	})
}

func (m *memoryBuffer) Close(ctx context.Context) error {
	// Nothing to clean up
	return nil
}

// This example demonstrates how to create a buffer plugin. Buffers are an
// advanced component type that most plugin authors aren't likely to require.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("Creates a lame memory buffer that loses data on forced restarts or service crashes.").
		Field(service.NewIntField("max_batches").Default(100))

	err := service.RegisterBatchBuffer("lame_memory", configSpec,
		func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchBuffer, error) {
			capacity, err := conf.FieldInt("max_batches")
			if err != nil {
				return nil, err
			}
			return newMemoryBuffer(capacity), nil
		})
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (CachePlugin)

This example demonstrates how to create a cache plugin, where the implementation of the cache (type `LossyCache`) also contains fields that should be parsed within the Benthos config.

package main

import (
	"context"
	"time"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

// LossyCache is a terrible cache example and silently drops items when the
// capacity is reached. It also doesn't respect TTLs.
type LossyCache struct {
	capacity int
	mDropped *service.MetricCounter
	items    map[string][]byte
}

func (l *LossyCache) Get(ctx context.Context, key string) ([]byte, error) {
	if b, ok := l.items[key]; ok {
		return b, nil
	}
	return nil, service.ErrKeyNotFound
}

func (l *LossyCache) Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error {
	if len(l.items) >= l.capacity {
		// Dropped, whoopsie!
		l.mDropped.Incr(1)
		return nil
	}
	l.items[key] = value
	return nil
}

func (l *LossyCache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error {
	if _, exists := l.items[key]; exists {
		return service.ErrKeyAlreadyExists
	}
	if len(l.items) >= l.capacity {
		// Dropped, whoopsie!
		l.mDropped.Incr(1)
		return nil
	}
	l.items[key] = value
	return nil
}

func (l *LossyCache) Delete(ctx context.Context, key string) error {
	delete(l.items, key)
	return nil
}

func (l *LossyCache) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a cache plugin, where the
// implementation of the cache (type `LossyCache`) also contains fields that
// should be parsed within the Benthos config.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("Creates a terrible cache with a fixed capacity.").
		Field(service.NewIntField("capacity").Default(100))

	err := service.RegisterCache("lossy", configSpec,
		func(conf *service.ParsedConfig, mgr *service.Resources) (service.Cache, error) {
			capacity, err := conf.FieldInt("capacity")
			if err != nil {
				return nil, err
			}
			return &LossyCache{
				capacity: capacity,
				mDropped: mgr.Metrics().NewCounter("dropped_just_cus"),
				items:    make(map[string][]byte, capacity),
			}, nil
		})
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (InputPlugin)

This example demonstrates how to create an input plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.

package main

import (
	"context"
	"math/rand"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

type GibberishInput struct {
	length int
}

func (g *GibberishInput) Connect(ctx context.Context) error {
	return nil
}

func (g *GibberishInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
	b := make([]byte, g.length)
	for k := range b {
		b[k] = byte((rand.Int() % 94) + 32)
	}
	return service.NewMessage(b), func(ctx context.Context, err error) error {
		// A nack (when err is non-nil) is handled automatically when we
		// construct using service.AutoRetryNacks, so we don't need to handle
		// nacks here.
		return nil
	}, nil
}

func (g *GibberishInput) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create an input plugin, which is configured
// by providing a struct containing the fields to be parsed from within the
// Benthos configuration.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("Creates a load of gibberish, putting us all out of work.").
		Field(service.NewIntField("length").Default(100))

	constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
		length, err := conf.FieldInt("length")
		if err != nil {
			return nil, err
		}
		return service.AutoRetryNacks(&GibberishInput{length}), nil
	}

	err := service.RegisterInput("gibberish", configSpec, constructor)
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (OutputBatchedPlugin)

This example demonstrates how to create a batched output plugin, which allows us to specify a batching mechanism and implement an interface that writes a batch of messages in one call.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

type batchOfJSONWriter struct{}

func (b *batchOfJSONWriter) Connect(ctx context.Context) error {
	return nil
}

func (b *batchOfJSONWriter) WriteBatch(ctx context.Context, msgs service.MessageBatch) error {
	var messageObjs []any
	for _, msg := range msgs {
		msgObj, err := msg.AsStructured()
		if err != nil {
			return err
		}
		messageObjs = append(messageObjs, msgObj)
	}
	outBytes, err := json.Marshal(map[string]any{
		"count":   len(msgs),
		"objects": messageObjs,
	})
	if err != nil {
		return err
	}
	fmt.Println(string(outBytes))
	return nil
}

func (b *batchOfJSONWriter) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a batched output plugin, which allows
// us to specify a batching mechanism and implement an interface that writes a
// batch of messages in one call.
func main() {
	spec := service.NewConfigSpec().
		Field(service.NewBatchPolicyField("batching"))

	// Register our new output, which doesn't require a config schema.
	err := service.RegisterBatchOutput(
		"batched_json_stdout", spec,
		func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, policy service.BatchPolicy, maxInFlight int, err error) {
			if policy, err = conf.FieldBatchPolicy("batching"); err != nil {
				return
			}
			maxInFlight = 1
			out = &batchOfJSONWriter{}
			return
		})
	if err != nil {
		panic(err)
	}

	// Use the stream builder API to create a Benthos stream that uses our new
	// output type.
	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err = builder.SetYAML(`
input:
  generate:
    count: 5
    interval: 1ms
    mapping: |
      root.id = count("batched output example messages")
      root.text = "some stuff"

output:
  batched_json_stdout:
    batching:
      count: 5
`)
	if err != nil {
		panic(err)
	}

	// Build a stream with our configured components.
	stream, err := builder.Build()
	if err != nil {
		panic(err)
	}

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	if err = stream.Run(context.Background()); err != nil {
		panic(err)
	}

}
Output:

{"count":5,"objects":[{"id":1,"text":"some stuff"},{"id":2,"text":"some stuff"},{"id":3,"text":"some stuff"},{"id":4,"text":"some stuff"},{"id":5,"text":"some stuff"}]}
Example (OutputPlugin)

This example demonstrates how to create an output plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.

package main

import (
	"context"
	"fmt"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

type BlueOutput struct{}

func (b *BlueOutput) Connect(ctx context.Context) error {
	return nil
}

func (b *BlueOutput) Write(ctx context.Context, msg *service.Message) error {
	content, err := msg.AsBytes()
	if err != nil {
		return err
	}
	fmt.Printf("\033[01;34m%s\033[m\n", content)
	return nil
}

func (b *BlueOutput) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create an output plugin. This example is for
// an implementation that does not require any configuration parameters, and
// therefore doesn't defined any within the configuration specification.
func main() {
	// Register our new output, which doesn't require a config schema.
	err := service.RegisterOutput(
		"blue_stdout", service.NewConfigSpec(),
		func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) {
			return &BlueOutput{}, 1, nil
		})
	if err != nil {
		panic(err)
	}

	// Use the stream builder API to create a Benthos stream that uses our new
	// output type.
	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err = builder.SetYAML(`
input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world"'

output:
  blue_stdout: {}
`)
	if err != nil {
		panic(err)
	}

	// Build a stream with our configured components.
	stream, err := builder.Build()
	if err != nil {
		panic(err)
	}

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	if err = stream.Run(context.Background()); err != nil {
		panic(err)
	}

}
Output:

�[01;34mhello world�[m
Example (ProcessorPlugin)

This example demonstrates how to create a processor plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.

package main

import (
	"bytes"
	"context"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/io"
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

type ReverseProcessor struct {
	logger *service.Logger
}

func (r *ReverseProcessor) Process(ctx context.Context, m *service.Message) (service.MessageBatch, error) {
	bytesContent, err := m.AsBytes()
	if err != nil {
		return nil, err
	}

	newBytes := make([]byte, len(bytesContent))
	for i, b := range bytesContent {
		newBytes[len(newBytes)-i-1] = b
	}

	if bytes.Equal(newBytes, bytesContent) {
		r.logger.Infof("Woah! This is like totally a palindrome: %s", bytesContent)
	}

	m.SetBytes(newBytes)
	return []*service.Message{m}, nil
}

func (r *ReverseProcessor) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a processor plugin. This example is
// for an implementation that does not require any configuration parameters, and
// therefore doesn't defined any within the configuration specification.
func main() {
	// Register our new processor, which doesn't require a config schema.
	err := service.RegisterProcessor(
		"reverse", service.NewConfigSpec(),
		func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
			return &ReverseProcessor{logger: mgr.Logger()}, nil
		})
	if err != nil {
		panic(err)
	}

	// Build a Benthos stream that uses our new output type.
	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err = builder.SetYAML(`
input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world"'

pipeline:
  processors:
    - reverse: {}

output:
  stdout: {}
`)
	if err != nil {
		panic(err)
	}

	// Build a stream with our configured components.
	stream, err := builder.Build()
	if err != nil {
		panic(err)
	}

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	if err = stream.Run(context.Background()); err != nil {
		panic(err)
	}

}
Output:

dlrow olleh
Example (RateLimitPlugin)

This example demonstrates how to create a rate limit plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

type RandomRateLimit struct {
	max time.Duration
}

func (r *RandomRateLimit) Access(context.Context) (time.Duration, error) {
	return time.Duration(rand.Int() % int(r.max)), nil
}

func (r *RandomRateLimit) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a rate limit plugin, which is
// configured by providing a struct containing the fields to be parsed from
// within the Benthos configuration.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("A rate limit that's pretty much just random.").
		Description("I guess this isn't really that useful, sorry.").
		Field(service.NewStringField("maximum_duration").Default("1s"))

	constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.RateLimit, error) {
		maxDurStr, err := conf.FieldString("maximum_duration")
		if err != nil {
			return nil, err
		}
		maxDuration, err := time.ParseDuration(maxDurStr)
		if err != nil {
			return nil, fmt.Errorf("invalid max duration: %w", err)
		}
		return &RandomRateLimit{maxDuration}, nil
	}

	err := service.RegisterRateLimit("random", configSpec, constructor)
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (StreamBuilderConfig)

This example demonstrates how to use a stream builder to parse and execute a full Benthos config.

package main

import (
	"context"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/io"
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err := builder.SetYAML(`
input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world"'

  processors:
    - mapping: 'root = content().uppercase()'

output:
  stdout: {}

logger:
  level: none
`)
	panicOnErr(err)

	// Build a stream with our configured components.
	stream, err := builder.Build()
	panicOnErr(err)

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	err = stream.Run(context.Background())
	panicOnErr(err)

}
Output:

HELLO WORLD
Example (StreamBuilderConfigAddMethods)

This example demonstrates how to use a stream builder to assemble a stream of Benthos components by adding snippets of configs for different component types, and then execute it. You can use the Add methods to append any number of components to the stream, following fan in and fan out patterns for inputs and outputs respectively.

package main

import (
	"context"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/io"
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	builder := service.NewStreamBuilder()

	err := builder.AddInputYAML(`
generate:
  count: 1
  interval: 1ms
  mapping: 'root = "hello world"'
`)
	panicOnErr(err)

	err = builder.AddProcessorYAML(`mapping: 'root = content().uppercase()'`)
	panicOnErr(err)

	err = builder.AddOutputYAML(`stdout: {}`)
	panicOnErr(err)

	err = builder.SetLoggerYAML(`level: off`)
	panicOnErr(err)

	// Build a stream with our configured components.
	stream, err := builder.Build()
	panicOnErr(err)

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	err = stream.Run(context.Background())
	panicOnErr(err)

}
Output:

HELLO WORLD
Example (StreamBuilderMultipleStreams)

This example demonstrates using the stream builder API to create and run two independent streams.

package main

import (
	"context"
	"sync"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/io"
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	// Build the first stream pipeline. Note that we configure each pipeline
	// with its HTTP server disabled as otherwise we would see a port collision
	// when they both attempt to bind to the default address `0.0.0.0:4195`.
	//
	// Alternatively, we could choose to configure each with their own address
	// with the field `http.address`, or we could call `SetHTTPMux` on the
	// builder in order to explicitly override the configured server.
	builderOne := service.NewStreamBuilder()

	err := builderOne.SetYAML(`
http:
  enabled: false

input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world one"'

  processors:
    - mapping: 'root = content().uppercase()'

output:
  stdout: {}
`)
	panicOnErr(err)

	streamOne, err := builderOne.Build()
	panicOnErr(err)

	builderTwo := service.NewStreamBuilder()

	err = builderTwo.SetYAML(`
http:
  enabled: false

input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world two"'

  processors:
    - sleep:
        duration: 500ms
    - mapping: 'root = content().capitalize()'

output:
  stdout: {}
`)
	panicOnErr(err)

	streamTwo, err := builderTwo.Build()
	panicOnErr(err)

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		panicOnErr(streamOne.Run(context.Background()))
	}()
	go func() {
		defer wg.Done()
		panicOnErr(streamTwo.Run(context.Background()))
	}()

	wg.Wait()

}
Output:

HELLO WORLD ONE
Hello World Two
Example (StreamBuilderPush)

This example demonstrates how to use a stream builder to assemble a processing pipeline that you can push messages into and extract via closures.

package main

import (
	"bytes"
	"context"
	"fmt"
	"time"

	"github.com/dafanshu/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/dafanshu/benthos/v4/public/components/io"
	_ "github.com/dafanshu/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	builder := service.NewStreamBuilder()
	err := builder.SetLoggerYAML(`level: off`)
	panicOnErr(err)

	err = builder.AddProcessorYAML(`mapping: 'root = content().uppercase()'`)
	panicOnErr(err)

	err = builder.AddProcessorYAML(`mapping: 'root = "check this out: " + content()'`)
	panicOnErr(err)

	// Obtain a closure func that allows us to push data into the stream, this
	// is treated like any other input, which also means it's possible to use
	// this along with regular configured inputs.
	sendFn, err := builder.AddProducerFunc()
	panicOnErr(err)

	// Define a closure func that receives messages as an output of the stream.
	// It's also possible to use this along with regular configured outputs.
	var outputBuf bytes.Buffer
	err = builder.AddConsumerFunc(func(c context.Context, m *service.Message) error {
		msgBytes, err := m.AsBytes()
		if err != nil {
			return err
		}

		_, err = fmt.Fprintf(&outputBuf, "received: %s\n", msgBytes)
		return err
	})
	panicOnErr(err)

	stream, err := builder.Build()
	panicOnErr(err)

	go func() {
		perr := sendFn(context.Background(), service.NewMessage([]byte("hello world")))
		panicOnErr(perr)

		perr = sendFn(context.Background(), service.NewMessage([]byte("I'm pushing data into the stream")))
		panicOnErr(perr)

		perr = stream.StopWithin(time.Second)
		panicOnErr(perr)
	}()

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	err = stream.Run(context.Background())
	panicOnErr(err)

	fmt.Println(outputBuf.String())

}
Output:

received: check this out: HELLO WORLD
received: check this out: I'M PUSHING DATA INTO THE STREAM

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrKeyAlreadyExists = errors.New("key already exists")
	ErrKeyNotFound      = errors.New("key does not exist")
)

Errors returned by cache types.

View Source
var (
	// ErrNotConnected is returned by inputs and outputs when their Read or
	// Write methods are called and the connection that they maintain is lost.
	// This error prompts the upstream component to call Connect until the
	// connection is re-established.
	ErrNotConnected = errors.New("not connected")

	// ErrEndOfInput is returned by inputs that have exhausted their source of
	// data to the point where subsequent Read calls will be ineffective. This
	// error prompts the upstream component to gracefully terminate the
	// pipeline.
	ErrEndOfInput = errors.New("end of input")

	// ErrEndOfBuffer is returned by a buffer Read/ReadBatch method when the
	// contents of the buffer has been emptied and the source of the data is
	// ended (as indicated by EndOfInput). This error prompts the upstream
	// component to gracefully terminate the pipeline.
	ErrEndOfBuffer = errors.New("end of buffer")
)

Functions

func RegisterBatchBuffer

func RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error

RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.

Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.

func RegisterBatchInput

func RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error

RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.

func RegisterBatchOutput

func RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error

RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).

If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.

func RegisterBatchProcessor

func RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error

RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

func RegisterCache

func RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error

RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.

func RegisterInput

func RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error

RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.

func RegisterMetricsExporter

func RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error

RegisterMetricsExporter attempts to register a new metrics exporter plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.

func RegisterOtelTracerProvider

func RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error

RegisterOtelTracerProvider attempts to register a new open telemetry tracer provider plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

func RegisterOutput

func RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error

RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

func RegisterProcessor

func RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error

RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

For simple transformations consider implementing a Bloblang plugin method instead.

func RegisterRateLimit

func RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error

RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.

func RunCLI

func RunCLI(ctx context.Context)

RunCLI executes Benthos as a CLI, allowing users to specify a configuration file path(s) and execute subcommands for linting configs, testing configs, etc. This is how a standard distribution of Benthos operates.

This call blocks until either:

1. The service shuts down gracefully due to the inputs closing 2. A termination signal is received 3. The provided context is cancelled

This function must only be called once during the entire lifecycle of your program, as it interacts with singleton state. In order to manage multiple Benthos stream lifecycles in a program use the StreamBuilder API instead.

func XFormatConfigJSON

func XFormatConfigJSON() ([]byte, error)

XFormatConfigJSON returns a byte slice of the Benthos configuration spec formatted as a JSON object. The schema of this method is undocumented and is not intended for general use.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

Types

type AckFunc

type AckFunc func(ctx context.Context, err error) error

AckFunc is a common function returned by inputs that must be called once for each message consumed. This function ensures that the source of the message receives either an acknowledgement (err is nil) or an error that can either be propagated upstream as a nack, or trigger a reattempt at delivering the same message.

If your input implementation doesn't have a specific mechanism for dealing with a nack then you can wrap your input implementation with AutoRetryNacks to get automatic retries.

type BatchBuffer

type BatchBuffer interface {
	// Write a batch of messages to the buffer, the batch is accompanied with an
	// acknowledge function. A non-nil error should be returned if it is not
	// possible to store the given message batch in the buffer.
	//
	// If a nil error is returned the buffer assumes responsibility for calling
	// the acknowledge function at least once during the lifetime of the
	// message.
	//
	// This could be at the point where the message is written to the buffer,
	// which weakens delivery guarantees but can be useful for decoupling the
	// input from downstream components. Alternatively, this could be when the
	// associated batch has been read from the buffer and acknowledged
	// downstream, which preserves delivery guarantees.
	WriteBatch(context.Context, MessageBatch, AckFunc) error

	// Read a batch of messages from the buffer. This call should block until
	// either a batch is ready to consume, the provided context is cancelled or
	// EndOfInput has been called which indicates that the buffer is no longer
	// being populated with new messages.
	//
	// The returned acknowledge function will be called when a consumed message
	// batch has been processed and sent downstream. It is up to the buffer
	// implementation whether the ack function is used, it might be used in
	// order to "commit" the removal of a message from the buffer in cases where
	// the buffer is a persisted storage solution, or in cases where the output
	// of the buffer is temporal (a windowing algorithm, etc) it might be
	// considered correct to simply drop message batches that are not acked.
	//
	// When the buffer is closed (EndOfInput has been called and no more
	// messages are available) this method should return an ErrEndOfBuffer in
	// order to indicate the end of the buffered stream.
	//
	// It is valid to return a batch of only one message.
	ReadBatch(context.Context) (MessageBatch, AckFunc, error)

	// EndOfInput indicates to the buffer that the input has ended and that once
	// the buffer is depleted it should return ErrEndOfBuffer from ReadBatch in
	// order to gracefully shut down the pipeline.
	//
	// EndOfInput should be idempotent as it may be called more than once.
	EndOfInput()

	Closer
}

BatchBuffer is an interface implemented by Buffers able to read and write message batches. Buffers are a component type that are placed after inputs, and decouples the acknowledgement system of the inputs from the rest of the pipeline.

Buffers are useful when implementing buffers intended to relieve back pressure from upstream components, or when implementing message aggregators where the concept of discrete messages running through a pipeline no longer applies (such as with windowing algorithms).

Buffers are advanced component types that weaken delivery guarantees of a Benthos pipeline. Therefore, if you aren't absolutely sure that a component you wish to build should be a buffer type then it likely shouldn't be.

type BatchBufferConstructor

type BatchBufferConstructor func(conf *ParsedConfig, mgr *Resources) (BatchBuffer, error)

BatchBufferConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a buffer based on the config, or an error.

Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.

type BatchError

type BatchError struct {
	// contains filtered or unexported fields
}

BatchError groups the errors that were encountered while processing a collection (usually a batch) of messages and provides methods to iterate over these errors.

func NewBatchError

func NewBatchError(b MessageBatch, headline error) *BatchError

NewBatchError creates a BatchError that can be returned by batched outputs. The advantage of doing so is that nacks and retries can potentially be granularly dealt out in cases where only a subset of the batch failed.

A headline error must be supplied which will be exposed when upstream components do not support granular batch errors.

func (*BatchError) Error

func (err *BatchError) Error() string

Error returns the underlying error message

func (*BatchError) Failed

func (err *BatchError) Failed(i int, merr error) *BatchError

Failed stores an error state for a particular message of a batch. Returns a pointer to the underlying error, allowing the method to be chained.

If Failed is not called then all messages are assumed to have failed. If it is called at least once then all message indexes that aren't explicitly failed are assumed to have been processed successfully.

func (*BatchError) IndexedErrors

func (err *BatchError) IndexedErrors() int

IndexedErrors returns the number of indexed errors that have been registered within a walkable error.

func (*BatchError) Unwrap

func (err *BatchError) Unwrap() error

func (*BatchError) WalkMessages

func (err *BatchError) WalkMessages(fn func(int, *Message, error) bool)

WalkMessages applies a closure to each message that was part of the request that caused this error. The closure is provided the message index, a pointer to the message, and its individual error, which may be nil if the message itself was processed successfully. The closure should return a bool which indicates whether the iteration should be continued.

type BatchInput

type BatchInput interface {
	// Establish a connection to the upstream service. Connect will always be
	// called first when a reader is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// The provided context remains open only for the duration of the connecting
	// phase, and should not be used to establish the lifetime of the connection
	// itself.
	//
	// Once Connect returns a nil error the Read method will be called until
	// either ErrNotConnected is returned, or the reader is closed.
	Connect(context.Context) error

	// Read a message batch from a source, along with a function to be called
	// once the entire batch can be either acked (successfully sent or
	// intentionally filtered) or nacked (failed to be processed or dispatched
	// to the output).
	//
	// The AckFunc will be called for every message batch at least once, but
	// there are no guarantees as to when this will occur. If your input
	// implementation doesn't have a specific mechanism for dealing with a nack
	// then you can wrap your input implementation with AutoRetryNacksBatched to
	// get automatic retries.
	//
	// If this method returns ErrNotConnected then ReadBatch will not be called
	// again until Connect has returned a nil error. If ErrEndOfInput is
	// returned then Read will no longer be called and the pipeline will
	// gracefully terminate.
	ReadBatch(context.Context) (MessageBatch, AckFunc, error)

	Closer
}

BatchInput is an interface implemented by Benthos inputs that produce messages in batches, where there is a desire to process and send the batch as a logical group rather than as individual messages.

Calls to ReadBatch should block until either a message batch is ready to process, the connection is lost, or the provided context is cancelled.

func AutoRetryNacksBatched

func AutoRetryNacksBatched(i BatchInput) BatchInput

AutoRetryNacksBatched wraps a batched input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.

When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.

func InputBatchedWithMaxInFlight

func InputBatchedWithMaxInFlight(n int, i BatchInput) BatchInput

InputBatchedWithMaxInFlight wraps a batched input with a component that limits the number of batches being processed at a given time. When the limit is reached a new message batch will not be consumed until an ack/nack has been returned.

type BatchInputConstructor

type BatchInputConstructor func(conf *ParsedConfig, mgr *Resources) (BatchInput, error)

BatchInputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a batched reader based on the config, or an error.

type BatchOutput

type BatchOutput interface {
	// Establish a connection to the downstream service. Connect will always be
	// called first when a writer is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// Once Connect returns a nil error the write method will be called until
	// either ErrNotConnected is returned, or the writer is closed.
	Connect(context.Context) error

	// Write a batch of messages to a sink, or return an error if delivery is
	// not possible.
	//
	// If this method returns ErrNotConnected then write will not be called
	// again until Connect has returned a nil error.
	WriteBatch(context.Context, MessageBatch) error

	Closer
}

BatchOutput is an interface implemented by Benthos outputs that require Benthos to batch messages before dispatch in order to improve throughput. Each call to WriteBatch should block until either all messages in the batch have been successfully or unsuccessfully sent, or the context is cancelled.

Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel batched write calls the output supports.

type BatchOutputConstructor

type BatchOutputConstructor func(conf *ParsedConfig, mgr *Resources) (out BatchOutput, batchPolicy BatchPolicy, maxInFlight int, err error)

BatchOutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config, a batching policy, and a maximum number of in-flight message batches to allow, or an error.

type BatchPolicy

type BatchPolicy struct {
	ByteSize int
	Count    int
	Check    string
	Period   string
	// contains filtered or unexported fields
}

BatchPolicy describes the mechanisms by which batching should be performed of messages destined for a Batch output. This is returned by constructors of batch outputs.

func (BatchPolicy) NewBatcher

func (b BatchPolicy) NewBatcher(res *Resources) (*Batcher, error)

NewBatcher creates a batching mechanism from the policy.

type BatchProcessor

type BatchProcessor interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed. If zero messages are
	// returned and the error is nil then all messages are filtered.
	//
	// The provided MessageBatch should NOT be modified, in order to return a
	// mutated batch a copy of the slice should be created instead.
	//
	// When an error is returned all of the input messages will continue down
	// the pipeline but will be marked with the error with *message.SetError,
	// and metrics and logs will be emitted.
	//
	// In order to add errors to individual messages of the batch for downstream
	// handling use *message.SetError(err) and return it in the resulting batch
	// with a nil error.
	//
	// The Message types returned MUST be derived from the provided messages,
	// and CANNOT be custom implementations of Message. In order to copy the
	// provided messages use the Copy method.
	ProcessBatch(context.Context, MessageBatch) ([]MessageBatch, error)

	Closer
}

BatchProcessor is a Benthos processor implementation that works against batches of messages, which allows windowed processing.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.

type BatchProcessorConstructor

type BatchProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (BatchProcessor, error)

BatchProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher provides a batching mechanism where messages can be added one-by-one with a boolean return indicating whether the batching policy has been triggered.

Upon triggering the policy it is the responsibility of the owner of this batcher to call Flush, which returns all the pending messages in the batch.

This batcher may contain processors that are executed during the flush, therefore it is important to call Close when this batcher is no longer required, having also called Flush if appropriate.

func (*Batcher) Add

func (b *Batcher) Add(msg *Message) bool

Add a message to the batch. Returns true if the batching policy has been triggered by this new addition, in which case Flush should be called.

func (*Batcher) Close

func (b *Batcher) Close(ctx context.Context) error

Close the batching policy, which cleans up any resources used by batching processors.

func (*Batcher) Flush

func (b *Batcher) Flush(ctx context.Context) (batch MessageBatch, err error)

Flush pending messages into a batch, apply any batching processors that are part of the batching policy, and then return the result.

func (*Batcher) UntilNext

func (b *Batcher) UntilNext() (time.Duration, bool)

UntilNext returns a duration indicating how long until the current batch should be flushed due to a configured period. A boolean is also returned indicating whether the batching policy has a timed factor, if this is false then the duration returned should be ignored.

type Cache

type Cache interface {
	// Get a cache item.
	Get(ctx context.Context, key string) ([]byte, error)

	// Set a cache item, specifying an optional TTL. It is okay for caches to
	// ignore the ttl parameter if it isn't possible to implement.
	Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error

	// Add is the same operation as Set except that it returns an error if the
	// key already exists. It is okay for caches to return nil on duplicates if
	// it isn't possible to implement.
	Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error

	// Delete attempts to remove a key. If the key does not exist then it is
	// considered correct to return an error, however, for cache implementations
	// where it is difficult to determine this then it is acceptable to return
	// nil.
	Delete(ctx context.Context, key string) error

	Closer
}

Cache is an interface implemented by Benthos caches.

type CacheConstructor

type CacheConstructor func(conf *ParsedConfig, mgr *Resources) (Cache, error)

CacheConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a cache based on the config, or an error.

type CacheItem

type CacheItem struct {
	Key   string
	Value []byte
	TTL   *time.Duration
}

CacheItem represents an individual cache item.

type Closer

type Closer interface {
	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

Closer is implemented by components that support stopping and cleaning up their underlying resources.

type ConfigField

type ConfigField struct {
	// contains filtered or unexported fields
}

ConfigField describes a field within a component configuration, to be added to a ConfigSpec.

func NewAnyField

func NewAnyField(name string) *ConfigField

NewAnyField describes a new config field that can assume any value type without triggering a config parse or linting error.

func NewAnyListField

func NewAnyListField(name string) *ConfigField

NewAnyListField describes a new config field consisting of a list of values that can assume any value type without triggering a config parse or linting error.

func NewAnyMapField

func NewAnyMapField(name string) *ConfigField

NewAnyMapField describes a new config field consisting of a map of values that can assume any value type without triggering a config parse or linting error.

func NewBackOffField

func NewBackOffField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField

NewBackOffField defines a new object type config field that describes an exponential back off policy, often used for timing retry attempts. It is then possible to extract a *backoff.ExponentialBackOff from the resulting parsed config with the method FieldBackOff.

It is possible to configure a back off policy that has no upper bound (no maximum elapsed time set). In cases where this would be problematic the field allowUnbounded should be set `false` in order to add linting rules that ensure an upper bound is set.

The defaults struct is optional, and if provided will be used to establish default values for time interval fields. Otherwise the chosen defaults result in one minute of retry attempts, starting at 500ms intervals.

func NewBackOffToggledField

func NewBackOffToggledField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField

NewBackOffToggledField defines a new object type config field that describes an exponential back off policy, often used for timing retry attempts. It is then possible to extract a *backoff.ExponentialBackOff from the resulting parsed config with the method FieldBackOff. This Toggled variant includes a field `enabled` that is `false` by default.

It is possible to configure a back off policy that has no upper bound (no maximum elapsed time set). In cases where this would be problematic the field allowUnbounded should be set `false` in order to add linting rules that ensure an upper bound is set.

The defaults struct is optional, and if provided will be used to establish default values for time interval fields. Otherwise the chosen defaults result in one minute of retry attempts, starting at 500ms intervals.

func NewBatchPolicyField

func NewBatchPolicyField(name string) *ConfigField

NewBatchPolicyField defines a new object type config field that describes a batching policy for batched outputs. It is then possible to extract a BatchPolicy from the resulting parsed config with the method FieldBatchPolicy.

func NewBloblangField

func NewBloblangField(name string) *ConfigField

NewBloblangField defines a new config field that describes a Bloblang mapping string. It is then possible to extract a *bloblang.Executor from the resulting parsed config with the method FieldBloblang.

func NewBoolField

func NewBoolField(name string) *ConfigField

NewBoolField describes a new bool type config field.

func NewDurationField

func NewDurationField(name string) *ConfigField

NewDurationField describes a new duration string type config field, allowing users to define a time interval with strings of the form 60s, 3m, etc.

func NewFloatField

func NewFloatField(name string) *ConfigField

NewFloatField describes a new float type config field.

func NewInputField

func NewInputField(name string) *ConfigField

NewInputField defines a new input field, it is then possible to extract an OwnedInput from the resulting parsed config with the method FieldInput.

func NewInputListField

func NewInputListField(name string) *ConfigField

NewInputListField defines a new input list field, it is then possible to extract a list of OwnedInput from the resulting parsed config with the method FieldInputList.

func NewInputMaxInFlightField

func NewInputMaxInFlightField() *ConfigField

NewInputMaxInFlightField returns a field spec for a common max_in_flight input field.

func NewIntField

func NewIntField(name string) *ConfigField

NewIntField describes a new int type config field.

func NewIntListField

func NewIntListField(name string) *ConfigField

NewIntListField describes a new config field consisting of a list of integers.

func NewIntMapField

func NewIntMapField(name string) *ConfigField

NewIntMapField describes a new config field consisting of an object of arbitrary keys with integer values.

func NewInternalField

func NewInternalField(ifield docs.FieldSpec) *ConfigField

NewInternalField returns a ConfigField derived from an internal package field spec. This function is for internal use only.

func NewInterpolatedStringField

func NewInterpolatedStringField(name string) *ConfigField

NewInterpolatedStringField defines a new config field that describes a dynamic string that supports Bloblang interpolation functions. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedString.

func NewInterpolatedStringListField

func NewInterpolatedStringListField(name string) *ConfigField

NewInterpolatedStringListField describes a new config field consisting of a list of interpolated string values. It is then possible to extract a slice of *InterpolatedString from the resulting parsed config with the method FieldInterpolatedStringList.

func NewInterpolatedStringMapField

func NewInterpolatedStringMapField(name string) *ConfigField

NewInterpolatedStringMapField describes a new config field consisting of an object of arbitrary keys with interpolated string values. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedStringMap.

func NewMetadataFilterField

func NewMetadataFilterField(name string) *ConfigField

NewMetadataFilterField creates a config field spec for describing which metadata keys to include for a given purpose. This includes prefix based and regular expression based methods. This field is often used for making metadata written to output destinations explicit.

func NewObjectField

func NewObjectField(name string, fields ...*ConfigField) *ConfigField

NewObjectField describes a new object type config field, consisting of one or more child fields.

func NewObjectListField

func NewObjectListField(name string, fields ...*ConfigField) *ConfigField

NewObjectListField describes a new list type config field consisting of objects with one or more child fields.

func NewOutputField

func NewOutputField(name string) *ConfigField

NewOutputField defines a new output field, it is then possible to extract an OwnedOutput from the resulting parsed config with the method FieldOutput.

func NewOutputListField

func NewOutputListField(name string) *ConfigField

NewOutputListField defines a new output list field, it is then possible to extract a list of OwnedOutput from the resulting parsed config with the method FieldOutputList.

func NewProcessorField

func NewProcessorField(name string) *ConfigField

NewProcessorField defines a new processor field, it is then possible to extract an OwnedProcessor from the resulting parsed config with the method FieldProcessor.

func NewProcessorListField

func NewProcessorListField(name string) *ConfigField

NewProcessorListField defines a new processor list field, it is then possible to extract a list of OwnedProcessor from the resulting parsed config with the method FieldProcessorList.

func NewStringAnnotatedEnumField

func NewStringAnnotatedEnumField(name string, options map[string]string) *ConfigField

NewStringAnnotatedEnumField describes a new string type config field that can have one of a discrete list of values, where each value must be accompanied by a description that annotates its behaviour in the documentation.

func NewStringEnumField

func NewStringEnumField(name string, options ...string) *ConfigField

NewStringEnumField describes a new string type config field that can have one of a discrete list of values.

func NewStringField

func NewStringField(name string) *ConfigField

NewStringField describes a new string type config field.

func NewStringListField

func NewStringListField(name string) *ConfigField

NewStringListField describes a new config field consisting of a list of strings.

func NewStringMapField

func NewStringMapField(name string) *ConfigField

NewStringMapField describes a new config field consisting of an object of arbitrary keys with string values.

func NewTLSField

func NewTLSField(name string) *ConfigField

NewTLSField defines a new object type config field that describes TLS settings for networked components. It is then possible to extract a *tls.Config from the resulting parsed config with the method FieldTLS.

func NewTLSToggledField

func NewTLSToggledField(name string) *ConfigField

NewTLSToggledField defines a new object type config field that describes TLS settings for networked components. This field differs from a standard TLSField as it includes a boolean field `enabled` which allows users to explicitly configure whether TLS should be enabled or not.

A *tls.Config as well as an enabled boolean value can be extracted from the resulting parsed config with the method FieldTLSToggled.

func NewURLField

func NewURLField(name string) *ConfigField

NewURLField defines a new config field that describes a string that should contain a valid URL. It is then possible to extract either a string or a *url.URL from the resulting parsed config with the methods FieldString or FieldURL respectively.

func (*ConfigField) Advanced

func (c *ConfigField) Advanced() *ConfigField

Advanced marks a config field as being advanced, and therefore it will not appear in simplified documentation examples.

func (*ConfigField) Default

func (c *ConfigField) Default(v any) *ConfigField

Default specifies a default value that this field will assume if it is omitted from a provided config. Fields that do not have a default value are considered mandatory, and so parsing a config will fail in their absence.

func (*ConfigField) Deprecated

func (c *ConfigField) Deprecated() *ConfigField

Deprecated marks a config field as being deprecated, and therefore it will not appear in documentation examples.

func (*ConfigField) Description

func (c *ConfigField) Description(d string) *ConfigField

Description adds a description to the field which will be shown when printing documentation for the component config spec.

func (*ConfigField) Example

func (c *ConfigField) Example(e any) *ConfigField

Example adds an example value to the field which will be shown when printing documentation for the component config spec.

func (*ConfigField) LintRule

func (c *ConfigField) LintRule(blobl string) *ConfigField

LintRule adds a custom linting rule to the field in the form of a bloblang mapping. The mapping is provided the value of the field within a config as the context `this`, and if the mapping assigns to `root` an array of one or more strings these strings will be exposed to a config author as linting errors.

For example, if we wanted to add a linting rule for a string field that ensures the value contains only lowercase values we might add the following linting rule:

`root = if this.lowercase() != this { [ "field must be lowercase" ] }`.

func (*ConfigField) Optional

func (c *ConfigField) Optional() *ConfigField

Optional specifies that a field is optional even when a default value has not been specified. When a field is marked as optional you can test its presence within a parsed config with the method Contains.

func (*ConfigField) Secret

func (c *ConfigField) Secret() *ConfigField

Secret marks this field as being a secret, which means it represents information that is generally considered sensitive such as passwords or access tokens.

func (*ConfigField) Version

func (c *ConfigField) Version(v string) *ConfigField

Version specifies the specific version at which this field was added to the component.

func (*ConfigField) XUnwrapper

func (c *ConfigField) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type ConfigSpec

type ConfigSpec struct {
	// contains filtered or unexported fields
}

ConfigSpec describes the configuration specification for a plugin component. This will be used for validating and linting configuration files and providing a parsed configuration struct to the plugin constructor.

func NewConfigSpec

func NewConfigSpec() *ConfigSpec

NewConfigSpec creates a new empty component configuration spec. If the plugin does not require configuration fields the result of this call is enough.

func (*ConfigSpec) Beta

func (c *ConfigSpec) Beta() *ConfigSpec

Beta sets a documentation label on the component indicating that its configuration spec is ready for beta testing, meaning backwards incompatible changes will not be made unless a fundamental problem is found. Plugins are considered experimental by default.

func (*ConfigSpec) Categories

func (c *ConfigSpec) Categories(categories ...string) *ConfigSpec

Categories adds one or more string tags to the component, these are used for arbitrarily grouping components in documentation.

func (*ConfigSpec) Deprecated

func (c *ConfigSpec) Deprecated() *ConfigSpec

Deprecated sets a documentation label on the component indicating that it is now deprecated. Plugins are considered experimental by default.

func (*ConfigSpec) Description

func (c *ConfigSpec) Description(description string) *ConfigSpec

Description adds a description to the plugin configuration spec that describes in more detail the behaviour of the component and how it should be used.

func (*ConfigSpec) EncodeJSON

func (c *ConfigSpec) EncodeJSON(v []byte) error

EncodeJSON attempts to parse a JSON object as a byte slice and uses it to populate the configuration spec. The schema of this method is undocumented and is not intended for general use.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

func (*ConfigSpec) Example

func (c *ConfigSpec) Example(title, summary, config string) *ConfigSpec

Example adds an example to the plugin configuration spec that demonstrates how the component can be used. An example has a title, summary, and a YAML configuration showing a real use case.

func (*ConfigSpec) Field

func (c *ConfigSpec) Field(f *ConfigField) *ConfigSpec

Field sets the specification of a field within the config spec, used for linting and generating documentation for the component.

If the provided field has an empty name then it registered as the value at the root of the config spec.

When creating a spec with a struct constructor the fields from that struct will already be inferred. However, setting a field explicitly is sometimes useful for enriching the field documentation with more information.

func (*ConfigSpec) Fields

func (c *ConfigSpec) Fields(fs ...*ConfigField) *ConfigSpec

Fields sets the specification of multiple field within the config spec, used for linting and generating documentation for the component.

If the provided any of the fields have an empty name then they are registered as the value at the root of the config spec.

When creating a spec with a struct constructor the fields from that struct will already be inferred. However, setting fields explicitly is sometimes useful for enriching their documentation with more information.

func (*ConfigSpec) Footnotes

func (c *ConfigSpec) Footnotes(description string) *ConfigSpec

Footnotes adds a description to the plugin configuration spec that appears towards the bottom of the documentation page, this is usually best for long winded lists of docs.

func (*ConfigSpec) LintRule

func (c *ConfigSpec) LintRule(blobl string) *ConfigSpec

LintRule adds a custom linting rule to the ConfigSpec in the form of a bloblang mapping. The mapping is provided the value of the fields within the ConfigSpec as the context `this`, and if the mapping assigns to `root` an array of one or more strings these strings will be exposed to a config author as linting errors.

For example, if we wanted to add a linting rule for several ConfigSpec fields that ensures some fields are mutually exclusive and some require others we might use the following:

root = match { this.exists("meow") && this.exists("woof") => [ "both `+"`meow`"+` and `+"`woof`"+` can't be set simultaneously" ], this.exists("reticulation") && (!this.exists("splines") || this.splines == "") => [ "`+"`splines`"+` is required when setting `+"`reticulation`"+`" ], }.

func (*ConfigSpec) ParseYAML

func (c *ConfigSpec) ParseYAML(yamlStr string, env *Environment) (*ParsedConfig, error)

ParseYAML attempts to parse a YAML document as the defined configuration spec and returns a parsed config view. The provided environment determines which child components and Bloblang functions can be created by the fields of the spec, you can leave the environment nil to use the global environment.

This method is intended for testing purposes and is not required for normal use of plugin components, as parsing is managed by other components.

func (*ConfigSpec) Stable

func (c *ConfigSpec) Stable() *ConfigSpec

Stable sets a documentation label on the component indicating that its configuration spec is stable. Plugins are considered experimental by default.

func (*ConfigSpec) Summary

func (c *ConfigSpec) Summary(summary string) *ConfigSpec

Summary adds a short summary to the plugin configuration spec that describes the general purpose of the component.

func (*ConfigSpec) Version

func (c *ConfigSpec) Version(v string) *ConfigSpec

Version specifies that this component was introduced in a given version.

type ConfigView

type ConfigView struct {
	// contains filtered or unexported fields
}

ConfigView is a struct returned by a Benthos service environment when walking the list of registered components and provides access to information about the component.

func (*ConfigView) Description

func (c *ConfigView) Description() string

Description returns a documentation description of the component, often formatted as markdown.

func (*ConfigView) FormatJSON

func (c *ConfigView) FormatJSON() ([]byte, error)

FormatJSON returns a byte slice of the component configuration formatted as a JSON object. The schema of this method is undocumented and is not intended for general use.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

func (*ConfigView) IsDeprecated

func (c *ConfigView) IsDeprecated() bool

IsDeprecated returns true if the component is marked as deprecated.

func (*ConfigView) RenderDocs

func (c *ConfigView) RenderDocs() ([]byte, error)

RenderDocs creates a markdown file that documents the configuration of the component config view. This markdown may include Docusaurus react elements as it matches the documentation generated for the official Benthos website.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

func (*ConfigView) Summary

func (c *ConfigView) Summary() string

Summary returns a documentation summary of the component, often formatted as markdown.

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

Environment is a collection of Benthos component plugins that can be used in order to build and run streaming pipelines with access to different sets of plugins. This can be useful for sandboxing, testing, etc, but most plugin authors do not need to create an Environment and can simply use the global environment.

func GlobalEnvironment

func GlobalEnvironment() *Environment

GlobalEnvironment returns a reference to the global environment, adding plugins to this environment is the equivalent to adding plugins using global Functions.

func NewEnvironment

func NewEnvironment() *Environment

NewEnvironment creates a new environment that inherits all globally defined plugins, but can have plugins defined on it that are isolated.

func (*Environment) Clone

func (e *Environment) Clone() *Environment

Clone an environment, creating a new environment containing the same plugins that can be modified independently of the source.

func (*Environment) NewStreamBuilder

func (e *Environment) NewStreamBuilder() *StreamBuilder

NewStreamBuilder creates a new StreamBuilder upon the defined environment, only components known to this environment will be available to the stream builder.

func (*Environment) RegisterBatchBuffer

func (e *Environment) RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error

RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.

Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.

func (*Environment) RegisterBatchInput

func (e *Environment) RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error

RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.

func (*Environment) RegisterBatchOutput

func (e *Environment) RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error

RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).

If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.

func (*Environment) RegisterBatchProcessor

func (e *Environment) RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error

RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.

func (*Environment) RegisterCache

func (e *Environment) RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error

RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.

func (*Environment) RegisterInput

func (e *Environment) RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error

RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.

func (*Environment) RegisterMetricsExporter

func (e *Environment) RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error

RegisterMetricsExporter attempts to register a new metrics exporter plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself.

func (*Environment) RegisterOtelTracerProvider

func (e *Environment) RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error

RegisterOtelTracerProvider attempts to register a new open telemetry tracer provider plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

func (*Environment) RegisterOutput

func (e *Environment) RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error

RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

func (*Environment) RegisterProcessor

func (e *Environment) RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error

RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

For simple transformations consider implementing a Bloblang plugin method instead.

func (*Environment) RegisterRateLimit

func (e *Environment) RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error

RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.

func (*Environment) UseBloblangEnvironment

func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment)

UseBloblangEnvironment configures the service environment to restrict components constructed with it to a specific Bloblang environment.

func (*Environment) UseFS

func (e *Environment) UseFS(fs *FS)

UseFS configures the service environment to use an implementation of ifs.FS as its filesystem.

func (*Environment) WalkBuffers

func (e *Environment) WalkBuffers(fn func(name string, config *ConfigView))

WalkBuffers executes a provided function argument for every buffer component that has been registered to the environment.

func (*Environment) WalkCaches

func (e *Environment) WalkCaches(fn func(name string, config *ConfigView))

WalkCaches executes a provided function argument for every cache component that has been registered to the environment.

func (*Environment) WalkInputs

func (e *Environment) WalkInputs(fn func(name string, config *ConfigView))

WalkInputs executes a provided function argument for every input component that has been registered to the environment.

func (*Environment) WalkMetrics

func (e *Environment) WalkMetrics(fn func(name string, config *ConfigView))

WalkMetrics executes a provided function argument for every metrics component that has been registered to the environment. Note that metrics components available to an environment cannot be modified.

func (*Environment) WalkOutputs

func (e *Environment) WalkOutputs(fn func(name string, config *ConfigView))

WalkOutputs executes a provided function argument for every output component that has been registered to the environment.

func (*Environment) WalkProcessors

func (e *Environment) WalkProcessors(fn func(name string, config *ConfigView))

WalkProcessors executes a provided function argument for every processor component that has been registered to the environment.

func (*Environment) WalkRateLimits

func (e *Environment) WalkRateLimits(fn func(name string, config *ConfigView))

WalkRateLimits executes a provided function argument for every rate limit component that has been registered to the environment.

func (*Environment) WalkTracers

func (e *Environment) WalkTracers(fn func(name string, config *ConfigView))

WalkTracers executes a provided function argument for every tracer component that has been registered to the environment. Note that tracer components available to an environment cannot be modified.

type FS

type FS struct {
	// contains filtered or unexported fields
}

FS implements a superset of fs.FS and includes goodies that benthos components specifically need.

func NewFS

func NewFS(filesystem fs.FS) *FS

NewFS provides a new instance of a filesystem. The fs.FS passed in can optionally implement methods from benthos ifs.FS

func (*FS) MkdirAll

func (f *FS) MkdirAll(path string, perm fs.FileMode) error

MkdirAll creates a directory named path, along with any necessary parents, and returns nil, or else returns an error.

func (*FS) Open

func (f *FS) Open(name string) (fs.File, error)

Open opens the named file for reading.

func (*FS) OpenFile

func (f *FS) OpenFile(name string, flag int, perm fs.FileMode) (fs.File, error)

OpenFile is the generalized open call.

func (*FS) Remove

func (f *FS) Remove(name string) error

Remove removes the named file or (empty) directory.

func (*FS) Stat

func (f *FS) Stat(name string) (fs.FileInfo, error)

Stat returns a FileInfo describing the named file.

type HTTPMultiplexer

type HTTPMultiplexer interface {
	HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}

HTTPMultiplexer is an interface supported by most HTTP multiplexers.

type Input

type Input interface {
	// Establish a connection to the upstream service. Connect will always be
	// called first when a reader is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// The provided context remains open only for the duration of the connecting
	// phase, and should not be used to establish the lifetime of the connection
	// itself.
	//
	// Once Connect returns a nil error the Read method will be called until
	// either ErrNotConnected is returned, or the reader is closed.
	Connect(context.Context) error

	// Read a single message from a source, along with a function to be called
	// once the message can be either acked (successfully sent or intentionally
	// filtered) or nacked (failed to be processed or dispatched to the output).
	//
	// The AckFunc will be called for every message at least once, but there are
	// no guarantees as to when this will occur. If your input implementation
	// doesn't have a specific mechanism for dealing with a nack then you can
	// wrap your input implementation with AutoRetryNacks to get automatic
	// retries.
	//
	// If this method returns ErrNotConnected then Read will not be called again
	// until Connect has returned a nil error. If ErrEndOfInput is returned then
	// Read will no longer be called and the pipeline will gracefully terminate.
	Read(context.Context) (*Message, AckFunc, error)

	Closer
}

Input is an interface implemented by Benthos inputs. Calls to Read should block until either a message has been received, the connection is lost, or the provided context is cancelled.

func AutoRetryNacks

func AutoRetryNacks(i Input) Input

AutoRetryNacks wraps an input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.

When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.

func InputWithMaxInFlight

func InputWithMaxInFlight(n int, i Input) Input

InputWithMaxInFlight wraps an input with a component that limits the number of messages being processed at a given time. When the limit is reached a new message will not be consumed until an ack/nack has been returned.

type InputConstructor

type InputConstructor func(conf *ParsedConfig, mgr *Resources) (Input, error)

InputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a reader based on the config, or an error.

type InterpolatedString

type InterpolatedString struct {
	// contains filtered or unexported fields
}

InterpolatedString resolves a string containing dynamic interpolation functions for a given message.

func NewInterpolatedString

func NewInterpolatedString(expr string) (*InterpolatedString, error)

NewInterpolatedString parses an interpolated string expression.

func (*InterpolatedString) Bytes

func (i *InterpolatedString) Bytes(m *Message) []byte

Bytes resolves the interpolated field for a given message as a byte slice. Deprecated: Use TryBytes instead in order to capture errors.

func (*InterpolatedString) String

func (i *InterpolatedString) String(m *Message) string

String resolves the interpolated field for a given message as a string. Deprecated: Use TryString instead in order to capture errors.

func (*InterpolatedString) TryBytes

func (i *InterpolatedString) TryBytes(m *Message) ([]byte, error)

TryBytes resolves the interpolated field for a given message as a slice of bytes, returns an error if any interpolation functions fail.

func (*InterpolatedString) TryString

func (i *InterpolatedString) TryString(m *Message) (string, error)

TryString resolves the interpolated field for a given message as a string, returns an error if any interpolation functions fail.

type Lint

type Lint struct {
	Line   int
	Column int
	Type   LintType
	What   string
}

Lint represents a configuration file linting error.

func (Lint) Error

func (l Lint) Error() string

Error returns an error string.

type LintError

type LintError []Lint

LintError is an error type that represents one or more configuration file linting errors that were encountered.

func (LintError) Error

func (e LintError) Error() string

Error returns an error string.

type LintType

type LintType int

LintType is a discrete linting type. NOTE: These should be kept in sync with ./internal/docs/field.go:586.

const (
	// LintCustom means a custom linting rule failed.
	LintCustom LintType = iota

	// LintFailedRead means a configuration could not be read.
	LintFailedRead LintType = iota

	// LintInvalidOption means the field value was not one of the explicit list
	// of options.
	LintInvalidOption LintType = iota

	// LintBadLabel means the label contains invalid characters.
	LintBadLabel LintType = iota

	// LintMissingLabel means the label is missing when required.
	LintMissingLabel LintType = iota

	// LintDuplicateLabel means the label collides with another label.
	LintDuplicateLabel LintType = iota

	// LintBadBloblang means the field contains invalid Bloblang.
	LintBadBloblang LintType = iota

	// LintShouldOmit means the field should be omitted.
	LintShouldOmit LintType = iota

	// LintComponentMissing means a component value was expected but the type is
	// missing.
	LintComponentMissing LintType = iota

	// LintComponentNotFound means the specified component value is not
	// recognised.
	LintComponentNotFound LintType = iota

	// LintUnknown means the field is unknown.
	LintUnknown LintType = iota

	// LintMissing means a field was required but missing.
	LintMissing LintType = iota

	// LintExpectedArray means an array value was expected but something else
	// was provided.
	LintExpectedArray LintType = iota

	// LintExpectedObject means an object value was expected but something else
	// was provided.
	LintExpectedObject LintType = iota

	// LintExpectedScalar means a scalar value was expected but something else
	// was provided.
	LintExpectedScalar LintType = iota

	// LintDeprecated means a field is deprecated and should not be used.
	LintDeprecated LintType = iota
)

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger allows plugin authors to write custom logs from components that are exported the same way as native Benthos logs. It's safe to pass around a nil pointer for testing components.

func (*Logger) Debug

func (l *Logger) Debug(message string)

Debug logs a debug message.

func (*Logger) Debugf

func (l *Logger) Debugf(template string, args ...any)

Debugf logs a debug message using fmt.Sprintf when args are specified.

func (*Logger) Error

func (l *Logger) Error(message string)

Error logs an error message.

func (*Logger) Errorf

func (l *Logger) Errorf(template string, args ...any)

Errorf logs an error message using fmt.Sprintf when args are specified.

func (*Logger) Info

func (l *Logger) Info(message string)

Info logs an info message.

func (*Logger) Infof

func (l *Logger) Infof(template string, args ...any)

Infof logs an info message using fmt.Sprintf when args are specified.

func (*Logger) Trace

func (l *Logger) Trace(message string)

Trace logs a trace message.

func (*Logger) Tracef

func (l *Logger) Tracef(template string, args ...any)

Tracef logs a trace message using fmt.Sprintf when args are specified.

func (*Logger) Warn

func (l *Logger) Warn(message string)

Warn logs a warning message.

func (*Logger) Warnf

func (l *Logger) Warnf(template string, args ...any)

Warnf logs a warning message using fmt.Sprintf when args are specified.

func (*Logger) With

func (l *Logger) With(keyValuePairs ...any) *Logger

With adds a variadic set of fields to a logger. Each field must consist of a string key and a value of any type. An odd number of key/value pairs will therefore result in malformed log messages, but should never panic.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents a single discrete message passing through a Benthos pipeline. It is safe to mutate the message via Set methods, but the underlying byte data should not be edited directly.

func NewMessage

func NewMessage(content []byte) *Message

NewMessage creates a new message with an initial raw bytes content. The initial content can be nil, which is recommended if you intend to set it with structured contents.

func (*Message) AsBytes

func (m *Message) AsBytes() ([]byte, error)

AsBytes returns the underlying byte array contents of a message or, if the contents are a structured type, attempts to marshal the contents as a JSON document and returns either the byte array result or an error.

It is NOT safe to mutate the contents of the returned slice.

func (*Message) AsStructured

func (m *Message) AsStructured() (any, error)

AsStructured returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.

It is NOT safe to mutate the contents of the returned value if it is a reference type (slice or map). In order to safely mutate the structured contents of a message use AsStructuredMut.

func (*Message) AsStructuredMut

func (m *Message) AsStructuredMut() (any, error)

AsStructuredMut returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.

It is safe to mutate the contents of the returned value even if it is a reference type (slice or map), as the structured contents will be lazily deep cloned if it is still owned by an upstream component.

func (*Message) BloblangMutate

func (m *Message) BloblangMutate(blobl *bloblang.Executor) (*Message, error)

BloblangMutate executes a parsed Bloblang mapping onto a message where the contents of the message are mutated directly rather than creating an entirely new object.

Returns the same message back in a mutated form, or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

Note that using overlay means certain functions within the Bloblang mapping will behave differently. In the root of the mapping the right-hand keywords `root` and `this` refer to the same mutable root of the output document.

func (*Message) BloblangQuery

func (m *Message) BloblangQuery(blobl *bloblang.Executor) (*Message, error)

BloblangQuery executes a parsed Bloblang mapping on a message and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

func (*Message) Context

func (m *Message) Context() context.Context

Context returns a context associated with the message, or a background context in the absence of one.

func (*Message) Copy

func (m *Message) Copy() *Message

Copy creates a shallow copy of a message that is safe to mutate with Set methods without mutating the original. Both messages will share a context, and therefore a tracing ID, if one has been associated with them.

func (*Message) DeepCopy

func (m *Message) DeepCopy() *Message

DeepCopy creates a deep copy of a message and its contents that is safe to mutate with Set methods without mutating the original, and mutations on the inner (deep) contents of the source message will not mutate the copy.

This is required in situations where a component wishes to retain a copy of a message beyond the boundaries of a process or write command. This is specifically required for buffer implementations that operate by keeping a reference to the message.

func (*Message) GetError

func (m *Message) GetError() error

GetError returns an error associated with a message, or nil if there isn't one. Messages marked with errors can be handled using a range of methods outlined in https://www.benthos.dev/docs/configuration/error_handling.

func (*Message) MetaDelete

func (m *Message) MetaDelete(key string)

MetaDelete removes a key from the message metadata.

func (*Message) MetaGet

func (m *Message) MetaGet(key string) (string, bool)

MetaGet attempts to find a metadata key from the message and returns a string result and a boolean indicating whether it was found.

Strong advice: Use MetaGetMut instead.

func (*Message) MetaGetMut

func (m *Message) MetaGetMut(key string) (any, bool)

MetaGetMut attempts to find a metadata key from the message and returns the value if found, and a boolean indicating whether it was found. The value returned is mutable, and so it is safe to modify even though it may be a reference type such as a slice or map.

func (*Message) MetaSet

func (m *Message) MetaSet(key, value string)

MetaSet sets the value of a metadata key. If the value is an empty string the metadata key is deleted.

Strong advice: Use MetaSetMut instead.

func (*Message) MetaSetMut

func (m *Message) MetaSetMut(key string, value any)

MetaSetMut sets the value of a metadata key to any value. The value provided is stored as mutable, and therefore if it is a reference type such as a slice or map then it could be modified by a downstream component.

func (*Message) MetaWalk

func (m *Message) MetaWalk(fn func(string, string) error) error

MetaWalk iterates each metadata key/value pair and executes a provided closure on each iteration. To stop iterating, return an error from the closure. An error returned by the closure will be returned by this function.

Strong advice: Use MetaWalkMut instead.

func (*Message) MetaWalkMut

func (m *Message) MetaWalkMut(fn func(key string, value any) error) error

MetaWalkMut iterates each metadata key/value pair and executes a provided closure on each iteration. To stop iterating, return an error from the closure. An error returned by the closure will be returned by this function.

func (*Message) SetBytes

func (m *Message) SetBytes(b []byte)

SetBytes sets the underlying contents of the message as a byte slice.

func (*Message) SetError

func (m *Message) SetError(err error)

SetError marks the message as having failed a processing step and adds the error to it as context. Messages marked with errors can be handled using a range of methods outlined in https://www.benthos.dev/docs/configuration/error_handling.

func (*Message) SetStructured

func (m *Message) SetStructured(i any)

SetStructured sets the underlying contents of the message as a structured type. This structured value should be a scalar Go type, or either a map[string]interface{} or []interface{} containing the same types all the way through the hierarchy, this ensures that other processors are able to work with the contents and that they can be JSON marshalled when coerced into a byte array.

The provided structure is considered read-only, which means subsequent processors will need to fully clone the structure in order to perform mutations on the data.

func (*Message) SetStructuredMut

func (m *Message) SetStructuredMut(i any)

SetStructuredMut sets the underlying contents of the message as a structured type. This structured value should be a scalar Go type, or either a map[string]interface{} or []interface{} containing the same types all the way through the hierarchy, this ensures that other processors are able to work with the contents and that they can be JSON marshalled when coerced into a byte array.

The provided structure is considered mutable, which means subsequent processors might mutate the structure without performing a deep copy.

func (*Message) WithContext

func (m *Message) WithContext(ctx context.Context) *Message

WithContext returns a new message with a provided context associated with it.

type MessageBatch

type MessageBatch []*Message

MessageBatch describes a collection of one or more messages.

func ExecuteProcessors

func ExecuteProcessors(ctx context.Context, processors []*OwnedProcessor, inbatches ...MessageBatch) ([]MessageBatch, error)

ExecuteProcessors runs a set of batches through a series processors. If a context error occurs during execution then this function terminates and returns the error.

However, for general processing errors unrelated to context cancellation the errors are marked against individual messages with the `SetError` method and processing continues against subsequent processors.

func (MessageBatch) BloblangMutate

func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Message, error)

BloblangMutate executes a parsed Bloblang mapping onto a message within the batch, where the contents of the message are mutated directly rather than creating an entirely new object.

Returns the same message back in a mutated form, or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

This method allows mappings to perform windowed aggregations across message batches.

Note that using overlay means certain functions within the Bloblang mapping will behave differently. In the root of the mapping the right-hand keywords `root` and `this` refer to the same mutable root of the output document.

func (MessageBatch) BloblangQuery

func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Message, error)

BloblangQuery executes a parsed Bloblang mapping on a message batch, from the perspective of a particular message index, and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

This method allows mappings to perform windowed aggregations across message batches.

func (MessageBatch) Copy

func (b MessageBatch) Copy() MessageBatch

Copy creates a new slice of the same messages, which can be modified without changing the contents of the original batch.

func (MessageBatch) DeepCopy

func (b MessageBatch) DeepCopy() MessageBatch

DeepCopy creates a new slice of the same messages, which can be modified without changing the contents of the original batch and are unchanged from deep mutations performed on the source message.

This is required in situations where a component wishes to retain a copy of a message batch beyond the boundaries of a process or write command. This is specifically required for buffer implementations that operate by keeping a reference to the message.

func (MessageBatch) InterpolatedBytes deprecated

func (b MessageBatch) InterpolatedBytes(index int, i *InterpolatedString) []byte

InterpolatedBytes resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

Deprecated: Use TryInterpolatedBytes instead.

func (MessageBatch) InterpolatedString deprecated

func (b MessageBatch) InterpolatedString(index int, i *InterpolatedString) string

InterpolatedString resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

Deprecated: Use TryInterpolatedString instead.

func (MessageBatch) TryInterpolatedBytes

func (b MessageBatch) TryInterpolatedBytes(index int, i *InterpolatedString) ([]byte, error)

TryInterpolatedBytes resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

func (MessageBatch) TryInterpolatedString

func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (string, error)

TryInterpolatedString resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

type MessageBatchHandlerFunc

type MessageBatchHandlerFunc func(context.Context, MessageBatch) error

MessageBatchHandlerFunc is a function signature defining a component that consumes Benthos message batches. An error must be returned if the context is cancelled, or if the messages could not be delivered or processed.

type MessageHandlerFunc

type MessageHandlerFunc func(context.Context, *Message) error

MessageHandlerFunc is a function signature defining a component that consumes Benthos messages. An error must be returned if the context is cancelled, or if the message could not be delivered or processed.

type MetadataFilter

type MetadataFilter struct {
	// contains filtered or unexported fields
}

MetadataFilter provides a configured mechanism for filtering metadata key/values from a message.

func (*MetadataFilter) Walk

func (m *MetadataFilter) Walk(msg *Message, fn func(key, value string) error) error

Walk iterates the filtered metadata key/value pairs from a message and executes a provided closure function for each pair. An error returned by the closure will be returned by this function and prevent subsequent pairs from being accessed.

type MetricCounter

type MetricCounter struct {
	// contains filtered or unexported fields
}

MetricCounter represents a counter metric of a given name and labels.

func (*MetricCounter) Incr

func (c *MetricCounter) Incr(count int64, labelValues ...string)

Incr increments a counter metric by an amount, the number of label values must match the number and order of labels specified when the counter was created.

type MetricGauge

type MetricGauge struct {
	// contains filtered or unexported fields
}

MetricGauge represents a gauge metric of a given name and labels.

func (*MetricGauge) Set

func (g *MetricGauge) Set(value int64, labelValues ...string)

Set a gauge metric, the number of label values must match the number and order of labels specified when the gauge was created.

type MetricTimer

type MetricTimer struct {
	// contains filtered or unexported fields
}

MetricTimer represents a timing metric of a given name and labels.

func (*MetricTimer) Timing

func (t *MetricTimer) Timing(delta int64, labelValues ...string)

Timing adds a delta to a timing metric. Delta should be measured in nanoseconds for consistency with other Benthos timing metrics.

The number of label values must match the number and order of labels specified when the timing was created.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics allows plugin authors to emit custom metrics from components that are exported the same way as native Benthos metrics. It's safe to pass around a nil pointer for testing components.

func (*Metrics) NewCounter

func (m *Metrics) NewCounter(name string, labelKeys ...string) *MetricCounter

NewCounter creates a new counter metric with a name and variant list of label keys.

func (*Metrics) NewGauge

func (m *Metrics) NewGauge(name string, labelKeys ...string) *MetricGauge

NewGauge creates a new gauge metric with a name and variant list of label keys.

func (*Metrics) NewTimer

func (m *Metrics) NewTimer(name string, labelKeys ...string) *MetricTimer

NewTimer creates a new timer metric with a name and variant list of label keys.

type MetricsExporter

type MetricsExporter interface {
	NewCounterCtor(name string, labelKeys ...string) MetricsExporterCounterCtor
	NewTimerCtor(name string, labelKeys ...string) MetricsExporterTimerCtor
	NewGaugeCtor(name string, labelKeys ...string) MetricsExporterGaugeCtor
	Close(ctx context.Context) error
}

MetricsExporter is an interface implemented by Benthos metrics exporters.

type MetricsExporterConstructor

type MetricsExporterConstructor func(conf *ParsedConfig, log *Logger) (MetricsExporter, error)

MetricsExporterConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a metrics exporter based on the config, or an error.

type MetricsExporterCounter

type MetricsExporterCounter interface {
	// Incr increments a counter metric by an amount, the number of label values
	// must match the number and order of labels specified when the counter was
	// created.
	Incr(count int64)
}

MetricsExporterCounter represents a counter metric of a given name and labels.

type MetricsExporterCounterCtor

type MetricsExporterCounterCtor func(labelValues ...string) MetricsExporterCounter

MetricsExporterCounterCtor is a constructor for a MetricsExporterCounter that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.

type MetricsExporterGauge

type MetricsExporterGauge interface {
	// Set a gauge metric, the number of label values must match the number and
	// order of labels specified when the gauge was created.
	Set(value int64)
}

MetricsExporterGauge represents a gauge metric of a given name and labels.

type MetricsExporterGaugeCtor

type MetricsExporterGaugeCtor func(labelValues ...string) MetricsExporterGauge

MetricsExporterGaugeCtor is a constructor for a MetricsExporterGauge that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.

type MetricsExporterTimer

type MetricsExporterTimer interface {
	// Timing adds a delta to a timing metric. Delta should be measured in
	// nanoseconds for consistency with other Benthos timing metrics.
	//
	// The number of label values must match the number and order of labels
	// specified when the timing was created.
	Timing(delta int64)
}

MetricsExporterTimer represents a timing metric of a given name and labels.

type MetricsExporterTimerCtor

type MetricsExporterTimerCtor func(labelValues ...string) MetricsExporterTimer

MetricsExporterTimerCtor is a constructor for a MetricsExporterTimer that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.

type MockResourcesOptFn

type MockResourcesOptFn func(*mock.Manager)

MockResourcesOptFn provides a func based optional argument to MockResources.

func MockResourcesOptAddCache

func MockResourcesOptAddCache(name string) MockResourcesOptFn

MockResourcesOptAddCache instantiates the resources type with a mock cache with a given name. Cached items are held in memory.

func MockResourcesOptAddRateLimit

func MockResourcesOptAddRateLimit(name string, fn func(context.Context) (time.Duration, error)) MockResourcesOptFn

MockResourcesOptAddRateLimit instantiates the resources type with a mock rate limit with a given name, the provided closure will be called for each invocation of the rate limit.

type OtelTracerProviderConstructor

type OtelTracerProviderConstructor func(conf *ParsedConfig) (trace.TracerProvider, error)

OtelTracerProviderConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of an open telemetry tracer provider.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

type Output

type Output interface {
	// Establish a connection to the downstream service. Connect will always be
	// called first when a writer is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// The provided context remains open only for the duration of the connecting
	// phase, and should not be used to establish the lifetime of the connection
	// itself.
	//
	// Once Connect returns a nil error the write method will be called until
	// either ErrNotConnected is returned, or the writer is closed.
	Connect(context.Context) error

	// Write a message to a sink, or return an error if delivery is not
	// possible.
	//
	// If this method returns ErrNotConnected then write will not be called
	// again until Connect has returned a nil error.
	Write(context.Context, *Message) error

	Closer
}

Output is an interface implemented by Benthos outputs that support single message writes. Each call to Write should block until either the message has been successfully or unsuccessfully sent, or the context is cancelled.

Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel write calls the output supports.

type OutputConstructor

type OutputConstructor func(conf *ParsedConfig, mgr *Resources) (out Output, maxInFlight int, err error)

OutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config and a maximum number of in-flight messages to allow, or an error.

type OwnedInput

type OwnedInput struct {
	// contains filtered or unexported fields
}

OwnedInput provides direct ownership of an input extracted from a plugin config. Connectivity of the input is handled internally, and so the consumer of this type should only be concerned with reading messages and eventually calling Close to terminate the input.

func (*OwnedInput) BatchedWith

func (o *OwnedInput) BatchedWith(b *Batcher) *OwnedInput

BatchedWith returns a copy of the OwnedInput where messages will be batched according to the provided batcher.

func (*OwnedInput) Close

func (o *OwnedInput) Close(ctx context.Context) error

Close the input.

func (*OwnedInput) ReadBatch

func (o *OwnedInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error)

ReadBatch attempts to read a message batch from the input, along with a function to be called once the entire batch can be either acked (successfully sent or intentionally filtered) or nacked (failed to be processed or dispatched to the output).

If this method returns ErrEndOfInput then that indicates that the input has finished and will no longer yield new messages.

func (*OwnedInput) XUnwrapper

func (o *OwnedInput) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type OwnedOutput

type OwnedOutput struct {
	// contains filtered or unexported fields
}

OwnedOutput provides direct ownership of an output extracted from a plugin config. Connectivity of the output is handled internally, and so the owner of this type should only be concerned with writing messages and eventually calling Close to terminate the output.

func (*OwnedOutput) Close

func (o *OwnedOutput) Close(ctx context.Context) error

Close the output.

func (*OwnedOutput) Write

func (o *OwnedOutput) Write(ctx context.Context, m *Message) error

Write a message to the output, or return an error either if delivery is not possible or the context is cancelled.

func (*OwnedOutput) WriteBatch

func (o *OwnedOutput) WriteBatch(ctx context.Context, b MessageBatch) error

WriteBatch attempts to write a message batch to the output, and returns an error either if delivery is not possible or the context is cancelled.

type OwnedProcessor

type OwnedProcessor struct {
	// contains filtered or unexported fields
}

OwnedProcessor provides direct ownership of a processor extracted from a plugin config.

func (*OwnedProcessor) Close

func (o *OwnedProcessor) Close(ctx context.Context) error

Close the processor, allowing it to clean up resources.

func (*OwnedProcessor) Process

func (o *OwnedProcessor) Process(ctx context.Context, msg *Message) (MessageBatch, error)

Process a single message, returns either a batch of zero or more resulting messages or an error if the message could not be processed.

func (*OwnedProcessor) ProcessBatch

func (o *OwnedProcessor) ProcessBatch(ctx context.Context, batch MessageBatch) ([]MessageBatch, error)

ProcessBatch attempts to process a batch of messages, returns zero or more batches of resulting messages, or an error if the context is cancelled during execution.

However, for general processing errors unrelated to context cancellation the error is marked against individual messages with the `SetError` method and a nil error is returned by this method.

type ParsedConfig

type ParsedConfig struct {
	// contains filtered or unexported fields
}

ParsedConfig represents a plugin configuration that has been validated and parsed from a ConfigSpec, and allows plugin constructors to access configuration fields.

func (*ParsedConfig) Contains

func (p *ParsedConfig) Contains(path ...string) bool

Contains checks whether the parsed config contains a given field identified by its name.

func (*ParsedConfig) FieldAny

func (p *ParsedConfig) FieldAny(path ...string) (any, error)

FieldAny accesses a field from the parsed config by its name that can assume any value type. If the field is not found an error is returned.

func (*ParsedConfig) FieldAnyList

func (p *ParsedConfig) FieldAnyList(path ...string) ([]*ParsedConfig, error)

FieldAnyList accesses a field that is a list of any value types from the parsed config by its name and returns the value as an array of *ParsedConfig types, where each one represents an object or value in the list. Returns an error if the field is not found, or is not a list of values.

func (*ParsedConfig) FieldAnyMap

func (p *ParsedConfig) FieldAnyMap(path ...string) (map[string]*ParsedConfig, error)

FieldAnyMap accesses a field that is an object of arbitrary keys and any values from the parsed config by its name and returns a map of *ParsedConfig types, where each one represents an object or value in the map. Returns an error if the field is not found, or is not an object.

func (*ParsedConfig) FieldBackOff

func (p *ParsedConfig) FieldBackOff(path ...string) (*backoff.ExponentialBackOff, error)

FieldBackOff accesses a field from a parsed config that was defined with NewBackoffField and returns a *backoff.ExponentialBackOff, or an error if the configuration was invalid.

func (*ParsedConfig) FieldBackOffToggled

func (p *ParsedConfig) FieldBackOffToggled(path ...string) (boff *backoff.ExponentialBackOff, enabled bool, err error)

FieldBackOffToggled accesses a field from a parsed config that was defined with NewBackOffField and returns a *backoff.ExponentialBackOff and a boolean flag indicating whether retries are explicitly enabled, or an error if the configuration was invalid.

func (*ParsedConfig) FieldBatchPolicy

func (p *ParsedConfig) FieldBatchPolicy(path ...string) (conf BatchPolicy, err error)

FieldBatchPolicy accesses a field from a parsed config that was defined with NewBatchPolicyField and returns a BatchPolicy, or an error if the configuration was invalid.

func (*ParsedConfig) FieldBloblang

func (p *ParsedConfig) FieldBloblang(path ...string) (*bloblang.Executor, error)

FieldBloblang accesses a field from a parsed config that was defined with NewBloblangField and returns either a *bloblang.Executor or an error if the mapping was invalid.

func (*ParsedConfig) FieldBool

func (p *ParsedConfig) FieldBool(path ...string) (bool, error)

FieldBool accesses a bool field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a bool.

func (*ParsedConfig) FieldDuration

func (p *ParsedConfig) FieldDuration(path ...string) (time.Duration, error)

FieldDuration accesses a duration string field from the parsed config by its name. If the field is not found or is not a valid duration string an error is returned.

func (*ParsedConfig) FieldFloat

func (p *ParsedConfig) FieldFloat(path ...string) (float64, error)

FieldFloat accesses a float field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a float.

func (*ParsedConfig) FieldInput

func (p *ParsedConfig) FieldInput(path ...string) (*OwnedInput, error)

FieldInput accesses a field from a parsed config that was defined with NewInputField and returns an OwnedInput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldInputList

func (p *ParsedConfig) FieldInputList(path ...string) ([]*OwnedInput, error)

FieldInputList accesses a field from a parsed config that was defined with NewInputListField and returns a slice of OwnedInput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldInt

func (p *ParsedConfig) FieldInt(path ...string) (int, error)

FieldInt accesses an int field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not an int.

func (*ParsedConfig) FieldIntList

func (p *ParsedConfig) FieldIntList(path ...string) ([]int, error)

FieldIntList accesses a field that is a list of integers from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of integers.

func (*ParsedConfig) FieldIntMap

func (p *ParsedConfig) FieldIntMap(path ...string) (map[string]int, error)

FieldIntMap accesses a field that is an object of arbitrary keys and integer values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of integers.

func (*ParsedConfig) FieldInterpolatedString

func (p *ParsedConfig) FieldInterpolatedString(path ...string) (*InterpolatedString, error)

FieldInterpolatedString accesses a field from a parsed config that was defined with NewInterpolatedStringField and returns either an *InterpolatedString or an error if the string was invalid.

func (*ParsedConfig) FieldInterpolatedStringList

func (p *ParsedConfig) FieldInterpolatedStringList(path ...string) ([]*InterpolatedString, error)

FieldInterpolatedStringList accesses a field that is a list of interpolated string values from the parsed config.

Returns an error if the field is not found, or is not an list of interpolated strings.

func (*ParsedConfig) FieldInterpolatedStringMap

func (p *ParsedConfig) FieldInterpolatedStringMap(path ...string) (map[string]*InterpolatedString, error)

FieldInterpolatedStringMap accesses a field that is an object of arbitrary keys and interpolated string values from the parsed config by its name and returns the value.

Returns an error if the field is not found, or is not an object of interpolated strings.

func (*ParsedConfig) FieldMetadataFilter

func (p *ParsedConfig) FieldMetadataFilter(path ...string) (f *MetadataFilter, err error)

FieldMetadataFilter accesses a field from a parsed config that was defined with NewMetdataFilterField and returns a MetadataFilter, or an error if the configuration was invalid.

func (*ParsedConfig) FieldObjectList

func (p *ParsedConfig) FieldObjectList(path ...string) ([]*ParsedConfig, error)

FieldObjectList accesses a field that is a list of objects from the parsed config by its name and returns the value as an array of *ParsedConfig types, where each one represents an object in the list. Returns an error if the field is not found, or is not a list of objects.

func (*ParsedConfig) FieldOutput

func (p *ParsedConfig) FieldOutput(path ...string) (*OwnedOutput, error)

FieldOutput accesses a field from a parsed config that was defined with NewOutputField and returns an OwnedOutput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldOutputList

func (p *ParsedConfig) FieldOutputList(path ...string) ([]*OwnedOutput, error)

FieldOutputList accesses a field from a parsed config that was defined with NewOutputListField and returns a slice of OwnedOutput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldProcessor

func (p *ParsedConfig) FieldProcessor(path ...string) (*OwnedProcessor, error)

FieldProcessor accesses a field from a parsed config that was defined with NewProcessorField and returns an OwnedProcessor, or an error if the configuration was invalid.

func (*ParsedConfig) FieldProcessorList

func (p *ParsedConfig) FieldProcessorList(path ...string) ([]*OwnedProcessor, error)

FieldProcessorList accesses a field from a parsed config that was defined with NewProcessorListField and returns a slice of OwnedProcessor, or an error if the configuration was invalid.

func (*ParsedConfig) FieldString

func (p *ParsedConfig) FieldString(path ...string) (string, error)

FieldString accesses a string field from the parsed config by its name. If the field is not found or is not a string an error is returned.

func (*ParsedConfig) FieldStringList

func (p *ParsedConfig) FieldStringList(path ...string) ([]string, error)

FieldStringList accesses a field that is a list of strings from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of strings.

func (*ParsedConfig) FieldStringMap

func (p *ParsedConfig) FieldStringMap(path ...string) (map[string]string, error)

FieldStringMap accesses a field that is an object of arbitrary keys and string values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of strings.

func (*ParsedConfig) FieldTLS

func (p *ParsedConfig) FieldTLS(path ...string) (*tls.Config, error)

FieldTLS accesses a field from a parsed config that was defined with NewTLSField and returns a *tls.Config, or an error if the configuration was invalid.

func (*ParsedConfig) FieldTLSToggled

func (p *ParsedConfig) FieldTLSToggled(path ...string) (tconf *tls.Config, enabled bool, err error)

FieldTLSToggled accesses a field from a parsed config that was defined with NewTLSFieldToggled and returns a *tls.Config and a boolean flag indicating whether tls is explicitly enabled, or an error if the configuration was invalid.

func (*ParsedConfig) FieldURL

func (p *ParsedConfig) FieldURL(path ...string) (*url.URL, error)

FieldURL accesses a field from a parsed config that was defined with NewURLField and returns either a *url.URL or an error if the string was invalid.

func (*ParsedConfig) Namespace

func (p *ParsedConfig) Namespace(path ...string) *ParsedConfig

Namespace returns a version of the parsed config at a given field namespace. This is useful for extracting multiple fields under the same grouping.

type PrintLogger

type PrintLogger interface {
	Printf(format string, v ...any)
	Println(v ...any)
}

PrintLogger is a simple Print based interface implemented by custom loggers.

type Processor

type Processor interface {
	// Process a message into one or more resulting messages, or return an error
	// if the message could not be processed. If zero messages are returned and
	// the error is nil then the message is filtered.
	//
	// When an error is returned the input message will continue down the
	// pipeline but will be marked with the error with *message.SetError, and
	// metrics and logs will be emitted. The failed message can then be handled
	// with the patterns outlined in https://www.benthos.dev/docs/configuration/error_handling.
	//
	// The Message types returned MUST be derived from the provided message, and
	// CANNOT be custom implementations of Message. In order to copy the
	// provided message use the Copy method.
	Process(context.Context, *Message) (MessageBatch, error)

	Closer
}

Processor is a Benthos processor implementation that works against single messages.

type ProcessorConstructor

type ProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (Processor, error)

ProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.

type RateLimit

type RateLimit interface {
	// Access the rate limited resource. Returns a duration or an error if the
	// rate limit check fails. The returned duration is either zero (meaning the
	// resource may be accessed) or a reasonable length of time to wait before
	// requesting again.
	Access(context.Context) (time.Duration, error)

	Closer
}

RateLimit is an interface implemented by Benthos rate limits.

type RateLimitConstructor

type RateLimitConstructor func(conf *ParsedConfig, mgr *Resources) (RateLimit, error)

RateLimitConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a rate limit based on the config, or an error.

type ResourceInput

type ResourceInput struct {
	// contains filtered or unexported fields
}

ResourceInput provides access to an input resource.

func (*ResourceInput) ReadBatch

func (r *ResourceInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error)

ReadBatch attempts to read a message batch from the input, along with a function to be called once the entire batch can be either acked (successfully sent or intentionally filtered) or nacked (failed to be processed or dispatched to the output).

If this method returns ErrEndOfInput then that indicates that the input has finished and will no longer yield new messages.

type ResourceOutput

type ResourceOutput struct {
	// contains filtered or unexported fields
}

ResourceOutput provides access to an output resource.

func (*ResourceOutput) Write

func (o *ResourceOutput) Write(ctx context.Context, m *Message) error

Write a message to the output, or return an error either if delivery is not possible or the context is cancelled.

func (*ResourceOutput) WriteBatch

func (o *ResourceOutput) WriteBatch(ctx context.Context, b MessageBatch) error

WriteBatch attempts to write a message batch to the output, and returns an error either if delivery is not possible or the context is cancelled.

type Resources

type Resources struct {
	// contains filtered or unexported fields
}

Resources provides access to service-wide resources.

func MockResources

func MockResources(opts ...MockResourcesOptFn) *Resources

MockResources returns an instantiation of a resources struct that provides valid but ineffective methods and observability components. It is possible to instantiate this with mocked (in-memory) cache and rate limit types for testing purposed.

func (*Resources) AccessCache

func (r *Resources) AccessCache(ctx context.Context, name string, fn func(c Cache)) error

AccessCache attempts to access a cache resource by name. This action can block if CRUD operations are being actively performed on the resource.

func (*Resources) AccessInput

func (r *Resources) AccessInput(ctx context.Context, name string, fn func(i *ResourceInput)) error

AccessInput attempts to access a input resource by name.

func (*Resources) AccessOutput

func (r *Resources) AccessOutput(ctx context.Context, name string, fn func(o *ResourceOutput)) error

AccessOutput attempts to access an output resource by name. This action can block if CRUD operations are being actively performed on the resource.

func (*Resources) AccessRateLimit

func (r *Resources) AccessRateLimit(ctx context.Context, name string, fn func(r RateLimit)) error

AccessRateLimit attempts to access a rate limit resource by name. This action can block if CRUD operations are being actively performed on the resource.

func (*Resources) FS

func (r *Resources) FS() *FS

FS returns an fs.FS implementation that provides isolation or customised behaviour for components that access the filesystem. For example, this might be used to tally files being accessed by components for observability purposes, or to customise where relative paths are resolved from.

Components should use this instead of accessing the os directly. However, the default behaviour of an environment FS is to access the OS from the directory the process is running from, which matches calling the os package directly.

func (*Resources) HasCache

func (r *Resources) HasCache(name string) bool

HasCache confirms whether a cache with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) HasInput

func (r *Resources) HasInput(name string) bool

HasInput confirms whether an input with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) HasOutput

func (r *Resources) HasOutput(name string) bool

HasOutput confirms whether an output with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) HasRateLimit

func (r *Resources) HasRateLimit(name string) bool

HasRateLimit confirms whether a rate limit with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) Label

func (r *Resources) Label() string

Label returns a label that identifies the component instantiation. This could be an explicit label set in config, or is otherwise a generated label based on the position of the component within a config.

func (*Resources) Logger

func (r *Resources) Logger() *Logger

Logger returns a logger preset with context about the component the resources were provided to.

func (*Resources) Metrics

func (r *Resources) Metrics() *Metrics

Metrics returns a mechanism for creating custom metrics.

func (*Resources) OtelTracer

func (r *Resources) OtelTracer() trace.TracerProvider

OtelTracer returns an open telemetry tracer provider that can be used to create new tracers.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

func (*Resources) XUnwrapper

func (r *Resources) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream executes a full Benthos stream and provides methods for performing status checks, terminating the stream, and blocking until the stream ends.

func (*Stream) Run

func (s *Stream) Run(ctx context.Context) (err error)

Run attempts to start the stream pipeline and blocks until either the stream has gracefully come to a stop, or the provided context is cancelled.

func (*Stream) Stop

func (s *Stream) Stop(ctx context.Context) (err error)

Stop attempts to close the stream gracefully, but if the context is closed or draws near to a deadline the attempt becomes less graceful.

An ungraceful shutdown increases the likelihood of processing duplicate messages on the next start up, but never results in dropped messages as long as the input source supports at-least-once delivery.

func (*Stream) StopWithin

func (s *Stream) StopWithin(timeout time.Duration) error

StopWithin attempts to close the stream within the specified timeout period. Initially the attempt is graceful, but as the timeout draws close the attempt becomes progressively less graceful.

An ungraceful shutdown increases the likelihood of processing duplicate messages on the next start up, but never results in dropped messages as long as the input source supports at-least-once delivery.

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

StreamBuilder provides methods for building a Benthos stream configuration. When parsing Benthos configs this builder follows the schema and field defaults of a standard Benthos configuration. Environment variable interpolations are also parsed and resolved the same as regular configs.

Streams built with a stream builder have the HTTP server for exposing metrics and ready checks disabled by default, which is the only deviation away from a standard Benthos default configuration. In order to enable the server set the configuration field `http.enabled` to `true` explicitly, or use `SetHTTPMux` in order to provide an explicit HTTP multiplexer for registering those endpoints.

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

NewStreamBuilder creates a new StreamBuilder.

func (*StreamBuilder) AddBatchConsumerFunc

func (s *StreamBuilder) AddBatchConsumerFunc(fn MessageBatchHandlerFunc) error

AddBatchConsumerFunc adds an output to the builder that executes a closure function argument for each message batch. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.

The provided MessageBatchHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.

Only one consumer can be added to a stream builder, and subsequent calls will return an error.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise message batches received by this consumer will have a single message contents.

func (*StreamBuilder) AddBatchProducerFunc

func (s *StreamBuilder) AddBatchProducerFunc() (MessageBatchHandlerFunc, error)

AddBatchProducerFunc adds an input to the builder that allows you to write message batches directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.

The returned MessageBatchHandlerFunc can be called concurrently from any number of goroutines, and each call will block until all messages within the batch are successfully delivered downstream, were rejected (or otherwise could not be delivered) or the context is cancelled.

Only one producer func can be added to a stream builder, and subsequent calls will return an error.

func (*StreamBuilder) AddCacheYAML

func (s *StreamBuilder) AddCacheYAML(conf string) error

AddCacheYAML parses a cache YAML configuration and adds it to the builder as a resource.

func (*StreamBuilder) AddConsumerFunc

func (s *StreamBuilder) AddConsumerFunc(fn MessageHandlerFunc) error

AddConsumerFunc adds an output to the builder that executes a closure function argument for each message. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.

The provided MessageHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.

Only one consumer can be added to a stream builder, and subsequent calls will return an error.

func (*StreamBuilder) AddInputYAML

func (s *StreamBuilder) AddInputYAML(conf string) error

AddInputYAML parses an input YAML configuration and adds it to the builder. If more than one input configuration is added they will automatically be composed within a broker when the pipeline is built.

func (*StreamBuilder) AddOutputYAML

func (s *StreamBuilder) AddOutputYAML(conf string) error

AddOutputYAML parses an output YAML configuration and adds it to the builder. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.

func (*StreamBuilder) AddProcessorYAML

func (s *StreamBuilder) AddProcessorYAML(conf string) error

AddProcessorYAML parses a processor YAML configuration and adds it to the builder to be executed within the pipeline.processors section, after all prior added processor configs.

func (*StreamBuilder) AddProducerFunc

func (s *StreamBuilder) AddProducerFunc() (MessageHandlerFunc, error)

AddProducerFunc adds an input to the builder that allows you to write messages directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.

The returned MessageHandlerFunc can be called concurrently from any number of goroutines, and each call will block until the message is successfully delivered downstream, was rejected (or otherwise could not be delivered) or the context is cancelled.

Only one producer func can be added to a stream builder, and subsequent calls will return an error.

func (*StreamBuilder) AddRateLimitYAML

func (s *StreamBuilder) AddRateLimitYAML(conf string) error

AddRateLimitYAML parses a rate limit YAML configuration and adds it to the builder as a resource.

func (*StreamBuilder) AddResourcesYAML

func (s *StreamBuilder) AddResourcesYAML(conf string) error

AddResourcesYAML parses resource configurations and adds them to the config.

func (*StreamBuilder) AsYAML

func (s *StreamBuilder) AsYAML() (string, error)

AsYAML prints a YAML representation of the stream config as it has been currently built.

func (*StreamBuilder) Build

func (s *StreamBuilder) Build() (*Stream, error)

Build a Benthos stream pipeline according to the components specified by this stream builder.

func (*StreamBuilder) BuildTraced

func (s *StreamBuilder) BuildTraced() (*Stream, *TracingSummary, error)

BuildTraced creates a Benthos stream pipeline according to the components specified by this stream builder, where each major component (input, processor, output) is wrapped with a tracing module that, during the lifetime of the stream, aggregates tracing events into the returned *TracingSummary. Once the stream has ended the TracingSummary can be queried for events that occurred.

Experimental: The behaviour of this method could change outside of major version releases.

func (*StreamBuilder) DisableLinting

func (s *StreamBuilder) DisableLinting()

DisableLinting configures the stream builder to no longer lint YAML configs, allowing you to add snippets of config to the builder without failing on linting rules.

func (*StreamBuilder) SetBufferYAML

func (s *StreamBuilder) SetBufferYAML(conf string) error

SetBufferYAML parses a buffer YAML configuration and sets it to the builder to be placed between the input and the pipeline (processors) sections. This config will replace any prior configured buffer.

func (*StreamBuilder) SetEnvVarLookupFunc

func (s *StreamBuilder) SetEnvVarLookupFunc(fn func(string) (string, bool))

SetEnvVarLookupFunc changes the behaviour of the stream builder so that the value of environment variable interpolations (of the form `${FOO}`) are obtained via a provided function rather than the default of os.LookupEnv.

func (*StreamBuilder) SetFields

func (s *StreamBuilder) SetFields(pathValues ...any) error

SetFields modifies the config by setting one or more fields identified by a dot path to a value. The argument must be a variadic list of pairs, where the first element is a string containing the target field dot path, and the second element is a typed value to set the field to.

func (*StreamBuilder) SetHTTPMux

func (s *StreamBuilder) SetHTTPMux(m HTTPMultiplexer)

SetHTTPMux sets an HTTP multiplexer to be used by stream components when registering endpoints instead of a new server spawned following the `http` fields of a Benthos config.

func (*StreamBuilder) SetLoggerYAML

func (s *StreamBuilder) SetLoggerYAML(conf string) error

SetLoggerYAML parses a logger YAML configuration and adds it to the builder such that all stream components emit logs through it.

func (*StreamBuilder) SetMetricsYAML

func (s *StreamBuilder) SetMetricsYAML(conf string) error

SetMetricsYAML parses a metrics YAML configuration and adds it to the builder such that all stream components emit metrics through it.

func (*StreamBuilder) SetPrintLogger

func (s *StreamBuilder) SetPrintLogger(l PrintLogger)

SetPrintLogger sets a custom logger supporting a simple Print based interface to be used by stream components. This custom logger will override any logging fields set via config.

func (*StreamBuilder) SetThreads

func (s *StreamBuilder) SetThreads(n int)

SetThreads configures the number of pipeline processor threads should be configured. By default the number will be zero, which means the thread count will match the number of logical CPUs on the machine.

func (*StreamBuilder) SetTracerYAML

func (s *StreamBuilder) SetTracerYAML(conf string) error

SetTracerYAML parses a tracer YAML configuration and adds it to the builder such that all stream components emit tracing spans through it.

func (*StreamBuilder) SetYAML

func (s *StreamBuilder) SetYAML(conf string) error

SetYAML parses a full Benthos config and uses it to configure the builder. If any inputs, processors, outputs, resources, etc, have previously been added to the builder they will be overridden by this new config.

func (*StreamBuilder) WalkComponents

func (s *StreamBuilder) WalkComponents(fn func(w *WalkedComponent) error) error

WalkComponents walks the Benthos configuration as it is currently built and for each component type (input, processor, output, etc) calls a provided function with a struct containing information about the component.

This can be useful for taking an inventory of the contents of a config.

type TracingEvent

type TracingEvent struct {
	Type    TracingEventType
	Content string
	Meta    map[string]any
}

TracingEvent represents a single event that occurred within the stream.

Experimental: This type may change outside of major version releases.

type TracingEventType

type TracingEventType string

TracingEventType describes the type of tracing event a component might experience during a config run.

Experimental: This type may change outside of major version releases.

var (
	// Note: must match up with ./internal/bundle/tracing/events.go.
	TracingEventProduce TracingEventType = "PRODUCE"
	TracingEventConsume TracingEventType = "CONSUME"
	TracingEventDelete  TracingEventType = "DELETE"
	TracingEventError   TracingEventType = "ERROR"
	TracingEventUnknown TracingEventType = "UNKNOWN"
)

Various tracing event types.

Experimental: This type may change outside of major version releases.

type TracingSummary

type TracingSummary struct {
	// contains filtered or unexported fields
}

TracingSummary is a high level description of all traced events. When tracing a stream this should only be queried once the stream has ended.

Experimental: This type may change outside of major version releases.

func (*TracingSummary) InputEvents

func (s *TracingSummary) InputEvents() map[string][]TracingEvent

InputEvents returns a map of input labels to events traced during the execution of a stream pipeline.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) OutputEvents

func (s *TracingSummary) OutputEvents() map[string][]TracingEvent

OutputEvents returns a map of output labels to events traced during the execution of a stream pipeline.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) ProcessorEvents

func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent

ProcessorEvents returns a map of processor labels to events traced during the execution of a stream pipeline.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) TotalInput

func (s *TracingSummary) TotalInput() uint64

TotalInput returns the total traced input messages received.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) TotalOutput

func (s *TracingSummary) TotalOutput() uint64

TotalOutput returns the total traced output messages received.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) TotalProcessorErrors

func (s *TracingSummary) TotalProcessorErrors() uint64

TotalProcessorErrors returns the total traced processor errors occurred.

Experimental: This method may change outside of major version releases.

type WalkedComponent

type WalkedComponent struct {
	ComponentType string
	Name          string
	Label         string
	// contains filtered or unexported fields
}

WalkedComponent is a struct containing information about a component yielded via the WalkComponents method.

func (*WalkedComponent) ConfigYAML

func (w *WalkedComponent) ConfigYAML() string

ConfigYAML returns the configuration of a walked component in YAML form.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL