Documentation ¶
Overview ¶
Package service provides a high level API for registering custom plugin components and executing either a standard Benthos CLI, or programmatically building isolated pipelines with a StreamBuilder API.
For a video guide on Benthos plugins check out: https://youtu.be/uH6mKw-Ly0g And an example repo containing component plugins and tests can be found at: https://github.com/benthosdev/benthos-plugin-example
In order to add custom Bloblang functions and methods use the ./public/bloblang package.
Example (BufferPlugin) ¶
This example demonstrates how to create a buffer plugin. Buffers are an advanced component type that most plugin authors aren't likely to require.
package main import ( "context" "sync" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) type memoryBuffer struct { messages chan service.MessageBatch endOfInputChan chan struct{} closeOnce sync.Once } func newMemoryBuffer(n int) *memoryBuffer { return &memoryBuffer{ messages: make(chan service.MessageBatch, n), endOfInputChan: make(chan struct{}), } } func (m *memoryBuffer) WriteBatch(ctx context.Context, batch service.MessageBatch, aFn service.AckFunc) error { select { case m.messages <- batch: case <-ctx.Done(): return ctx.Err() } // We weaken delivery guarantees here by acknowledging receipt of our batch // immediately. return aFn(ctx, nil) } func yoloIgnoreNacks(context.Context, error) error { // YOLO: Drop messages that are nacked return nil } func (m *memoryBuffer) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { select { case msg := <-m.messages: return msg, yoloIgnoreNacks, nil case <-ctx.Done(): return nil, nil, ctx.Err() case <-m.endOfInputChan: // Input has ended, so return ErrEndOfBuffer if our buffer is empty. select { case msg := <-m.messages: return msg, yoloIgnoreNacks, nil default: return nil, nil, service.ErrEndOfBuffer } } } func (m *memoryBuffer) EndOfInput() { m.closeOnce.Do(func() { close(m.endOfInputChan) }) } func (m *memoryBuffer) Close(ctx context.Context) error { // Nothing to clean up return nil } // This example demonstrates how to create a buffer plugin. Buffers are an // advanced component type that most plugin authors aren't likely to require. func main() { configSpec := service.NewConfigSpec(). Summary("Creates a lame memory buffer that loses data on forced restarts or service crashes."). Field(service.NewIntField("max_batches").Default(100)) err := service.RegisterBatchBuffer("lame_memory", configSpec, func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchBuffer, error) { capacity, err := conf.FieldInt("max_batches") if err != nil { return nil, err } return newMemoryBuffer(capacity), nil }) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (CachePlugin) ¶
This example demonstrates how to create a cache plugin, where the implementation of the cache (type `LossyCache`) also contains fields that should be parsed within the Benthos config.
package main import ( "context" "time" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) // LossyCache is a terrible cache example and silently drops items when the // capacity is reached. It also doesn't respect TTLs. type LossyCache struct { capacity int mDropped *service.MetricCounter items map[string][]byte } func (l *LossyCache) Get(ctx context.Context, key string) ([]byte, error) { if b, ok := l.items[key]; ok { return b, nil } return nil, service.ErrKeyNotFound } func (l *LossyCache) Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error { if len(l.items) >= l.capacity { // Dropped, whoopsie! l.mDropped.Incr(1) return nil } l.items[key] = value return nil } func (l *LossyCache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error { if _, exists := l.items[key]; exists { return service.ErrKeyAlreadyExists } if len(l.items) >= l.capacity { // Dropped, whoopsie! l.mDropped.Incr(1) return nil } l.items[key] = value return nil } func (l *LossyCache) Delete(ctx context.Context, key string) error { delete(l.items, key) return nil } func (l *LossyCache) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a cache plugin, where the // implementation of the cache (type `LossyCache`) also contains fields that // should be parsed within the Benthos config. func main() { configSpec := service.NewConfigSpec(). Summary("Creates a terrible cache with a fixed capacity."). Field(service.NewIntField("capacity").Default(100)) err := service.RegisterCache("lossy", configSpec, func(conf *service.ParsedConfig, mgr *service.Resources) (service.Cache, error) { capacity, err := conf.FieldInt("capacity") if err != nil { return nil, err } return &LossyCache{ capacity: capacity, mDropped: mgr.Metrics().NewCounter("dropped_just_cus"), items: make(map[string][]byte, capacity), }, nil }) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (InputPlugin) ¶
This example demonstrates how to create an input plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.
package main import ( "context" "math/rand" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) type GibberishInput struct { length int } func (g *GibberishInput) Connect(ctx context.Context) error { return nil } func (g *GibberishInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { b := make([]byte, g.length) for k := range b { b[k] = byte((rand.Int() % 94) + 32) } return service.NewMessage(b), func(ctx context.Context, err error) error { // A nack (when err is non-nil) is handled automatically when we // construct using service.AutoRetryNacks, so we don't need to handle // nacks here. return nil }, nil } func (g *GibberishInput) Close(ctx context.Context) error { return nil } // This example demonstrates how to create an input plugin, which is configured // by providing a struct containing the fields to be parsed from within the // Benthos configuration. func main() { configSpec := service.NewConfigSpec(). Summary("Creates a load of gibberish, putting us all out of work."). Field(service.NewIntField("length").Default(100)) constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { length, err := conf.FieldInt("length") if err != nil { return nil, err } return service.AutoRetryNacks(&GibberishInput{length}), nil } err := service.RegisterInput("gibberish", configSpec, constructor) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (OutputBatchedPlugin) ¶
This example demonstrates how to create a batched output plugin, which allows us to specify a batching mechanism and implement an interface that writes a batch of messages in one call.
package main import ( "context" "encoding/json" "fmt" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) type batchOfJSONWriter struct{} func (b *batchOfJSONWriter) Connect(ctx context.Context) error { return nil } func (b *batchOfJSONWriter) WriteBatch(ctx context.Context, msgs service.MessageBatch) error { var messageObjs []interface{} for _, msg := range msgs { msgObj, err := msg.AsStructured() if err != nil { return err } messageObjs = append(messageObjs, msgObj) } outBytes, err := json.Marshal(map[string]interface{}{ "count": len(msgs), "objects": messageObjs, }) if err != nil { return err } fmt.Println(string(outBytes)) return nil } func (b *batchOfJSONWriter) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a batched output plugin, which allows // us to specify a batching mechanism and implement an interface that writes a // batch of messages in one call. func main() { spec := service.NewConfigSpec(). Field(service.NewBatchPolicyField("batching")) // Register our new output, which doesn't require a config schema. err := service.RegisterBatchOutput( "batched_json_stdout", spec, func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, policy service.BatchPolicy, maxInFlight int, err error) { if policy, err = conf.FieldBatchPolicy("batching"); err != nil { return } maxInFlight = 1 out = &batchOfJSONWriter{} return }) if err != nil { panic(err) } // Use the stream builder API to create a Benthos stream that uses our new // output type. builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err = builder.SetYAML(` input: generate: count: 5 interval: 1ms mapping: | root.id = count("batched output example messages") root.text = "some stuff" output: batched_json_stdout: batching: count: 5 `) if err != nil { panic(err) } // Build a stream with our configured components. stream, err := builder.Build() if err != nil { panic(err) } // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. if err = stream.Run(context.Background()); err != nil { panic(err) } }
Output: {"count":5,"objects":[{"id":1,"text":"some stuff"},{"id":2,"text":"some stuff"},{"id":3,"text":"some stuff"},{"id":4,"text":"some stuff"},{"id":5,"text":"some stuff"}]}
Example (OutputPlugin) ¶
This example demonstrates how to create an output plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.
package main import ( "context" "fmt" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) type BlueOutput struct{} func (b *BlueOutput) Connect(ctx context.Context) error { return nil } func (b *BlueOutput) Write(ctx context.Context, msg *service.Message) error { content, err := msg.AsBytes() if err != nil { return err } fmt.Printf("\033[01;34m%s\033[m\n", content) return nil } func (b *BlueOutput) Close(ctx context.Context) error { return nil } // This example demonstrates how to create an output plugin. This example is for // an implementation that does not require any configuration parameters, and // therefore doesn't defined any within the configuration specification. func main() { // Register our new output, which doesn't require a config schema. err := service.RegisterOutput( "blue_stdout", service.NewConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) { return &BlueOutput{}, 1, nil }) if err != nil { panic(err) } // Use the stream builder API to create a Benthos stream that uses our new // output type. builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err = builder.SetYAML(` input: generate: count: 1 interval: 1ms mapping: 'root = "hello world"' output: blue_stdout: {} `) if err != nil { panic(err) } // Build a stream with our configured components. stream, err := builder.Build() if err != nil { panic(err) } // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. if err = stream.Run(context.Background()); err != nil { panic(err) } }
Output: �[01;34mhello world�[m
Example (ProcessorPlugin) ¶
This example demonstrates how to create a processor plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.
package main import ( "bytes" "context" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) type ReverseProcessor struct { logger *service.Logger } func (r *ReverseProcessor) Process(ctx context.Context, m *service.Message) (service.MessageBatch, error) { bytesContent, err := m.AsBytes() if err != nil { return nil, err } newBytes := make([]byte, len(bytesContent)) for i, b := range bytesContent { newBytes[len(newBytes)-i-1] = b } if bytes.Equal(newBytes, bytesContent) { r.logger.Infof("Woah! This is like totally a palindrome: %s", bytesContent) } m.SetBytes(newBytes) return []*service.Message{m}, nil } func (r *ReverseProcessor) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a processor plugin. This example is // for an implementation that does not require any configuration parameters, and // therefore doesn't defined any within the configuration specification. func main() { // Register our new processor, which doesn't require a config schema. err := service.RegisterProcessor( "reverse", service.NewConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { return &ReverseProcessor{logger: mgr.Logger()}, nil }) if err != nil { panic(err) } // Build a Benthos stream that uses our new output type. builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err = builder.SetYAML(` input: generate: count: 1 interval: 1ms mapping: 'root = "hello world"' pipeline: processors: - reverse: {} output: stdout: {} `) if err != nil { panic(err) } // Build a stream with our configured components. stream, err := builder.Build() if err != nil { panic(err) } // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. if err = stream.Run(context.Background()); err != nil { panic(err) } }
Output: dlrow olleh
Example (RateLimitPlugin) ¶
This example demonstrates how to create a rate limit plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.
package main import ( "context" "fmt" "math/rand" "time" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) type RandomRateLimit struct { max time.Duration } func (r *RandomRateLimit) Access(context.Context) (time.Duration, error) { return time.Duration(rand.Int() % int(r.max)), nil } func (r *RandomRateLimit) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a rate limit plugin, which is // configured by providing a struct containing the fields to be parsed from // within the Benthos configuration. func main() { configSpec := service.NewConfigSpec(). Summary("A rate limit that's pretty much just random."). Description("I guess this isn't really that useful, sorry."). Field(service.NewStringField("maximum_duration").Default("1s")) constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.RateLimit, error) { maxDurStr, err := conf.FieldString("maximum_duration") if err != nil { return nil, err } maxDuration, err := time.ParseDuration(maxDurStr) if err != nil { return nil, fmt.Errorf("invalid max duration: %w", err) } return &RandomRateLimit{maxDuration}, nil } err := service.RegisterRateLimit("random", configSpec, constructor) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (StreamBuilderConfig) ¶
This example demonstrates how to use a stream builder to parse and execute a full Benthos config.
package main import ( "context" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err := builder.SetYAML(` input: generate: count: 1 interval: 1ms mapping: 'root = "hello world"' pipeline: processors: - bloblang: 'root = content().uppercase()' output: stdout: {} logger: level: none `) panicOnErr(err) // Build a stream with our configured components. stream, err := builder.Build() panicOnErr(err) // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. err = stream.Run(context.Background()) panicOnErr(err) }
Output: HELLO WORLD
Example (StreamBuilderConfigAddMethods) ¶
This example demonstrates how to use a stream builder to assemble a stream of Benthos components by adding snippets of configs for different component types, and then execute it. You can use the Add methods to append any number of components to the stream, following fan in and fan out patterns for inputs and outputs respectively.
package main import ( "context" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } builder := service.NewStreamBuilder() err := builder.AddInputYAML(` generate: count: 1 interval: 1ms mapping: 'root = "hello world"' `) panicOnErr(err) err = builder.AddProcessorYAML(`bloblang: 'root = content().uppercase()'`) panicOnErr(err) err = builder.AddOutputYAML(`stdout: {}`) panicOnErr(err) err = builder.SetLoggerYAML(`level: off`) panicOnErr(err) // Build a stream with our configured components. stream, err := builder.Build() panicOnErr(err) // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. err = stream.Run(context.Background()) panicOnErr(err) }
Output: HELLO WORLD
Example (StreamBuilderMultipleStreams) ¶
This example demonstrates using the stream builder API to create and run two independent streams.
package main import ( "context" "sync" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } // Build the first stream pipeline. Note that we configure each pipeline // with its HTTP server disabled as otherwise we would see a port collision // when they both attempt to bind to the default address `0.0.0.0:4195`. // // Alternatively, we could choose to configure each with their own address // with the field `http.address`, or we could call `SetHTTPMux` on the // builder in order to explicitly override the configured server. builderOne := service.NewStreamBuilder() err := builderOne.SetYAML(` http: enabled: false input: generate: count: 1 interval: 1ms mapping: 'root = "hello world one"' pipeline: processors: - bloblang: 'root = content().uppercase()' output: stdout: {} `) panicOnErr(err) streamOne, err := builderOne.Build() panicOnErr(err) builderTwo := service.NewStreamBuilder() err = builderTwo.SetYAML(` http: enabled: false input: generate: count: 1 interval: 1ms mapping: 'root = "hello world two"' pipeline: processors: - sleep: duration: 500ms - bloblang: 'root = content().capitalize()' output: stdout: {} `) panicOnErr(err) streamTwo, err := builderTwo.Build() panicOnErr(err) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() panicOnErr(streamOne.Run(context.Background())) }() go func() { defer wg.Done() panicOnErr(streamTwo.Run(context.Background())) }() wg.Wait() }
Output: HELLO WORLD ONE Hello World Two
Example (StreamBuilderPush) ¶
This example demonstrates how to use a stream builder to assemble a processing pipeline that you can push messages into and extract via closures.
package main import ( "bytes" "context" "fmt" "time" "github.com/utilitywarehouse/data-infra-benthos/v4/public/service" // Import all standard Benthos components _ "github.com/utilitywarehouse/data-infra-benthos/v4/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } builder := service.NewStreamBuilder() err := builder.SetLoggerYAML(`level: off`) panicOnErr(err) err = builder.AddProcessorYAML(`bloblang: 'root = content().uppercase()'`) panicOnErr(err) err = builder.AddProcessorYAML(`bloblang: 'root = "check this out: " + content()'`) panicOnErr(err) // Obtain a closure func that allows us to push data into the stream, this // is treated like any other input, which also means it's possible to use // this along with regular configured inputs. sendFn, err := builder.AddProducerFunc() panicOnErr(err) // Define a closure func that receives messages as an output of the stream. // It's also possible to use this along with regular configured outputs. var outputBuf bytes.Buffer err = builder.AddConsumerFunc(func(c context.Context, m *service.Message) error { msgBytes, err := m.AsBytes() if err != nil { return err } _, err = fmt.Fprintf(&outputBuf, "received: %s\n", msgBytes) return err }) panicOnErr(err) stream, err := builder.Build() panicOnErr(err) go func() { perr := sendFn(context.Background(), service.NewMessage([]byte("hello world"))) panicOnErr(perr) perr = sendFn(context.Background(), service.NewMessage([]byte("I'm pushing data into the stream"))) panicOnErr(perr) perr = stream.StopWithin(time.Second) panicOnErr(perr) }() // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. err = stream.Run(context.Background()) panicOnErr(err) fmt.Println(outputBuf.String()) }
Output: received: check this out: HELLO WORLD received: check this out: I'M PUSHING DATA INTO THE STREAM
Index ¶
- Variables
- func RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
- func RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
- func RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
- func RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
- func RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
- func RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
- func RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error
- func RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error
- func RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
- func RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
- func RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
- func RunCLI(ctx context.Context)
- func XFormatConfigJSON() ([]byte, error)
- type AckFunc
- type BatchBuffer
- type BatchBufferConstructor
- type BatchInput
- type BatchInputConstructor
- type BatchOutput
- type BatchOutputConstructor
- type BatchPolicy
- type BatchProcessor
- type BatchProcessorConstructor
- type Batcher
- type Cache
- type CacheConstructor
- type CacheItem
- type Closer
- type ConfigField
- func NewAnyField(name string) *ConfigField
- func NewAnyListField(name string) *ConfigField
- func NewBackOffField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField
- func NewBackOffToggledField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField
- func NewBatchPolicyField(name string) *ConfigField
- func NewBloblangField(name string) *ConfigField
- func NewBoolField(name string) *ConfigField
- func NewDurationField(name string) *ConfigField
- func NewFloatField(name string) *ConfigField
- func NewInputField(name string) *ConfigField
- func NewInputListField(name string) *ConfigField
- func NewIntField(name string) *ConfigField
- func NewIntListField(name string) *ConfigField
- func NewIntMapField(name string) *ConfigField
- func NewInternalField(ifield docs.FieldSpec) *ConfigField
- func NewInterpolatedStringField(name string) *ConfigField
- func NewInterpolatedStringMapField(name string) *ConfigField
- func NewMetadataFilterField(name string) *ConfigField
- func NewObjectField(name string, fields ...*ConfigField) *ConfigField
- func NewObjectListField(name string, fields ...*ConfigField) *ConfigField
- func NewOutputField(name string) *ConfigField
- func NewOutputListField(name string) *ConfigField
- func NewProcessorField(name string) *ConfigField
- func NewProcessorListField(name string) *ConfigField
- func NewStringAnnotatedEnumField(name string, options map[string]string) *ConfigField
- func NewStringEnumField(name string, options ...string) *ConfigField
- func NewStringField(name string) *ConfigField
- func NewStringListField(name string) *ConfigField
- func NewStringMapField(name string) *ConfigField
- func NewTLSField(name string) *ConfigField
- func NewTLSToggledField(name string) *ConfigField
- func (c *ConfigField) Advanced() *ConfigField
- func (c *ConfigField) Default(v interface{}) *ConfigField
- func (c *ConfigField) Deprecated() *ConfigField
- func (c *ConfigField) Description(d string) *ConfigField
- func (c *ConfigField) Example(e interface{}) *ConfigField
- func (c *ConfigField) LintRule(blobl string) *ConfigField
- func (c *ConfigField) Optional() *ConfigField
- func (c *ConfigField) Version(v string) *ConfigField
- type ConfigSpec
- func (c *ConfigSpec) Beta() *ConfigSpec
- func (c *ConfigSpec) Categories(categories ...string) *ConfigSpec
- func (c *ConfigSpec) Deprecated() *ConfigSpec
- func (c *ConfigSpec) Description(description string) *ConfigSpec
- func (c *ConfigSpec) EncodeJSON(v []byte) error
- func (c *ConfigSpec) Example(title, summary, config string) *ConfigSpec
- func (c *ConfigSpec) Field(f *ConfigField) *ConfigSpec
- func (c *ConfigSpec) LintRule(blobl string) *ConfigSpec
- func (c *ConfigSpec) ParseYAML(yamlStr string, env *Environment) (*ParsedConfig, error)
- func (c *ConfigSpec) Stable() *ConfigSpec
- func (c *ConfigSpec) Summary(summary string) *ConfigSpec
- func (c *ConfigSpec) Version(v string) *ConfigSpec
- type ConfigView
- type Environment
- func (e *Environment) Clone() *Environment
- func (e *Environment) NewStreamBuilder() *StreamBuilder
- func (e *Environment) RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
- func (e *Environment) RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
- func (e *Environment) RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
- func (e *Environment) RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
- func (e *Environment) RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
- func (e *Environment) RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
- func (e *Environment) RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error
- func (e *Environment) RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error
- func (e *Environment) RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
- func (e *Environment) RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
- func (e *Environment) RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
- func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment)
- func (e *Environment) WalkBuffers(fn func(name string, config *ConfigView))
- func (e *Environment) WalkCaches(fn func(name string, config *ConfigView))
- func (e *Environment) WalkInputs(fn func(name string, config *ConfigView))
- func (e *Environment) WalkMetrics(fn func(name string, config *ConfigView))
- func (e *Environment) WalkOutputs(fn func(name string, config *ConfigView))
- func (e *Environment) WalkProcessors(fn func(name string, config *ConfigView))
- func (e *Environment) WalkRateLimits(fn func(name string, config *ConfigView))
- func (e *Environment) WalkTracers(fn func(name string, config *ConfigView))
- type HTTPMultiplexer
- type Input
- type InputConstructor
- type InterpolatedString
- type Lint
- type LintError
- type Logger
- func (l *Logger) Debug(message string)
- func (l *Logger) Debugf(template string, args ...interface{})
- func (l *Logger) Error(message string)
- func (l *Logger) Errorf(template string, args ...interface{})
- func (l *Logger) Info(message string)
- func (l *Logger) Infof(template string, args ...interface{})
- func (l *Logger) Trace(message string)
- func (l *Logger) Tracef(template string, args ...interface{})
- func (l *Logger) Warn(message string)
- func (l *Logger) Warnf(template string, args ...interface{})
- func (l *Logger) With(keyValuePairs ...interface{}) *Logger
- type Message
- func (m *Message) AsBytes() ([]byte, error)
- func (m *Message) AsStructured() (interface{}, error)
- func (m *Message) AsStructuredMut() (interface{}, error)
- func (m *Message) BloblangQuery(blobl *bloblang.Executor) (*Message, error)
- func (m *Message) Context() context.Context
- func (m *Message) Copy() *Message
- func (m *Message) GetError() error
- func (m *Message) MetaDelete(key string)
- func (m *Message) MetaGet(key string) (string, bool)
- func (m *Message) MetaSet(key, value string)
- func (m *Message) MetaWalk(fn func(string, string) error) error
- func (m *Message) SetBytes(b []byte)
- func (m *Message) SetError(err error)
- func (m *Message) SetStructured(i interface{})
- func (m *Message) WithContext(ctx context.Context) *Message
- type MessageBatch
- type MessageBatchHandlerFunc
- type MessageHandlerFunc
- type MetadataFilter
- type MetricCounter
- type MetricGauge
- type MetricTimer
- type Metrics
- type MetricsExporter
- type MetricsExporterConstructor
- type MetricsExporterCounter
- type MetricsExporterCounterCtor
- type MetricsExporterGauge
- type MetricsExporterGaugeCtor
- type MetricsExporterTimer
- type MetricsExporterTimerCtor
- type MockResourcesOptFn
- type OtelTracerProviderConstructor
- type Output
- type OutputConstructor
- type OwnedInput
- type OwnedOutput
- type OwnedProcessor
- type ParsedConfig
- func (p *ParsedConfig) Contains(path ...string) bool
- func (p *ParsedConfig) FieldAny(path ...string) (interface{}, error)
- func (p *ParsedConfig) FieldAnyList(path ...string) ([]*ParsedConfig, error)
- func (p *ParsedConfig) FieldBackOff(path ...string) (*backoff.ExponentialBackOff, error)
- func (p *ParsedConfig) FieldBackOffToggled(path ...string) (boff *backoff.ExponentialBackOff, enabled bool, err error)
- func (p *ParsedConfig) FieldBatchPolicy(path ...string) (conf BatchPolicy, err error)
- func (p *ParsedConfig) FieldBloblang(path ...string) (*bloblang.Executor, error)
- func (p *ParsedConfig) FieldBool(path ...string) (bool, error)
- func (p *ParsedConfig) FieldDuration(path ...string) (time.Duration, error)
- func (p *ParsedConfig) FieldFloat(path ...string) (float64, error)
- func (p *ParsedConfig) FieldInput(path ...string) (*OwnedInput, error)
- func (p *ParsedConfig) FieldInputList(path ...string) ([]*OwnedInput, error)
- func (p *ParsedConfig) FieldInt(path ...string) (int, error)
- func (p *ParsedConfig) FieldIntList(path ...string) ([]int, error)
- func (p *ParsedConfig) FieldIntMap(path ...string) (map[string]int, error)
- func (p *ParsedConfig) FieldInterpolatedString(path ...string) (*InterpolatedString, error)
- func (p *ParsedConfig) FieldInterpolatedStringMap(path ...string) (map[string]*InterpolatedString, error)
- func (p *ParsedConfig) FieldMetadataFilter(path ...string) (f *MetadataFilter, err error)
- func (p *ParsedConfig) FieldObjectList(path ...string) ([]*ParsedConfig, error)
- func (p *ParsedConfig) FieldOutput(path ...string) (*OwnedOutput, error)
- func (p *ParsedConfig) FieldOutputList(path ...string) ([]*OwnedOutput, error)
- func (p *ParsedConfig) FieldProcessor(path ...string) (*OwnedProcessor, error)
- func (p *ParsedConfig) FieldProcessorList(path ...string) ([]*OwnedProcessor, error)
- func (p *ParsedConfig) FieldString(path ...string) (string, error)
- func (p *ParsedConfig) FieldStringList(path ...string) ([]string, error)
- func (p *ParsedConfig) FieldStringMap(path ...string) (map[string]string, error)
- func (p *ParsedConfig) FieldTLS(path ...string) (*tls.Config, error)
- func (p *ParsedConfig) FieldTLSToggled(path ...string) (tconf *tls.Config, enabled bool, err error)
- func (p *ParsedConfig) Namespace(path ...string) *ParsedConfig
- type PrintLogger
- type Processor
- type ProcessorConstructor
- type RateLimit
- type RateLimitConstructor
- type ResourceInput
- type ResourceOutput
- type Resources
- func (r *Resources) AccessCache(ctx context.Context, name string, fn func(c Cache)) error
- func (r *Resources) AccessInput(ctx context.Context, name string, fn func(i *ResourceInput)) error
- func (r *Resources) AccessOutput(ctx context.Context, name string, fn func(o *ResourceOutput)) error
- func (r *Resources) AccessRateLimit(ctx context.Context, name string, fn func(r RateLimit)) error
- func (r *Resources) HasCache(name string) bool
- func (r *Resources) HasInput(name string) bool
- func (r *Resources) HasOutput(name string) bool
- func (r *Resources) HasRateLimit(name string) bool
- func (r *Resources) Label() string
- func (r *Resources) Logger() *Logger
- func (r *Resources) Metrics() *Metrics
- func (r *Resources) OtelTracer() trace.TracerProvider
- type Stream
- type StreamBuilder
- func (s *StreamBuilder) AddBatchConsumerFunc(fn MessageBatchHandlerFunc) error
- func (s *StreamBuilder) AddBatchProducerFunc() (MessageBatchHandlerFunc, error)
- func (s *StreamBuilder) AddCacheYAML(conf string) error
- func (s *StreamBuilder) AddConsumerFunc(fn MessageHandlerFunc) error
- func (s *StreamBuilder) AddInputYAML(conf string) error
- func (s *StreamBuilder) AddOutputYAML(conf string) error
- func (s *StreamBuilder) AddProcessorYAML(conf string) error
- func (s *StreamBuilder) AddProducerFunc() (MessageHandlerFunc, error)
- func (s *StreamBuilder) AddRateLimitYAML(conf string) error
- func (s *StreamBuilder) AddResourcesYAML(conf string) error
- func (s *StreamBuilder) AsYAML() (string, error)
- func (s *StreamBuilder) Build() (*Stream, error)
- func (s *StreamBuilder) BuildTraced() (*Stream, *TracingSummary, error)
- func (s *StreamBuilder) DisableLinting()
- func (s *StreamBuilder) SetBufferYAML(conf string) error
- func (s *StreamBuilder) SetFields(pathValues ...interface{}) error
- func (s *StreamBuilder) SetHTTPMux(m HTTPMultiplexer)
- func (s *StreamBuilder) SetLoggerYAML(conf string) error
- func (s *StreamBuilder) SetMetricsYAML(conf string) error
- func (s *StreamBuilder) SetPrintLogger(l PrintLogger)
- func (s *StreamBuilder) SetThreads(n int)
- func (s *StreamBuilder) SetTracerYAML(conf string) error
- func (s *StreamBuilder) SetYAML(conf string) error
- type TracingEvent
- type TracingEventType
- type TracingSummary
- func (s *TracingSummary) InputEvents() map[string][]TracingEvent
- func (s *TracingSummary) OutputEvents() map[string][]TracingEvent
- func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent
- func (s *TracingSummary) TotalInput() uint64
- func (s *TracingSummary) TotalOutput() uint64
- func (s *TracingSummary) TotalProcessorErrors() uint64
Examples ¶
- Package (BufferPlugin)
- Package (CachePlugin)
- Package (InputPlugin)
- Package (OutputBatchedPlugin)
- Package (OutputPlugin)
- Package (ProcessorPlugin)
- Package (RateLimitPlugin)
- Package (StreamBuilderConfig)
- Package (StreamBuilderConfigAddMethods)
- Package (StreamBuilderMultipleStreams)
- Package (StreamBuilderPush)
Constants ¶
This section is empty.
Variables ¶
var ( ErrKeyAlreadyExists = errors.New("key already exists") ErrKeyNotFound = errors.New("key does not exist") )
Errors returned by cache types.
var ( // ErrNotConnected is returned by inputs and outputs when their Read or // Write methods are called and the connection that they maintain is lost. // This error prompts the upstream component to call Connect until the // connection is re-established. ErrNotConnected = errors.New("not connected") // ErrEndOfInput is returned by inputs that have exhausted their source of // data to the point where subsequent Read calls will be ineffective. This // error prompts the upstream component to gracefully terminate the // pipeline. ErrEndOfInput = errors.New("end of input") // ErrEndOfBuffer is returned by a buffer Read/ReadBatch method when the // contents of the buffer has been emptied and the source of the data is // ended (as indicated by EndOfInput). This error prompts the upstream // component to gracefully terminate the pipeline. ErrEndOfBuffer = errors.New("end of buffer") )
Functions ¶
func RegisterBatchBuffer ¶
func RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.
Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.
func RegisterBatchInput ¶
func RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.
func RegisterBatchOutput ¶
func RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).
If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.
func RegisterBatchProcessor ¶
func RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
func RegisterCache ¶
func RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.
func RegisterInput ¶
func RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.
func RegisterMetricsExporter ¶
func RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error
RegisterMetricsExporter attempts to register a new metrics exporter plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.
func RegisterOtelTracerProvider ¶
func RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error
RegisterOtelTracerProvider attempts to register a new open telemetry tracer provider plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.
Experimental: This type signature is experimental and therefore subject to change outside of major version releases.
func RegisterOutput ¶
func RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
func RegisterProcessor ¶
func RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
For simple transformations consider implementing a Bloblang plugin method instead.
func RegisterRateLimit ¶
func RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.
func RunCLI ¶
RunCLI executes Benthos as a CLI, allowing users to specify a configuration file path(s) and execute subcommands for linting configs, testing configs, etc. This is how a standard distribution of Benthos operates.
This call blocks until either:
1. The service shuts down gracefully due to the inputs closing 2. A termination signal is received 3. The provided context is cancelled
This function must only be called once during the entire lifecycle of your program, as it interacts with singleton state. In order to manage multiple Benthos stream lifecycles in a program use the StreamBuilder API instead.
func XFormatConfigJSON ¶
XFormatConfigJSON returns a byte slice of the Benthos configuration spec formatted as a JSON object. The schema of this method is undocumented and is not intended for general use.
Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.
Types ¶
type AckFunc ¶
AckFunc is a common function returned by inputs that must be called once for each message consumed. This function ensures that the source of the message receives either an acknowledgement (err is nil) or an error that can either be propagated upstream as a nack, or trigger a reattempt at delivering the same message.
If your input implementation doesn't have a specific mechanism for dealing with a nack then you can wrap your input implementation with AutoRetryNacks to get automatic retries.
type BatchBuffer ¶
type BatchBuffer interface { // Write a batch of messages to the buffer, the batch is accompanied with an // acknowledge function. A non-nil error should be returned if it is not // possible to store the given message batch in the buffer. // // If a nil error is returned the buffer assumes responsibility for calling // the acknowledge function at least once during the lifetime of the // message. // // This could be at the point where the message is written to the buffer, // which weakens delivery guarantees but can be useful for decoupling the // input from downstream components. Alternatively, this could be when the // associated batch has been read from the buffer and acknowledged // downstream, which preserves delivery guarantees. WriteBatch(context.Context, MessageBatch, AckFunc) error // Read a batch of messages from the buffer. This call should block until // either a batch is ready to consume, the provided context is cancelled or // EndOfInput has been called which indicates that the buffer is no longer // being populated with new messages. // // The returned acknowledge function will be called when a consumed message // batch has been processed and sent downstream. It is up to the buffer // implementation whether the ack function is used, it might be used in // order to "commit" the removal of a message from the buffer in cases where // the buffer is a persisted storage solution, or in cases where the output // of the buffer is temporal (a windowing algorithm, etc) it might be // considered correct to simply drop message batches that are not acked. // // When the buffer is closed (EndOfInput has been called and no more // messages are available) this method should return an ErrEndOfBuffer in // order to indicate the end of the buffered stream. // // It is valid to return a batch of only one message. ReadBatch(context.Context) (MessageBatch, AckFunc, error) // EndOfInput indicates to the buffer that the input has ended and that once // the buffer is depleted it should return ErrEndOfBuffer from ReadBatch in // order to gracefully shut down the pipeline. // // EndOfInput should be idempotent as it may be called more than once. EndOfInput() Closer }
BatchBuffer is an interface implemented by Buffers able to read and write message batches. Buffers are a component type that are placed after inputs, and decouples the acknowledgement system of the inputs from the rest of the pipeline.
Buffers are useful when implementing buffers intended to relieve back pressure from upstream components, or when implementing message aggregators where the concept of discrete messages running through a pipeline no longer applies (such as with windowing algorithms).
Buffers are advanced component types that weaken delivery guarantees of a Benthos pipeline. Therefore, if you aren't absolutely sure that a component you wish to build should be a buffer type then it likely shouldn't be.
type BatchBufferConstructor ¶
type BatchBufferConstructor func(conf *ParsedConfig, mgr *Resources) (BatchBuffer, error)
BatchBufferConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a buffer based on the config, or an error.
Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.
type BatchInput ¶
type BatchInput interface { // Establish a connection to the upstream service. Connect will always be // called first when a reader is instantiated, and will be continuously // called with back off until a nil error is returned. // // The provided context remains open only for the duration of the connecting // phase, and should not be used to establish the lifetime of the connection // itself. // // Once Connect returns a nil error the Read method will be called until // either ErrNotConnected is returned, or the reader is closed. Connect(context.Context) error // Read a message batch from a source, along with a function to be called // once the entire batch can be either acked (successfully sent or // intentionally filtered) or nacked (failed to be processed or dispatched // to the output). // // The AckFunc will be called for every message batch at least once, but // there are no guarantees as to when this will occur. If your input // implementation doesn't have a specific mechanism for dealing with a nack // then you can wrap your input implementation with AutoRetryNacksBatched to // get automatic retries. // // If this method returns ErrNotConnected then ReadBatch will not be called // again until Connect has returned a nil error. If ErrEndOfInput is // returned then Read will no longer be called and the pipeline will // gracefully terminate. ReadBatch(context.Context) (MessageBatch, AckFunc, error) Closer }
BatchInput is an interface implemented by Benthos inputs that produce messages in batches, where there is a desire to process and send the batch as a logical group rather than as individual messages.
Calls to ReadBatch should block until either a message batch is ready to process, the connection is lost, or the provided context is cancelled.
func AutoRetryNacksBatched ¶
func AutoRetryNacksBatched(i BatchInput) BatchInput
AutoRetryNacksBatched wraps a batched input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.
When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.
type BatchInputConstructor ¶
type BatchInputConstructor func(conf *ParsedConfig, mgr *Resources) (BatchInput, error)
BatchInputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a batched reader based on the config, or an error.
type BatchOutput ¶
type BatchOutput interface { // Establish a connection to the downstream service. Connect will always be // called first when a writer is instantiated, and will be continuously // called with back off until a nil error is returned. // // Once Connect returns a nil error the write method will be called until // either ErrNotConnected is returned, or the writer is closed. Connect(context.Context) error // Write a batch of messages to a sink, or return an error if delivery is // not possible. // // If this method returns ErrNotConnected then write will not be called // again until Connect has returned a nil error. WriteBatch(context.Context, MessageBatch) error Closer }
BatchOutput is an interface implemented by Benthos outputs that require Benthos to batch messages before dispatch in order to improve throughput. Each call to WriteBatch should block until either all messages in the batch have been successfully or unsuccessfully sent, or the context is cancelled.
Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel batched write calls the output supports.
type BatchOutputConstructor ¶
type BatchOutputConstructor func(conf *ParsedConfig, mgr *Resources) (out BatchOutput, batchPolicy BatchPolicy, maxInFlight int, err error)
BatchOutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config, a batching policy, and a maximum number of in-flight message batches to allow, or an error.
type BatchPolicy ¶
type BatchPolicy struct { ByteSize int Count int Check string Period string // contains filtered or unexported fields }
BatchPolicy describes the mechanisms by which batching should be performed of messages destined for a Batch output. This is returned by constructors of batch outputs.
func (BatchPolicy) NewBatcher ¶
func (b BatchPolicy) NewBatcher(res *Resources) (*Batcher, error)
NewBatcher creates a batching mechanism from the policy.
type BatchProcessor ¶
type BatchProcessor interface { // Process a batch of messages into one or more resulting batches, or return // an error if the entire batch could not be processed. If zero messages are // returned and the error is nil then all messages are filtered. // // The provided MessageBatch should NOT be modified, in order to return a // mutated batch a copy of the slice should be created instead. // // When an error is returned all of the input messages will continue down // the pipeline but will be marked with the error with *message.SetError, // and metrics and logs will be emitted. // // In order to add errors to individual messages of the batch for downstream // handling use *message.SetError(err) and return it in the resulting batch // with a nil error. // // The Message types returned MUST be derived from the provided messages, // and CANNOT be custom implementations of Message. In order to copy the // provided messages use the Copy method. ProcessBatch(context.Context, MessageBatch) ([]MessageBatch, error) Closer }
BatchProcessor is a Benthos processor implementation that works against batches of messages, which allows windowed processing.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.
type BatchProcessorConstructor ¶
type BatchProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (BatchProcessor, error)
BatchProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher provides a batching mechanism where messages can be added one-by-one with a boolean return indicating whether the batching policy has been triggered.
Upon triggering the policy it is the responsibility of the owner of this batcher to call Flush, which returns all the pending messages in the batch.
This batcher may contain processors that are executed during the flush, therefore it is important to call Close when this batcher is no longer required, having also called Flush if appropriate.
func (*Batcher) Add ¶
Add a message to the batch. Returns true if the batching policy has been triggered by this new addition, in which case Flush should be called.
func (*Batcher) Close ¶
Close the batching policy, which cleans up any resources used by batching processors.
func (*Batcher) Flush ¶
func (b *Batcher) Flush(ctx context.Context) (batch MessageBatch, err error)
Flush pending messages into a batch, apply any batching processors that are part of the batching policy, and then return the result.
type Cache ¶
type Cache interface { // Get a cache item. Get(ctx context.Context, key string) ([]byte, error) // Set a cache item, specifying an optional TTL. It is okay for caches to // ignore the ttl parameter if it isn't possible to implement. Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error // Add is the same operation as Set except that it returns an error if the // key already exists. It is okay for caches to return nil on duplicates if // it isn't possible to implement. Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error // Delete attempts to remove a key. If the key does not exist then it is // considered correct to return an error, however, for cache implementations // where it is difficult to determine this then it is acceptable to return // nil. Delete(ctx context.Context, key string) error Closer }
Cache is an interface implemented by Benthos caches.
type CacheConstructor ¶
type CacheConstructor func(conf *ParsedConfig, mgr *Resources) (Cache, error)
CacheConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a cache based on the config, or an error.
type Closer ¶
type Closer interface { // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
Closer is implemented by components that support stopping and cleaning up their underlying resources.
type ConfigField ¶
type ConfigField struct {
// contains filtered or unexported fields
}
ConfigField describes a field within a component configuration, to be added to a ConfigSpec.
func NewAnyField ¶
func NewAnyField(name string) *ConfigField
NewAnyField describes a new config field that can assume any value type without triggering a config parse or linting error.
func NewAnyListField ¶
func NewAnyListField(name string) *ConfigField
NewAnyListField describes a new config field consisting of a list of values that can assume any value type without triggering a config parse or linting error.
func NewBackOffField ¶
func NewBackOffField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField
NewBackOffField defines a new object type config field that describes an exponential back off policy, often used for timing retry attempts. It is then possible to extract a *backoff.ExponentialBackOff from the resulting parsed config with the method FieldBackOff.
It is possible to configure a back off policy that has no upper bound (no maximum elapsed time set). In cases where this would be problematic the field allowUnbounded should be set `false` in order to add linting rules that ensure an upper bound is set.
The defaults struct is optional, and if provided will be used to establish default values for time interval fields. Otherwise the chosen defaults result in one minute of retry attempts, starting at 500ms intervals.
func NewBackOffToggledField ¶
func NewBackOffToggledField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField
NewBackOffToggledField defines a new object type config field that describes an exponential back off policy, often used for timing retry attempts. It is then possible to extract a *backoff.ExponentialBackOff from the resulting parsed config with the method FieldBackOff. This Toggled variant includes a field `enabled` that is `false` by default.
It is possible to configure a back off policy that has no upper bound (no maximum elapsed time set). In cases where this would be problematic the field allowUnbounded should be set `false` in order to add linting rules that ensure an upper bound is set.
The defaults struct is optional, and if provided will be used to establish default values for time interval fields. Otherwise the chosen defaults result in one minute of retry attempts, starting at 500ms intervals.
func NewBatchPolicyField ¶
func NewBatchPolicyField(name string) *ConfigField
NewBatchPolicyField defines a new object type config field that describes a batching policy for batched outputs. It is then possible to extract a BatchPolicy from the resulting parsed config with the method FieldBatchPolicy.
func NewBloblangField ¶
func NewBloblangField(name string) *ConfigField
NewBloblangField defines a new config field that describes a Bloblang mapping string. It is then possible to extract a *bloblang.Executor from the resulting parsed config with the method FieldBloblang.
func NewBoolField ¶
func NewBoolField(name string) *ConfigField
NewBoolField describes a new bool type config field.
func NewDurationField ¶
func NewDurationField(name string) *ConfigField
NewDurationField describes a new duration string type config field, allowing users to define a time interval with strings of the form 60s, 3m, etc.
func NewFloatField ¶
func NewFloatField(name string) *ConfigField
NewFloatField describes a new float type config field.
func NewInputField ¶
func NewInputField(name string) *ConfigField
NewInputField defines a new input field, it is then possible to extract an OwnedInput from the resulting parsed config with the method FieldInput.
func NewInputListField ¶
func NewInputListField(name string) *ConfigField
NewInputListField defines a new input list field, it is then possible to extract a list of OwnedInput from the resulting parsed config with the method FieldInputList.
func NewIntField ¶
func NewIntField(name string) *ConfigField
NewIntField describes a new int type config field.
func NewIntListField ¶
func NewIntListField(name string) *ConfigField
NewIntListField describes a new config field consisting of a list of integers.
func NewIntMapField ¶
func NewIntMapField(name string) *ConfigField
NewIntMapField describes a new config field consisting of an object of arbitrary keys with integer values.
func NewInternalField ¶
func NewInternalField(ifield docs.FieldSpec) *ConfigField
NewInternalField returns a ConfigField derived from an internal package field spec. This function is for internal use only.
func NewInterpolatedStringField ¶
func NewInterpolatedStringField(name string) *ConfigField
NewInterpolatedStringField defines a new config field that describes a dynamic string that supports Bloblang interpolation functions. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedString.
func NewInterpolatedStringMapField ¶
func NewInterpolatedStringMapField(name string) *ConfigField
NewInterpolatedStringMapField describes a new config field consisting of an object of arbitrary keys with interpolated string values. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedStringMap.
func NewMetadataFilterField ¶
func NewMetadataFilterField(name string) *ConfigField
NewMetadataFilterField creates a config field spec for describing which metadata keys to include for a given purpose. This includes prefix based and regular expression based methods. This field is often used for making metadata written to output destinations explicit.
func NewObjectField ¶
func NewObjectField(name string, fields ...*ConfigField) *ConfigField
NewObjectField describes a new object type config field, consisting of one or more child fields.
func NewObjectListField ¶
func NewObjectListField(name string, fields ...*ConfigField) *ConfigField
NewObjectListField describes a new list type config field consisting of objects with one or more child fields.
func NewOutputField ¶
func NewOutputField(name string) *ConfigField
NewOutputField defines a new output field, it is then possible to extract an OwnedOutput from the resulting parsed config with the method FieldOutput.
func NewOutputListField ¶
func NewOutputListField(name string) *ConfigField
NewOutputListField defines a new output list field, it is then possible to extract a list of OwnedOutput from the resulting parsed config with the method FieldOutputList.
func NewProcessorField ¶
func NewProcessorField(name string) *ConfigField
NewProcessorField defines a new processor field, it is then possible to extract an OwnedProcessor from the resulting parsed config with the method FieldProcessor.
func NewProcessorListField ¶
func NewProcessorListField(name string) *ConfigField
NewProcessorListField defines a new processor list field, it is then possible to extract a list of OwnedProcessor from the resulting parsed config with the method FieldProcessorList.
func NewStringAnnotatedEnumField ¶
func NewStringAnnotatedEnumField(name string, options map[string]string) *ConfigField
NewStringAnnotatedEnumField describes a new string type config field that can have one of a discrete list of values, where each value must be accompanied by a description that annotates its behaviour in the documentation.
func NewStringEnumField ¶
func NewStringEnumField(name string, options ...string) *ConfigField
NewStringEnumField describes a new string type config field that can have one of a discrete list of values.
func NewStringField ¶
func NewStringField(name string) *ConfigField
NewStringField describes a new string type config field.
func NewStringListField ¶
func NewStringListField(name string) *ConfigField
NewStringListField describes a new config field consisting of a list of strings.
func NewStringMapField ¶
func NewStringMapField(name string) *ConfigField
NewStringMapField describes a new config field consisting of an object of arbitrary keys with string values.
func NewTLSField ¶
func NewTLSField(name string) *ConfigField
NewTLSField defines a new object type config field that describes TLS settings for networked components. It is then possible to extract a *tls.Config from the resulting parsed config with the method FieldTLS.
func NewTLSToggledField ¶
func NewTLSToggledField(name string) *ConfigField
NewTLSToggledField defines a new object type config field that describes TLS settings for networked components. This field differs from a standard TLSField as it includes a boolean field `enabled` which allows users to explicitly configure whether TLS should be enabled or not.
A *tls.Config as well as an enabled boolean value can be extracted from the resulting parsed config with the method FieldTLSToggled.
func (*ConfigField) Advanced ¶
func (c *ConfigField) Advanced() *ConfigField
Advanced marks a config field as being advanced, and therefore it will not appear in simplified documentation examples.
func (*ConfigField) Default ¶
func (c *ConfigField) Default(v interface{}) *ConfigField
Default specifies a default value that this field will assume if it is omitted from a provided config. Fields that do not have a default value are considered mandatory, and so parsing a config will fail in their absence.
func (*ConfigField) Deprecated ¶
func (c *ConfigField) Deprecated() *ConfigField
Deprecated marks a config field as being deprecated, and therefore it will not appear in documentation examples.
func (*ConfigField) Description ¶
func (c *ConfigField) Description(d string) *ConfigField
Description adds a description to the field which will be shown when printing documentation for the component config spec.
func (*ConfigField) Example ¶
func (c *ConfigField) Example(e interface{}) *ConfigField
Example adds an example value to the field which will be shown when printing documentation for the component config spec.
func (*ConfigField) LintRule ¶
func (c *ConfigField) LintRule(blobl string) *ConfigField
LintRule adds a custom linting rule to the field in the form of a bloblang mapping. The mapping is provided the value of the field within a config as the context `this`, and if the mapping assigns to `root` an array of one or more strings these strings will be exposed to a config author as linting errors.
For example, if we wanted to add a linting rule for a string field that ensures the value contains only lowercase values we might add the following linting rule:
`root = if this.lowercase() != this { [ "field must be lowercase" ] }`
func (*ConfigField) Optional ¶
func (c *ConfigField) Optional() *ConfigField
Optional specifies that a field is optional even when a default value has not been specified. When a field is marked as optional you can test its presence within a parsed config with the method Contains.
func (*ConfigField) Version ¶
func (c *ConfigField) Version(v string) *ConfigField
Version specifies the specific version at which this field was added to the component.
type ConfigSpec ¶
type ConfigSpec struct {
// contains filtered or unexported fields
}
ConfigSpec describes the configuration specification for a plugin component. This will be used for validating and linting configuration files and providing a parsed configuration struct to the plugin constructor.
func NewConfigSpec ¶
func NewConfigSpec() *ConfigSpec
NewConfigSpec creates a new empty component configuration spec. If the plugin does not require configuration fields the result of this call is enough.
func (*ConfigSpec) Beta ¶
func (c *ConfigSpec) Beta() *ConfigSpec
Beta sets a documentation label on the component indicating that its configuration spec is ready for beta testing, meaning backwards incompatible changes will not be made unless a fundamental problem is found. Plugins are considered experimental by default.
func (*ConfigSpec) Categories ¶
func (c *ConfigSpec) Categories(categories ...string) *ConfigSpec
Categories adds one or more string tags to the component, these are used for arbitrarily grouping components in documentation.
func (*ConfigSpec) Deprecated ¶
func (c *ConfigSpec) Deprecated() *ConfigSpec
Deprecated sets a documentation label on the component indicating that it is now deprecated. Plugins are considered experimental by default.
func (*ConfigSpec) Description ¶
func (c *ConfigSpec) Description(description string) *ConfigSpec
Description adds a description to the plugin configuration spec that describes in more detail the behaviour of the component and how it should be used.
func (*ConfigSpec) EncodeJSON ¶
func (c *ConfigSpec) EncodeJSON(v []byte) error
EncodeJSON attempts to parse a JSON object as a byte slice and uses it to populate the configuration spec. The schema of this method is undocumented and is not intended for general use.
Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.
func (*ConfigSpec) Example ¶
func (c *ConfigSpec) Example(title, summary, config string) *ConfigSpec
Example adds an example to the plugin configuration spec that demonstrates how the component can be used. An example has a title, summary, and a YAML configuration showing a real use case.
func (*ConfigSpec) Field ¶
func (c *ConfigSpec) Field(f *ConfigField) *ConfigSpec
Field sets the specification of a field within the config spec, used for linting and generating documentation for the component.
If the provided field has an empty name then it registered as the value at the root of the config spec.
When creating a spec with a struct constructor the fields from that struct will already be inferred. However, setting a field explicitly is sometimes useful for enriching the field documentation with more information.
func (*ConfigSpec) LintRule ¶
func (c *ConfigSpec) LintRule(blobl string) *ConfigSpec
LintRule adds a custom linting rule to the ConfigSpec in the form of a bloblang mapping. The mapping is provided the value of the fields within the ConfigSpec as the context `this`, and if the mapping assigns to `root` an array of one or more strings these strings will be exposed to a config author as linting errors.
For example, if we wanted to add a linting rule for several ConfigSpec fields that ensures some fields are mutually exclusive and some require others we might use the following:
`root = match { this.exists("meow") && this.exists("woof") => [ "both `+"`meow`"+` and `+"`woof`"+` can't be set simultaneously" ], this.exists("reticulation") && (!this.exists("splines") || this.splines == "") => [ "`+"`splines`"+` is required when setting `+"`reticulation`"+`" ], }`
func (*ConfigSpec) ParseYAML ¶
func (c *ConfigSpec) ParseYAML(yamlStr string, env *Environment) (*ParsedConfig, error)
ParseYAML attempts to parse a YAML document as the defined configuration spec and returns a parsed config view. The provided environment determines which child components and Bloblang functions can be created by the fields of the spec, you can leave the environment nil to use the global environment.
This method is intended for testing purposes and is not required for normal use of plugin components, as parsing is managed by other components.
func (*ConfigSpec) Stable ¶
func (c *ConfigSpec) Stable() *ConfigSpec
Stable sets a documentation label on the component indicating that its configuration spec is stable. Plugins are considered experimental by default.
func (*ConfigSpec) Summary ¶
func (c *ConfigSpec) Summary(summary string) *ConfigSpec
Summary adds a short summary to the plugin configuration spec that describes the general purpose of the component.
func (*ConfigSpec) Version ¶
func (c *ConfigSpec) Version(v string) *ConfigSpec
Version specifies that this component was introduced in a given version.
type ConfigView ¶
type ConfigView struct {
// contains filtered or unexported fields
}
ConfigView is a struct returned by a Benthos service environment when walking the list of registered components and provides access to information about the component.
func (*ConfigView) Description ¶
func (c *ConfigView) Description() string
Description returns a documentation description of the component, often formatted as markdown.
func (*ConfigView) FormatJSON ¶
func (c *ConfigView) FormatJSON() ([]byte, error)
FormatJSON returns a byte slice of the component configuration formatted as a JSON object. The schema of this method is undocumented and is not intended for general use.
Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.
func (*ConfigView) IsDeprecated ¶
func (c *ConfigView) IsDeprecated() bool
IsDeprecated returns true if the component is marked as deprecated.
func (*ConfigView) RenderDocs ¶
func (c *ConfigView) RenderDocs() ([]byte, error)
RenderDocs creates a markdown file that documents the configuration of the component config view. This markdown may include Docusaurus react elements as it matches the documentation generated for the official Benthos website.
Experimental: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.
func (*ConfigView) Summary ¶
func (c *ConfigView) Summary() string
Summary returns a documentation summary of the component, often formatted as markdown.
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
Environment is a collection of Benthos component plugins that can be used in order to build and run streaming pipelines with access to different sets of plugins. This can be useful for sandboxing, testing, etc, but most plugin authors do not need to create an Environment and can simply use the global environment.
func GlobalEnvironment ¶
func GlobalEnvironment() *Environment
GlobalEnvironment returns a reference to the global environment, adding plugins to this environment is the equivalent to adding plugins using global Functions.
func NewEnvironment ¶
func NewEnvironment() *Environment
NewEnvironment creates a new environment that inherits all globally defined plugins, but can have plugins defined on it that are isolated.
func (*Environment) Clone ¶
func (e *Environment) Clone() *Environment
Clone an environment, creating a new environment containing the same plugins that can be modified independently of the source.
func (*Environment) NewStreamBuilder ¶
func (e *Environment) NewStreamBuilder() *StreamBuilder
NewStreamBuilder creates a new StreamBuilder upon the defined environment, only components known to this environment will be available to the stream builder.
func (*Environment) RegisterBatchBuffer ¶
func (e *Environment) RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.
Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.
func (*Environment) RegisterBatchInput ¶
func (e *Environment) RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.
func (*Environment) RegisterBatchOutput ¶
func (e *Environment) RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).
If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.
func (*Environment) RegisterBatchProcessor ¶
func (e *Environment) RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.
func (*Environment) RegisterCache ¶
func (e *Environment) RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.
func (*Environment) RegisterInput ¶
func (e *Environment) RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.
func (*Environment) RegisterMetricsExporter ¶
func (e *Environment) RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error
RegisterMetricsExporter attempts to register a new metrics exporter plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself.
func (*Environment) RegisterOtelTracerProvider ¶
func (e *Environment) RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error
RegisterOtelTracerProvider attempts to register a new open telemetry tracer provider plugin by providing a description of the configuration for the plugin as well as a constructor for the metrics exporter itself. The constructor will be called for each instantiation of the component within a config.
Experimental: This type signature is experimental and therefore subject to change outside of major version releases.
func (*Environment) RegisterOutput ¶
func (e *Environment) RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
func (*Environment) RegisterProcessor ¶
func (e *Environment) RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
For simple transformations consider implementing a Bloblang plugin method instead.
func (*Environment) RegisterRateLimit ¶
func (e *Environment) RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.
func (*Environment) UseBloblangEnvironment ¶
func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment)
UseBloblangEnvironment configures the service environment to restrict components constructed with it to a specific Bloblang environment.
func (*Environment) WalkBuffers ¶
func (e *Environment) WalkBuffers(fn func(name string, config *ConfigView))
WalkBuffers executes a provided function argument for every buffer component that has been registered to the environment.
func (*Environment) WalkCaches ¶
func (e *Environment) WalkCaches(fn func(name string, config *ConfigView))
WalkCaches executes a provided function argument for every cache component that has been registered to the environment.
func (*Environment) WalkInputs ¶
func (e *Environment) WalkInputs(fn func(name string, config *ConfigView))
WalkInputs executes a provided function argument for every input component that has been registered to the environment.
func (*Environment) WalkMetrics ¶
func (e *Environment) WalkMetrics(fn func(name string, config *ConfigView))
WalkMetrics executes a provided function argument for every metrics component that has been registered to the environment. Note that metrics components available to an environment cannot be modified
func (*Environment) WalkOutputs ¶
func (e *Environment) WalkOutputs(fn func(name string, config *ConfigView))
WalkOutputs executes a provided function argument for every output component that has been registered to the environment.
func (*Environment) WalkProcessors ¶
func (e *Environment) WalkProcessors(fn func(name string, config *ConfigView))
WalkProcessors executes a provided function argument for every processor component that has been registered to the environment.
func (*Environment) WalkRateLimits ¶
func (e *Environment) WalkRateLimits(fn func(name string, config *ConfigView))
WalkRateLimits executes a provided function argument for every rate limit component that has been registered to the environment.
func (*Environment) WalkTracers ¶
func (e *Environment) WalkTracers(fn func(name string, config *ConfigView))
WalkTracers executes a provided function argument for every tracer component that has been registered to the environment. Note that tracer components available to an environment cannot be modified
type HTTPMultiplexer ¶
type HTTPMultiplexer interface {
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}
HTTPMultiplexer is an interface supported by most HTTP multiplexers.
type Input ¶
type Input interface { // Establish a connection to the upstream service. Connect will always be // called first when a reader is instantiated, and will be continuously // called with back off until a nil error is returned. // // The provided context remains open only for the duration of the connecting // phase, and should not be used to establish the lifetime of the connection // itself. // // Once Connect returns a nil error the Read method will be called until // either ErrNotConnected is returned, or the reader is closed. Connect(context.Context) error // Read a single message from a source, along with a function to be called // once the message can be either acked (successfully sent or intentionally // filtered) or nacked (failed to be processed or dispatched to the output). // // The AckFunc will be called for every message at least once, but there are // no guarantees as to when this will occur. If your input implementation // doesn't have a specific mechanism for dealing with a nack then you can // wrap your input implementation with AutoRetryNacks to get automatic // retries. // // If this method returns ErrNotConnected then Read will not be called again // until Connect has returned a nil error. If ErrEndOfInput is returned then // Read will no longer be called and the pipeline will gracefully terminate. Read(context.Context) (*Message, AckFunc, error) Closer }
Input is an interface implemented by Benthos inputs. Calls to Read should block until either a message has been received, the connection is lost, or the provided context is cancelled.
func AutoRetryNacks ¶
AutoRetryNacks wraps an input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.
When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.
type InputConstructor ¶
type InputConstructor func(conf *ParsedConfig, mgr *Resources) (Input, error)
InputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a reader based on the config, or an error.
type InterpolatedString ¶
type InterpolatedString struct {
// contains filtered or unexported fields
}
InterpolatedString resolves a string containing dynamic interpolation functions for a given message.
func NewInterpolatedString ¶
func NewInterpolatedString(expr string) (*InterpolatedString, error)
NewInterpolatedString parses an interpolated string expression.
func (*InterpolatedString) Bytes ¶
func (i *InterpolatedString) Bytes(m *Message) []byte
Bytes resolves the interpolated field for a given message as a byte slice.
func (*InterpolatedString) String ¶
func (i *InterpolatedString) String(m *Message) string
String resolves the interpolated field for a given message as a string.
type LintError ¶
type LintError []Lint
LintError is an error type that represents one or more configuration file linting errors that were encountered.
type Logger ¶
type Logger struct {
// contains filtered or unexported fields
}
Logger allows plugin authors to write custom logs from components that are exported the same way as native Benthos logs. It's safe to pass around a nil pointer for testing components.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message represents a single discrete message passing through a Benthos pipeline. It is safe to mutate the message via Set methods, but the underlying byte data should not be edited directly.
func NewMessage ¶
NewMessage creates a new message with an initial raw bytes content. The initial content can be nil, which is recommended if you intend to set it with structured contents.
func (*Message) AsBytes ¶
AsBytes returns the underlying byte array contents of a message or, if the contents are a structured type, attempts to marshal the contents as a JSON document and returns either the byte array result or an error.
It is NOT safe to mutate the contents of the returned slice.
func (*Message) AsStructured ¶
AsStructured returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.
It is NOT safe to mutate the contents of the returned value if it is a reference type (slice or map). In order to safely mutate the structured contents of a message use AsStructuredMut.
func (*Message) AsStructuredMut ¶
AsStructuredMut returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.
It is safe to mutate the contents of the returned value even if it is a reference type (slice or map), as the structured contents will be lazily deep cloned if it is still owned by an upstream component.
func (*Message) BloblangQuery ¶
BloblangQuery executes a parsed Bloblang mapping on a message and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.
func (*Message) Context ¶
Context returns a context associated with the message, or a background context in the absence of one.
func (*Message) Copy ¶
Copy creates a shallow copy of a message that is safe to mutate with Set methods without mutating the original. Both messages will share a context, and therefore a tracing ID, if one has been associated with them.
Note that this does not perform a deep copy of the byte or structured contents of the message, and therefore it is not safe to perform inline mutations on those values without copying them.
func (*Message) GetError ¶
GetError returns an error associated with a message, or nil if there isn't one. Messages marked with errors can be handled using a range of methods outlined in https://www.benthos.dev/docs/configuration/error_handling.
func (*Message) MetaDelete ¶
MetaDelete removes a key from the message metadata.
func (*Message) MetaGet ¶
MetaGet attempts to find a metadata key from the message and returns a string result and a boolean indicating whether it was found.
func (*Message) MetaSet ¶
MetaSet sets the value of a metadata key. If the value is an empty string the metadata key is deleted.
func (*Message) MetaWalk ¶
MetaWalk iterates each metadata key/value pair and executes a provided closure on each iteration. To stop iterating, return an error from the closure. An error returned by the closure will be returned by this function.
func (*Message) SetError ¶
SetError marks the message as having failed a processing step and adds the error to it as context. Messages marked with errors can be handled using a range of methods outlined in https://www.benthos.dev/docs/configuration/error_handling.
func (*Message) SetStructured ¶
func (m *Message) SetStructured(i interface{})
SetStructured sets the underlying contents of the message as a structured type. This structured value should be a scalar Go type, or either a map[string]interface{} or []interface{} containing the same types all the way through the hierarchy, this ensures that other processors are able to work with the contents and that they can be JSON marshalled when coerced into a byte array.
type MessageBatch ¶
type MessageBatch []*Message
MessageBatch describes a collection of one or more messages.
func ExecuteProcessors ¶
func ExecuteProcessors(ctx context.Context, processors []*OwnedProcessor, inbatches ...MessageBatch) ([]MessageBatch, error)
ExecuteProcessors runs a set of batches through a series processors. If a context error occurs during execution then this function terminates and returns the error.
However, for general processing errors unrelated to context cancellation the errors are marked against individual messages with the `SetError` method and processing continues against subsequent processors.
func (MessageBatch) BloblangQuery ¶
BloblangQuery executes a parsed Bloblang mapping on a message batch, from the perspective of a particular message index, and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.
This method allows mappings to perform windowed aggregations across message batches.
func (MessageBatch) Copy ¶
func (b MessageBatch) Copy() MessageBatch
Copy creates a new slice of the same messages, which can be modified without changing the contents of the original batch.
func (MessageBatch) InterpolatedBytes ¶
func (b MessageBatch) InterpolatedBytes(index int, i *InterpolatedString) []byte
InterpolatedBytes resolves an interpolated string expression on a message batch, from the perspective of a particular message index.
This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.
func (MessageBatch) InterpolatedString ¶
func (b MessageBatch) InterpolatedString(index int, i *InterpolatedString) string
InterpolatedString resolves an interpolated string expression on a message batch, from the perspective of a particular message index.
This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.
type MessageBatchHandlerFunc ¶
type MessageBatchHandlerFunc func(context.Context, MessageBatch) error
MessageBatchHandlerFunc is a function signature defining a component that consumes Benthos message batches. An error must be returned if the context is cancelled, or if the messages could not be delivered or processed.
type MessageHandlerFunc ¶
MessageHandlerFunc is a function signature defining a component that consumes Benthos messages. An error must be returned if the context is cancelled, or if the message could not be delivered or processed.
type MetadataFilter ¶
type MetadataFilter struct {
// contains filtered or unexported fields
}
MetadataFilter provides a configured mechanism for filtering metadata key/values from a message.
func (*MetadataFilter) Walk ¶
func (m *MetadataFilter) Walk(msg *Message, fn func(key, value string) error) error
Walk iterates the filtered metadata key/value pairs from a message and executes a provided closure function for each pair. An error returned by the closure will be returned by this function and prevent subsequent pairs from being accessed.
type MetricCounter ¶
type MetricCounter struct {
// contains filtered or unexported fields
}
MetricCounter represents a counter metric of a given name and labels.
func (*MetricCounter) Incr ¶
func (c *MetricCounter) Incr(count int64, labelValues ...string)
Incr increments a counter metric by an amount, the number of label values must match the number and order of labels specified when the counter was created.
type MetricGauge ¶
type MetricGauge struct {
// contains filtered or unexported fields
}
MetricGauge represents a gauge metric of a given name and labels.
func (*MetricGauge) Set ¶
func (g *MetricGauge) Set(value int64, labelValues ...string)
Set a gauge metric, the number of label values must match the number and order of labels specified when the gauge was created.
type MetricTimer ¶
type MetricTimer struct {
// contains filtered or unexported fields
}
MetricTimer represents a timing metric of a given name and labels.
func (*MetricTimer) Timing ¶
func (t *MetricTimer) Timing(delta int64, labelValues ...string)
Timing adds a delta to a timing metric. Delta should be measured in nanoseconds for consistency with other Benthos timing metrics.
The number of label values must match the number and order of labels specified when the timing was created.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics allows plugin authors to emit custom metrics from components that are exported the same way as native Benthos metrics. It's safe to pass around a nil pointer for testing components.
func (*Metrics) NewCounter ¶
func (m *Metrics) NewCounter(name string, labelKeys ...string) *MetricCounter
NewCounter creates a new counter metric with a name and variant list of label keys.
type MetricsExporter ¶
type MetricsExporter interface { NewCounterCtor(name string, labelKeys ...string) MetricsExporterCounterCtor NewTimerCtor(name string, labelKeys ...string) MetricsExporterTimerCtor NewGaugeCtor(name string, labelKeys ...string) MetricsExporterGaugeCtor Close(ctx context.Context) error }
MetricsExporter is an interface implemented by Benthos metrics exporters.
type MetricsExporterConstructor ¶
type MetricsExporterConstructor func(conf *ParsedConfig, log *Logger) (MetricsExporter, error)
MetricsExporterConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a metrics exporter based on the config, or an error.
type MetricsExporterCounter ¶
type MetricsExporterCounter interface { // Incr increments a counter metric by an amount, the number of label values // must match the number and order of labels specified when the counter was // created. Incr(count int64) }
MetricsExporterCounter represents a counter metric of a given name and labels.
type MetricsExporterCounterCtor ¶
type MetricsExporterCounterCtor func(labelValues ...string) MetricsExporterCounter
MetricsExporterCounterCtor is a constructor for a MetricsExporterCounter that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.
type MetricsExporterGauge ¶
type MetricsExporterGauge interface { // Set a gauge metric, the number of label values must match the number and // order of labels specified when the gauge was created. Set(value int64) }
MetricsExporterGauge represents a gauge metric of a given name and labels.
type MetricsExporterGaugeCtor ¶
type MetricsExporterGaugeCtor func(labelValues ...string) MetricsExporterGauge
MetricsExporterGaugeCtor is a constructor for a MetricsExporterGauge that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.
type MetricsExporterTimer ¶
type MetricsExporterTimer interface { // Timing adds a delta to a timing metric. Delta should be measured in // nanoseconds for consistency with other Benthos timing metrics. // // The number of label values must match the number and order of labels // specified when the timing was created. Timing(delta int64) }
MetricsExporterTimer represents a timing metric of a given name and labels.
type MetricsExporterTimerCtor ¶
type MetricsExporterTimerCtor func(labelValues ...string) MetricsExporterTimer
MetricsExporterTimerCtor is a constructor for a MetricsExporterTimer that must be called with a variadic list of label values exactly matching the length and order of the label keys provided.
type MockResourcesOptFn ¶
MockResourcesOptFn provides a func based optional argument to MockResources.
func MockResourcesOptAddCache ¶
func MockResourcesOptAddCache(name string) MockResourcesOptFn
MockResourcesOptAddCache instantiates the resources type with a mock cache with a given name. Cached items are held in memory.
func MockResourcesOptAddRateLimit ¶
func MockResourcesOptAddRateLimit(name string, fn func(context.Context) (time.Duration, error)) MockResourcesOptFn
MockResourcesOptAddRateLimit instantiates the resources type with a mock rate limit with a given name, the provided closure will be called for each invocation of the rate limit.
type OtelTracerProviderConstructor ¶
type OtelTracerProviderConstructor func(conf *ParsedConfig) (trace.TracerProvider, error)
OtelTracerProviderConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of an open telemetry tracer provider.
Experimental: This type signature is experimental and therefore subject to change outside of major version releases.
type Output ¶
type Output interface { // Establish a connection to the downstream service. Connect will always be // called first when a writer is instantiated, and will be continuously // called with back off until a nil error is returned. // // The provided context remains open only for the duration of the connecting // phase, and should not be used to establish the lifetime of the connection // itself. // // Once Connect returns a nil error the write method will be called until // either ErrNotConnected is returned, or the writer is closed. Connect(context.Context) error // Write a message to a sink, or return an error if delivery is not // possible. // // If this method returns ErrNotConnected then write will not be called // again until Connect has returned a nil error. Write(context.Context, *Message) error Closer }
Output is an interface implemented by Benthos outputs that support single message writes. Each call to Write should block until either the message has been successfully or unsuccessfully sent, or the context is cancelled.
Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel write calls the output supports.
type OutputConstructor ¶
type OutputConstructor func(conf *ParsedConfig, mgr *Resources) (out Output, maxInFlight int, err error)
OutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config and a maximum number of in-flight messages to allow, or an error.
type OwnedInput ¶
type OwnedInput struct {
// contains filtered or unexported fields
}
OwnedInput provides direct ownership of an input extracted from a plugin config. Connectivity of the input is handled internally, and so the consumer of this type should only be concerned with reading messages and eventually calling Close to terminate the input.
func (*OwnedInput) ReadBatch ¶
func (o *OwnedInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error)
ReadBatch attempts to read a message batch from the input, along with a function to be called once the entire batch can be either acked (successfully sent or intentionally filtered) or nacked (failed to be processed or dispatched to the output).
If this method returns ErrEndOfInput then that indicates that the input has finished and will no longer yield new messages.
type OwnedOutput ¶
type OwnedOutput struct {
// contains filtered or unexported fields
}
OwnedOutput provides direct ownership of an output extracted from a plugin config. Connectivity of the output is handled internally, and so the owner of this type should only be concerned with writing messages and eventually calling Close to terminate the output.
func (*OwnedOutput) Close ¶
func (o *OwnedOutput) Close(ctx context.Context) error
Close the output.
func (*OwnedOutput) Write ¶
func (o *OwnedOutput) Write(ctx context.Context, m *Message) error
Write a message to the output, or return an error either if delivery is not possible or the context is cancelled.
func (*OwnedOutput) WriteBatch ¶
func (o *OwnedOutput) WriteBatch(ctx context.Context, b MessageBatch) error
WriteBatch attempts to write a message batch to the output, and returns an error either if delivery is not possible or the context is cancelled.
type OwnedProcessor ¶
type OwnedProcessor struct {
// contains filtered or unexported fields
}
OwnedProcessor provides direct ownership of a processor extracted from a plugin config.
func (*OwnedProcessor) Close ¶
func (o *OwnedProcessor) Close(ctx context.Context) error
Close the processor, allowing it to clean up resources. It is
func (*OwnedProcessor) Process ¶
func (o *OwnedProcessor) Process(ctx context.Context, msg *Message) (MessageBatch, error)
Process a single message, returns either a batch of zero or more resulting messages or an error if the message could not be processed.
func (*OwnedProcessor) ProcessBatch ¶
func (o *OwnedProcessor) ProcessBatch(ctx context.Context, batch MessageBatch) ([]MessageBatch, error)
ProcessBatch attempts to process a batch of messages, returns zero or more batches of resulting messages, or an error if the context is cancelled during execution.
However, for general processing errors unrelated to context cancellation the error is marked against individual messages with the `SetError` method and a nil error is returned by this method.
type ParsedConfig ¶
type ParsedConfig struct {
// contains filtered or unexported fields
}
ParsedConfig represents a plugin configuration that has been validated and parsed from a ConfigSpec, and allows plugin constructors to access configuration fields.
func (*ParsedConfig) Contains ¶
func (p *ParsedConfig) Contains(path ...string) bool
Contains checks whether the parsed config contains a given field identified by its name.
func (*ParsedConfig) FieldAny ¶
func (p *ParsedConfig) FieldAny(path ...string) (interface{}, error)
FieldAny accesses a field from the parsed config by its name that can assume any value type. If the field is not found an error is returned.
func (*ParsedConfig) FieldAnyList ¶
func (p *ParsedConfig) FieldAnyList(path ...string) ([]*ParsedConfig, error)
FieldAnyList accesses a field that is a list of any value types from the parsed config by its name and returns the value as an array of *ParsedConfig types, where each one represents an object or value in the list. Returns an error if the field is not found, or is not a list of values.
func (*ParsedConfig) FieldBackOff ¶
func (p *ParsedConfig) FieldBackOff(path ...string) (*backoff.ExponentialBackOff, error)
FieldBackOff accesses a field from a parsed config that was defined with NewBackoffField and returns a *backoff.ExponentialBackOff, or an error if the configuration was invalid.
func (*ParsedConfig) FieldBackOffToggled ¶
func (p *ParsedConfig) FieldBackOffToggled(path ...string) (boff *backoff.ExponentialBackOff, enabled bool, err error)
FieldBackOffToggled accesses a field from a parsed config that was defined with NewBackOffField and returns a *backoff.ExponentialBackOff and a boolean flag indicating whether retries are explicitly enabled, or an error if the configuration was invalid.
func (*ParsedConfig) FieldBatchPolicy ¶
func (p *ParsedConfig) FieldBatchPolicy(path ...string) (conf BatchPolicy, err error)
FieldBatchPolicy accesses a field from a parsed config that was defined with NewBatchPolicyField and returns a BatchPolicy, or an error if the configuration was invalid.
func (*ParsedConfig) FieldBloblang ¶
func (p *ParsedConfig) FieldBloblang(path ...string) (*bloblang.Executor, error)
FieldBloblang accesses a field from a parsed config that was defined with NewBloblangField and returns either a *bloblang.Executor or an error if the mapping was invalid.
func (*ParsedConfig) FieldBool ¶
func (p *ParsedConfig) FieldBool(path ...string) (bool, error)
FieldBool accesses a bool field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a bool.
func (*ParsedConfig) FieldDuration ¶
func (p *ParsedConfig) FieldDuration(path ...string) (time.Duration, error)
FieldDuration accesses a duration string field from the parsed config by its name. If the field is not found or is not a valid duration string an error is returned.
func (*ParsedConfig) FieldFloat ¶
func (p *ParsedConfig) FieldFloat(path ...string) (float64, error)
FieldFloat accesses a float field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a float.
func (*ParsedConfig) FieldInput ¶
func (p *ParsedConfig) FieldInput(path ...string) (*OwnedInput, error)
FieldInput accesses a field from a parsed config that was defined with NewInputField and returns an OwnedInput, or an error if the configuration was invalid.
func (*ParsedConfig) FieldInputList ¶
func (p *ParsedConfig) FieldInputList(path ...string) ([]*OwnedInput, error)
FieldInputList accesses a field from a parsed config that was defined with NewInputListField and returns a slice of OwnedInput, or an error if the configuration was invalid.
func (*ParsedConfig) FieldInt ¶
func (p *ParsedConfig) FieldInt(path ...string) (int, error)
FieldInt accesses an int field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not an int.
func (*ParsedConfig) FieldIntList ¶
func (p *ParsedConfig) FieldIntList(path ...string) ([]int, error)
FieldIntList accesses a field that is a list of integers from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of integers.
func (*ParsedConfig) FieldIntMap ¶
func (p *ParsedConfig) FieldIntMap(path ...string) (map[string]int, error)
FieldIntMap accesses a field that is an object of arbitrary keys and integer values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of integers.
func (*ParsedConfig) FieldInterpolatedString ¶
func (p *ParsedConfig) FieldInterpolatedString(path ...string) (*InterpolatedString, error)
FieldInterpolatedString accesses a field from a parsed config that was defined with NewInterpolatedStringField and returns either an *InterpolatedString or an error if the string was invalid.
func (*ParsedConfig) FieldInterpolatedStringMap ¶
func (p *ParsedConfig) FieldInterpolatedStringMap(path ...string) (map[string]*InterpolatedString, error)
FieldInterpolatedStringMap accesses a field that is an object of arbitrary keys and interpolated string values from the parsed config by its name and returns the value.
Returns an error if the field is not found, or is not an object of interpolated strings.
func (*ParsedConfig) FieldMetadataFilter ¶
func (p *ParsedConfig) FieldMetadataFilter(path ...string) (f *MetadataFilter, err error)
FieldMetadataFilter accesses a field from a parsed config that was defined with NewMetdataFilterField and returns a MetadataFilter, or an error if the configuration was invalid.
func (*ParsedConfig) FieldObjectList ¶
func (p *ParsedConfig) FieldObjectList(path ...string) ([]*ParsedConfig, error)
FieldObjectList accesses a field that is a list of objects from the parsed config by its name and returns the value as an array of *ParsedConfig types, where each one represents an object in the list. Returns an error if the field is not found, or is not a list of objects.
func (*ParsedConfig) FieldOutput ¶
func (p *ParsedConfig) FieldOutput(path ...string) (*OwnedOutput, error)
FieldOutput accesses a field from a parsed config that was defined with NewOutputField and returns an OwnedOutput, or an error if the configuration was invalid.
func (*ParsedConfig) FieldOutputList ¶
func (p *ParsedConfig) FieldOutputList(path ...string) ([]*OwnedOutput, error)
FieldOutputList accesses a field from a parsed config that was defined with NewOutputListField and returns a slice of OwnedOutput, or an error if the configuration was invalid.
func (*ParsedConfig) FieldProcessor ¶
func (p *ParsedConfig) FieldProcessor(path ...string) (*OwnedProcessor, error)
FieldProcessor accesses a field from a parsed config that was defined with NewProcessorField and returns an OwnedProcessor, or an error if the configuration was invalid.
func (*ParsedConfig) FieldProcessorList ¶
func (p *ParsedConfig) FieldProcessorList(path ...string) ([]*OwnedProcessor, error)
FieldProcessorList accesses a field from a parsed config that was defined with NewProcessorListField and returns a slice of OwnedProcessor, or an error if the configuration was invalid.
func (*ParsedConfig) FieldString ¶
func (p *ParsedConfig) FieldString(path ...string) (string, error)
FieldString accesses a string field from the parsed config by its name. If the field is not found or is not a string an error is returned.
func (*ParsedConfig) FieldStringList ¶
func (p *ParsedConfig) FieldStringList(path ...string) ([]string, error)
FieldStringList accesses a field that is a list of strings from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of strings.
func (*ParsedConfig) FieldStringMap ¶
func (p *ParsedConfig) FieldStringMap(path ...string) (map[string]string, error)
FieldStringMap accesses a field that is an object of arbitrary keys and string values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of strings.
func (*ParsedConfig) FieldTLS ¶
func (p *ParsedConfig) FieldTLS(path ...string) (*tls.Config, error)
FieldTLS accesses a field from a parsed config that was defined with NewTLSField and returns a *tls.Config, or an error if the configuration was invalid.
func (*ParsedConfig) FieldTLSToggled ¶
FieldTLSToggled accesses a field from a parsed config that was defined with NewTLSFieldToggled and returns a *tls.Config and a boolean flag indicating whether tls is explicitly enabled, or an error if the configuration was invalid.
func (*ParsedConfig) Namespace ¶
func (p *ParsedConfig) Namespace(path ...string) *ParsedConfig
Namespace returns a version of the parsed config at a given field namespace. This is useful for extracting multiple fields under the same grouping.
type PrintLogger ¶
type PrintLogger interface { Printf(format string, v ...interface{}) Println(v ...interface{}) }
PrintLogger is a simple Print based interface implemented by custom loggers.
type Processor ¶
type Processor interface { // Process a message into one or more resulting messages, or return an error // if the message could not be processed. If zero messages are returned and // the error is nil then the message is filtered. // // When an error is returned the input message will continue down the // pipeline but will be marked with the error with *message.SetError, and // metrics and logs will be emitted. The failed message can then be handled // with the patterns outlined in https://www.benthos.dev/docs/configuration/error_handling. // // The Message types returned MUST be derived from the provided message, and // CANNOT be custom implementations of Message. In order to copy the // provided message use the Copy method. Process(context.Context, *Message) (MessageBatch, error) Closer }
Processor is a Benthos processor implementation that works against single messages.
type ProcessorConstructor ¶
type ProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (Processor, error)
ProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.
type RateLimit ¶
type RateLimit interface { // Access the rate limited resource. Returns a duration or an error if the // rate limit check fails. The returned duration is either zero (meaning the // resource may be accessed) or a reasonable length of time to wait before // requesting again. Access(context.Context) (time.Duration, error) Closer }
RateLimit is an interface implemented by Benthos rate limits.
type RateLimitConstructor ¶
type RateLimitConstructor func(conf *ParsedConfig, mgr *Resources) (RateLimit, error)
RateLimitConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a rate limit based on the config, or an error.
type ResourceInput ¶
type ResourceInput struct {
// contains filtered or unexported fields
}
ResourceInput provides access to an input resource.
func (*ResourceInput) ReadBatch ¶
func (r *ResourceInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error)
ReadBatch attempts to read a message batch from the input, along with a function to be called once the entire batch can be either acked (successfully sent or intentionally filtered) or nacked (failed to be processed or dispatched to the output).
If this method returns ErrEndOfInput then that indicates that the input has finished and will no longer yield new messages.
type ResourceOutput ¶
type ResourceOutput struct {
// contains filtered or unexported fields
}
ResourceOutput provides access to an output resource.
func (*ResourceOutput) Write ¶
func (o *ResourceOutput) Write(ctx context.Context, m *Message) error
Write a message to the output, or return an error either if delivery is not possible or the context is cancelled.
func (*ResourceOutput) WriteBatch ¶
func (o *ResourceOutput) WriteBatch(ctx context.Context, b MessageBatch) error
WriteBatch attempts to write a message batch to the output, and returns an error either if delivery is not possible or the context is cancelled.
type Resources ¶
type Resources struct {
// contains filtered or unexported fields
}
Resources provides access to service-wide resources.
func MockResources ¶
func MockResources(opts ...MockResourcesOptFn) *Resources
MockResources returns an instantiation of a resources struct that provides valid but ineffective methods and observability components. It is possible to instantiate this with mocked (in-memory) cache and rate limit types for testing purposed.
func (*Resources) AccessCache ¶
AccessCache attempts to access a cache resource by name. This action can block if CRUD operations are being actively performed on the resource.
func (*Resources) AccessInput ¶
AccessInput attempts to access a input resource by name.
func (*Resources) AccessOutput ¶
func (r *Resources) AccessOutput(ctx context.Context, name string, fn func(o *ResourceOutput)) error
AccessOutput attempts to access an output resource by name. This action can block if CRUD operations are being actively performed on the resource.
func (*Resources) AccessRateLimit ¶
AccessRateLimit attempts to access a rate limit resource by name. This action can block if CRUD operations are being actively performed on the resource.
func (*Resources) HasCache ¶
HasCache confirms whether a cache with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.
func (*Resources) HasInput ¶
HasInput confirms whether an input with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.
func (*Resources) HasOutput ¶
HasOutput confirms whether an output with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.
func (*Resources) HasRateLimit ¶
HasRateLimit confirms whether a rate limit with a given name has been registered as a resource. This method is useful during component initialisation as it is defensive against ordering.
func (*Resources) Label ¶
Label returns a label that identifies the component instantiation. This could be an explicit label set in config, or is otherwise a generated label based on the position of the component within a config.
func (*Resources) Logger ¶
Logger returns a logger preset with context about the component the resources were provided to.
func (*Resources) OtelTracer ¶
func (r *Resources) OtelTracer() trace.TracerProvider
OtelTracer returns an open telemetry tracer provider that can be used to create new tracers.
Experimental: This type signature is experimental and therefore subject to change outside of major version releases.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream executes a full Benthos stream and provides methods for performing status checks, terminating the stream, and blocking until the stream ends.
func (*Stream) Run ¶
Run attempts to start the stream pipeline and blocks until either the stream has gracefully come to a stop, or the provided context is cancelled.
func (*Stream) StopWithin ¶
StopWithin attempts to close the stream within the specified timeout period. Initially the attempt is graceful, but as the timeout draws close the attempt becomes progressively less graceful.
An ungraceful shutdown increases the likelihood of processing duplicate messages on the next start up, but never results in dropped messages as long as the input source supports at-least-once delivery.
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
StreamBuilder provides methods for building a Benthos stream configuration. When parsing Benthos configs this builder follows the schema and field defaults of a standard Benthos configuration. Environment variable interpolations are also parsed and resolved the same as regular configs.
Benthos streams register HTTP endpoints by default that expose metrics and ready checks. If your intention is to execute multiple streams in the same process then it is recommended that you disable the HTTP server in config, or use `SetHTTPMux` with prefixed multiplexers in order to share it across the streams.
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
NewStreamBuilder creates a new StreamBuilder.
func (*StreamBuilder) AddBatchConsumerFunc ¶
func (s *StreamBuilder) AddBatchConsumerFunc(fn MessageBatchHandlerFunc) error
AddBatchConsumerFunc adds an output to the builder that executes a closure function argument for each message batch. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.
The provided MessageBatchHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.
Only one consumer can be added to a stream builder, and subsequent calls will return an error.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise message batches received by this consumer will have a single message contents.
func (*StreamBuilder) AddBatchProducerFunc ¶
func (s *StreamBuilder) AddBatchProducerFunc() (MessageBatchHandlerFunc, error)
AddBatchProducerFunc adds an input to the builder that allows you to write message batches directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.
The returned MessageBatchHandlerFunc can be called concurrently from any number of goroutines, and each call will block until all messages within the batch are successfully delivered downstream, were rejected (or otherwise could not be delivered) or the context is cancelled.
Only one producer func can be added to a stream builder, and subsequent calls will return an error.
func (*StreamBuilder) AddCacheYAML ¶
func (s *StreamBuilder) AddCacheYAML(conf string) error
AddCacheYAML parses a cache YAML configuration and adds it to the builder as a resource.
func (*StreamBuilder) AddConsumerFunc ¶
func (s *StreamBuilder) AddConsumerFunc(fn MessageHandlerFunc) error
AddConsumerFunc adds an output to the builder that executes a closure function argument for each message. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.
The provided MessageHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.
Only one consumer can be added to a stream builder, and subsequent calls will return an error.
func (*StreamBuilder) AddInputYAML ¶
func (s *StreamBuilder) AddInputYAML(conf string) error
AddInputYAML parses an input YAML configuration and adds it to the builder. If more than one input configuration is added they will automatically be composed within a broker when the pipeline is built.
func (*StreamBuilder) AddOutputYAML ¶
func (s *StreamBuilder) AddOutputYAML(conf string) error
AddOutputYAML parses an output YAML configuration and adds it to the builder. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.
func (*StreamBuilder) AddProcessorYAML ¶
func (s *StreamBuilder) AddProcessorYAML(conf string) error
AddProcessorYAML parses a processor YAML configuration and adds it to the builder to be executed within the pipeline.processors section, after all prior added processor configs.
func (*StreamBuilder) AddProducerFunc ¶
func (s *StreamBuilder) AddProducerFunc() (MessageHandlerFunc, error)
AddProducerFunc adds an input to the builder that allows you to write messages directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.
The returned MessageHandlerFunc can be called concurrently from any number of goroutines, and each call will block until the message is successfully delivered downstream, was rejected (or otherwise could not be delivered) or the context is cancelled.
Only one producer func can be added to a stream builder, and subsequent calls will return an error.
func (*StreamBuilder) AddRateLimitYAML ¶
func (s *StreamBuilder) AddRateLimitYAML(conf string) error
AddRateLimitYAML parses a rate limit YAML configuration and adds it to the builder as a resource.
func (*StreamBuilder) AddResourcesYAML ¶
func (s *StreamBuilder) AddResourcesYAML(conf string) error
AddResourcesYAML parses resource configurations and adds them to the config.
func (*StreamBuilder) AsYAML ¶
func (s *StreamBuilder) AsYAML() (string, error)
AsYAML prints a YAML representation of the stream config as it has been currently built.
func (*StreamBuilder) Build ¶
func (s *StreamBuilder) Build() (*Stream, error)
Build a Benthos stream pipeline according to the components specified by this stream builder.
func (*StreamBuilder) BuildTraced ¶
func (s *StreamBuilder) BuildTraced() (*Stream, *TracingSummary, error)
BuildTraced creates a Benthos stream pipeline according to the components specified by this stream builder, where each major component (input, processor, output) is wrapped with a tracing module that, during the lifetime of the stream, aggregates tracing events into the returned *TracingSummary. Once the stream has ended the TracingSummary can be queried for events that occurred.
Experimental: The behaviour of this method could change outside of major version releases.
func (*StreamBuilder) DisableLinting ¶
func (s *StreamBuilder) DisableLinting()
DisableLinting configures the stream builder to no longer lint YAML configs, allowing you to add snippets of config to the builder without failing on linting rules.
func (*StreamBuilder) SetBufferYAML ¶
func (s *StreamBuilder) SetBufferYAML(conf string) error
SetBufferYAML parses a buffer YAML configuration and sets it to the builder to be placed between the input and the pipeline (processors) sections. This config will replace any prior configured buffer.
func (*StreamBuilder) SetFields ¶
func (s *StreamBuilder) SetFields(pathValues ...interface{}) error
SetFields modifies the config by setting one or more fields identified by a dot path to a value. The argument must be a variadic list of pairs, where the first element is a string containing the target field dot path, and the second element is a typed value to set the field to.
func (*StreamBuilder) SetHTTPMux ¶
func (s *StreamBuilder) SetHTTPMux(m HTTPMultiplexer)
SetHTTPMux sets an HTTP multiplexer to be used by stream components when registering endpoints instead of a new server spawned following the `http` fields of a Benthos config.
func (*StreamBuilder) SetLoggerYAML ¶
func (s *StreamBuilder) SetLoggerYAML(conf string) error
SetLoggerYAML parses a logger YAML configuration and adds it to the builder such that all stream components emit logs through it.
func (*StreamBuilder) SetMetricsYAML ¶
func (s *StreamBuilder) SetMetricsYAML(conf string) error
SetMetricsYAML parses a metrics YAML configuration and adds it to the builder such that all stream components emit metrics through it.
func (*StreamBuilder) SetPrintLogger ¶
func (s *StreamBuilder) SetPrintLogger(l PrintLogger)
SetPrintLogger sets a custom logger supporting a simple Print based interface to be used by stream components. This custom logger will override any logging fields set via config.
func (*StreamBuilder) SetThreads ¶
func (s *StreamBuilder) SetThreads(n int)
SetThreads configures the number of pipeline processor threads should be configured. By default the number will be zero, which means the thread count will match the number of logical CPUs on the machine.
func (*StreamBuilder) SetTracerYAML ¶
func (s *StreamBuilder) SetTracerYAML(conf string) error
SetTracerYAML parses a tracer YAML configuration and adds it to the builder such that all stream components emit tracing spans through it.
func (*StreamBuilder) SetYAML ¶
func (s *StreamBuilder) SetYAML(conf string) error
SetYAML parses a full Benthos config and uses it to configure the builder. If any inputs, processors, outputs, resources, etc, have previously been added to the builder they will be overridden by this new config.
type TracingEvent ¶
type TracingEvent struct { Type TracingEventType Content string }
TracingEvent represents a single event that occured within the stream.
Experimental: This type may change outside of major version releases.
type TracingEventType ¶
type TracingEventType string
TracingEventType describes the type of tracing event a component might experience during a config run.
Experimental: This type may change outside of major version releases.
var ( // Note: must match up with ./internal/bundle/tracing/events.go TracingEventProduce TracingEventType = "PRODUCE" TracingEventConsume TracingEventType = "CONSUME" TracingEventDelete TracingEventType = "DELETE" TracingEventError TracingEventType = "ERROR" TracingEventUnknown TracingEventType = "UNKNOWN" )
Various tracing event types.
Experimental: This type may change outside of major version releases.
type TracingSummary ¶
type TracingSummary struct {
// contains filtered or unexported fields
}
TracingSummary is a high level description of all traced events. When tracing a stream this should only be queried once the stream has ended.
Experimental: This type may change outside of major version releases.
func (*TracingSummary) InputEvents ¶
func (s *TracingSummary) InputEvents() map[string][]TracingEvent
InputEvents returns a map of input labels to events traced during the execution of a stream pipeline.
Experimental: This method may change outside of major version releases.
func (*TracingSummary) OutputEvents ¶
func (s *TracingSummary) OutputEvents() map[string][]TracingEvent
OutputEvents returns a map of output labels to events traced during the execution of a stream pipeline.
Experimental: This method may change outside of major version releases.
func (*TracingSummary) ProcessorEvents ¶
func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent
ProcessorEvents returns a map of processor labels to events traced during the execution of a stream pipeline.
Experimental: This method may change outside of major version releases.
func (*TracingSummary) TotalInput ¶
func (s *TracingSummary) TotalInput() uint64
TotalInput returns the total traced input messages received.
Experimental: This method may change outside of major version releases.
func (*TracingSummary) TotalOutput ¶
func (s *TracingSummary) TotalOutput() uint64
TotalOutput returns the total traced output messages received.
Experimental: This method may change outside of major version releases.
func (*TracingSummary) TotalProcessorErrors ¶
func (s *TracingSummary) TotalProcessorErrors() uint64
TotalProcessorErrors returns the total traced processor errors occurred.
Experimental: This method may change outside of major version releases.
Source Files ¶
- buffer.go
- cache.go
- config.go
- config_backoff.go
- config_batch_policy.go
- config_bloblang.go
- config_input.go
- config_interpolated_string.go
- config_metadata_filter.go
- config_output.go
- config_processor.go
- config_tls.go
- config_util.go
- environment.go
- input.go
- input_auto_retry.go
- input_auto_retry_batched.go
- interpolated_string.go
- logger.go
- message.go
- metrics.go
- output.go
- package.go
- plugins.go
- processor.go
- rate_limit.go
- resources.go
- service.go
- stream.go
- stream_builder.go
- tracing.go