service

package
v4.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2024 License: MIT Imports: 80 Imported by: 193

Documentation

Overview

Package service provides a high level API for registering custom plugin components and executing either a standard Benthos CLI, or programmatically building isolated pipelines with a StreamBuilder API.

For a video guide on Benthos plugins check out: https://youtu.be/uH6mKw-Ly0g And an example repo containing component plugins and tests can be found at: https://github.com/benthosdev/benthos-plugin-example

In order to add custom Bloblang functions and methods use the ./public/bloblang package.

Example (BufferPlugin)

This example demonstrates how to create a buffer plugin. Buffers are an advanced component type that most plugin authors aren't likely to require.

package main

import (
	"context"
	"sync"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

type memoryBuffer struct {
	messages       chan service.MessageBatch
	endOfInputChan chan struct{}
	closeOnce      sync.Once
}

func newMemoryBuffer(n int) *memoryBuffer {
	return &memoryBuffer{
		messages:       make(chan service.MessageBatch, n),
		endOfInputChan: make(chan struct{}),
	}
}

func (m *memoryBuffer) WriteBatch(ctx context.Context, batch service.MessageBatch, aFn service.AckFunc) error {
	select {
	case m.messages <- batch:
	case <-ctx.Done():
		return ctx.Err()
	}
	// We weaken delivery guarantees here by acknowledging receipt of our batch
	// immediately.
	return aFn(ctx, nil)
}

func yoloIgnoreNacks(context.Context, error) error {
	// YOLO: Drop messages that are nacked
	return nil
}

func (m *memoryBuffer) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
	select {
	case msg := <-m.messages:
		return msg, yoloIgnoreNacks, nil
	case <-ctx.Done():
		return nil, nil, ctx.Err()
	case <-m.endOfInputChan:
		// Input has ended, so return ErrEndOfBuffer if our buffer is empty.
		select {
		case msg := <-m.messages:
			return msg, yoloIgnoreNacks, nil
		default:
			return nil, nil, service.ErrEndOfBuffer
		}
	}
}

func (m *memoryBuffer) EndOfInput() {
	m.closeOnce.Do(func() {
		close(m.endOfInputChan)
	})
}

func (m *memoryBuffer) Close(ctx context.Context) error {
	// Nothing to clean up
	return nil
}

// This example demonstrates how to create a buffer plugin. Buffers are an
// advanced component type that most plugin authors aren't likely to require.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("Creates a lame memory buffer that loses data on forced restarts or service crashes.").
		Field(service.NewIntField("max_batches").Default(100))

	err := service.RegisterBatchBuffer("lame_memory", configSpec,
		func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchBuffer, error) {
			capacity, err := conf.FieldInt("max_batches")
			if err != nil {
				return nil, err
			}
			return newMemoryBuffer(capacity), nil
		})
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (CachePlugin)

This example demonstrates how to create a cache plugin, where the implementation of the cache (type `LossyCache`) also contains fields that should be parsed within the Benthos config.

package main

import (
	"context"
	"time"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

// LossyCache is a terrible cache example and silently drops items when the
// capacity is reached. It also doesn't respect TTLs.
type LossyCache struct {
	capacity int
	mDropped *service.MetricCounter
	items    map[string][]byte
}

func (l *LossyCache) Get(ctx context.Context, key string) ([]byte, error) {
	if b, ok := l.items[key]; ok {
		return b, nil
	}
	return nil, service.ErrKeyNotFound
}

func (l *LossyCache) Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error {
	if len(l.items) >= l.capacity {
		// Dropped, whoopsie!
		l.mDropped.Incr(1)
		return nil
	}
	l.items[key] = value
	return nil
}

func (l *LossyCache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error {
	if _, exists := l.items[key]; exists {
		return service.ErrKeyAlreadyExists
	}
	if len(l.items) >= l.capacity {
		// Dropped, whoopsie!
		l.mDropped.Incr(1)
		return nil
	}
	l.items[key] = value
	return nil
}

func (l *LossyCache) Delete(ctx context.Context, key string) error {
	delete(l.items, key)
	return nil
}

func (l *LossyCache) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a cache plugin, where the
// implementation of the cache (type `LossyCache`) also contains fields that
// should be parsed within the Benthos config.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("Creates a terrible cache with a fixed capacity.").
		Field(service.NewIntField("capacity").Default(100))

	err := service.RegisterCache("lossy", configSpec,
		func(conf *service.ParsedConfig, mgr *service.Resources) (service.Cache, error) {
			capacity, err := conf.FieldInt("capacity")
			if err != nil {
				return nil, err
			}
			return &LossyCache{
				capacity: capacity,
				mDropped: mgr.Metrics().NewCounter("dropped_just_cus"),
				items:    make(map[string][]byte, capacity),
			}, nil
		})
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (InputPlugin)

This example demonstrates how to create an input plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.

package main

import (
	"context"
	"math/rand"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

type GibberishInput struct {
	length int
}

func (g *GibberishInput) Connect(ctx context.Context) error {
	return nil
}

func (g *GibberishInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
	b := make([]byte, g.length)
	for k := range b {
		b[k] = byte((rand.Int() % 94) + 32)
	}
	return service.NewMessage(b), func(ctx context.Context, err error) error {
		// A nack (when err is non-nil) is handled automatically when we
		// construct using service.AutoRetryNacks, so we don't need to handle
		// nacks here.
		return nil
	}, nil
}

func (g *GibberishInput) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create an input plugin, which is configured
// by providing a struct containing the fields to be parsed from within the
// Benthos configuration.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("Creates a load of gibberish, putting us all out of work.").
		Field(service.NewIntField("length").Default(100))

	constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
		length, err := conf.FieldInt("length")
		if err != nil {
			return nil, err
		}
		return service.AutoRetryNacks(&GibberishInput{length}), nil
	}

	err := service.RegisterInput("gibberish", configSpec, constructor)
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (OutputBatchedPlugin)

This example demonstrates how to create a batched output plugin, which allows us to specify a batching mechanism and implement an interface that writes a batch of messages in one call.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

type batchOfJSONWriter struct{}

func (b *batchOfJSONWriter) Connect(ctx context.Context) error {
	return nil
}

func (b *batchOfJSONWriter) WriteBatch(ctx context.Context, msgs service.MessageBatch) error {
	var messageObjs []any
	for _, msg := range msgs {
		msgObj, err := msg.AsStructured()
		if err != nil {
			return err
		}
		messageObjs = append(messageObjs, msgObj)
	}
	outBytes, err := json.Marshal(map[string]any{
		"count":   len(msgs),
		"objects": messageObjs,
	})
	if err != nil {
		return err
	}
	fmt.Println(string(outBytes))
	return nil
}

func (b *batchOfJSONWriter) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a batched output plugin, which allows
// us to specify a batching mechanism and implement an interface that writes a
// batch of messages in one call.
func main() {
	spec := service.NewConfigSpec().
		Field(service.NewBatchPolicyField("batching"))

	// Register our new output, which doesn't require a config schema.
	err := service.RegisterBatchOutput(
		"batched_json_stdout", spec,
		func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, policy service.BatchPolicy, maxInFlight int, err error) {
			if policy, err = conf.FieldBatchPolicy("batching"); err != nil {
				return
			}
			maxInFlight = 1
			out = &batchOfJSONWriter{}
			return
		})
	if err != nil {
		panic(err)
	}

	// Use the stream builder API to create a Benthos stream that uses our new
	// output type.
	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err = builder.SetYAML(`
input:
  generate:
    count: 5
    interval: 1ms
    mapping: |
      root.id = count("batched output example messages")
      root.text = "some stuff"

output:
  batched_json_stdout:
    batching:
      count: 5

logger:
  level: off
`)
	if err != nil {
		panic(err)
	}

	// Build a stream with our configured components.
	stream, err := builder.Build()
	if err != nil {
		panic(err)
	}

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	if err = stream.Run(context.Background()); err != nil {
		panic(err)
	}

}
Output:

{"count":5,"objects":[{"id":1,"text":"some stuff"},{"id":2,"text":"some stuff"},{"id":3,"text":"some stuff"},{"id":4,"text":"some stuff"},{"id":5,"text":"some stuff"}]}
Example (OutputPlugin)

This example demonstrates how to create an output plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.

package main

import (
	"context"
	"fmt"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

type BlueOutput struct{}

func (b *BlueOutput) Connect(ctx context.Context) error {
	return nil
}

func (b *BlueOutput) Write(ctx context.Context, msg *service.Message) error {
	content, err := msg.AsBytes()
	if err != nil {
		return err
	}
	fmt.Printf("\033[01;34m%s\033[m\n", content)
	return nil
}

func (b *BlueOutput) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create an output plugin. This example is for
// an implementation that does not require any configuration parameters, and
// therefore doesn't defined any within the configuration specification.
func main() {
	// Register our new output, which doesn't require a config schema.
	err := service.RegisterOutput(
		"blue_stdout", service.NewConfigSpec(),
		func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) {
			return &BlueOutput{}, 1, nil
		})
	if err != nil {
		panic(err)
	}

	// Use the stream builder API to create a Benthos stream that uses our new
	// output type.
	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err = builder.SetYAML(`
input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world"'

output:
  blue_stdout: {}

logger:
  level: off
`)
	if err != nil {
		panic(err)
	}

	// Build a stream with our configured components.
	stream, err := builder.Build()
	if err != nil {
		panic(err)
	}

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	if err = stream.Run(context.Background()); err != nil {
		panic(err)
	}

}
Output:

�[01;34mhello world�[m
Example (ProcessorPlugin)

This example demonstrates how to create a processor plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.

package main

import (
	"bytes"
	"context"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/io"
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

type ReverseProcessor struct {
	logger *service.Logger
}

func (r *ReverseProcessor) Process(ctx context.Context, m *service.Message) (service.MessageBatch, error) {
	bytesContent, err := m.AsBytes()
	if err != nil {
		return nil, err
	}

	newBytes := make([]byte, len(bytesContent))
	for i, b := range bytesContent {
		newBytes[len(newBytes)-i-1] = b
	}

	if bytes.Equal(newBytes, bytesContent) {
		r.logger.Infof("Woah! This is like totally a palindrome: %s", bytesContent)
	}

	m.SetBytes(newBytes)
	return []*service.Message{m}, nil
}

func (r *ReverseProcessor) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a processor plugin. This example is
// for an implementation that does not require any configuration parameters, and
// therefore doesn't defined any within the configuration specification.
func main() {
	// Register our new processor, which doesn't require a config schema.
	err := service.RegisterProcessor(
		"reverse", service.NewConfigSpec(),
		func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
			return &ReverseProcessor{logger: mgr.Logger()}, nil
		})
	if err != nil {
		panic(err)
	}

	// Build a Benthos stream that uses our new output type.
	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err = builder.SetYAML(`
input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world"'

pipeline:
  processors:
    - reverse: {}

output:
  stdout: {}

logger:
  level: off
`)
	if err != nil {
		panic(err)
	}

	// Build a stream with our configured components.
	stream, err := builder.Build()
	if err != nil {
		panic(err)
	}

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	if err = stream.Run(context.Background()); err != nil {
		panic(err)
	}

}
Output:

dlrow olleh
Example (RateLimitPlugin)

This example demonstrates how to create a rate limit plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only pure Benthos components, switch with `components/all` for all
	// standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

type RandomRateLimit struct {
	max time.Duration
}

func (r *RandomRateLimit) Access(context.Context) (time.Duration, error) {
	return time.Duration(rand.Int() % int(r.max)), nil
}

func (r *RandomRateLimit) Close(ctx context.Context) error {
	return nil
}

// This example demonstrates how to create a rate limit plugin, which is
// configured by providing a struct containing the fields to be parsed from
// within the Benthos configuration.
func main() {
	configSpec := service.NewConfigSpec().
		Summary("A rate limit that's pretty much just random.").
		Description("I guess this isn't really that useful, sorry.").
		Field(service.NewStringField("maximum_duration").Default("1s"))

	constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.RateLimit, error) {
		maxDurStr, err := conf.FieldString("maximum_duration")
		if err != nil {
			return nil, err
		}
		maxDuration, err := time.ParseDuration(maxDurStr)
		if err != nil {
			return nil, fmt.Errorf("invalid max duration: %w", err)
		}
		return &RandomRateLimit{maxDuration}, nil
	}

	err := service.RegisterRateLimit("random", configSpec, constructor)
	if err != nil {
		panic(err)
	}

	// And then execute Benthos with:
	// service.RunCLI(context.Background())
}
Output:

Example (StreamBuilderConfig)

This example demonstrates how to use a stream builder to parse and execute a full Benthos config.

package main

import (
	"context"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/io"
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	builder := service.NewStreamBuilder()

	// Set the full Benthos configuration of the stream.
	err := builder.SetYAML(`
input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world"'

  processors:
    - mapping: 'root = content().uppercase()'

output:
  stdout: {}

logger:
  level: none
`)
	panicOnErr(err)

	// Build a stream with our configured components.
	stream, err := builder.Build()
	panicOnErr(err)

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	err = stream.Run(context.Background())
	panicOnErr(err)

}
Output:

HELLO WORLD
Example (StreamBuilderConfigAddMethods)

This example demonstrates how to use a stream builder to assemble a stream of Benthos components by adding snippets of configs for different component types, and then execute it. You can use the Add methods to append any number of components to the stream, following fan in and fan out patterns for inputs and outputs respectively.

package main

import (
	"context"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/io"
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	builder := service.NewStreamBuilder()

	err := builder.AddInputYAML(`
generate:
  count: 1
  interval: 1ms
  mapping: 'root = "hello world"'
`)
	panicOnErr(err)

	err = builder.AddProcessorYAML(`mapping: 'root = content().uppercase()'`)
	panicOnErr(err)

	err = builder.AddOutputYAML(`stdout: {}`)
	panicOnErr(err)

	err = builder.SetLoggerYAML(`level: off`)
	panicOnErr(err)

	// Build a stream with our configured components.
	stream, err := builder.Build()
	panicOnErr(err)

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	err = stream.Run(context.Background())
	panicOnErr(err)

}
Output:

HELLO WORLD
Example (StreamBuilderMultipleStreams)

This example demonstrates using the stream builder API to create and run two independent streams.

package main

import (
	"context"
	"sync"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/io"
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	// Build the first stream pipeline. Note that we configure each pipeline
	// with its HTTP server disabled as otherwise we would see a port collision
	// when they both attempt to bind to the default address `0.0.0.0:4195`.
	//
	// Alternatively, we could choose to configure each with their own address
	// with the field `http.address`, or we could call `SetHTTPMux` on the
	// builder in order to explicitly override the configured server.
	builderOne := service.NewStreamBuilder()

	err := builderOne.SetYAML(`
http:
  enabled: false

input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world one"'

  processors:
    - mapping: 'root = content().uppercase()'

output:
  stdout: {}

logger:
  level: off
`)
	panicOnErr(err)

	streamOne, err := builderOne.Build()
	panicOnErr(err)

	builderTwo := service.NewStreamBuilder()

	err = builderTwo.SetYAML(`
http:
  enabled: false

input:
  generate:
    count: 1
    interval: 1ms
    mapping: 'root = "hello world two"'

  processors:
    - sleep:
        duration: 500ms
    - mapping: 'root = content().capitalize()'

output:
  stdout: {}

logger:
  level: off
`)
	panicOnErr(err)

	streamTwo, err := builderTwo.Build()
	panicOnErr(err)

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		panicOnErr(streamOne.Run(context.Background()))
	}()
	go func() {
		defer wg.Done()
		panicOnErr(streamTwo.Run(context.Background()))
	}()

	wg.Wait()

}
Output:

HELLO WORLD ONE
Hello World Two
Example (StreamBuilderPush)

This example demonstrates how to use a stream builder to assemble a processing pipeline that you can push messages into and extract via closures.

package main

import (
	"bytes"
	"context"
	"fmt"
	"time"

	"github.com/redpanda-data/benthos/v4/public/service"

	// Import only required Benthos components, switch with `components/all` for
	// all standard components.
	_ "github.com/redpanda-data/benthos/v4/public/components/io"
	_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

func main() {
	panicOnErr := func(err error) {
		if err != nil {
			panic(err)
		}
	}

	builder := service.NewStreamBuilder()
	err := builder.SetLoggerYAML(`level: off`)
	panicOnErr(err)

	err = builder.AddProcessorYAML(`mapping: 'root = content().uppercase()'`)
	panicOnErr(err)

	err = builder.AddProcessorYAML(`mapping: 'root = "check this out: " + content()'`)
	panicOnErr(err)

	// Obtain a closure func that allows us to push data into the stream, this
	// is treated like any other input, which also means it's possible to use
	// this along with regular configured inputs.
	sendFn, err := builder.AddProducerFunc()
	panicOnErr(err)

	// Define a closure func that receives messages as an output of the stream.
	// It's also possible to use this along with regular configured outputs.
	var outputBuf bytes.Buffer
	err = builder.AddConsumerFunc(func(c context.Context, m *service.Message) error {
		msgBytes, err := m.AsBytes()
		if err != nil {
			return err
		}

		_, err = fmt.Fprintf(&outputBuf, "received: %s\n", msgBytes)
		return err
	})
	panicOnErr(err)

	stream, err := builder.Build()
	panicOnErr(err)

	go func() {
		perr := sendFn(context.Background(), service.NewMessage([]byte("hello world")))
		panicOnErr(perr)

		perr = sendFn(context.Background(), service.NewMessage([]byte("I'm pushing data into the stream")))
		panicOnErr(perr)

		perr = stream.StopWithin(time.Second)
		panicOnErr(perr)
	}()

	// And run it, blocking until it gracefully terminates once the generate
	// input has generated a message and it has flushed through the stream.
	err = stream.Run(context.Background())
	panicOnErr(err)

	fmt.Println(outputBuf.String())

}
Output:

received: check this out: HELLO WORLD
received: check this out: I'M PUSHING DATA INTO THE STREAM

Index

Examples

Constants

View Source
const AutoRetryNacksToggleFieldName = "auto_replay_nacks"

Variables

View Source
var (
	ErrKeyAlreadyExists = errors.New("key already exists")
	ErrKeyNotFound      = errors.New("key does not exist")
)

Errors returned by cache types.

View Source
var (
	// ErrNotConnected is returned by inputs and outputs when their Read or
	// Write methods are called and the connection that they maintain is lost.
	// This error prompts the upstream component to call Connect until the
	// connection is re-established.
	ErrNotConnected = errors.New("not connected")

	// ErrEndOfInput is returned by inputs that have exhausted their source of
	// data to the point where subsequent Read calls will be ineffective. This
	// error prompts the upstream component to gracefully terminate the
	// pipeline.
	ErrEndOfInput = errors.New("end of input")

	// ErrEndOfBuffer is returned by a buffer Read/ReadBatch method when the
	// contents of the buffer has been emptied and the source of the data is
	// ended (as indicated by EndOfInput). This error prompts the upstream
	// component to gracefully terminate the pipeline.
	ErrEndOfBuffer = errors.New("end of buffer")
)
View Source
var ErrBlockingWrite = errors.New("a blocking write attempt was aborted")

ErrBlockingWrite is returned when a non-blocking write is aborted

Functions

func Globs

func Globs(f fs.FS, paths ...string) ([]string, error)

Globs attempts to expand the glob patterns within of a series of paths and returns the resulting expanded slice or an error.

func GlobsAndSuperPaths

func GlobsAndSuperPaths(f fs.FS, paths []string, extensions ...string) ([]string, error)

GlobsAndSuperPaths attempts to expand a list of paths, which may include glob patterns and super paths (the ... thing) to a list of explicit file paths. Extensions must be provided, and limit the file types that are captured with a super path.

func OSFS

func OSFS() fs.FS

OSFS provides an fs.FS implementation that simply calls into the os.

func OutputPerformanceDocs

func OutputPerformanceDocs(benefitsFromMaxInFlight, benefitsFromBatching bool) (content string)

OutputPerformanceDocs returns a string of markdown documentation that can be added to outputs as standard performance advice based on whether the output benefits from a max_in_flight field, batching or both.

func ReadFile

func ReadFile(f fs.FS, name string) ([]byte, error)

ReadFile opens a file from an fs.FS and reads all bytes. When the OpenFile method is available this will be used instead of Open with the RDONLY flag.

func RegisterBatchBuffer

func RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error

RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.

Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.

func RegisterBatchInput

func RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error

RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.

func RegisterBatchOutput

func RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error

RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).

If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.

func RegisterBatchProcessor

func RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error

RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

func RegisterBatchScannerCreator

func RegisterBatchScannerCreator(name string, spec *ConfigSpec, ctor BatchScannerCreatorConstructor) error

RegisterBatchScannerCreator attempts to register a new batch scanner exporter plugin by providing a description of the configuration for the plugin as well as a constructor for the scanner itself. The constructor will be called for each instantiation of the component within a config.

func RegisterCache

func RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error

RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.

func RegisterInput

func RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error

RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.

func RegisterMetricsExporter

func RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error

RegisterMetricsExporter attempts to register a new metrics exporter plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.

func RegisterOtelTracerProvider

func RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error

RegisterOtelTracerProvider attempts to register a new open telemetry tracer provider plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

func RegisterOutput

func RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error

RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

func RegisterProcessor

func RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error

RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

For simple transformations consider implementing a Bloblang plugin method instead.

func RegisterRateLimit

func RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error

RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.

func RegisterTemplateYAML

func RegisterTemplateYAML(yamlStr string) error

RegisterTemplateYAML attempts to register a template to the global environment, defined as a YAML document, to the environment such that it may be used similarly to any other component plugin.

func RunCLI

func RunCLI(ctx context.Context, optFuncs ...CLIOptFunc)

RunCLI executes Benthos as a CLI, allowing users to specify a configuration file path(s) and execute subcommands for linting configs, testing configs, etc. This is how a standard distribution of Benthos operates.

This call blocks until either:

1. The service shuts down gracefully due to the inputs closing 2. A termination signal is received 3. The provided context has a deadline that is reached, triggering graceful termination 4. The provided context is cancelled (WARNING, this prevents graceful termination)

This function must only be called once during the entire lifecycle of your program, as it interacts with singleton state. In order to manage multiple Benthos stream lifecycles in a program use the StreamBuilder API instead.

func XFormatConfigJSON

func XFormatConfigJSON() ([]byte, error)

XFormatConfigJSON returns a byte slice of the Benthos configuration spec formatted as a JSON object. The schema of this method is undocumented and is not intended for general use.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

Types

type AckFunc

type AckFunc func(ctx context.Context, err error) error

AckFunc is a common function returned by inputs that must be called once for each message consumed. This function ensures that the source of the message receives either an acknowledgement (err is nil) or an error that can either be propagated upstream as a nack, or trigger a reattempt at delivering the same message.

If your input implementation doesn't have a specific mechanism for dealing with a nack then you can wrap your input implementation with AutoRetryNacks to get automatic retries.

type BatchBuffer

type BatchBuffer interface {
	// Write a batch of messages to the buffer, the batch is accompanied with an
	// acknowledge function. A non-nil error should be returned if it is not
	// possible to store the given message batch in the buffer.
	//
	// If a nil error is returned the buffer assumes responsibility for calling
	// the acknowledge function at least once during the lifetime of the
	// message.
	//
	// This could be at the point where the message is written to the buffer,
	// which weakens delivery guarantees but can be useful for decoupling the
	// input from downstream components. Alternatively, this could be when the
	// associated batch has been read from the buffer and acknowledged
	// downstream, which preserves delivery guarantees.
	WriteBatch(context.Context, MessageBatch, AckFunc) error

	// Read a batch of messages from the buffer. This call should block until
	// either a batch is ready to consume, the provided context is cancelled or
	// EndOfInput has been called which indicates that the buffer is no longer
	// being populated with new messages.
	//
	// The returned acknowledge function will be called when a consumed message
	// batch has been processed and sent downstream. It is up to the buffer
	// implementation whether the ack function is used, it might be used in
	// order to "commit" the removal of a message from the buffer in cases where
	// the buffer is a persisted storage solution, or in cases where the output
	// of the buffer is temporal (a windowing algorithm, etc) it might be
	// considered correct to simply drop message batches that are not acked.
	//
	// When the buffer is closed (EndOfInput has been called and no more
	// messages are available) this method should return an ErrEndOfBuffer in
	// order to indicate the end of the buffered stream.
	//
	// It is valid to return a batch of only one message.
	ReadBatch(context.Context) (MessageBatch, AckFunc, error)

	// EndOfInput indicates to the buffer that the input has ended and that once
	// the buffer is depleted it should return ErrEndOfBuffer from ReadBatch in
	// order to gracefully shut down the pipeline.
	//
	// EndOfInput should be idempotent as it may be called more than once.
	EndOfInput()

	Closer
}

BatchBuffer is an interface implemented by Buffers able to read and write message batches. Buffers are a component type that are placed after inputs, and decouples the acknowledgement system of the inputs from the rest of the pipeline.

Buffers are useful when implementing buffers intended to relieve back pressure from upstream components, or when implementing message aggregators where the concept of discrete messages running through a pipeline no longer applies (such as with windowing algorithms).

Buffers are advanced component types that weaken delivery guarantees of a Benthos pipeline. Therefore, if you aren't absolutely sure that a component you wish to build should be a buffer type then it likely shouldn't be.

type BatchBufferConstructor

type BatchBufferConstructor func(conf *ParsedConfig, mgr *Resources) (BatchBuffer, error)

BatchBufferConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a buffer based on the config, or an error.

Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.

type BatchError

type BatchError struct {
	// contains filtered or unexported fields
}

BatchError groups the errors that were encountered while processing a collection (usually a batch) of messages and provides methods to iterate over these errors.

func NewBatchError

func NewBatchError(b MessageBatch, headline error) *BatchError

NewBatchError creates a BatchError that can be returned by batched outputs. The advantage of doing so is that nacks and retries can potentially be granularly dealt out in cases where only a subset of the batch failed.

A headline error must be supplied which will be exposed when upstream components do not support granular batch errors.

func (*BatchError) Error

func (err *BatchError) Error() string

Error returns the underlying error message

func (*BatchError) Failed

func (err *BatchError) Failed(i int, merr error) *BatchError

Failed stores an error state for a particular message of a batch. Returns a pointer to the underlying error, allowing the method to be chained.

If Failed is not called then all messages are assumed to have failed. If it is called at least once then all message indexes that aren't explicitly failed are assumed to have been processed successfully.

func (*BatchError) IndexedErrors

func (err *BatchError) IndexedErrors() int

IndexedErrors returns the number of indexed errors that have been registered within a walkable error.

func (*BatchError) Unwrap

func (err *BatchError) Unwrap() error

func (*BatchError) WalkMessages deprecated

func (err *BatchError) WalkMessages(fn func(int, *Message, error) bool)

WalkMessages applies a closure to each message that was part of the request that caused this error. The closure is provided the message index, a pointer to the message, and its individual error, which may be nil if the message itself was processed successfully. The closure should return a bool which indicates whether the iteration should be continued.

Deprecated: This method is harmful and should be avoided as indexes are not guaranteed to match a hypothetical origin batch that they might be compared against. Use WalkMessagesIndexedBy instead.

func (*BatchError) WalkMessagesIndexedBy

func (err *BatchError) WalkMessagesIndexedBy(s *Indexer, fn func(int, *Message, error) bool)

WalkMessagesIndexedBy applies a closure to each message of a batch that is included in this batch error. A batch error represents errors that occurred to only a subset of a batch of messages, in which case it is possible to use this error in order to avoid re-processing or re-delivering messages that didn't fail.

However, the shape of the batch of messages at the time the errors occurred may differ significantly from the batch known by the component receiving this error. For example a processor that dispatches a batch to a list of child processors may receive a batch error that occurred after filtering and re-ordering has occurred to the batch. In such cases it is not possible to simply inspect the indexes of errored messages in order to associate them with the original batch as those indexes could have changed.

Therefore, in order to solve this particular use case it is possible to create a batch indexer before dispatching the batch to the child components. Then, when a batch error is received WalkMessagesIndexedBy can be used as way to walk the errored messages with the indexes (and message contents) of the original batch.

Important! The order of messages walked is not guaranteed to match that of the source batch. It is also possible for any given index to be represented zero, one or more times.

type BatchInput

type BatchInput interface {
	// Establish a connection to the upstream service. Connect will always be
	// called first when a reader is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// The provided context remains open only for the duration of the connecting
	// phase, and should not be used to establish the lifetime of the connection
	// itself.
	//
	// Once Connect returns a nil error the Read method will be called until
	// either ErrNotConnected is returned, or the reader is closed.
	Connect(context.Context) error

	// Read a message batch from a source, along with a function to be called
	// once the entire batch can be either acked (successfully sent or
	// intentionally filtered) or nacked (failed to be processed or dispatched
	// to the output).
	//
	// The AckFunc will be called for every message batch at least once, but
	// there are no guarantees as to when this will occur. If your input
	// implementation doesn't have a specific mechanism for dealing with a nack
	// then you can wrap your input implementation with AutoRetryNacksBatched to
	// get automatic retries.
	//
	// If this method returns ErrNotConnected then ReadBatch will not be called
	// again until Connect has returned a nil error. If ErrEndOfInput is
	// returned then Read will no longer be called and the pipeline will
	// gracefully terminate.
	ReadBatch(context.Context) (MessageBatch, AckFunc, error)

	Closer
}

BatchInput is an interface implemented by Benthos inputs that produce messages in batches, where there is a desire to process and send the batch as a logical group rather than as individual messages.

Calls to ReadBatch should block until either a message batch is ready to process, the connection is lost, or the provided context is cancelled.

func AutoRetryNacksBatched

func AutoRetryNacksBatched(i BatchInput) BatchInput

AutoRetryNacksBatched wraps a batched input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.

When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.

func AutoRetryNacksBatchedToggled

func AutoRetryNacksBatchedToggled(c *ParsedConfig, i BatchInput) (BatchInput, error)

AutoRetryNacksBatchedToggled wraps an input implementation with AutoRetryNacksBatched only if a field defined by NewAutoRetryNacksToggleField has been set to true.

func InputBatchedWithMaxInFlight

func InputBatchedWithMaxInFlight(n int, i BatchInput) BatchInput

InputBatchedWithMaxInFlight wraps a batched input with a component that limits the number of batches being processed at a given time. When the limit is reached a new message batch will not be consumed until an ack/nack has been returned.

type BatchInputConstructor

type BatchInputConstructor func(conf *ParsedConfig, mgr *Resources) (BatchInput, error)

BatchInputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a batched reader based on the config, or an error.

type BatchOutput

type BatchOutput interface {
	// Establish a connection to the downstream service. Connect will always be
	// called first when a writer is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// Once Connect returns a nil error the write method will be called until
	// either ErrNotConnected is returned, or the writer is closed.
	Connect(context.Context) error

	// Write a batch of messages to a sink, or return an error if delivery is
	// not possible.
	//
	// If this method returns ErrNotConnected then write will not be called
	// again until Connect has returned a nil error.
	WriteBatch(context.Context, MessageBatch) error

	Closer
}

BatchOutput is an interface implemented by Benthos outputs that require Benthos to batch messages before dispatch in order to improve throughput. Each call to WriteBatch should block until either all messages in the batch have been successfully or unsuccessfully sent, or the context is cancelled.

Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel batched write calls the output supports.

type BatchOutputConstructor

type BatchOutputConstructor func(conf *ParsedConfig, mgr *Resources) (out BatchOutput, batchPolicy BatchPolicy, maxInFlight int, err error)

BatchOutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config, a batching policy, and a maximum number of in-flight message batches to allow, or an error.

type BatchPolicy

type BatchPolicy struct {
	ByteSize int
	Count    int
	Check    string
	Period   string
	// contains filtered or unexported fields
}

BatchPolicy describes the mechanisms by which batching should be performed of messages destined for a Batch output. This is returned by constructors of batch outputs.

func (BatchPolicy) IsNoop

func (b BatchPolicy) IsNoop() bool

IsNoop returns true if the batching policy does not have any batching mechanisms configured.

func (BatchPolicy) NewBatcher

func (b BatchPolicy) NewBatcher(res *Resources) (*Batcher, error)

NewBatcher creates a batching mechanism from the policy.

type BatchProcessor

type BatchProcessor interface {
	// Process a batch of messages into one or more resulting batches, or return
	// an error if the entire batch could not be processed. If zero messages are
	// returned and the error is nil then all messages are filtered.
	//
	// The provided MessageBatch should NOT be modified, in order to return a
	// mutated batch a copy of the slice should be created instead.
	//
	// When an error is returned all of the input messages will continue down
	// the pipeline but will be marked with the error with *message.SetError,
	// and metrics and logs will be emitted.
	//
	// In order to add errors to individual messages of the batch for downstream
	// handling use *message.SetError(err) and return it in the resulting batch
	// with a nil error.
	//
	// The Message types returned MUST be derived from the provided messages,
	// and CANNOT be custom implementations of Message. In order to copy the
	// provided messages use the Copy method.
	ProcessBatch(context.Context, MessageBatch) ([]MessageBatch, error)

	Closer
}

BatchProcessor is a Benthos processor implementation that works against batches of messages, which allows windowed processing.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.

type BatchProcessorConstructor

type BatchProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (BatchProcessor, error)

BatchProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.

type BatchScanner

type BatchScanner interface {
	NextBatch(context.Context) (MessageBatch, AckFunc, error)
	Close(context.Context) error
}

BatchScanner is an interface implemented by instantiations of BatchScannerCreator responsible for consuming an io.ReadCloser and converting the stream of bytes into discrete message batches based on the underlying format of the scanner.

The returned ack func will be called by downstream components once the produced message batch has been successfully processed and delivered. Only once all message batches extracted from a BatchScanner should the ack func provided at instantiation be called, unless an ack call is returned with an error.

Once the input data has been fully consumed io.EOF should be returned.

func AutoAggregateBatchScannerAcks

func AutoAggregateBatchScannerAcks(strm SimpleBatchScanner, aFn AckFunc) BatchScanner

AutoAggregateBatchScannerAcks wraps a simplified SimpleBatchScanner in a mechanism that automatically aggregates acknowledgments from yielded batches.

type BatchScannerCreator

type BatchScannerCreator interface {
	Create(io.ReadCloser, AckFunc, *ScannerSourceDetails) (BatchScanner, error)
	Close(context.Context) error
}

BatchScannerCreator is an interface implemented by Benthos scanner plugins. Calls to Create must create a new instantiation of BatchScanner that consumes the provided io.ReadCloser, produces batches of messages (batches containing a single message are valid) and calls the provided AckFunc once all derived data is delivered (or rejected).

type BatchScannerCreatorConstructor

type BatchScannerCreatorConstructor func(conf *ParsedConfig, mgr *Resources) (BatchScannerCreator, error)

BatchScannerCreatorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a batch scanner creator.

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher provides a batching mechanism where messages can be added one-by-one with a boolean return indicating whether the batching policy has been triggered.

Upon triggering the policy it is the responsibility of the owner of this batcher to call Flush, which returns all the pending messages in the batch.

This batcher may contain processors that are executed during the flush, therefore it is important to call Close when this batcher is no longer required, having also called Flush if appropriate.

func (*Batcher) Add

func (b *Batcher) Add(msg *Message) bool

Add a message to the batch. Returns true if the batching policy has been triggered by this new addition, in which case Flush should be called.

func (*Batcher) Close

func (b *Batcher) Close(ctx context.Context) error

Close the batching policy, which cleans up any resources used by batching processors.

func (*Batcher) Flush

func (b *Batcher) Flush(ctx context.Context) (batch MessageBatch, err error)

Flush pending messages into a batch, apply any batching processors that are part of the batching policy, and then return the result.

func (*Batcher) UntilNext

func (b *Batcher) UntilNext() (time.Duration, bool)

UntilNext returns a duration indicating how long until the current batch should be flushed due to a configured period. A boolean is also returned indicating whether the batching policy has a timed factor, if this is false then the duration returned should be ignored.

func (*Batcher) XUnwrapper

func (b *Batcher) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type CLIOptBuilder

type CLIOptBuilder struct {
	// contains filtered or unexported fields
}

type CLIOptFunc

type CLIOptFunc func(*CLIOptBuilder)

CLIOptFunc defines an option to pass through the standard Benthos CLI in order to customise it's behaviour.

func CLIOptAddTeeLogger

func CLIOptAddTeeLogger(l *slog.Logger) CLIOptFunc

CLIOptAddTeeLogger adds another logger to receive all log events from the service initialised via the CLI.

func CLIOptOnConfigParse

func CLIOptOnConfigParse(fn func(fn *ParsedConfig) error) CLIOptFunc

CLIOptOnConfigParse sets a closure function to be called when a main configuration file load has occurred.

If an error is returned this will be treated by the CLI the same as any other failure to parse the bootstrap config.

func CLIOptOnLoggerInit

func CLIOptOnLoggerInit(fn func(*Logger)) CLIOptFunc

CLIOptOnLoggerInit sets a closure to be called when the service-wide logger is initialised. A modified version can be returned, allowing you to mutate the fields and settings that it has.

func CLIOptSetBinaryName

func CLIOptSetBinaryName(n string) CLIOptFunc

CLIOptSetBinaryName overrides the default binary name in CLI help docs.

func CLIOptSetDefaultConfigPaths added in v4.28.1

func CLIOptSetDefaultConfigPaths(paths ...string) CLIOptFunc

CLIOptSetDefaultConfigPaths overrides the default paths used for detecting and loading config files when one was not provided explicitly with the --config flag.

func CLIOptSetDocumentationURL

func CLIOptSetDocumentationURL(n string) CLIOptFunc

CLIOptSetDocumentationURL overrides the default documentation URL in CLI help docs.

func CLIOptSetMainSchemaFrom

func CLIOptSetMainSchemaFrom(fn func() *ConfigSchema) CLIOptFunc

CLIOptSetMainSchemaFrom overrides the default Benthos configuration schema for another. A constructor is provided such that downstream components can still modify copies of the schema when needed.

NOTE: This transfers the configuration schema but NOT the Environment plugins themselves, which is the global set by default.

func CLIOptSetProductName

func CLIOptSetProductName(n string) CLIOptFunc

CLIOptSetProductName overrides the default product name in CLI help docs.

func CLIOptSetShowRunCommand added in v4.28.1

func CLIOptSetShowRunCommand(show bool) CLIOptFunc

CLIOptSetShowRunCommand determines whether a `run` subcommand should appear in CLI help and autocomplete.

func CLIOptSetVersion

func CLIOptSetVersion(version, dateBuilt string) CLIOptFunc

CLIOptSetVersion overrides the default version and date built stamps.

type Cache

type Cache interface {
	// Get a cache item.
	Get(ctx context.Context, key string) ([]byte, error)

	// Set a cache item, specifying an optional TTL. It is okay for caches to
	// ignore the ttl parameter if it isn't possible to implement.
	Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error

	// Add is the same operation as Set except that it returns an error if the
	// key already exists. It is okay for caches to return nil on duplicates if
	// it isn't possible to implement.
	Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error

	// Delete attempts to remove a key. If the key does not exist then it is
	// considered correct to return an error, however, for cache implementations
	// where it is difficult to determine this then it is acceptable to return
	// nil.
	Delete(ctx context.Context, key string) error

	Closer
}

Cache is an interface implemented by Benthos caches.

type CacheConstructor

type CacheConstructor func(conf *ParsedConfig, mgr *Resources) (Cache, error)

CacheConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a cache based on the config, or an error.

type CacheItem

type CacheItem struct {
	Key   string
	Value []byte
	TTL   *time.Duration
}

CacheItem represents an individual cache item.

type Closer

type Closer interface {
	// Close the component, blocks until either the underlying resources are
	// cleaned up or the context is cancelled. Returns an error if the context
	// is cancelled.
	Close(ctx context.Context) error
}

Closer is implemented by components that support stopping and cleaning up their underlying resources.

type ConfigField

type ConfigField struct {
	// contains filtered or unexported fields
}

ConfigField describes a field within a component configuration, to be added to a ConfigSpec.

func NewAnyField

func NewAnyField(name string) *ConfigField

NewAnyField describes a new config field that can assume any value type without triggering a config parse or linting error.

func NewAnyListField

func NewAnyListField(name string) *ConfigField

NewAnyListField describes a new config field consisting of a list of values that can assume any value type without triggering a config parse or linting error.

func NewAnyMapField

func NewAnyMapField(name string) *ConfigField

NewAnyMapField describes a new config field consisting of a map of values that can assume any value type without triggering a config parse or linting error.

func NewAutoRetryNacksToggleField

func NewAutoRetryNacksToggleField() *ConfigField

NewAutoRetryNacksToggleField creates a configuration field for toggling the behaviour of an input where nacks (rejections) of data results in the automatic replay of that data (the default). This field should be used for conditionally wrapping inputs with AutoRetryNacksToggled or AutoRetryNacksBatchedToggled.

func NewBackOffField

func NewBackOffField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField

NewBackOffField defines a new object type config field that describes an exponential back off policy, often used for timing retry attempts. It is then possible to extract a *backoff.ExponentialBackOff from the resulting parsed config with the method FieldBackOff.

It is possible to configure a back off policy that has no upper bound (no maximum elapsed time set). In cases where this would be problematic the field allowUnbounded should be set `false` in order to add linting rules that ensure an upper bound is set.

The defaults struct is optional, and if provided will be used to establish default values for time interval fields. Otherwise the chosen defaults result in one minute of retry attempts, starting at 500ms intervals.

func NewBackOffToggledField

func NewBackOffToggledField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField

NewBackOffToggledField defines a new object type config field that describes an exponential back off policy, often used for timing retry attempts. It is then possible to extract a *backoff.ExponentialBackOff from the resulting parsed config with the method FieldBackOff. This Toggled variant includes a field `enabled` that is `false` by default.

It is possible to configure a back off policy that has no upper bound (no maximum elapsed time set). In cases where this would be problematic the field allowUnbounded should be set `false` in order to add linting rules that ensure an upper bound is set.

The defaults struct is optional, and if provided will be used to establish default values for time interval fields. Otherwise the chosen defaults result in one minute of retry attempts, starting at 500ms intervals.

func NewBatchPolicyField

func NewBatchPolicyField(name string) *ConfigField

NewBatchPolicyField defines a new object type config field that describes a batching policy for batched outputs. It is then possible to extract a BatchPolicy from the resulting parsed config with the method FieldBatchPolicy.

func NewBloblangField

func NewBloblangField(name string) *ConfigField

NewBloblangField defines a new config field that describes a Bloblang mapping string. It is then possible to extract a *bloblang.Executor from the resulting parsed config with the method FieldBloblang.

func NewBoolField

func NewBoolField(name string) *ConfigField

NewBoolField describes a new bool type config field.

func NewDurationField

func NewDurationField(name string) *ConfigField

NewDurationField describes a new duration string type config field, allowing users to define a time interval with strings of the form 60s, 3m, etc.

func NewExtractTracingSpanMappingField

func NewExtractTracingSpanMappingField() *ConfigField

NewExtractTracingSpanMappingField returns a config field for mapping messages in order to extract distributed tracing information.

func NewFloatField

func NewFloatField(name string) *ConfigField

NewFloatField describes a new float type config field.

func NewFloatListField

func NewFloatListField(name string) *ConfigField

NewFloatListField describes a new config field consisting of a list of floats.

func NewFloatMapField

func NewFloatMapField(name string) *ConfigField

NewFloatMapField describes a new config field consisting of an object of arbitrary keys with float values.

func NewHTTPRequestAuthSignerFields

func NewHTTPRequestAuthSignerFields() []*ConfigField

NewHTTPRequestAuthSignerFields returns a list of config fields for adding authentication to HTTP requests. The options available with this field include OAuth (v1), basic authentication, and JWT as these are mechanisms that can be implemented by mutating a request object.

func NewInjectTracingSpanMappingField

func NewInjectTracingSpanMappingField() *ConfigField

NewInjectTracingSpanMappingField returns a field spec describing an inject tracing span mapping.

func NewInputField

func NewInputField(name string) *ConfigField

NewInputField defines a new input field, it is then possible to extract an OwnedInput from the resulting parsed config with the method FieldInput.

func NewInputListField

func NewInputListField(name string) *ConfigField

NewInputListField defines a new input list field, it is then possible to extract a list of OwnedInput from the resulting parsed config with the method FieldInputList.

func NewInputMapField

func NewInputMapField(name string) *ConfigField

NewInputMapField defines a new input map field, it is then possible to extract a map of OwnedInput from the resulting parsed config with the method FieldInputMap.

func NewInputMaxInFlightField

func NewInputMaxInFlightField() *ConfigField

NewInputMaxInFlightField returns a field spec for a common max_in_flight input field.

func NewIntField

func NewIntField(name string) *ConfigField

NewIntField describes a new int type config field.

func NewIntListField

func NewIntListField(name string) *ConfigField

NewIntListField describes a new config field consisting of a list of integers.

func NewIntMapField

func NewIntMapField(name string) *ConfigField

NewIntMapField describes a new config field consisting of an object of arbitrary keys with integer values.

func NewInternalField

func NewInternalField(ifield docs.FieldSpec) *ConfigField

NewInternalField returns a ConfigField derived from an internal package field spec. This function is for internal use only.

func NewInterpolatedStringEnumField

func NewInterpolatedStringEnumField(name string, options ...string) *ConfigField

NewInterpolatedStringEnumField defines a new config field that describes a dynamic string that supports Bloblang interpolation functions, but also has a discrete list of acceptable values. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedString. Verifying that the interpolated result matches one of the specified options must be done at runtime.

func NewInterpolatedStringField

func NewInterpolatedStringField(name string) *ConfigField

NewInterpolatedStringField defines a new config field that describes a dynamic string that supports Bloblang interpolation functions. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedString.

func NewInterpolatedStringListField

func NewInterpolatedStringListField(name string) *ConfigField

NewInterpolatedStringListField describes a new config field consisting of a list of interpolated string values. It is then possible to extract a slice of *InterpolatedString from the resulting parsed config with the method FieldInterpolatedStringList.

func NewInterpolatedStringMapField

func NewInterpolatedStringMapField(name string) *ConfigField

NewInterpolatedStringMapField describes a new config field consisting of an object of arbitrary keys with interpolated string values. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedStringMap.

func NewMetadataExcludeFilterField

func NewMetadataExcludeFilterField(name string) *ConfigField

NewMetadataExcludeFilterField creates a config field spec for describing which metadata keys to exclude for a given purpose, where all metadata is included by default. This includes prefix based and regular expression based methods. This field should be avoided in favour of NewMetadataFilterField as all components should be converging on opt-in metadata delivery. However, this field is useful for migrating existing components to the new plugin APIs with backwards compatibility.

func NewMetadataFilterField

func NewMetadataFilterField(name string) *ConfigField

NewMetadataFilterField creates a config field spec for describing which metadata keys to include for a given purpose. This includes prefix based and regular expression based methods. This field is often used for making metadata written to output destinations explicit.

func NewObjectField

func NewObjectField(name string, fields ...*ConfigField) *ConfigField

NewObjectField describes a new object type config field, consisting of one or more child fields.

func NewObjectListField

func NewObjectListField(name string, fields ...*ConfigField) *ConfigField

NewObjectListField describes a new list type config field consisting of objects with one or more child fields.

func NewObjectMapField

func NewObjectMapField(name string, fields ...*ConfigField) *ConfigField

NewObjectMapField describes a new map type config field consisting of objects with one or more child fields.

func NewOutputField

func NewOutputField(name string) *ConfigField

NewOutputField defines a new output field, it is then possible to extract an OwnedOutput from the resulting parsed config with the method FieldOutput.

func NewOutputListField

func NewOutputListField(name string) *ConfigField

NewOutputListField defines a new output list field, it is then possible to extract a list of OwnedOutput from the resulting parsed config with the method FieldOutputList.

func NewOutputMapField

func NewOutputMapField(name string) *ConfigField

NewOutputMapField defines a new output list field, it is then possible to extract a map of OwnedOutput from the resulting parsed config with the method FieldOutputMap.

func NewOutputMaxInFlightField

func NewOutputMaxInFlightField() *ConfigField

NewOutputMaxInFlightField creates a common field for determining the maximum number of in-flight messages an output should allow. This function is a short-hand way of creating an integer field with the common name max_in_flight, with a typical default of 64.

func NewProcessorField

func NewProcessorField(name string) *ConfigField

NewProcessorField defines a new processor field, it is then possible to extract an OwnedProcessor from the resulting parsed config with the method FieldProcessor.

func NewProcessorListField

func NewProcessorListField(name string) *ConfigField

NewProcessorListField defines a new processor list field, it is then possible to extract a list of OwnedProcessor from the resulting parsed config with the method FieldProcessorList.

func NewScannerField

func NewScannerField(name string) *ConfigField

NewScannerField defines a new scanner field, it is then possible to extract an OwnedScannerCreator from the resulting parsed config with the method FieldScanner.

func NewStringAnnotatedEnumField

func NewStringAnnotatedEnumField(name string, options map[string]string) *ConfigField

NewStringAnnotatedEnumField describes a new string type config field that can have one of a discrete list of values, where each value must be accompanied by a description that annotates its behaviour in the documentation.

func NewStringEnumField

func NewStringEnumField(name string, options ...string) *ConfigField

NewStringEnumField describes a new string type config field that can have one of a discrete list of values.

func NewStringField

func NewStringField(name string) *ConfigField

NewStringField describes a new string type config field.

func NewStringListField

func NewStringListField(name string) *ConfigField

NewStringListField describes a new config field consisting of a list of strings.

func NewStringListOfListsField

func NewStringListOfListsField(name string) *ConfigField

NewStringListOfListsField describes a new config field consisting of a list of lists of strings (a 2D array of strings).

func NewStringMapField

func NewStringMapField(name string) *ConfigField

NewStringMapField describes a new config field consisting of an object of arbitrary keys with string values.

func NewTLSField

func NewTLSField(name string) *ConfigField

NewTLSField defines a new object type config field that describes TLS settings for networked components. It is then possible to extract a *tls.Config from the resulting parsed config with the method FieldTLS.

func NewTLSToggledField

func NewTLSToggledField(name string) *ConfigField

NewTLSToggledField defines a new object type config field that describes TLS settings for networked components. This field differs from a standard TLSField as it includes a boolean field `enabled` which allows users to explicitly configure whether TLS should be enabled or not.

A *tls.Config as well as an enabled boolean value can be extracted from the resulting parsed config with the method FieldTLSToggled.

func NewURLField

func NewURLField(name string) *ConfigField

NewURLField defines a new config field that describes a string that should contain a valid URL. It is then possible to extract either a string or a *url.URL from the resulting parsed config with the methods FieldString or FieldURL respectively.

func NewURLListField

func NewURLListField(name string) *ConfigField

NewURLListField defines a new config field that describes an array of strings that should contain only valid URLs. It is then possible to extract either a string slice or a slice of *url.URL from the resulting parsed config with the methods FieldStringArray or FieldURLArray respectively.

func (*ConfigField) Advanced

func (c *ConfigField) Advanced() *ConfigField

Advanced marks a config field as being advanced, and therefore it will not appear in simplified documentation examples.

func (*ConfigField) Default

func (c *ConfigField) Default(v any) *ConfigField

Default specifies a default value that this field will assume if it is omitted from a provided config. Fields that do not have a default value are considered mandatory, and so parsing a config will fail in their absence.

func (*ConfigField) Deprecated

func (c *ConfigField) Deprecated() *ConfigField

Deprecated marks a config field as being deprecated, and therefore it will not appear in documentation examples.

func (*ConfigField) Description

func (c *ConfigField) Description(d string) *ConfigField

Description adds a description to the field which will be shown when printing documentation for the component config spec.

func (*ConfigField) Example

func (c *ConfigField) Example(e any) *ConfigField

Example adds an example value to the field which will be shown when printing documentation for the component config spec.

func (*ConfigField) Examples

func (c *ConfigField) Examples(e ...any) *ConfigField

Examples adds a variadic list of example values to the field which will be shown when printing documentation for the component config spec.

func (*ConfigField) LintRule

func (c *ConfigField) LintRule(blobl string) *ConfigField

LintRule adds a custom linting rule to the field in the form of a bloblang mapping. The mapping is provided the value of the field within a config as the context `this`, and if the mapping assigns to `root` an array of one or more strings these strings will be exposed to a config author as linting errors.

For example, if we wanted to add a linting rule for a string field that ensures the value contains only lowercase values we might add the following linting rule:

`root = if this.lowercase() != this { [ "field must be lowercase" ] }`.

func (*ConfigField) Optional

func (c *ConfigField) Optional() *ConfigField

Optional specifies that a field is optional even when a default value has not been specified. When a field is marked as optional you can test its presence within a parsed config with the method Contains.

func (*ConfigField) Secret

func (c *ConfigField) Secret() *ConfigField

Secret marks this field as being a secret, which means it represents information that is generally considered sensitive such as passwords or access tokens.

func (*ConfigField) Version

func (c *ConfigField) Version(v string) *ConfigField

Version specifies the specific version at which this field was added to the component.

func (*ConfigField) XUnwrapper

func (c *ConfigField) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type ConfigSchema

type ConfigSchema struct {
	// contains filtered or unexported fields
}

ConfigSchema contains the definitions of all config fields for the overall Benthos config as well as all component plugins. A schema can be used in order to analyse, export and import the schemas of varying distributions and versions of Benthos.

func ConfigSchemaFromJSONV0

func ConfigSchemaFromJSONV0(jBytes []byte) (*ConfigSchema, error)

ConfigSchemaFromJSONV0 attempts to parse a JSON serialised definition of an entire schema. Any plugins defined in the schema will be registered with the config schema environment and can be used for config linting and marshalling.

However, the environment cannot be used for instantiating a runnable pipeline as the constructors will be disabled. This allows applications to lint against plugin definitions that they themselves haven't imported.

func (*ConfigSchema) Environment

func (s *ConfigSchema) Environment() *Environment

Environment provides access to the environment referenced by this schema.

func (*ConfigSchema) Field

func (s *ConfigSchema) Field(f *ConfigField) *ConfigSchema

Field adds a field to the main config of a schema.

func (*ConfigSchema) Fields

func (s *ConfigSchema) Fields(fs ...*ConfigField) *ConfigSchema

Fields adds multiple fields to the main config of a schema.

func (*ConfigSchema) MarshalJSONSchema

func (s *ConfigSchema) MarshalJSONSchema() ([]byte, error)

MarshalJSONSchema attempts to marshal a JSON Schema definition containing the entire config and plugin ecosystem such that other applications can potentially execute their own linting and generation tools with it.

func (*ConfigSchema) MarshalJSONV0

func (s *ConfigSchema) MarshalJSONV0() ([]byte, error)

MarshalJSONV0 attempts to marshal a JSON document containing the entire config and plugin ecosystem schema such that other applications can potentially execute their own linting and generation tools with it.

func (*ConfigSchema) NewStreamConfigLinter

func (s *ConfigSchema) NewStreamConfigLinter() *StreamConfigLinter

NewStreamConfigLinter creates a component for marshalling stream configs, allowing you to print sanitised, redacted or hydrated configs in various formats.

func (*ConfigSchema) NewStreamConfigMarshaller

func (s *ConfigSchema) NewStreamConfigMarshaller() *StreamConfigMarshaller

NewStreamConfigMarshaller creates a component for marshalling stream configs, allowing you to print sanitised, redacted or hydrated configs in various formats.

func (*ConfigSchema) SetVersion

func (s *ConfigSchema) SetVersion(version, dateBuilt string) *ConfigSchema

SetVersion sets the version and date-built stamp associated with the schema.

func (*ConfigSchema) TemplateData added in v4.29.0

func (s *ConfigSchema) TemplateData(path ...string) (TemplateDataSchema, error)

TemplateFieldsData attempts to prepare a list of structs containing information for the fields within the section specified of the schema. This information can then be fed into a template in order to generate documentation for the section.

type ConfigSpec

type ConfigSpec struct {
	// contains filtered or unexported fields
}

ConfigSpec describes the configuration specification for a plugin component. This will be used for validating and linting configuration files and providing a parsed configuration struct to the plugin constructor.

func NewConfigSpec

func NewConfigSpec() *ConfigSpec

NewConfigSpec creates a new empty component configuration spec. If the plugin does not require configuration fields the result of this call is enough.

func (*ConfigSpec) Beta

func (c *ConfigSpec) Beta() *ConfigSpec

Beta sets a documentation label on the component indicating that its configuration spec is ready for beta testing, meaning backwards incompatible changes will not be made unless a fundamental problem is found. Plugins are considered experimental by default.

func (*ConfigSpec) Categories

func (c *ConfigSpec) Categories(categories ...string) *ConfigSpec

Categories adds one or more string tags to the component, these are used for arbitrarily grouping components in documentation.

func (*ConfigSpec) Deprecated

func (c *ConfigSpec) Deprecated() *ConfigSpec

Deprecated sets a documentation label on the component indicating that it is now deprecated. Plugins are considered experimental by default.

func (*ConfigSpec) Description

func (c *ConfigSpec) Description(description string) *ConfigSpec

Description adds a description to the plugin configuration spec that describes in more detail the behaviour of the component and how it should be used.

func (*ConfigSpec) EncodeJSON

func (c *ConfigSpec) EncodeJSON(v []byte) error

EncodeJSON attempts to parse a JSON object as a byte slice and uses it to populate the configuration spec. The schema of this method is undocumented and is not intended for general use.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

func (*ConfigSpec) Example

func (c *ConfigSpec) Example(title, summary, config string) *ConfigSpec

Example adds an example to the plugin configuration spec that demonstrates how the component can be used. An example has a title, summary, and a YAML configuration showing a real use case.

func (*ConfigSpec) Field

func (c *ConfigSpec) Field(f *ConfigField) *ConfigSpec

Field sets the specification of a field within the config spec, used for linting and generating documentation for the component.

If the provided field has an empty name then it is registered as the value at the root of the config spec.

func (*ConfigSpec) Fields

func (c *ConfigSpec) Fields(fs ...*ConfigField) *ConfigSpec

Fields sets the specification of multiple fields within the config spec, used for linting and generating documentation for the component.

If any of the provided fields have an empty name then they are registered as the value at the root of the config spec.

func (*ConfigSpec) Footnotes

func (c *ConfigSpec) Footnotes(description string) *ConfigSpec

Footnotes adds a description to the plugin configuration spec that appears towards the bottom of the documentation page, this is usually best for long winded lists of docs.

func (*ConfigSpec) LintRule

func (c *ConfigSpec) LintRule(blobl string) *ConfigSpec

LintRule adds a custom linting rule to the ConfigSpec in the form of a bloblang mapping. The mapping is provided the value of the fields within the ConfigSpec as the context `this`, and if the mapping assigns to `root` an array of one or more strings these strings will be exposed to a config author as linting errors.

For example, if we wanted to add a linting rule for several ConfigSpec fields that ensures some fields are mutually exclusive and some require others we might use the following:

root = match { this.exists("meow") && this.exists("woof") => [ "both `+"`meow`"+` and `+"`woof`"+` can't be set simultaneously" ], this.exists("reticulation") && (!this.exists("splines") || this.splines == "") => [ "`+"`splines`"+` is required when setting `+"`reticulation`"+`" ], }.

func (*ConfigSpec) ParseYAML

func (c *ConfigSpec) ParseYAML(yamlStr string, env *Environment) (*ParsedConfig, error)

ParseYAML attempts to parse a YAML document as the defined configuration spec and returns a parsed config view. The provided environment determines which child components and Bloblang functions can be created by the fields of the spec, you can leave the environment nil to use the global environment.

This method is intended for testing purposes and is not required for normal use of plugin components, as parsing is managed by other components.

func (*ConfigSpec) Stable

func (c *ConfigSpec) Stable() *ConfigSpec

Stable sets a documentation label on the component indicating that its configuration spec is stable. Plugins are considered experimental by default.

func (*ConfigSpec) Summary

func (c *ConfigSpec) Summary(summary string) *ConfigSpec

Summary adds a short summary to the plugin configuration spec that describes the general purpose of the component.

func (*ConfigSpec) SupportLevel added in v4.29.0

func (c *ConfigSpec) SupportLevel(l string) *ConfigSpec

SupportLevel adds an abstract label indicating the support level of the plugin.

func (*ConfigSpec) Version

func (c *ConfigSpec) Version(v string) *ConfigSpec

Version specifies that this component was introduced in a given version.

type ConfigView

type ConfigView struct {
	// contains filtered or unexported fields
}

ConfigView is a struct returned by a Benthos service environment when walking the list of registered components and provides access to information about the component.

func (*ConfigView) Description

func (c *ConfigView) Description() string

Description returns a documentation description of the component, often formatted as markdown.

func (*ConfigView) FormatJSON

func (c *ConfigView) FormatJSON() ([]byte, error)

FormatJSON returns a byte slice of the component configuration formatted as a JSON object. The schema of this method is undocumented and is not intended for general use.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

func (*ConfigView) IsDeprecated

func (c *ConfigView) IsDeprecated() bool

IsDeprecated returns true if the component is marked as deprecated.

func (*ConfigView) RenderDocs

func (c *ConfigView) RenderDocs() ([]byte, error)

RenderDocs creates a markdown file that documents the configuration of the component config view. This markdown may include Docusaurus react elements as it matches the documentation generated for the official Benthos website.

Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.

func (*ConfigView) Summary

func (c *ConfigView) Summary() string

Summary returns a documentation summary of the component, often formatted as markdown.

func (*ConfigView) TemplateData added in v4.29.0

func (c *ConfigView) TemplateData() (TemplateDataPlugin, error)

PluginTemplateData returns a struct containing useful documentation details, which can then be injected into a template in order to populate a documentation website automatically.

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

Environment is a collection of Benthos component plugins that can be used in order to build and run streaming pipelines with access to different sets of plugins. This can be useful for sandboxing, testing, etc, but most plugin authors do not need to create an Environment and can simply use the global environment.

func GlobalEnvironment

func GlobalEnvironment() *Environment

GlobalEnvironment returns a reference to the global environment, adding plugins to this environment is the equivalent to adding plugins using global Functions.

func NewEmptyEnvironment

func NewEmptyEnvironment() *Environment

NewEmptyEnvironment creates a new environment with zero registered plugins.

func NewEnvironment

func NewEnvironment() *Environment

NewEnvironment creates a new environment that inherits all globally defined plugins, but can have plugins defined on it that are isolated.

func (*Environment) Clone

func (e *Environment) Clone() *Environment

Clone an environment, creating a new environment containing the same plugins that can be modified independently of the source.

func (*Environment) CoreConfigSchema

func (e *Environment) CoreConfigSchema(version, dateBuilt string) *ConfigSchema

CoreConfigSchema returns a config schema containing only the core Benthos pipeline fields (input, buffer, pipeline, output), and all plugin definitions from the environment.

func (*Environment) FullConfigSchema

func (e *Environment) FullConfigSchema(version, dateBuilt string) *ConfigSchema

FullConfigSchema returns a config schema containing all the standard config fields and all plugin definitions from the environment.

func (*Environment) GenerateSchema

func (e *Environment) GenerateSchema(version, dateBuilt string) *EnvironmentSchema

func (*Environment) GetBufferConfig

func (e *Environment) GetBufferConfig(name string) (*ConfigView, bool)

GetBufferConfig attempts to obtain a buffer configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetCacheConfig

func (e *Environment) GetCacheConfig(name string) (*ConfigView, bool)

GetCacheConfig attempts to obtain a cache configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetInputConfig

func (e *Environment) GetInputConfig(name string) (*ConfigView, bool)

GetInputConfig attempts to obtain an input configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetMetricsConfig

func (e *Environment) GetMetricsConfig(name string) (*ConfigView, bool)

GetMetricsConfig attempts to obtain a metrics exporter configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetOutputConfig

func (e *Environment) GetOutputConfig(name string) (*ConfigView, bool)

GetOutputConfig attempts to obtain an output configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetProcessorConfig

func (e *Environment) GetProcessorConfig(name string) (*ConfigView, bool)

GetProcessorConfig attempts to obtain a processor configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetRateLimitConfig

func (e *Environment) GetRateLimitConfig(name string) (*ConfigView, bool)

GetRateLimitConfig attempts to obtain a rate limit configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetScannerConfig

func (e *Environment) GetScannerConfig(name string) (*ConfigView, bool)

GetScannerConfig attempts to obtain a scanner configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) GetTracerConfig

func (e *Environment) GetTracerConfig(name string) (*ConfigView, bool)

GetTracerConfig attempts to obtain a tracer configuration spec by the component name. Returns a nil ConfigView and false if the component is unknown.

func (*Environment) NewStreamBuilder

func (e *Environment) NewStreamBuilder() *StreamBuilder

NewStreamBuilder creates a new StreamBuilder upon the defined environment, only components known to this environment will be available to the stream builder.

func (*Environment) NewStreamTemplateTester

func (e *Environment) NewStreamTemplateTester() *StreamTemplateTester

NewStreamTemplateTester creates a component for marshalling stream configs, allowing you to print sanitised, redacted or hydrated configs in various formats.

func (*Environment) RegisterBatchBuffer

func (e *Environment) RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error

RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.

Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.

func (*Environment) RegisterBatchInput

func (e *Environment) RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error

RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.

func (*Environment) RegisterBatchOutput

func (e *Environment) RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error

RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).

If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.

func (*Environment) RegisterBatchProcessor

func (e *Environment) RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error

RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.

func (*Environment) RegisterBatchScannerCreator

func (e *Environment) RegisterBatchScannerCreator(name string, spec *ConfigSpec, ctor BatchScannerCreatorConstructor) error

RegisterBatchScannerCreator attempts to register a new batched scanner plugin by providing a description of the configuration for the plugin as well as a constructor for the scanner creator itself. The constructor will be called for each instantiation of the component within a config.

func (*Environment) RegisterCache

func (e *Environment) RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error

RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.

func (*Environment) RegisterInput

func (e *Environment) RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error

RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.

If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.

func (*Environment) RegisterMetricsExporter

func (e *Environment) RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error

RegisterMetricsExporter attempts to register a new metrics exporter plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself.

func (*Environment) RegisterOtelTracerProvider

func (e *Environment) RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error

RegisterOtelTracerProvider attempts to register a new open telemetry tracer provider plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

func (*Environment) RegisterOutput

func (e *Environment) RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error

RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.

func (*Environment) RegisterProcessor

func (e *Environment) RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error

RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.

For simple transformations consider implementing a Bloblang plugin method instead.

func (*Environment) RegisterRateLimit

func (e *Environment) RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error

RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.

func (*Environment) RegisterTemplateYAML

func (e *Environment) RegisterTemplateYAML(yamlStr string) error

RegisterTemplateYAML attempts to register a template, defined as a YAML document, to the environment such that it may be used similarly to any other component plugin.

func (*Environment) TemplateSchema added in v4.29.0

func (e *Environment) TemplateSchema(version, dateBuilt string) *ConfigSchema

TemplateSchema returns the schema for a Benthos template file.

func (*Environment) UseBloblangEnvironment

func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment)

UseBloblangEnvironment configures the service environment to restrict components constructed with it to a specific Bloblang environment.

func (*Environment) UseFS

func (e *Environment) UseFS(fs *FS)

UseFS configures the service environment to use an instantiation of *FS as its filesystem. This provides extra control over the file access of all Benthos components within the stream. However, this functionality is opt-in and there is no guarantee that plugin implementations will use this method of file access.

The underlying bloblang environment will also be configured to import mappings and other files via this file access method. In order to avoid this behaviour add a fresh bloblang environment via UseBloblangEnvironment _after_ setting this file access.

func (*Environment) WalkBuffers

func (e *Environment) WalkBuffers(fn func(name string, config *ConfigView))

WalkBuffers executes a provided function argument for every buffer component that has been registered to the environment.

func (*Environment) WalkCaches

func (e *Environment) WalkCaches(fn func(name string, config *ConfigView))

WalkCaches executes a provided function argument for every cache component that has been registered to the environment.

func (*Environment) WalkInputs

func (e *Environment) WalkInputs(fn func(name string, config *ConfigView))

WalkInputs executes a provided function argument for every input component that has been registered to the environment.

func (*Environment) WalkMetrics

func (e *Environment) WalkMetrics(fn func(name string, config *ConfigView))

WalkMetrics executes a provided function argument for every metrics component that has been registered to the environment. Note that metrics components available to an environment cannot be modified.

func (*Environment) WalkOutputs

func (e *Environment) WalkOutputs(fn func(name string, config *ConfigView))

WalkOutputs executes a provided function argument for every output component that has been registered to the environment.

func (*Environment) WalkProcessors

func (e *Environment) WalkProcessors(fn func(name string, config *ConfigView))

WalkProcessors executes a provided function argument for every processor component that has been registered to the environment.

func (*Environment) WalkRateLimits

func (e *Environment) WalkRateLimits(fn func(name string, config *ConfigView))

WalkRateLimits executes a provided function argument for every rate limit component that has been registered to the environment.

func (*Environment) WalkScanners

func (e *Environment) WalkScanners(fn func(name string, config *ConfigView))

WalkScanners executes a provided function argument for every scanner component that has been registered to the environment. Note that scanner components available to an environment cannot be modified.

func (*Environment) WalkTracers

func (e *Environment) WalkTracers(fn func(name string, config *ConfigView))

WalkTracers executes a provided function argument for every tracer component that has been registered to the environment. Note that tracer components available to an environment cannot be modified.

type EnvironmentSchema

type EnvironmentSchema struct {
	// contains filtered or unexported fields
}

EnvironmentSchema represents a schema definition for all components registered within the environment.

func (*EnvironmentSchema) Minimise

func (e *EnvironmentSchema) Minimise() *EnvironmentSchema

Minimise removes all documentation from the schema definition.

func (*EnvironmentSchema) ReduceToStatus

func (e *EnvironmentSchema) ReduceToStatus(status string) *EnvironmentSchema

ReduceToStatus removes all components that aren't of the given stability status.

func (*EnvironmentSchema) ToCUE

func (e *EnvironmentSchema) ToCUE() ([]byte, error)

ToCUE attempts to generate a CUE schema.

func (*EnvironmentSchema) XFlattened

func (e *EnvironmentSchema) XFlattened() map[string][]string

XFlattened returns a generic structure of the schema as a map of component types to component names.

Experimental: This method is experimental and therefore could be changed outside of major version releases.

type ErrBackOff

type ErrBackOff struct {
	Err  error
	Wait time.Duration
}

ErrBackOff is an error that plugins can optionally wrap another error with which instructs upstream components to wait for a specified period of time before retrying the errored call.

Not all plugin methods support this error, for a list refer to the documentation of NewErrBackOff.

func NewErrBackOff

func NewErrBackOff(err error, wait time.Duration) *ErrBackOff

NewErrBackOff wraps an error with a specified time to wait. For specific plugin methods this will instruct upstream components to wait by the specified amount of time before re-attempting the errored call.

NOTE: ErrBackOff is opt-in for upstream components and therefore only a subset of plugin calls will respect this error. Currently the following methods are known to support ErrBackOff:

- Input.Connect - BatchInput.Connect - Output.Connect - BatchOutput.Connect

func (*ErrBackOff) Error

func (e *ErrBackOff) Error() string

Error returns the Error string.

type FS

type FS struct {
	// contains filtered or unexported fields
}

FS implements a superset of fs.FS and includes goodies that benthos components specifically need.

func NewFS

func NewFS(filesystem fs.FS) *FS

NewFS provides a new instance of a filesystem. The fs.FS passed in can optionally implement methods from benthos ifs.FS

func (*FS) MkdirAll

func (f *FS) MkdirAll(path string, perm fs.FileMode) error

MkdirAll creates a directory named path, along with any necessary parents, and returns nil, or else returns an error.

func (*FS) Open

func (f *FS) Open(name string) (fs.File, error)

Open opens the named file for reading.

func (*FS) OpenFile

func (f *FS) OpenFile(name string, flag int, perm fs.FileMode) (fs.File, error)

OpenFile is the generalized open call.

func (*FS) Remove

func (f *FS) Remove(name string) error

Remove removes the named file or (empty) directory.

func (*FS) Stat

func (f *FS) Stat(name string) (fs.FileInfo, error)

Stat returns a FileInfo describing the named file.

type FieldView

type FieldView struct {
	// contains filtered or unexported fields
}

FieldView provides a way to analyse a config field.

func (*FieldView) IsAdvanced

func (f *FieldView) IsAdvanced() bool

IsAdvanced returns true if the field is advanced.

type HTTPMultiplexer

type HTTPMultiplexer interface {
	HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}

HTTPMultiplexer is an interface supported by most HTTP multiplexers.

type Indexer

type Indexer struct {
	// contains filtered or unexported fields
}

Indexer encapsulates the ability to acquire the original index of a message from a derivative batch as it was when the indexer was created. This can be useful in situations where a batch is being dispatched to processors or outputs and a derivative batch needs to be associated with the origin.

func (*Indexer) IndexOf

func (s *Indexer) IndexOf(m *Message) int

IndexOf attempts to obtain the index of a message as it occurred within the origin batch known at the time the indexer was created. If the message is an orphan and does not originate from that batch then -1 is returned. It is possible that zero, one or more derivative messages yield any given index of the origin batch due to filtering and/or duplication enacted on the batch.

type Input

type Input interface {
	// Establish a connection to the upstream service. Connect will always be
	// called first when a reader is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// The provided context remains open only for the duration of the connecting
	// phase, and should not be used to establish the lifetime of the connection
	// itself.
	//
	// Once Connect returns a nil error the Read method will be called until
	// either ErrNotConnected is returned, or the reader is closed.
	Connect(context.Context) error

	// Read a single message from a source, along with a function to be called
	// once the message can be either acked (successfully sent or intentionally
	// filtered) or nacked (failed to be processed or dispatched to the output).
	//
	// The AckFunc will be called for every message at least once, but there are
	// no guarantees as to when this will occur. If your input implementation
	// doesn't have a specific mechanism for dealing with a nack then you can
	// wrap your input implementation with AutoRetryNacks to get automatic
	// retries.
	//
	// If this method returns ErrNotConnected then Read will not be called again
	// until Connect has returned a nil error. If ErrEndOfInput is returned then
	// Read will no longer be called and the pipeline will gracefully terminate.
	Read(context.Context) (*Message, AckFunc, error)

	Closer
}

Input is an interface implemented by Benthos inputs. Calls to Read should block until either a message has been received, the connection is lost, or the provided context is cancelled.

func AutoRetryNacks

func AutoRetryNacks(i Input) Input

AutoRetryNacks wraps an input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.

When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.

func AutoRetryNacksToggled

func AutoRetryNacksToggled(c *ParsedConfig, i Input) (Input, error)

AutoRetryNacksToggled wraps an input implementation with AutoRetryNacks only if a field defined by NewAutoRetryNacksToggleField has been set to true.

func InputWithMaxInFlight

func InputWithMaxInFlight(n int, i Input) Input

InputWithMaxInFlight wraps an input with a component that limits the number of messages being processed at a given time. When the limit is reached a new message will not be consumed until an ack/nack has been returned.

type InputConstructor

type InputConstructor func(conf *ParsedConfig, mgr *Resources) (Input, error)

InputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a reader based on the config, or an error.

type InterpolatedString

type InterpolatedString struct {
	// contains filtered or unexported fields
}

InterpolatedString resolves a string containing dynamic interpolation functions for a given message.

func NewInterpolatedString

func NewInterpolatedString(expr string) (*InterpolatedString, error)

NewInterpolatedString parses an interpolated string expression.

func (*InterpolatedString) Bytes

func (i *InterpolatedString) Bytes(m *Message) []byte

Bytes resolves the interpolated field for a given message as a byte slice. Deprecated: Use TryBytes instead in order to capture errors.

func (*InterpolatedString) Static

func (i *InterpolatedString) Static() (string, bool)

Static returns the underlying contents of the interpolated string only if it contains zero dynamic expressions, and is therefore static, otherwise an empty string is returned. A second boolean parameter is also returned indicating whether the string was static, helping to distinguish between a static empty string versus a non-static string.

func (*InterpolatedString) String

func (i *InterpolatedString) String(m *Message) string

String resolves the interpolated field for a given message as a string. Deprecated: Use TryString instead in order to capture errors.

func (*InterpolatedString) TryBytes

func (i *InterpolatedString) TryBytes(m *Message) ([]byte, error)

TryBytes resolves the interpolated field for a given message as a slice of bytes, returns an error if any interpolation functions fail.

func (*InterpolatedString) TryString

func (i *InterpolatedString) TryString(m *Message) (string, error)

TryString resolves the interpolated field for a given message as a string, returns an error if any interpolation functions fail.

type Lint

type Lint struct {
	Line   int
	Column int
	Type   LintType
	What   string
}

Lint represents a configuration file linting error.

func (Lint) Error

func (l Lint) Error() string

Error returns an error string.

type LintError

type LintError []Lint

LintError is an error type that represents one or more configuration file linting errors that were encountered.

func (LintError) Error

func (e LintError) Error() string

Error returns an error string.

type LintType

type LintType int

LintType is a discrete linting type. NOTE: These should be kept in sync with ./internal/docs/field.go:586.

const (
	// LintCustom means a custom linting rule failed.
	LintCustom LintType = iota

	// LintFailedRead means a configuration could not be read.
	LintFailedRead LintType = iota

	// LintMissingEnvVar means a configuration contained an environment variable
	// interpolation without a default and the variable was undefined.
	LintMissingEnvVar LintType = iota

	// LintInvalidOption means the field value was not one of the explicit list
	// of options.
	LintInvalidOption LintType = iota

	// LintBadLabel means the label contains invalid characters.
	LintBadLabel LintType = iota

	// LintMissingLabel means the label is missing when required.
	LintMissingLabel LintType = iota

	// LintDuplicateLabel means the label collides with another label.
	LintDuplicateLabel LintType = iota

	// LintBadBloblang means the field contains invalid Bloblang.
	LintBadBloblang LintType = iota

	// LintShouldOmit means the field should be omitted.
	LintShouldOmit LintType = iota

	// LintComponentMissing means a component value was expected but the type is
	// missing.
	LintComponentMissing LintType = iota

	// LintComponentNotFound means the specified component value is not
	// recognised.
	LintComponentNotFound LintType = iota

	// LintUnknown means the field is unknown.
	LintUnknown LintType = iota

	// LintMissing means a field was required but missing.
	LintMissing LintType = iota

	// LintExpectedArray means an array value was expected but something else
	// was provided.
	LintExpectedArray LintType = iota

	// LintExpectedObject means an object value was expected but something else
	// was provided.
	LintExpectedObject LintType = iota

	// LintExpectedScalar means a scalar value was expected but something else
	// was provided.
	LintExpectedScalar LintType = iota

	// LintDeprecated means a field is deprecated and should not be used.
	LintDeprecated LintType = iota
)

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger allows plugin authors to write custom logs from components that are exported the same way as native Benthos logs. It's safe to pass around a nil pointer for testing components.

func (*Logger) Debug

func (l *Logger) Debug(message string)

Debug logs a debug message.

func (*Logger) Debugf

func (l *Logger) Debugf(template string, args ...any)

Debugf logs a debug message using fmt.Sprintf when args are specified.

func (*Logger) Error

func (l *Logger) Error(message string)

Error logs an error message.

func (*Logger) Errorf

func (l *Logger) Errorf(template string, args ...any)

Errorf logs an error message using fmt.Sprintf when args are specified.

func (*Logger) Info

func (l *Logger) Info(message string)

Info logs an info message.

func (*Logger) Infof

func (l *Logger) Infof(template string, args ...any)

Infof logs an info message using fmt.Sprintf when args are specified.

func (*Logger) Trace

func (l *Logger) Trace(message string)

Trace logs a trace message.

func (*Logger) Tracef

func (l *Logger) Tracef(template string, args ...any)

Tracef logs a trace message using fmt.Sprintf when args are specified.

func (*Logger) Warn

func (l *Logger) Warn(message string)

Warn logs a warning message.

func (*Logger) Warnf

func (l *Logger) Warnf(template string, args ...any)

Warnf logs a warning message using fmt.Sprintf when args are specified.

func (*Logger) With

func (l *Logger) With(keyValuePairs ...any) *Logger

With adds a variadic set of fields to a logger. Each field must consist of a string key and a value of any type. An odd number of key/value pairs will therefore result in malformed log messages, but should never panic.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents a single discrete message passing through a Benthos pipeline. It is safe to mutate the message via Set methods, but the underlying byte data should not be edited directly.

func NewInternalMessage

func NewInternalMessage(imsg *message.Part) *Message

NewInternalMessage returns a message wrapped around an instantiation of the internal message package. This function is for internal use only and intended as a scaffold for internal components migrating to the new APIs.

func NewMessage

func NewMessage(content []byte) *Message

NewMessage creates a new message with an initial raw bytes content. The initial content can be nil, which is recommended if you intend to set it with structured contents.

func (*Message) AsBytes

func (m *Message) AsBytes() ([]byte, error)

AsBytes returns the underlying byte array contents of a message or, if the contents are a structured type, attempts to marshal the contents as a JSON document and returns either the byte array result or an error.

It is NOT safe to mutate the contents of the returned slice.

func (*Message) AsStructured

func (m *Message) AsStructured() (any, error)

AsStructured returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.

It is NOT safe to mutate the contents of the returned value if it is a reference type (slice or map). In order to safely mutate the structured contents of a message use AsStructuredMut.

func (*Message) AsStructuredMut

func (m *Message) AsStructuredMut() (any, error)

AsStructuredMut returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.

It is safe to mutate the contents of the returned value even if it is a reference type (slice or map), as the structured contents will be lazily deep cloned if it is still owned by an upstream component.

func (*Message) BloblangMutate

func (m *Message) BloblangMutate(blobl *bloblang.Executor) (*Message, error)

BloblangMutate executes a parsed Bloblang mapping onto a message where the contents of the message are mutated directly rather than creating an entirely new object.

Returns the same message back in a mutated form, or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

Note that using a Mutate execution means certain functions within the Bloblang mapping will behave differently. In the root of the mapping the right-hand keywords `root` and `this` refer to the same mutable root of the output document.

func (*Message) BloblangMutateFrom

func (m *Message) BloblangMutateFrom(blobl *bloblang.Executor, from *Message) (*Message, error)

BloblangMutateFrom executes a parsed Bloblang mapping onto a message where the reference material for the mapping comes from a provided message rather than the target message of the map. Contents of the target message are mutated directly rather than creating an entirely new object.

Returns the same message back in a mutated form, or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

Note that using a MutateFrom execution means certain functions within the Bloblang mapping will behave differently. In the root of the mapping the right-hand keyword `root` refers to the same mutable root of the output document, but the keyword `this` refers to the message being provided as an argument.

func (*Message) BloblangQuery

func (m *Message) BloblangQuery(blobl *bloblang.Executor) (*Message, error)

BloblangQuery executes a parsed Bloblang mapping on a message and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

func (*Message) BloblangQueryValue

func (m *Message) BloblangQueryValue(blobl *bloblang.Executor) (any, error)

BloblangQueryValue executes a parsed Bloblang mapping on a message and returns the raw value result, or an error if either the mapping fails. The error bloblang.ErrRootDeleted is returned if the root of the mapping value is deleted, this is in order to allow distinction between a real nil value and a deleted value.

func (*Message) Context

func (m *Message) Context() context.Context

Context returns a context associated with the message, or a background context in the absence of one.

func (*Message) Copy

func (m *Message) Copy() *Message

Copy creates a shallow copy of a message that is safe to mutate with Set methods without mutating the original. Both messages will share a context, and therefore a tracing ID, if one has been associated with them.

func (*Message) DeepCopy

func (m *Message) DeepCopy() *Message

DeepCopy creates a deep copy of a message and its contents that is safe to mutate with Set methods without mutating the original, and mutations on the inner (deep) contents of the source message will not mutate the copy.

This is required in situations where a component wishes to retain a copy of a message beyond the boundaries of a process or write command. This is specifically required for buffer implementations that operate by keeping a reference to the message.

func (*Message) GetError

func (m *Message) GetError() error

GetError returns an error associated with a message, or nil if there isn't one. Messages marked with errors can be handled using a range of methods outlined in https://docs.redpanda.com/redpanda-connect/configuration/error_handling.

func (*Message) MetaDelete

func (m *Message) MetaDelete(key string)

MetaDelete removes a key from the message metadata.

func (*Message) MetaGet

func (m *Message) MetaGet(key string) (string, bool)

MetaGet attempts to find a metadata key from the message and returns a string result and a boolean indicating whether it was found.

Strong advice: Use MetaGetMut instead.

func (*Message) MetaGetMut

func (m *Message) MetaGetMut(key string) (any, bool)

MetaGetMut attempts to find a metadata key from the message and returns the value if found, and a boolean indicating whether it was found. The value returned is mutable, and so it is safe to modify even though it may be a reference type such as a slice or map.

func (*Message) MetaSet

func (m *Message) MetaSet(key, value string)

MetaSet sets the value of a metadata key. If the value is an empty string the metadata key is deleted.

Strong advice: Use MetaSetMut instead.

func (*Message) MetaSetMut

func (m *Message) MetaSetMut(key string, value any)

MetaSetMut sets the value of a metadata key to any value. The value provided is stored as mutable, and therefore if it is a reference type such as a slice or map then it could be modified by a downstream component.

func (*Message) MetaWalk

func (m *Message) MetaWalk(fn func(string, string) error) error

MetaWalk iterates each metadata key/value pair and executes a provided closure on each iteration. To stop iterating, return an error from the closure. An error returned by the closure will be returned by this function.

Strong advice: Use MetaWalkMut instead.

func (*Message) MetaWalkMut

func (m *Message) MetaWalkMut(fn func(key string, value any) error) error

MetaWalkMut iterates each metadata key/value pair and executes a provided closure on each iteration. To stop iterating, return an error from the closure. An error returned by the closure will be returned by this function.

func (*Message) SetBytes

func (m *Message) SetBytes(b []byte)

SetBytes sets the underlying contents of the message as a byte slice.

func (*Message) SetError

func (m *Message) SetError(err error)

SetError marks the message as having failed a processing step and adds the error to it as context. Messages marked with errors can be handled using a range of methods outlined in https://docs.redpanda.com/redpanda-connect/configuration/error_handling.

func (*Message) SetStructured

func (m *Message) SetStructured(i any)

SetStructured sets the underlying contents of the message as a structured type. This structured value should be a scalar Go type, or either a map[string]interface{} or []interface{} containing the same types all the way through the hierarchy, this ensures that other processors are able to work with the contents and that they can be JSON marshalled when coerced into a byte array.

The provided structure is considered read-only, which means subsequent processors will need to fully clone the structure in order to perform mutations on the data.

func (*Message) SetStructuredMut

func (m *Message) SetStructuredMut(i any)

SetStructuredMut sets the underlying contents of the message as a structured type. This structured value should be a scalar Go type, or either a map[string]interface{} or []interface{} containing the same types all the way through the hierarchy, this ensures that other processors are able to work with the contents and that they can be JSON marshalled when coerced into a byte array.

The provided structure is considered mutable, which means subsequent processors might mutate the structure without performing a deep copy.

func (*Message) WithContext

func (m *Message) WithContext(ctx context.Context) *Message

WithContext returns a new message with a provided context associated with it.

type MessageBatch

type MessageBatch []*Message

MessageBatch describes a collection of one or more messages.

func ExecuteProcessors

func ExecuteProcessors(ctx context.Context, processors []*OwnedProcessor, inbatches ...MessageBatch) ([]MessageBatch, error)

ExecuteProcessors runs a set of batches through a series processors. If a context error occurs during execution then this function terminates and returns the error.

However, for general processing errors unrelated to context cancellation the errors are marked against individual messages with the `SetError` method and processing continues against subsequent processors.

func (MessageBatch) AddSyncResponse

func (b MessageBatch) AddSyncResponse() error

AddSyncResponse attempts to add this batch of messages, in its exact current condition, to the synchronous response destined for the original source input of this data. Synchronous responses aren't supported by all inputs, and so it's possible that attempting to mark a batch as ready for a synchronous response will return an error.

func (MessageBatch) BloblangMutate

func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Message, error)

BloblangMutate executes a parsed Bloblang mapping onto a message within the batch, where the contents of the message are mutated directly rather than creating an entirely new object.

Returns the same message back in a mutated form, or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

This method allows mappings to perform windowed aggregations across message batches.

Note that using overlay means certain functions within the Bloblang mapping will behave differently. In the root of the mapping the right-hand keywords `root` and `this` refer to the same mutable root of the output document.

func (MessageBatch) BloblangQuery

func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Message, error)

BloblangQuery executes a parsed Bloblang mapping on a message batch, from the perspective of a particular message index, and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.

This method allows mappings to perform windowed aggregations across message batches.

func (MessageBatch) BloblangQueryValue

func (b MessageBatch) BloblangQueryValue(index int, blobl *bloblang.Executor) (any, error)

BloblangQueryValue executes a parsed Bloblang mapping on a message batch, from the perspective of a particular message index, and returns the raw value result or an error if the mapping fails. The error bloblang.ErrRootDeleted is returned if the root of the mapping value is deleted, this is in order to allow distinction between a real nil value and a deleted value.

This method allows mappings to perform windowed aggregations across message batches.

func (MessageBatch) Copy

func (b MessageBatch) Copy() MessageBatch

Copy creates a new slice of the same messages, which can be modified without changing the contents of the original batch.

func (MessageBatch) DeepCopy

func (b MessageBatch) DeepCopy() MessageBatch

DeepCopy creates a new slice of the same messages, which can be modified without changing the contents of the original batch and are unchanged from deep mutations performed on the source message.

This is required in situations where a component wishes to retain a copy of a message batch beyond the boundaries of a process or write command. This is specifically required for buffer implementations that operate by keeping a reference to the message.

func (MessageBatch) Index

func (b MessageBatch) Index() *Indexer

Index mutates the batch in situ such that each message in the batch retains knowledge of where in the batch it currently resides. An indexer is then returned which can be used as a way of re-acquiring the original order of a batch derived from this one even after filtering, duplication and reordering has been done by other components.

This can be useful in situations where a batch of messages is going to be mutated outside of the control of this component (by processors, for example) in ways that may change the ordering or presence of messages in the resulting batch. Having an indexer that we created prior to this processing allows us to take the resulting batch and join the messages within to the messages we started with.

func (MessageBatch) InterpolatedBytes deprecated

func (b MessageBatch) InterpolatedBytes(index int, i *InterpolatedString) []byte

InterpolatedBytes resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

Deprecated: Use TryInterpolatedBytes instead.

func (MessageBatch) InterpolatedString deprecated

func (b MessageBatch) InterpolatedString(index int, i *InterpolatedString) string

InterpolatedString resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

Deprecated: Use TryInterpolatedString instead.

func (MessageBatch) TryInterpolatedBytes

func (b MessageBatch) TryInterpolatedBytes(index int, i *InterpolatedString) ([]byte, error)

TryInterpolatedBytes resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

func (MessageBatch) TryInterpolatedString

func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (string, error)

TryInterpolatedString resolves an interpolated string expression on a message batch, from the perspective of a particular message index.

This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.

func (MessageBatch) WalkWithBatchedErrors

func (b MessageBatch) WalkWithBatchedErrors(fn func(int, *Message) error) error

WalkWithBatchedErrors walks a batch and executes a closure function for each message. If the provided closure returns an error then iteration of the batch is not stopped and instead a *BatchError is created and populated.

The one exception to this behaviour is when an error is returned that is considered fatal such as ErrNotConnected, in which case iteration is terminated early and that error is returned immediately.

This is a useful pattern for batched outputs that deliver messages individually.

type MessageBatchHandlerFunc

type MessageBatchHandlerFunc func(context.Context, MessageBatch) error

MessageBatchHandlerFunc is a function signature defining a component that consumes Benthos message batches. An error must be returned if the context is cancelled, or if the messages could not be delivered or processed.

type MessageHandlerFunc

type MessageHandlerFunc func(context.Context, *Message) error

MessageHandlerFunc is a function signature defining a component that consumes Benthos messages. An error must be returned if the context is cancelled, or if the message could not be delivered or processed.

type MetadataExcludeFilter

type MetadataExcludeFilter struct {
	// contains filtered or unexported fields
}

MetadataExcludeFilter provides a configured mechanism for filtering metadata key/values from a message.

func (*MetadataExcludeFilter) Walk

func (m *MetadataExcludeFilter) Walk(msg *Message, fn func(key, value string) error) error

Walk iterates the filtered metadata key/value pairs from a message and executes a provided closure function for each pair. An error returned by the closure will be returned by this function and prevent subsequent pairs from being accessed.

func (*MetadataExcludeFilter) WalkMut

func (m *MetadataExcludeFilter) WalkMut(msg *Message, fn func(key string, value any) error) error

WalkMut iterates the filtered metadata key/value pairs as mutable structured values from a message and executes a provided closure function for each pair. An error returned by the closure will be returned by this function and prevent subsequent pairs from being accessed.

type MetadataFilter

type MetadataFilter struct {
	// contains filtered or unexported fields
}

MetadataFilter provides a configured mechanism for filtering metadata key/values from a message.

func (*MetadataFilter) IsEmpty

func (m *MetadataFilter) IsEmpty() bool

IsEmpty returns true if there aren't any rules configured for matching.

func (*MetadataFilter) Match

func (m *MetadataFilter) Match(k string) bool

Match returns true if the provided key matches the filter.

func (*MetadataFilter) Walk

func (m *MetadataFilter) Walk(msg *Message, fn func(key, value string) error) error

Walk iterates the filtered metadata key/value pairs from a message and executes a provided closure function for each pair. An error returned by the closure will be returned by this function and prevent subsequent pairs from being accessed.

func (*MetadataFilter) WalkMut

func (m *MetadataFilter) WalkMut(msg *Message, fn func(key string, value any) error) error

WalkMut iterates the filtered metadata key/value pairs as mutable structured values from a message and executes a provided closure function for each pair. An error returned by the closure will be returned by this function and prevent subsequent pairs from being accessed.

type MetricCounter

type MetricCounter struct {
	// contains filtered or unexported fields
}

MetricCounter represents a counter metric of a given name and labels.

func (*MetricCounter) Incr

func (c *MetricCounter) Incr(count int64, labelValues ...string)

Incr increments a counter metric by an integer amount, the number of label values must match the number and order of labels specified when the counter was created.

func (*MetricCounter) IncrFloat64

func (c *MetricCounter) IncrFloat64(count float64, labelValues ...string)

IncrFloat64 increments a counter metric by a decimal amount, the number of label values must match the number and order of labels specified when the counter was created.

type MetricGauge

type MetricGauge struct {
	// contains filtered or unexported fields
}

MetricGauge represents a gauge metric of a given name and labels.

func (*MetricGauge) Set

func (g *MetricGauge) Set(value int64, labelValues ...string)

Set a gauge metric, the number of label values must match the number and order of labels specified when the gauge was created.

func (*MetricGauge) SetFloat64

func (g *MetricGauge) SetFloat64(value float64, labelValues ...string)

SetFloat64 sets a gauge metric to a float64 value. Not all metrics exporters support floats, in which case the value will be cast to an int64. The number of label values must match the number and order of labels specified when the gauge was created.

type MetricTimer

type MetricTimer struct {
	// contains filtered or unexported fields
}

MetricTimer represents a timing metric of a given name and labels.

func (*MetricTimer) Timing

func (t *MetricTimer) Timing(delta int64, labelValues ...string)

Timing adds a delta to a timing metric. Delta should be measured in nanoseconds for consistency with other Benthos timing metrics.

The number of label values must match the number and order of labels specified when the timing was created.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics allows plugin authors to emit custom metrics from components that are exported the same way as native Benthos metrics. It's safe to pass around a nil pointer for testing components.

func (*Metrics) NewCounter

func (m *Metrics) NewCounter(name string, labelKeys ...string) *MetricCounter

NewCounter creates a new counter metric with a name and variant list of label keys.

func (*Metrics) NewGauge

func (m *Metrics) NewGauge(name string, labelKeys ...string) *MetricGauge

NewGauge creates a new gauge metric with a name and variant list of label keys.

func (*Metrics) NewTimer

func (m *Metrics) NewTimer(name string, labelKeys ...string) *MetricTimer

NewTimer creates a new timer metric with a name and variant list of label keys.

type MetricsExporter

type MetricsExporter interface {
	NewCounterCtor(name string, labelKeys ...string) MetricsExporterCounterCtor
	NewTimerCtor(name string, labelKeys ...string) MetricsExporterTimerCtor
	NewGaugeCtor(name string, labelKeys ...string) MetricsExporterGaugeCtor
	Close(ctx context.Context) error
}

MetricsExporter is an interface implemented by Benthos metrics exporters.

type MetricsExporterConstructor

type MetricsExporterConstructor func(conf *ParsedConfig, log *Logger) (MetricsExporter, error)

MetricsExporterConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a metrics exporter based on the config, or an error.

type MetricsExporterCounter

type MetricsExporterCounter interface {
	// Incr increments a counter metric by an integer amount, the number of label values
	// must match the number and order of labels specified when the counter was
	// created.
	Incr(count int64)
}

MetricsExporterCounter represents a counter metric of a given name and labels.

type MetricsExporterCounterCtor

type MetricsExporterCounterCtor func(labelValues ...string) MetricsExporterCounter

MetricsExporterCounterCtor is a constructor for a MetricsExporterCounter that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.

type MetricsExporterGauge

type MetricsExporterGauge interface {
	// Set sets a gauge metric with an int64 value, the number of label values must match the number and
	// order of labels specified when the gauge was created.
	Set(value int64)
}

MetricsExporterGauge represents a gauge metric of a given name and labels.

type MetricsExporterGaugeCtor

type MetricsExporterGaugeCtor func(labelValues ...string) MetricsExporterGauge

MetricsExporterGaugeCtor is a constructor for a MetricsExporterGauge that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.

type MetricsExporterTimer

type MetricsExporterTimer interface {
	// Timing adds a delta to a timing metric. Delta should be measured in
	// nanoseconds for consistency with other Benthos timing metrics.
	//
	// The number of label values must match the number and order of labels
	// specified when the timing was created.
	Timing(delta int64)
}

MetricsExporterTimer represents a timing metric of a given name and labels.

type MetricsExporterTimerCtor

type MetricsExporterTimerCtor func(labelValues ...string) MetricsExporterTimer

MetricsExporterTimerCtor is a constructor for a MetricsExporterTimer that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.

type MockResourcesOptFn

type MockResourcesOptFn func(*mock.Manager)

MockResourcesOptFn provides a func based optional argument to MockResources.

func MockResourcesOptAddCache

func MockResourcesOptAddCache(name string) MockResourcesOptFn

MockResourcesOptAddCache instantiates the resources type with a mock cache with a given name. Cached items are held in memory.

func MockResourcesOptAddRateLimit

func MockResourcesOptAddRateLimit(name string, fn func(context.Context) (time.Duration, error)) MockResourcesOptFn

MockResourcesOptAddRateLimit instantiates the resources type with a mock rate limit with a given name, the provided closure will be called for each invocation of the rate limit.

func MockResourcesOptUseLogger

func MockResourcesOptUseLogger(l *Logger) MockResourcesOptFn

MockResourcesOptUseLogger sets the logger to be used by components referencing these resources.

type OtelTracerProviderConstructor

type OtelTracerProviderConstructor func(conf *ParsedConfig) (trace.TracerProvider, error)

OtelTracerProviderConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of an open telemetry tracer provider.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

type Output

type Output interface {
	// Establish a connection to the downstream service. Connect will always be
	// called first when a writer is instantiated, and will be continuously
	// called with back off until a nil error is returned.
	//
	// The provided context remains open only for the duration of the connecting
	// phase, and should not be used to establish the lifetime of the connection
	// itself.
	//
	// Once Connect returns a nil error the write method will be called until
	// either ErrNotConnected is returned, or the writer is closed.
	Connect(context.Context) error

	// Write a message to a sink, or return an error if delivery is not
	// possible.
	//
	// If this method returns ErrNotConnected then write will not be called
	// again until Connect has returned a nil error.
	Write(context.Context, *Message) error

	Closer
}

Output is an interface implemented by Benthos outputs that support single message writes. Each call to Write should block until either the message has been successfully or unsuccessfully sent, or the context is cancelled.

Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel write calls the output supports.

type OutputConstructor

type OutputConstructor func(conf *ParsedConfig, mgr *Resources) (out Output, maxInFlight int, err error)

OutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config and a maximum number of in-flight messages to allow, or an error.

type OwnedInput

type OwnedInput struct {
	// contains filtered or unexported fields
}

OwnedInput provides direct ownership of an input extracted from a plugin config. Connectivity of the input is handled internally, and so the consumer of this type should only be concerned with reading messages and eventually calling Close to terminate the input.

func (*OwnedInput) BatchedWith

func (o *OwnedInput) BatchedWith(b *Batcher) *OwnedInput

BatchedWith returns a copy of the OwnedInput where messages will be batched according to the provided batcher.

func (*OwnedInput) Close

func (o *OwnedInput) Close(ctx context.Context) error

Close the input.

func (*OwnedInput) ReadBatch

func (o *OwnedInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error)

ReadBatch attempts to read a message batch from the input, along with a function to be called once the entire batch can be either acked (successfully sent or intentionally filtered) or nacked (failed to be processed or dispatched to the output).

If this method returns ErrEndOfInput then that indicates that the input has finished and will no longer yield new messages.

func (*OwnedInput) XUnwrapper

func (o *OwnedInput) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type OwnedOutput

type OwnedOutput struct {
	// contains filtered or unexported fields
}

OwnedOutput provides direct ownership of an output extracted from a plugin config. Connectivity of the output is handled internally, and so the owner of this type should only be concerned with writing messages and eventually calling Close to terminate the output.

func (*OwnedOutput) BatchedWith

func (o *OwnedOutput) BatchedWith(b *Batcher) *OwnedOutput

BatchedWith returns a copy of the OwnedOutput where messages will be batched according to the provided batcher.

func (*OwnedOutput) Close

func (o *OwnedOutput) Close(ctx context.Context) error

Close the output.

func (*OwnedOutput) Prime

func (o *OwnedOutput) Prime() error

Prime attempts to establish the output connection ready for consuming data. This is done automatically once data is written. However, pre-emptively priming the connection before data is received is generally a better idea for short lived outputs as it'll speed up the first write.

func (*OwnedOutput) PrimeBuffered

func (o *OwnedOutput) PrimeBuffered(n int) error

PrimeBuffered performs the same output preparation as Prime but the internal transaction channel used for delivering data is buffered with the provided size. This allows for multiple write transactions to be written to the buffer and may improve the chance of delivery when using the WriteBatchNonBlocking method.

func (*OwnedOutput) Write

func (o *OwnedOutput) Write(ctx context.Context, m *Message) error

Write a message to the output, or return an error either if delivery is not possible or the context is cancelled.

func (*OwnedOutput) WriteBatch

func (o *OwnedOutput) WriteBatch(ctx context.Context, b MessageBatch) error

WriteBatch attempts to write a message batch to the output, and returns an error either if delivery is not possible or the context is cancelled.

func (*OwnedOutput) WriteBatchNonBlocking

func (o *OwnedOutput) WriteBatchNonBlocking(b MessageBatch, aFn AckFunc) error

WriteBatchNonBlocking attempts to write a message batch to the output, but if the write is blocked (the read channel is full or not being listened to) then the write is aborted immediately in order to prevent blocking the caller.

Instead of blocking until an acknowledgement of delivery is returned this method returns immediately and the provided acknowledgement function is called when appropriate.

If the write is aborted then ErrBlockingWrite is returned. An error may also be returned if the output cannot be primed.

func (*OwnedOutput) XUnwrapper

func (o *OwnedOutput) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type OwnedProcessor

type OwnedProcessor struct {
	// contains filtered or unexported fields
}

OwnedProcessor provides direct ownership of a processor extracted from a plugin config.

func (*OwnedProcessor) Close

func (o *OwnedProcessor) Close(ctx context.Context) error

Close the processor, allowing it to clean up resources.

func (*OwnedProcessor) Process

func (o *OwnedProcessor) Process(ctx context.Context, msg *Message) (MessageBatch, error)

Process a single message, returns either a batch of zero or more resulting messages or an error if the message could not be processed.

func (*OwnedProcessor) ProcessBatch

func (o *OwnedProcessor) ProcessBatch(ctx context.Context, batch MessageBatch) ([]MessageBatch, error)

ProcessBatch attempts to process a batch of messages, returns zero or more batches of resulting messages, or an error if the context is cancelled during execution.

However, for general processing errors unrelated to context cancellation the error is marked against individual messages with the `SetError` method and a nil error is returned by this method.

func (*OwnedProcessor) XUnwrapper

func (o *OwnedProcessor) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type OwnedScanner

type OwnedScanner struct {
	// contains filtered or unexported fields
}

OwnedScanner provides direct ownership of a scanner.

func (*OwnedScanner) Close

func (s *OwnedScanner) Close(ctx context.Context) error

Close the scanner, indicating that it will no longer be consumed.

func (*OwnedScanner) NextBatch

func (s *OwnedScanner) NextBatch(ctx context.Context) (MessageBatch, AckFunc, error)

NextBatch attempts to further consume the underlying reader in order to extract another message (or multiple).

type OwnedScannerCreator

type OwnedScannerCreator struct {
	// contains filtered or unexported fields
}

OwnedScannerCreator provides direct ownership of a batch scanner extracted from a plugin config.

func (*OwnedScannerCreator) Close

func (s *OwnedScannerCreator) Close(ctx context.Context) error

func (*OwnedScannerCreator) Create

Create a new scanner from an io.ReadCloser along with optional information about the source of the reader and a function to be called once the underlying data has been read and acknowledged in its entirety.

type ParsedConfig

type ParsedConfig struct {
	// contains filtered or unexported fields
}

ParsedConfig represents a plugin configuration that has been validated and parsed from a ConfigSpec, and allows plugin constructors to access configuration fields.

func (*ParsedConfig) Contains

func (p *ParsedConfig) Contains(path ...string) bool

Contains checks whether the parsed config contains a given field identified by its name.

func (*ParsedConfig) EngineVersion

func (p *ParsedConfig) EngineVersion() string

EngineVersion returns the version stamp associated with the underlying benthos engine. The version string is not guaranteed to match any particular scheme.

func (*ParsedConfig) FieldAny

func (p *ParsedConfig) FieldAny(path ...string) (any, error)

FieldAny accesses a field from the parsed config by its name that can assume any value type. If the field is not found an error is returned.

func (*ParsedConfig) FieldAnyList

func (p *ParsedConfig) FieldAnyList(path ...string) ([]*ParsedConfig, error)

FieldAnyList accesses a field that is a list of any value types from the parsed config by its name and returns the value as an array of *ParsedConfig types, where each one represents an object or value in the list. Returns an error if the field is not found, or is not a list of values.

func (*ParsedConfig) FieldAnyMap

func (p *ParsedConfig) FieldAnyMap(path ...string) (map[string]*ParsedConfig, error)

FieldAnyMap accesses a field that is an object of arbitrary keys and any values from the parsed config by its name and returns a map of *ParsedConfig types, where each one represents an object or value in the map. Returns an error if the field is not found, or is not an object.

func (*ParsedConfig) FieldBackOff

func (p *ParsedConfig) FieldBackOff(path ...string) (*backoff.ExponentialBackOff, error)

FieldBackOff accesses a field from a parsed config that was defined with NewBackoffField and returns a *backoff.ExponentialBackOff, or an error if the configuration was invalid.

func (*ParsedConfig) FieldBackOffToggled

func (p *ParsedConfig) FieldBackOffToggled(path ...string) (boff *backoff.ExponentialBackOff, enabled bool, err error)

FieldBackOffToggled accesses a field from a parsed config that was defined with NewBackOffField and returns a *backoff.ExponentialBackOff and a boolean flag indicating whether retries are explicitly enabled, or an error if the configuration was invalid.

func (*ParsedConfig) FieldBatchPolicy

func (p *ParsedConfig) FieldBatchPolicy(path ...string) (conf BatchPolicy, err error)

FieldBatchPolicy accesses a field from a parsed config that was defined with NewBatchPolicyField and returns a BatchPolicy, or an error if the configuration was invalid.

func (*ParsedConfig) FieldBloblang

func (p *ParsedConfig) FieldBloblang(path ...string) (*bloblang.Executor, error)

FieldBloblang accesses a field from a parsed config that was defined with NewBloblangField and returns either a *bloblang.Executor or an error if the mapping was invalid.

func (*ParsedConfig) FieldBool

func (p *ParsedConfig) FieldBool(path ...string) (bool, error)

FieldBool accesses a bool field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a bool.

func (*ParsedConfig) FieldDuration

func (p *ParsedConfig) FieldDuration(path ...string) (time.Duration, error)

FieldDuration accesses a duration string field from the parsed config by its name. If the field is not found or is not a valid duration string an error is returned.

func (*ParsedConfig) FieldFloat

func (p *ParsedConfig) FieldFloat(path ...string) (float64, error)

FieldFloat accesses a float field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a float.

func (*ParsedConfig) FieldFloatList

func (p *ParsedConfig) FieldFloatList(path ...string) ([]float64, error)

FieldFloatList accesses a field that is a list of floats from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of floats.

func (*ParsedConfig) FieldFloatMap

func (p *ParsedConfig) FieldFloatMap(path ...string) (map[string]float64, error)

FieldFloatMap accesses a field that is an object of arbitrary keys and float values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of floats.

func (*ParsedConfig) FieldInput

func (p *ParsedConfig) FieldInput(path ...string) (*OwnedInput, error)

FieldInput accesses a field from a parsed config that was defined with NewInputField and returns an OwnedInput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldInputList

func (p *ParsedConfig) FieldInputList(path ...string) ([]*OwnedInput, error)

FieldInputList accesses a field from a parsed config that was defined with NewInputListField and returns a slice of OwnedInput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldInputMap

func (p *ParsedConfig) FieldInputMap(path ...string) (map[string]*OwnedInput, error)

FieldInputMap accesses a field from a parsed config that was defined with NewInputMapField and returns a map of OwnedInput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldInt

func (p *ParsedConfig) FieldInt(path ...string) (int, error)

FieldInt accesses an int field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not an int.

func (*ParsedConfig) FieldIntList

func (p *ParsedConfig) FieldIntList(path ...string) ([]int, error)

FieldIntList accesses a field that is a list of integers from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of integers.

func (*ParsedConfig) FieldIntMap

func (p *ParsedConfig) FieldIntMap(path ...string) (map[string]int, error)

FieldIntMap accesses a field that is an object of arbitrary keys and integer values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of integers.

func (*ParsedConfig) FieldInterpolatedString

func (p *ParsedConfig) FieldInterpolatedString(path ...string) (*InterpolatedString, error)

FieldInterpolatedString accesses a field from a parsed config that was defined with NewInterpolatedStringField and returns either an *InterpolatedString or an error if the string was invalid.

func (*ParsedConfig) FieldInterpolatedStringList

func (p *ParsedConfig) FieldInterpolatedStringList(path ...string) ([]*InterpolatedString, error)

FieldInterpolatedStringList accesses a field that is a list of interpolated string values from the parsed config.

Returns an error if the field is not found, or is not an list of interpolated strings.

func (*ParsedConfig) FieldInterpolatedStringMap

func (p *ParsedConfig) FieldInterpolatedStringMap(path ...string) (map[string]*InterpolatedString, error)

FieldInterpolatedStringMap accesses a field that is an object of arbitrary keys and interpolated string values from the parsed config by its name and returns the value.

Returns an error if the field is not found, or is not an object of interpolated strings.

func (*ParsedConfig) FieldMaxInFlight

func (p *ParsedConfig) FieldMaxInFlight() (int, error)

FieldMaxInFlight accesses a field from a parsed config that was defined either with NewInputMaxInFlightField or NewOutputMaxInFlightField, and returns either an integer or an error if the value was invalid.

func (*ParsedConfig) FieldMetadataExcludeFilter

func (p *ParsedConfig) FieldMetadataExcludeFilter(path ...string) (f *MetadataExcludeFilter, err error)

FieldMetadataExcludeFilter accesses a field from a parsed config that was defined with NewMetdataExcludeFilterField and returns a MetadataExcludeFilter, or an error if the configuration was invalid.

func (*ParsedConfig) FieldMetadataFilter

func (p *ParsedConfig) FieldMetadataFilter(path ...string) (f *MetadataFilter, err error)

FieldMetadataFilter accesses a field from a parsed config that was defined with NewMetdataFilterField and returns a MetadataFilter, or an error if the configuration was invalid.

func (*ParsedConfig) FieldObjectList

func (p *ParsedConfig) FieldObjectList(path ...string) ([]*ParsedConfig, error)

FieldObjectList accesses a field that is a list of objects from the parsed config by its name and returns the value as an array of *ParsedConfig types, where each one represents an object in the list. Returns an error if the field is not found, or is not a list of objects.

func (*ParsedConfig) FieldObjectMap

func (p *ParsedConfig) FieldObjectMap(path ...string) (map[string]*ParsedConfig, error)

FieldObjectMap accesses a field that is a map of objects from the parsed config by its name and returns the value as a map of *ParsedConfig types, where each one represents an object in the map. Returns an error if the field is not found, or is not a map of objects.

func (*ParsedConfig) FieldOutput

func (p *ParsedConfig) FieldOutput(path ...string) (*OwnedOutput, error)

FieldOutput accesses a field from a parsed config that was defined with NewOutputField and returns an OwnedOutput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldOutputList

func (p *ParsedConfig) FieldOutputList(path ...string) ([]*OwnedOutput, error)

FieldOutputList accesses a field from a parsed config that was defined with NewOutputListField and returns a slice of OwnedOutput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldOutputMap

func (p *ParsedConfig) FieldOutputMap(path ...string) (map[string]*OwnedOutput, error)

FieldOutputMap accesses a field from a parsed config that was defined with NewOutputMapField and returns a map of OwnedOutput, or an error if the configuration was invalid.

func (*ParsedConfig) FieldProcessor

func (p *ParsedConfig) FieldProcessor(path ...string) (*OwnedProcessor, error)

FieldProcessor accesses a field from a parsed config that was defined with NewProcessorField and returns an OwnedProcessor, or an error if the configuration was invalid.

func (*ParsedConfig) FieldProcessorList

func (p *ParsedConfig) FieldProcessorList(path ...string) ([]*OwnedProcessor, error)

FieldProcessorList accesses a field from a parsed config that was defined with NewProcessorListField and returns a slice of OwnedProcessor, or an error if the configuration was invalid.

func (*ParsedConfig) FieldScanner

func (p *ParsedConfig) FieldScanner(path ...string) (*OwnedScannerCreator, error)

FieldScanner accesses a field from a parsed config that was defined with NewScannerField and returns an OwnedScannerCreator, or an error if the configuration was invalid.

func (*ParsedConfig) FieldString

func (p *ParsedConfig) FieldString(path ...string) (string, error)

FieldString accesses a string field from the parsed config by its name. If the field is not found or is not a string an error is returned.

func (*ParsedConfig) FieldStringList

func (p *ParsedConfig) FieldStringList(path ...string) ([]string, error)

FieldStringList accesses a field that is a list of strings from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of strings.

func (*ParsedConfig) FieldStringListOfLists

func (p *ParsedConfig) FieldStringListOfLists(path ...string) ([][]string, error)

FieldStringListOfLists accesses a field that is a list of lists of strings from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of lists of strings.

func (*ParsedConfig) FieldStringMap

func (p *ParsedConfig) FieldStringMap(path ...string) (map[string]string, error)

FieldStringMap accesses a field that is an object of arbitrary keys and string values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of strings.

func (*ParsedConfig) FieldTLS

func (p *ParsedConfig) FieldTLS(path ...string) (*tls.Config, error)

FieldTLS accesses a field from a parsed config that was defined with NewTLSField and returns a *tls.Config, or an error if the configuration was invalid.

func (*ParsedConfig) FieldTLSToggled

func (p *ParsedConfig) FieldTLSToggled(path ...string) (tconf *tls.Config, enabled bool, err error)

FieldTLSToggled accesses a field from a parsed config that was defined with NewTLSFieldToggled and returns a *tls.Config and a boolean flag indicating whether tls is explicitly enabled, or an error if the configuration was invalid.

func (*ParsedConfig) FieldURL

func (p *ParsedConfig) FieldURL(path ...string) (*url.URL, error)

FieldURL accesses a field from a parsed config that was defined with NewURLField and returns either a *url.URL or an error if the string was invalid.

func (*ParsedConfig) FieldURLList

func (p *ParsedConfig) FieldURLList(path ...string) ([]*url.URL, error)

FieldURLList accesses a field from a parsed config that was defined with NewURLListField and returns either a []*url.URL or an error if one or more strings were invalid.

func (*ParsedConfig) HTTPRequestAuthSignerFromParsed

func (p *ParsedConfig) HTTPRequestAuthSignerFromParsed() (fn func(fs.FS, *http.Request) error, err error)

HTTPRequestAuthSignerFromParsed takes a parsed config which is expected to contain fields from NewHTTPRequestAuthSignerFields, and returns a func that applies those configured authentication mechanisms to a given HTTP request.

func (*ParsedConfig) Namespace

func (p *ParsedConfig) Namespace(path ...string) *ParsedConfig

Namespace returns a version of the parsed config at a given field namespace. This is useful for extracting multiple fields under the same grouping.

func (*ParsedConfig) WrapBatchInputExtractTracingSpanMapping

func (p *ParsedConfig) WrapBatchInputExtractTracingSpanMapping(inputName string, i BatchInput) (
	BatchInput, error)

WrapBatchInputExtractTracingSpanMapping wraps a BatchInput with a mechanism for extracting tracing spans using a bloblang mapping.

func (*ParsedConfig) WrapBatchOutputExtractTracingSpanMapping

func (p *ParsedConfig) WrapBatchOutputExtractTracingSpanMapping(outputName string, o BatchOutput) (BatchOutput, error)

WrapBatchOutputExtractTracingSpanMapping wraps a BatchOutput with a mechanism for extracting tracing spans from the consumed message and merging them into the written result using a Bloblang mapping.

func (*ParsedConfig) WrapInputExtractTracingSpanMapping

func (p *ParsedConfig) WrapInputExtractTracingSpanMapping(inputName string, i Input) (Input, error)

WrapInputExtractTracingSpanMapping wraps a Input with a mechanism for extracting tracing spans from the consumed message using a Bloblang mapping.

func (*ParsedConfig) WrapOutputExtractTracingSpanMapping

func (p *ParsedConfig) WrapOutputExtractTracingSpanMapping(outputName string, o Output) (Output, error)

WrapOutputExtractTracingSpanMapping wraps a Output with a mechanism for extracting tracing spans from the consumed message and merging them into the written result using a Bloblang mapping.

type PrintLogger

type PrintLogger interface {
	Printf(format string, v ...any)
	Println(v ...any)
}

PrintLogger is a simple Print based interface implemented by custom loggers.

type Processor

type Processor interface {
	// Process a message into one or more resulting messages, or return an error
	// if the message could not be processed. If zero messages are returned and
	// the error is nil then the message is filtered.
	//
	// When an error is returned the input message will continue down the
	// pipeline but will be marked with the error with *message.SetError, and
	// metrics and logs will be emitted. The failed message can then be handled
	// with the patterns outlined in https://docs.redpanda.com/redpanda-connect/configuration/error_handling.
	//
	// The Message types returned MUST be derived from the provided message, and
	// CANNOT be custom implementations of Message. In order to copy the
	// provided message use the Copy method.
	Process(context.Context, *Message) (MessageBatch, error)

	Closer
}

Processor is a Benthos processor implementation that works against single messages.

type ProcessorConstructor

type ProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (Processor, error)

ProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.

type RateLimit

type RateLimit interface {
	// Access the rate limited resource. Returns a duration or an error if the
	// rate limit check fails. The returned duration is either zero (meaning the
	// resource may be accessed) or a reasonable length of time to wait before
	// requesting again.
	Access(context.Context) (time.Duration, error)

	Closer
}

RateLimit is an interface implemented by Benthos rate limits.

type RateLimitConstructor

type RateLimitConstructor func(conf *ParsedConfig, mgr *Resources) (RateLimit, error)

RateLimitConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a rate limit based on the config, or an error.

type ResourceInput

type ResourceInput struct {
	// contains filtered or unexported fields
}

ResourceInput provides access to an input resource.

func (*ResourceInput) ReadBatch

func (r *ResourceInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error)

ReadBatch attempts to read a message batch from the input, along with a function to be called once the entire batch can be either acked (successfully sent or intentionally filtered) or nacked (failed to be processed or dispatched to the output).

If this method returns ErrEndOfInput then that indicates that the input has finished and will no longer yield new messages.

type ResourceOutput

type ResourceOutput struct {
	// contains filtered or unexported fields
}

ResourceOutput provides access to an output resource.

func (*ResourceOutput) Write

func (o *ResourceOutput) Write(ctx context.Context, m *Message) error

Write a message to the output, or return an error either if delivery is not possible or the context is cancelled.

func (*ResourceOutput) WriteBatch

func (o *ResourceOutput) WriteBatch(ctx context.Context, b MessageBatch) error

WriteBatch attempts to write a message batch to the output, and returns an error either if delivery is not possible or the context is cancelled.

type Resources

type Resources struct {
	// contains filtered or unexported fields
}

Resources provides access to service-wide resources.

func MockResources

func MockResources(opts ...MockResourcesOptFn) *Resources

MockResources returns an instantiation of a resources struct that provides valid but ineffective methods and observability components. It is possible to instantiate this with mocked (in-memory) cache and rate limit types for testing purposed.

func (*Resources) AccessCache

func (r *Resources) AccessCache(ctx context.Context, name string, fn func(c Cache)) error

AccessCache attempts to access a cache resource by name. This action can block if CRUD operations are being actively performed on the resource.

func (*Resources) AccessInput

func (r *Resources) AccessInput(ctx context.Context, name string, fn func(i *ResourceInput)) error

AccessInput attempts to access a input resource by name.

func (*Resources) AccessOutput

func (r *Resources) AccessOutput(ctx context.Context, name string, fn func(o *ResourceOutput)) error

AccessOutput attempts to access an output resource by name. This action can block if CRUD operations are being actively performed on the resource.

func (*Resources) AccessRateLimit

func (r *Resources) AccessRateLimit(ctx context.Context, name string, fn func(r RateLimit)) error

AccessRateLimit attempts to access a rate limit resource by name. This action can block if CRUD operations are being actively performed on the resource.

func (*Resources) EngineVersion

func (r *Resources) EngineVersion() string

EngineVersion returns the version stamp associated with the underlying benthos engine. The version string is not guaranteed to match any particular scheme.

func (*Resources) FS

func (r *Resources) FS() *FS

FS returns an fs.FS implementation that provides isolation or customised behaviour for components that access the filesystem. For example, this might be used to tally files being accessed by components for observability purposes, or to customise where relative paths are resolved from.

Components should use this instead of accessing the os directly. However, the default behaviour of an environment FS is to access the OS from the directory the process is running from, which matches calling the os package directly.

func (*Resources) HasCache

func (r *Resources) HasCache(name string) bool

HasCache confirms whether a cache with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) HasInput

func (r *Resources) HasInput(name string) bool

HasInput confirms whether an input with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) HasOutput

func (r *Resources) HasOutput(name string) bool

HasOutput confirms whether an output with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) HasRateLimit

func (r *Resources) HasRateLimit(name string) bool

HasRateLimit confirms whether a rate limit with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.

func (*Resources) Label

func (r *Resources) Label() string

Label returns a label that identifies the component instantiation. This could be an explicit label set in config, or is otherwise a generated label based on the position of the component within a config.

func (*Resources) Logger

func (r *Resources) Logger() *Logger

Logger returns a logger preset with context about the component the resources were provided to.

func (*Resources) ManagedBatchOutput

func (r *Resources) ManagedBatchOutput(typeName string, maxInFlight int, b BatchOutput) (*OwnedOutput, error)

ManagedBatchOutput takes a BatchOutput implementation and wraps it within a mechanism that automatically manages QOL details such as connect/reconnect looping, max in flight, back pressure, and so on. This is similar to how an output would be executed within a standard Benthos pipeline.

func (*Resources) Metrics

func (r *Resources) Metrics() *Metrics

Metrics returns a mechanism for creating custom metrics.

func (*Resources) OtelTracer

func (r *Resources) OtelTracer() trace.TracerProvider

OtelTracer returns an open telemetry tracer provider that can be used to create new tracers.

Experimental: This type signature is experimental and therefore subject to change outside of major version releases.

func (*Resources) XUnwrapper

func (r *Resources) XUnwrapper() any

XUnwrapper is for internal use only, do not use this.

type ScannerSourceDetails

type ScannerSourceDetails struct {
	// contains filtered or unexported fields
}

ScannerSourceDetails contains exclusively optional information which could be used by scanner implementations in order to determine the underlying data format.

func NewScannerSourceDetails

func NewScannerSourceDetails() *ScannerSourceDetails

NewScannerSourceDetails creates a ScannerSourceDetails object with default values.

func (*ScannerSourceDetails) Name

func (r *ScannerSourceDetails) Name() string

Name returns a filename (or other equivalent name of the source), or an empty string if it has not been set.

func (*ScannerSourceDetails) SetName

func (r *ScannerSourceDetails) SetName(name string)

SetName sets a filename (or other equivalent name of the source) to details.

type SimpleBatchScanner

type SimpleBatchScanner interface {
	NextBatch(context.Context) (MessageBatch, error)
	Close(context.Context) error
}

SimpleBatchScanner is a reduced version of BatchScanner where managing the aggregation of acknowledgments from yielded message batches is omitted.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream executes a full Benthos stream and provides methods for performing status checks, terminating the stream, and blocking until the stream ends.

func (*Stream) Run

func (s *Stream) Run(ctx context.Context) (err error)

Run attempts to start the stream pipeline and blocks until either the stream has gracefully come to a stop, or the provided context is cancelled.

func (*Stream) Stop

func (s *Stream) Stop(ctx context.Context) (err error)

Stop attempts to close the stream gracefully, but if the context is closed or draws near to a deadline the attempt becomes less graceful.

An ungraceful shutdown increases the likelihood of processing duplicate messages on the next start up, but never results in dropped messages as long as the input source supports at-least-once delivery.

func (*Stream) StopWithin

func (s *Stream) StopWithin(timeout time.Duration) error

StopWithin attempts to close the stream within the specified timeout period. Initially the attempt is graceful, but as the timeout draws close the attempt becomes progressively less graceful.

An ungraceful shutdown increases the likelihood of processing duplicate messages on the next start up, but never results in dropped messages as long as the input source supports at-least-once delivery.

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

StreamBuilder provides methods for building a Benthos stream configuration. When parsing Benthos configs this builder follows the schema and field defaults of a standard Benthos configuration. Environment variable interpolations are also parsed and resolved the same as regular configs.

Streams built with a stream builder have the HTTP server for exposing metrics and ready checks disabled by default, which is the only deviation away from a standard Benthos default configuration. In order to enable the server set the configuration field `http.enabled` to `true` explicitly, or use `SetHTTPMux` in order to provide an explicit HTTP multiplexer for registering those endpoints.

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

NewStreamBuilder creates a new StreamBuilder.

func (*StreamBuilder) AddBatchConsumerFunc

func (s *StreamBuilder) AddBatchConsumerFunc(fn MessageBatchHandlerFunc) error

AddBatchConsumerFunc adds an output to the builder that executes a closure function argument for each message batch. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.

The provided MessageBatchHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.

Only one consumer can be added to a stream builder, and subsequent calls will return an error.

Message batches must be created by upstream components (inputs, buffers, etc) otherwise message batches received by this consumer will have a single message contents.

func (*StreamBuilder) AddBatchProducerFunc

func (s *StreamBuilder) AddBatchProducerFunc() (MessageBatchHandlerFunc, error)

AddBatchProducerFunc adds an input to the builder that allows you to write message batches directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.

The returned MessageBatchHandlerFunc can be called concurrently from any number of goroutines, and each call will block until all messages within the batch are successfully delivered downstream, were rejected (or otherwise could not be delivered) or the context is cancelled.

Only one producer func can be added to a stream builder, and subsequent calls will return an error.

func (*StreamBuilder) AddCacheYAML

func (s *StreamBuilder) AddCacheYAML(conf string) error

AddCacheYAML parses a cache YAML configuration and adds it to the builder as a resource.

func (*StreamBuilder) AddConsumerFunc

func (s *StreamBuilder) AddConsumerFunc(fn MessageHandlerFunc) error

AddConsumerFunc adds an output to the builder that executes a closure function argument for each message. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.

The provided MessageHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.

Only one consumer can be added to a stream builder, and subsequent calls will return an error.

func (*StreamBuilder) AddInputYAML

func (s *StreamBuilder) AddInputYAML(conf string) error

AddInputYAML parses an input YAML configuration and adds it to the builder. If more than one input configuration is added they will automatically be composed within a broker when the pipeline is built.

func (*StreamBuilder) AddOutputYAML

func (s *StreamBuilder) AddOutputYAML(conf string) error

AddOutputYAML parses an output YAML configuration and adds it to the builder. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.

func (*StreamBuilder) AddProcessorYAML

func (s *StreamBuilder) AddProcessorYAML(conf string) error

AddProcessorYAML parses a processor YAML configuration and adds it to the builder to be executed within the pipeline.processors section, after all prior added processor configs.

func (*StreamBuilder) AddProducerFunc

func (s *StreamBuilder) AddProducerFunc() (MessageHandlerFunc, error)

AddProducerFunc adds an input to the builder that allows you to write messages directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.

The returned MessageHandlerFunc can be called concurrently from any number of goroutines, and each call will block until the message is successfully delivered downstream, was rejected (or otherwise could not be delivered) or the context is cancelled.

Only one producer func can be added to a stream builder, and subsequent calls will return an error.

func (*StreamBuilder) AddRateLimitYAML

func (s *StreamBuilder) AddRateLimitYAML(conf string) error

AddRateLimitYAML parses a rate limit YAML configuration and adds it to the builder as a resource.

func (*StreamBuilder) AddResourcesYAML

func (s *StreamBuilder) AddResourcesYAML(conf string) error

AddResourcesYAML parses resource configurations and adds them to the config.

func (*StreamBuilder) AsYAML

func (s *StreamBuilder) AsYAML() (string, error)

AsYAML prints a YAML representation of the stream config as it has been currently built.

func (*StreamBuilder) Build

func (s *StreamBuilder) Build() (*Stream, error)

Build a Benthos stream pipeline according to the components specified by this stream builder.

func (*StreamBuilder) BuildTraced

func (s *StreamBuilder) BuildTraced() (*Stream, *TracingSummary, error)

BuildTraced creates a Benthos stream pipeline according to the components specified by this stream builder, where each major component (input, processor, output) is wrapped with a tracing module that, during the lifetime of the stream, aggregates tracing events into the returned *TracingSummary. Once the stream has ended the TracingSummary can be queried for events that occurred.

Experimental: The behaviour of this method could change outside of major version releases.

func (*StreamBuilder) DisableLinting

func (s *StreamBuilder) DisableLinting()

DisableLinting configures the stream builder to no longer lint YAML configs, allowing you to add snippets of config to the builder without failing on linting rules.

func (*StreamBuilder) SetBufferYAML

func (s *StreamBuilder) SetBufferYAML(conf string) error

SetBufferYAML parses a buffer YAML configuration and sets it to the builder to be placed between the input and the pipeline (processors) sections. This config will replace any prior configured buffer.

func (*StreamBuilder) SetEngineVersion

func (s *StreamBuilder) SetEngineVersion(ev string)

SetEngineVersion sets the version string representing the Benthos engine that components will see. By default a best attempt will be made to determine a version either from the benthos module import or a build-time flag.

func (*StreamBuilder) SetEnvVarLookupFunc

func (s *StreamBuilder) SetEnvVarLookupFunc(fn func(string) (string, bool))

SetEnvVarLookupFunc changes the behaviour of the stream builder so that the value of environment variable interpolations (of the form `${FOO}`) are obtained via a provided function rather than the default of os.LookupEnv.

TODO V5: Add context here, Travis is onto us.

func (*StreamBuilder) SetFields

func (s *StreamBuilder) SetFields(pathValues ...any) error

SetFields modifies the config by setting one or more fields identified by a dot path to a value. The argument must be a variadic list of pairs, where the first element is a string containing the target field dot path, and the second element is a typed value to set the field to.

func (*StreamBuilder) SetHTTPMux

func (s *StreamBuilder) SetHTTPMux(m HTTPMultiplexer)

SetHTTPMux sets an HTTP multiplexer to be used by stream components when registering endpoints instead of a new server spawned following the `http` fields of a Benthos config.

func (*StreamBuilder) SetLogger

func (s *StreamBuilder) SetLogger(l *slog.Logger)

SetLogger sets a customer logger via Go's standard logging interface, allowing you to replace the default Benthos logger with your own.

func (*StreamBuilder) SetLoggerYAML

func (s *StreamBuilder) SetLoggerYAML(conf string) error

SetLoggerYAML parses a logger YAML configuration and adds it to the builder such that all stream components emit logs through it.

func (*StreamBuilder) SetMetricsYAML

func (s *StreamBuilder) SetMetricsYAML(conf string) error

SetMetricsYAML parses a metrics YAML configuration and adds it to the builder such that all stream components emit metrics through it.

func (*StreamBuilder) SetPrintLogger

func (s *StreamBuilder) SetPrintLogger(l PrintLogger)

SetPrintLogger sets a custom logger supporting a simple Print based interface to be used by stream components. This custom logger will override any logging fields set via config.

func (*StreamBuilder) SetThreads

func (s *StreamBuilder) SetThreads(n int)

SetThreads configures the number of pipeline processor threads should be configured. By default the number will be zero, which means the thread count will match the number of logical CPUs on the machine.

func (*StreamBuilder) SetTracerYAML

func (s *StreamBuilder) SetTracerYAML(conf string) error

SetTracerYAML parses a tracer YAML configuration and adds it to the builder such that all stream components emit tracing spans through it.

func (*StreamBuilder) SetYAML

func (s *StreamBuilder) SetYAML(conf string) error

SetYAML parses a full Benthos config and uses it to configure the builder. If any inputs, processors, outputs, resources, etc, have previously been added to the builder they will be overridden by this new config.

func (*StreamBuilder) WalkComponents

func (s *StreamBuilder) WalkComponents(fn func(w *WalkedComponent) error) error

WalkComponents walks the Benthos configuration as it is currently built and for each component type (input, processor, output, etc) calls a provided function with a struct containing information about the component.

This can be useful for taking an inventory of the contents of a config.

type StreamConfigLinter

type StreamConfigLinter struct {
	// contains filtered or unexported fields
}

StreamConfigLinter provides utilities for linting stream configs.

func (*StreamConfigLinter) LintYAML

func (s *StreamConfigLinter) LintYAML(yamlBytes []byte) (lints []Lint, err error)

LintYAML attempts to parse a config in YAML format and, if successful, returns a slice of linting errors, or an error is the parsing failed.

func (*StreamConfigLinter) SetEnvVarLookupFunc

func (s *StreamConfigLinter) SetEnvVarLookupFunc(fn func(context.Context, string) (string, bool)) *StreamConfigLinter

SetEnvVarLookupFunc overrides the default environment variable lookup so that interpolations within a config are resolved by the provided closure function.

func (*StreamConfigLinter) SetRejectDeprecated

func (s *StreamConfigLinter) SetRejectDeprecated(v bool) *StreamConfigLinter

SetRejectDeprecated sets whether deprecated fields should trigger linting errors.

func (*StreamConfigLinter) SetRequireLabels

func (s *StreamConfigLinter) SetRequireLabels(v bool) *StreamConfigLinter

SetRequireLabels sets whether labels must be present for all components that support them.

func (*StreamConfigLinter) SetSkipEnvVarCheck

func (s *StreamConfigLinter) SetSkipEnvVarCheck(v bool) *StreamConfigLinter

SetSkipEnvVarCheck sets whether the linter should ignore cases where environment variables are referenced and do not exist.

type StreamConfigMarshaller

type StreamConfigMarshaller struct {
	// contains filtered or unexported fields
}

StreamConfigMarshaller provides utilities for marshalling stream configs, allowing you to print sanitised, redacted or hydrated configs.

func (*StreamConfigMarshaller) AnyToYAML

func (s *StreamConfigMarshaller) AnyToYAML(v any) (yamlStr string, err error)

AnyToYAML attempts to parse a config expressed as a structured value according to a stream config schema and then marshals it into a YAML string.

func (*StreamConfigMarshaller) SetFieldFilter

func (s *StreamConfigMarshaller) SetFieldFilter(fn func(view *FieldView, value any) bool) *StreamConfigMarshaller

SetFieldFilter sets a closure function to be used when preparing the marshalled config which determines whether the field is kept in the config, otherwise it is removed.

func (*StreamConfigMarshaller) SetHydrateExamples

func (s *StreamConfigMarshaller) SetHydrateExamples(v bool) *StreamConfigMarshaller

SetHydrateExamples sets whether to recurse and hydrate the config with default or example values when marshalling. This is useful for generating example configs.

func (*StreamConfigMarshaller) SetOmitDeprecated

func (s *StreamConfigMarshaller) SetOmitDeprecated(v bool) *StreamConfigMarshaller

SetOmitDeprecated sets whether deprecated fields should be omitted from the marshalled result.

func (*StreamConfigMarshaller) SetScrubSecrets

func (s *StreamConfigMarshaller) SetScrubSecrets(v bool) *StreamConfigMarshaller

SetScrubSecrets sets whether fields that contain secrets should be scrubbed when they contain raw values.

type StreamTemplateTester

type StreamTemplateTester struct {
	// contains filtered or unexported fields
}

StreamTemplateTester provides utilities for testing templates.

func (*StreamTemplateTester) LintYAML

func (s *StreamTemplateTester) LintYAML(yamlBytes []byte) (lints []Lint, err error)

LintYAML attempts to read a template defined in YAML format and lints it.

func (*StreamTemplateTester) RunYAML

func (s *StreamTemplateTester) RunYAML(yamlBytes []byte) (lints []Lint, err error)

RunYAML attempts to read a template defined in YAML format and runs any tests defined within the template.

type TemplatDataPluginExample added in v4.29.0

type TemplatDataPluginExample struct {
	// A title for the example.
	Title string

	// Summary of the example.
	Summary string

	// A config snippet to show.
	Config string
}

TemplatDataPluginExample contains a plugin example ready to inject into documentation.

type TemplateDataPlugin added in v4.29.0

type TemplateDataPlugin struct {
	// The name of the plugin.
	Name string

	// The component type of the plugin.
	Type string

	// A summary of the components purpose.
	Summary string

	// A longer form description of the plugin.
	Description string

	// A general category of the plugin.
	Categories string

	// A list of examples of this plugin in action.
	Examples []TemplatDataPluginExample

	// A list of fields defined by the plugin.
	Fields []TemplateDataPluginField

	// Documentation that should be placed at the bottom of a page.
	Footnotes string

	// An example YAML config containing only common fields.
	CommonConfigYAML string

	// An example YAML config containing all fields.
	AdvancedConfigYAML string

	// A general stability status of the plugin.
	Status string

	// An abstract concept of support level.
	SupportLevel string

	// The version in which this plugin was added.
	Version string
}

TemplateDataPlugin contains information ready to inject within a documentation page.

type TemplateDataPluginField added in v4.29.0

type TemplateDataPluginField struct {
	// The description of the field.
	Description string

	// Whether the field contains secrets.
	IsSecret bool

	// Whether the field is interpolated.
	IsInterpolated bool

	// The type information of the field.
	Type string

	// The version in which this field was added.
	Version string

	// An array of enum options accompanied by a description.
	AnnotatedOptions [][2]string

	// An array of enum options, without annotations.
	Options []string

	// An array of example values.
	Examples []any

	// FullName describes the full dot path name of the field relative to
	// the root of the documented component.
	FullName string

	// ExamplesMarshalled is a list of examples marshalled into YAML format.
	ExamplesMarshalled []string

	// DefaultMarshalled is a marshalled string of the default value in JSON
	// format, if there is one.
	DefaultMarshalled string
}

TemplateDataPluginField provides information about a field for injecting into documentation templates.

type TemplateDataSchema added in v4.29.0

type TemplateDataSchema struct {
	// A list of fields defined by the plugin.
	Fields []TemplateDataPluginField

	// An example YAML config containing only common fields.
	CommonConfigYAML string

	// An example YAML config containing all fields.
	AdvancedConfigYAML string
}

TemplateDataSchema contains schema information ready to inject within a documentation page.

type TracingEvent

type TracingEvent struct {
	Type    TracingEventType
	Content string
	Meta    map[string]any
}

TracingEvent represents a single event that occurred within the stream.

Experimental: This type may change outside of major version releases.

type TracingEventType

type TracingEventType string

TracingEventType describes the type of tracing event a component might experience during a config run.

Experimental: This type may change outside of major version releases.

var (
	// Note: must match up with ./internal/bundle/tracing/events.go.
	TracingEventProduce TracingEventType = "PRODUCE"
	TracingEventConsume TracingEventType = "CONSUME"
	TracingEventDelete  TracingEventType = "DELETE"
	TracingEventError   TracingEventType = "ERROR"
	TracingEventUnknown TracingEventType = "UNKNOWN"
)

Various tracing event types.

Experimental: This type may change outside of major version releases.

type TracingSummary

type TracingSummary struct {
	// contains filtered or unexported fields
}

TracingSummary is a high level description of all traced events. When tracing a stream this should only be queried once the stream has ended.

Experimental: This type may change outside of major version releases.

func (*TracingSummary) InputEvents

func (s *TracingSummary) InputEvents() map[string][]TracingEvent

InputEvents returns a map of input labels to events traced during the execution of a stream pipeline.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) OutputEvents

func (s *TracingSummary) OutputEvents() map[string][]TracingEvent

OutputEvents returns a map of output labels to events traced during the execution of a stream pipeline.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) ProcessorEvents

func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent

ProcessorEvents returns a map of processor labels to events traced during the execution of a stream pipeline.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) TotalInput

func (s *TracingSummary) TotalInput() uint64

TotalInput returns the total traced input messages received.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) TotalOutput

func (s *TracingSummary) TotalOutput() uint64

TotalOutput returns the total traced output messages received.

Experimental: This method may change outside of major version releases.

func (*TracingSummary) TotalProcessorErrors

func (s *TracingSummary) TotalProcessorErrors() uint64

TotalProcessorErrors returns the total traced processor errors occurred.

Experimental: This method may change outside of major version releases.

type WalkedComponent

type WalkedComponent struct {
	ComponentType string
	Name          string
	Label         string
	// contains filtered or unexported fields
}

WalkedComponent is a struct containing information about a component yielded via the WalkComponents method.

func (*WalkedComponent) ConfigYAML

func (w *WalkedComponent) ConfigYAML() string

ConfigYAML returns the configuration of a walked component in YAML form.

Directories

Path Synopsis
Package servicetest provides functions and utilities that might be useful for testing custom Benthos builds.
Package servicetest provides functions and utilities that might be useful for testing custom Benthos builds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL