Documentation ¶
Overview ¶
Package service provides a high level API for registering custom plugin components and executing either a standard Benthos CLI, or programmatically building isolated pipelines with a StreamBuilder API.
For a video guide on Benthos plugins check out: https://youtu.be/uH6mKw-Ly0g And an example repo containing component plugins and tests can be found at: https://github.com/benthosdev/benthos-plugin-example
In order to add custom Bloblang functions and methods use the ./public/bloblang package.
Example (BufferPlugin) ¶
This example demonstrates how to create a buffer plugin. Buffers are an advanced component type that most plugin authors aren't likely to require.
package main import ( "context" "sync" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) type memoryBuffer struct { messages chan service.MessageBatch endOfInputChan chan struct{} closeOnce sync.Once } func newMemoryBuffer(n int) *memoryBuffer { return &memoryBuffer{ messages: make(chan service.MessageBatch, n), endOfInputChan: make(chan struct{}), } } func (m *memoryBuffer) WriteBatch(ctx context.Context, batch service.MessageBatch, aFn service.AckFunc) error { select { case m.messages <- batch: case <-ctx.Done(): return ctx.Err() } // We weaken delivery guarantees here by acknowledging receipt of our batch // immediately. return aFn(ctx, nil) } func yoloIgnoreNacks(context.Context, error) error { // YOLO: Drop messages that are nacked return nil } func (m *memoryBuffer) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { select { case msg := <-m.messages: return msg, yoloIgnoreNacks, nil case <-ctx.Done(): return nil, nil, ctx.Err() case <-m.endOfInputChan: // Input has ended, so return ErrEndOfBuffer if our buffer is empty. select { case msg := <-m.messages: return msg, yoloIgnoreNacks, nil default: return nil, nil, service.ErrEndOfBuffer } } } func (m *memoryBuffer) EndOfInput() { m.closeOnce.Do(func() { close(m.endOfInputChan) }) } func (m *memoryBuffer) Close(ctx context.Context) error { // Nothing to clean up return nil } // This example demonstrates how to create a buffer plugin. Buffers are an // advanced component type that most plugin authors aren't likely to require. func main() { configSpec := service.NewConfigSpec(). Summary("Creates a lame memory buffer that loses data on forced restarts or service crashes."). Field(service.NewIntField("max_batches").Default(100)) err := service.RegisterBatchBuffer("lame_memory", configSpec, func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchBuffer, error) { capacity, err := conf.FieldInt("max_batches") if err != nil { return nil, err } return newMemoryBuffer(capacity), nil }) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (CachePlugin) ¶
This example demonstrates how to create a cache plugin, where the implementation of the cache (type `LossyCache`) also contains fields that should be parsed within the Benthos config.
package main import ( "context" "time" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) // LossyCache is a terrible cache example and silently drops items when the // capacity is reached. It also doesn't respect TTLs. type LossyCache struct { capacity int mDropped *service.MetricCounter items map[string][]byte } func (l *LossyCache) Get(ctx context.Context, key string) ([]byte, error) { if b, ok := l.items[key]; ok { return b, nil } return nil, service.ErrKeyNotFound } func (l *LossyCache) Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error { if len(l.items) >= l.capacity { // Dropped, whoopsie! l.mDropped.Incr(1) return nil } l.items[key] = value return nil } func (l *LossyCache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error { if _, exists := l.items[key]; exists { return service.ErrKeyAlreadyExists } if len(l.items) >= l.capacity { // Dropped, whoopsie! l.mDropped.Incr(1) return nil } l.items[key] = value return nil } func (l *LossyCache) Delete(ctx context.Context, key string) error { delete(l.items, key) return nil } func (l *LossyCache) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a cache plugin, where the // implementation of the cache (type `LossyCache`) also contains fields that // should be parsed within the Benthos config. func main() { configSpec := service.NewConfigSpec(). Summary("Creates a terrible cache with a fixed capacity."). Field(service.NewIntField("capacity").Default(100)) err := service.RegisterCache("lossy", configSpec, func(conf *service.ParsedConfig, mgr *service.Resources) (service.Cache, error) { capacity, err := conf.FieldInt("capacity") if err != nil { return nil, err } return &LossyCache{ capacity: capacity, mDropped: mgr.Metrics().NewCounter("dropped_just_cus"), items: make(map[string][]byte, capacity), }, nil }) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (InputPlugin) ¶
This example demonstrates how to create an input plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.
package main import ( "context" "math/rand" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) type GibberishInput struct { length int } func (g *GibberishInput) Connect(ctx context.Context) error { return nil } func (g *GibberishInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { b := make([]byte, g.length) for k := range b { b[k] = byte((rand.Int() % 94) + 32) } return service.NewMessage(b), func(ctx context.Context, err error) error { // A nack (when err is non-nil) is handled automatically when we // construct using service.AutoRetryNacks, so we don't need to handle // nacks here. return nil }, nil } func (g *GibberishInput) Close(ctx context.Context) error { return nil } // This example demonstrates how to create an input plugin, which is configured // by providing a struct containing the fields to be parsed from within the // Benthos configuration. func main() { configSpec := service.NewConfigSpec(). Summary("Creates a load of gibberish, putting us all out of work."). Field(service.NewIntField("length").Default(100)) constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { length, err := conf.FieldInt("length") if err != nil { return nil, err } return service.AutoRetryNacks(&GibberishInput{length}), nil } err := service.RegisterInput("gibberish", configSpec, constructor) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (OutputBatchedPlugin) ¶
This example demonstrates how to create a batched output plugin, which allows us to specify a batching mechanism and implement an interface that writes a batch of messages in one call.
package main import ( "context" "encoding/json" "fmt" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) type batchOfJSONWriter struct{} func (b *batchOfJSONWriter) Connect(ctx context.Context) error { return nil } func (b *batchOfJSONWriter) WriteBatch(ctx context.Context, msgs service.MessageBatch) error { var messageObjs []interface{} for _, msg := range msgs { msgObj, err := msg.AsStructured() if err != nil { return err } messageObjs = append(messageObjs, msgObj) } outBytes, err := json.Marshal(map[string]interface{}{ "count": len(msgs), "objects": messageObjs, }) if err != nil { return err } fmt.Println(string(outBytes)) return nil } func (b *batchOfJSONWriter) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a batched output plugin, which allows // us to specify a batching mechanism and implement an interface that writes a // batch of messages in one call. func main() { spec := service.NewConfigSpec(). Field(service.NewBatchPolicyField("batching")) // Register our new output, which doesn't require a config schema. err := service.RegisterBatchOutput( "batched_json_stdout", spec, func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, policy service.BatchPolicy, maxInFlight int, err error) { if policy, err = conf.FieldBatchPolicy("batching"); err != nil { return } maxInFlight = 1 out = &batchOfJSONWriter{} return }) if err != nil { panic(err) } // Use the stream builder API to create a Benthos stream that uses our new // output type. builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err = builder.SetYAML(` input: generate: count: 5 interval: 1ms mapping: | root.id = count("batched output example messages") root.text = "some stuff" output: batched_json_stdout: batching: count: 5 `) if err != nil { panic(err) } // Build a stream with our configured components. stream, err := builder.Build() if err != nil { panic(err) } // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. if err = stream.Run(context.Background()); err != nil { panic(err) } }
Output: {"count":5,"objects":[{"id":1,"text":"some stuff"},{"id":2,"text":"some stuff"},{"id":3,"text":"some stuff"},{"id":4,"text":"some stuff"},{"id":5,"text":"some stuff"}]}
Example (OutputPlugin) ¶
This example demonstrates how to create an output plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.
package main import ( "context" "fmt" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) type BlueOutput struct{} func (b *BlueOutput) Connect(ctx context.Context) error { return nil } func (b *BlueOutput) Write(ctx context.Context, msg *service.Message) error { content, err := msg.AsBytes() if err != nil { return err } fmt.Printf("\033[01;34m%s\033[m\n", content) return nil } func (b *BlueOutput) Close(ctx context.Context) error { return nil } // This example demonstrates how to create an output plugin. This example is for // an implementation that does not require any configuration parameters, and // therefore doesn't defined any within the configuration specification. func main() { // Register our new output, which doesn't require a config schema. err := service.RegisterOutput( "blue_stdout", service.NewConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) { return &BlueOutput{}, 1, nil }) if err != nil { panic(err) } // Use the stream builder API to create a Benthos stream that uses our new // output type. builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err = builder.SetYAML(` input: generate: count: 1 interval: 1ms mapping: 'root = "hello world"' output: blue_stdout: {} `) if err != nil { panic(err) } // Build a stream with our configured components. stream, err := builder.Build() if err != nil { panic(err) } // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. if err = stream.Run(context.Background()); err != nil { panic(err) } }
Output: �[01;34mhello world�[m
Example (ProcessorPlugin) ¶
This example demonstrates how to create a processor plugin. This example is for an implementation that does not require any configuration parameters, and therefore doesn't defined any within the configuration specification.
package main import ( "bytes" "context" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) type ReverseProcessor struct { logger *service.Logger } func (r *ReverseProcessor) Process(ctx context.Context, m *service.Message) (service.MessageBatch, error) { bytesContent, err := m.AsBytes() if err != nil { return nil, err } newBytes := make([]byte, len(bytesContent)) for i, b := range bytesContent { newBytes[len(newBytes)-i-1] = b } if bytes.Equal(newBytes, bytesContent) { r.logger.Infof("Woah! This is like totally a palindrome: %s", bytesContent) } m.SetBytes(newBytes) return []*service.Message{m}, nil } func (r *ReverseProcessor) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a processor plugin. This example is // for an implementation that does not require any configuration parameters, and // therefore doesn't defined any within the configuration specification. func main() { // Register our new processor, which doesn't require a config schema. err := service.RegisterProcessor( "reverse", service.NewConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { return &ReverseProcessor{logger: mgr.Logger()}, nil }) if err != nil { panic(err) } // Build a Benthos stream that uses our new output type. builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err = builder.SetYAML(` input: generate: count: 1 interval: 1ms mapping: 'root = "hello world"' pipeline: processors: - reverse: {} output: stdout: {} `) if err != nil { panic(err) } // Build a stream with our configured components. stream, err := builder.Build() if err != nil { panic(err) } // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. if err = stream.Run(context.Background()); err != nil { panic(err) } }
Output: dlrow olleh
Example (RateLimitPlugin) ¶
This example demonstrates how to create a rate limit plugin, which is configured by providing a struct containing the fields to be parsed from within the Benthos configuration.
package main import ( "context" "fmt" "math/rand" "time" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) type RandomRateLimit struct { max time.Duration } func (r *RandomRateLimit) Access(context.Context) (time.Duration, error) { return time.Duration(rand.Int() % int(r.max)), nil } func (r *RandomRateLimit) Close(ctx context.Context) error { return nil } // This example demonstrates how to create a rate limit plugin, which is // configured by providing a struct containing the fields to be parsed from // within the Benthos configuration. func main() { configSpec := service.NewConfigSpec(). Summary("A rate limit that's pretty much just random."). Description("I guess this isn't really that useful, sorry."). Field(service.NewStringField("maximum_duration").Default("1s")) constructor := func(conf *service.ParsedConfig, mgr *service.Resources) (service.RateLimit, error) { maxDurStr, err := conf.FieldString("maximum_duration") if err != nil { return nil, err } maxDuration, err := time.ParseDuration(maxDurStr) if err != nil { return nil, fmt.Errorf("invalid max duration: %w", err) } return &RandomRateLimit{maxDuration}, nil } err := service.RegisterRateLimit("random", configSpec, constructor) if err != nil { panic(err) } // And then execute Benthos with: // service.RunCLI(context.Background()) }
Output:
Example (StreamBuilderConfig) ¶
This example demonstrates how to use a stream builder to parse and execute a full Benthos config.
package main import ( "context" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } builder := service.NewStreamBuilder() // Set the full Benthos configuration of the stream. err := builder.SetYAML(` input: generate: count: 1 interval: 1ms mapping: 'root = "hello world"' pipeline: processors: - bloblang: 'root = content().uppercase()' output: stdout: {} `) panicOnErr(err) // Build a stream with our configured components. stream, err := builder.Build() panicOnErr(err) // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. err = stream.Run(context.Background()) panicOnErr(err) }
Output: HELLO WORLD
Example (StreamBuilderConfigAddMethods) ¶
This example demonstrates how to use a stream builder to assemble a stream of Benthos components by adding snippets of configs for different component types, and then execute it. You can use the Add methods to append any number of components to the stream, following fan in and fan out patterns for inputs and outputs respectively.
package main import ( "context" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } builder := service.NewStreamBuilder() err := builder.AddInputYAML(` generate: count: 1 interval: 1ms mapping: 'root = "hello world"' `) panicOnErr(err) err = builder.AddProcessorYAML(`bloblang: 'root = content().uppercase()'`) panicOnErr(err) err = builder.AddOutputYAML(`stdout: {}`) panicOnErr(err) // Build a stream with our configured components. stream, err := builder.Build() panicOnErr(err) // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. err = stream.Run(context.Background()) panicOnErr(err) }
Output: HELLO WORLD
Example (StreamBuilderMultipleStreams) ¶
This example demonstrates using the stream builder API to create and run two independent streams.
package main import ( "context" "sync" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } // Build the first stream pipeline. Note that we configure each pipeline // with its HTTP server disabled as otherwise we would see a port collision // when they both attempt to bind to the default address `0.0.0.0:4195`. // // Alternatively, we could choose to configure each with their own address // with the field `http.address`, or we could call `SetHTTPMux` on the // builder in order to explicitly override the configured server. builderOne := service.NewStreamBuilder() err := builderOne.SetYAML(` http: enabled: false input: generate: count: 1 interval: 1ms mapping: 'root = "hello world one"' pipeline: processors: - bloblang: 'root = content().uppercase()' output: stdout: {} `) panicOnErr(err) streamOne, err := builderOne.Build() panicOnErr(err) builderTwo := service.NewStreamBuilder() err = builderTwo.SetYAML(` http: enabled: false input: generate: count: 1 interval: 1ms mapping: 'root = "hello world two"' pipeline: processors: - sleep: duration: 500ms - bloblang: 'root = content().capitalize()' output: stdout: {} `) panicOnErr(err) streamTwo, err := builderTwo.Build() panicOnErr(err) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() panicOnErr(streamOne.Run(context.Background())) }() go func() { defer wg.Done() panicOnErr(streamTwo.Run(context.Background())) }() wg.Wait() }
Output: HELLO WORLD ONE Hello World Two
Example (StreamBuilderPush) ¶
This example demonstrates how to use a stream builder to assemble a processing pipeline that you can push messages into and extract via closures.
package main import ( "bytes" "context" "fmt" "time" "github.com/Jeffail/benthos/v3/public/service" // Import all standard Benthos components _ "github.com/Jeffail/benthos/v3/public/components/all" ) func main() { panicOnErr := func(err error) { if err != nil { panic(err) } } builder := service.NewStreamBuilder() err := builder.SetLoggerYAML(`level: NONE`) panicOnErr(err) err = builder.AddProcessorYAML(`bloblang: 'root = content().uppercase()'`) panicOnErr(err) err = builder.AddProcessorYAML(`bloblang: 'root = "check this out: " + content()'`) panicOnErr(err) // Obtain a closure func that allows us to push data into the stream, this // is treated like any other input, which also means it's possible to use // this along with regular configured inputs. sendFn, err := builder.AddProducerFunc() panicOnErr(err) // Define a closure func that receives messages as an output of the stream. // It's also possible to use this along with regular configured outputs. var outputBuf bytes.Buffer err = builder.AddConsumerFunc(func(c context.Context, m *service.Message) error { msgBytes, err := m.AsBytes() if err != nil { return err } _, err = fmt.Fprintf(&outputBuf, "received: %s\n", msgBytes) return err }) panicOnErr(err) stream, err := builder.Build() panicOnErr(err) go func() { perr := sendFn(context.Background(), service.NewMessage([]byte("hello world"))) panicOnErr(perr) perr = sendFn(context.Background(), service.NewMessage([]byte("I'm pushing data into the stream"))) panicOnErr(perr) perr = stream.StopWithin(time.Second) panicOnErr(perr) }() // And run it, blocking until it gracefully terminates once the generate // input has generated a message and it has flushed through the stream. err = stream.Run(context.Background()) panicOnErr(err) fmt.Println(outputBuf.String()) }
Output: received: check this out: HELLO WORLD received: check this out: I'M PUSHING DATA INTO THE STREAM
Index ¶
- Variables
- func RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
- func RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
- func RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
- func RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
- func RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
- func RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
- func RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
- func RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
- func RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
- func RunCLI(ctx context.Context)
- type AckFunc
- type BatchBuffer
- type BatchBufferConstructor
- type BatchInput
- type BatchInputConstructor
- type BatchOutput
- type BatchOutputConstructor
- type BatchPolicy
- type BatchProcessor
- type BatchProcessorConstructor
- type Cache
- type CacheConstructor
- type Closer
- type ConfigField
- func NewBatchPolicyField(name string) *ConfigField
- func NewBloblangField(name string) *ConfigField
- func NewBoolField(name string) *ConfigField
- func NewFloatField(name string) *ConfigField
- func NewIntField(name string) *ConfigField
- func NewIntListField(name string) *ConfigField
- func NewIntMapField(name string) *ConfigField
- func NewInterpolatedStringField(name string) *ConfigField
- func NewObjectField(name string, fields ...*ConfigField) *ConfigField
- func NewStringEnumField(name string, options ...string) *ConfigField
- func NewStringField(name string) *ConfigField
- func NewStringListField(name string) *ConfigField
- func NewStringMapField(name string) *ConfigField
- func NewTLSField(name string) *ConfigField
- type ConfigSpec
- func (c *ConfigSpec) Beta() *ConfigSpec
- func (c *ConfigSpec) Categories(categories ...string) *ConfigSpec
- func (c *ConfigSpec) Description(description string) *ConfigSpec
- func (c *ConfigSpec) EncodeJSON(v []byte) error
- func (c *ConfigSpec) Example(title, summary, config string) *ConfigSpec
- func (c *ConfigSpec) Field(f *ConfigField) *ConfigSpec
- func (c *ConfigSpec) ParseYAML(yamlStr string, env *Environment) (*ParsedConfig, error)
- func (c *ConfigSpec) Stable() *ConfigSpec
- func (c *ConfigSpec) Summary(summary string) *ConfigSpec
- func (c *ConfigSpec) Version(v string) *ConfigSpec
- type ConfigStructConstructordeprecated
- type ConfigView
- type Environment
- func (e *Environment) Clone() *Environment
- func (e *Environment) NewStreamBuilder() *StreamBuilder
- func (e *Environment) RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
- func (e *Environment) RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
- func (e *Environment) RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
- func (e *Environment) RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
- func (e *Environment) RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
- func (e *Environment) RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
- func (e *Environment) RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
- func (e *Environment) RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
- func (e *Environment) RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
- func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment)
- func (e *Environment) WalkBuffers(fn func(name string, config *ConfigView))
- func (e *Environment) WalkCaches(fn func(name string, config *ConfigView))
- func (e *Environment) WalkInputs(fn func(name string, config *ConfigView))
- func (e *Environment) WalkOutputs(fn func(name string, config *ConfigView))
- func (e *Environment) WalkProcessors(fn func(name string, config *ConfigView))
- func (e *Environment) WalkRateLimits(fn func(name string, config *ConfigView))
- type HTTPMultiplexer
- type Input
- type InputConstructor
- type InterpolatedString
- type Lint
- type LintError
- type Logger
- func (l *Logger) Debug(message string)
- func (l *Logger) Debugf(template string, args ...interface{})
- func (l *Logger) Error(message string)
- func (l *Logger) Errorf(template string, args ...interface{})
- func (l *Logger) Info(message string)
- func (l *Logger) Infof(template string, args ...interface{})
- func (l *Logger) Warn(message string)
- func (l *Logger) Warnf(template string, args ...interface{})
- func (l *Logger) With(keyValuePairs ...interface{}) *Logger
- type Message
- func (m *Message) AsBytes() ([]byte, error)
- func (m *Message) AsStructured() (interface{}, error)
- func (m *Message) AsStructuredMut() (interface{}, error)
- func (m *Message) BloblangQuery(blobl *bloblang.Executor) (*Message, error)
- func (m *Message) Context() context.Context
- func (m *Message) Copy() *Message
- func (m *Message) GetError() error
- func (m *Message) MetaDelete(key string)
- func (m *Message) MetaGet(key string) (string, bool)
- func (m *Message) MetaSet(key, value string)
- func (m *Message) MetaWalk(fn func(string, string) error) error
- func (m *Message) SetBytes(b []byte)
- func (m *Message) SetError(err error)
- func (m *Message) SetStructured(i interface{})
- func (m *Message) WithContext(ctx context.Context) *Message
- type MessageBatch
- type MessageBatchHandlerFunc
- type MessageHandlerFunc
- type MetricCounter
- type MetricGauge
- type MetricTimer
- type Metrics
- type Output
- type OutputConstructor
- type ParsedConfig
- func (p *ParsedConfig) AsStruct() interface{}deprecated
- func (p *ParsedConfig) FieldBatchPolicy(path ...string) (conf BatchPolicy, err error)
- func (p *ParsedConfig) FieldBloblang(path ...string) (*bloblang.Executor, error)
- func (p *ParsedConfig) FieldBool(path ...string) (bool, error)
- func (p *ParsedConfig) FieldFloat(path ...string) (float64, error)
- func (p *ParsedConfig) FieldInt(path ...string) (int, error)
- func (p *ParsedConfig) FieldIntList(path ...string) ([]int, error)
- func (p *ParsedConfig) FieldIntMap(path ...string) (map[string]int, error)
- func (p *ParsedConfig) FieldInterpolatedString(path ...string) (*InterpolatedString, error)
- func (p *ParsedConfig) FieldString(path ...string) (string, error)
- func (p *ParsedConfig) FieldStringList(path ...string) ([]string, error)
- func (p *ParsedConfig) FieldStringMap(path ...string) (map[string]string, error)
- func (p *ParsedConfig) FieldTLS(path ...string) (*tls.Config, error)
- func (p *ParsedConfig) Namespace(path ...string) *ParsedConfig
- type PrintLogger
- type Processor
- type ProcessorConstructor
- type RateLimit
- type RateLimitConstructor
- type Resources
- type Stream
- type StreamBuilder
- func (s *StreamBuilder) AddBatchConsumerFunc(fn MessageBatchHandlerFunc) error
- func (s *StreamBuilder) AddBatchProducerFunc() (MessageBatchHandlerFunc, error)
- func (s *StreamBuilder) AddCacheYAML(conf string) error
- func (s *StreamBuilder) AddConsumerFunc(fn MessageHandlerFunc) error
- func (s *StreamBuilder) AddInputYAML(conf string) error
- func (s *StreamBuilder) AddOutputYAML(conf string) error
- func (s *StreamBuilder) AddProcessorYAML(conf string) error
- func (s *StreamBuilder) AddProducerFunc() (MessageHandlerFunc, error)
- func (s *StreamBuilder) AddRateLimitYAML(conf string) error
- func (s *StreamBuilder) AddResourcesYAML(conf string) error
- func (s *StreamBuilder) AsYAML() (string, error)
- func (s *StreamBuilder) Build() (*Stream, error)
- func (s *StreamBuilder) SetBufferYAML(conf string) error
- func (s *StreamBuilder) SetFields(pathValues ...interface{}) error
- func (s *StreamBuilder) SetHTTPMux(m HTTPMultiplexer)
- func (s *StreamBuilder) SetLoggerYAML(conf string) error
- func (s *StreamBuilder) SetMetricsYAML(conf string) error
- func (s *StreamBuilder) SetPrintLogger(l PrintLogger)
- func (s *StreamBuilder) SetThreads(n int)
- func (s *StreamBuilder) SetYAML(conf string) error
Examples ¶
- Package (BufferPlugin)
- Package (CachePlugin)
- Package (InputPlugin)
- Package (OutputBatchedPlugin)
- Package (OutputPlugin)
- Package (ProcessorPlugin)
- Package (RateLimitPlugin)
- Package (StreamBuilderConfig)
- Package (StreamBuilderConfigAddMethods)
- Package (StreamBuilderMultipleStreams)
- Package (StreamBuilderPush)
Constants ¶
This section is empty.
Variables ¶
var ( ErrKeyAlreadyExists = errors.New("key already exists") ErrKeyNotFound = errors.New("key does not exist") )
Errors returned by cache types.
var ( // ErrNotConnected is returned by inputs and outputs when their Read or // Write methods are called and the connection that they maintain is lost. // This error prompts the upstream component to call Connect until the // connection is re-established. ErrNotConnected = errors.New("not connected") // ErrEndOfInput is returned by inputs that have exhausted their source of // data to the point where subsequent Read calls will be ineffective. This // error prompts the upstream component to gracefully terminate the // pipeline. ErrEndOfInput = errors.New("end of input") // ErrEndOfBuffer is returned by a buffer Read/ReadBatch method when the // contents of the buffer has been emptied and the source of the data is // ended (as indicated by EndOfInput). This error prompts the upstream // component to gracefully terminate the pipeline. ErrEndOfBuffer = errors.New("end of buffer") )
Functions ¶
func RegisterBatchBuffer ¶ added in v3.53.0
func RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.
Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.
func RegisterBatchInput ¶ added in v3.52.0
func RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.
func RegisterBatchOutput ¶
func RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).
If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.
func RegisterBatchProcessor ¶
func RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
func RegisterCache ¶
func RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.
func RegisterInput ¶
func RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.
func RegisterOutput ¶
func RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
func RegisterProcessor ¶
func RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
For simple transformations consider implementing a Bloblang plugin method instead.
func RegisterRateLimit ¶
func RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.
func RunCLI ¶
RunCLI executes Benthos as a CLI, allowing users to specify a configuration file path(s) and execute subcommands for linting configs, testing configs, etc. This is how a standard distribution of Benthos operates.
This call blocks until either:
1. The service shuts down gracefully due to the inputs closing 2. A termination signal is received 3. The provided context is cancelled
This function must only be called once during the entire lifecycle of your program, as it interacts with singleton state. In order to manage multiple Benthos stream lifecycles in a program use the StreamBuilder API instead.
Types ¶
type AckFunc ¶
AckFunc is a common function returned by inputs that must be called once for each message consumed. This function ensures that the source of the message receives either an acknowledgement (err is nil) or an error that can either be propagated upstream as a nack, or trigger a reattempt at delivering the same message.
If your input implementation doesn't have a specific mechanism for dealing with a nack then you can wrap your input implementation with AutoRetryNacks to get automatic retries.
type BatchBuffer ¶ added in v3.53.0
type BatchBuffer interface { // Write a batch of messages to the buffer, the batch is accompanied with an // acknowledge function. A non-nil error should be returned if it is not // possible to store the given message batch in the buffer. // // If a nil error is returned the buffer assumes responsibility for calling // the acknowledge function at least once during the lifetime of the // message. // // This could be at the point where the message is written to the buffer, // which weakens delivery guarantees but can be useful for decoupling the // input from downstream components. Alternatively, this could be when the // associated batch has been read from the buffer and acknowledged // downstream, which preserves delivery guarantees. WriteBatch(context.Context, MessageBatch, AckFunc) error // Read a batch of messages from the buffer. This call should block until // either a batch is ready to consume, the provided context is cancelled or // EndOfInput has been called which indicates that the buffer is no longer // being populated with new messages. // // The returned acknowledge function will be called when a consumed message // batch has been processed and sent downstream. It is up to the buffer // implementation whether the ack function is used, it might be used in // order to "commit" the removal of a message from the buffer in cases where // the buffer is a persisted storage solution, or in cases where the output // of the buffer is temporal (a windowing algorithm, etc) it might be // considered correct to simply drop message batches that are not acked. // // When the buffer is closed (EndOfInput has been called and no more // messages are available) this method should return an ErrEndOfBuffer in // order to indicate the end of the buffered stream. // // It is valid to return a batch of only one message. ReadBatch(context.Context) (MessageBatch, AckFunc, error) // EndOfInput indicates to the buffer that the input has ended and that once // the buffer is depleted it should return ErrEndOfBuffer from ReadBatch in // order to gracefully shut down the pipeline. // // EndOfInput should be idempotent as it may be called more than once. EndOfInput() Closer }
BatchBuffer is an interface implemented by Buffers able to read and write message batches. Buffers are a component type that are placed after inputs, and decouples the acknowledgement system of the inputs from the rest of the pipeline.
Buffers are useful when implementing buffers intended to relieve back pressure from upstream components, or when implementing message aggregators where the concept of discrete messages running through a pipeline no longer applies (such as with windowing algorithms).
Buffers are advanced component types that weaken delivery guarantees of a Benthos pipeline. Therefore, if you aren't absolutely sure that a component you wish to build should be a buffer type then it likely shouldn't be.
type BatchBufferConstructor ¶ added in v3.53.0
type BatchBufferConstructor func(conf *ParsedConfig, mgr *Resources) (BatchBuffer, error)
BatchBufferConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a buffer based on the config, or an error.
Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.
type BatchInput ¶ added in v3.52.0
type BatchInput interface { // Establish a connection to the upstream service. Connect will always be // called first when a reader is instantiated, and will be continuously // called with back off until a nil error is returned. // // Once Connect returns a nil error the Read method will be called until // either ErrNotConnected is returned, or the reader is closed. Connect(context.Context) error // Read a message batch from a source, along with a function to be called // once the entire batch can be either acked (successfully sent or // intentionally filtered) or nacked (failed to be processed or dispatched // to the output). // // The AckFunc will be called for every message batch at least once, but // there are no guarantees as to when this will occur. If your input // implementation doesn't have a specific mechanism for dealing with a nack // then you can wrap your input implementation with AutoRetryNacksBatched to // get automatic retries. // // If this method returns ErrNotConnected then ReadBatch will not be called // again until Connect has returned a nil error. If ErrEndOfInput is // returned then Read will no longer be called and the pipeline will // gracefully terminate. ReadBatch(context.Context) (MessageBatch, AckFunc, error) Closer }
BatchInput is an interface implemented by Benthos inputs that produce messages in batches, where there is a desire to process and send the batch as a logical group rather than as individual messages.
Calls to ReadBatch should block until either a message batch is ready to process, the connection is lost, or the provided context is cancelled.
func AutoRetryNacksBatched ¶ added in v3.52.0
func AutoRetryNacksBatched(i BatchInput) BatchInput
AutoRetryNacksBatched wraps a batched input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.
When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.
type BatchInputConstructor ¶ added in v3.52.0
type BatchInputConstructor func(conf *ParsedConfig, mgr *Resources) (BatchInput, error)
BatchInputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a batched reader based on the config, or an error.
type BatchOutput ¶
type BatchOutput interface { // Establish a connection to the downstream service. Connect will always be // called first when a writer is instantiated, and will be continuously // called with back off until a nil error is returned. // // Once Connect returns a nil error the write method will be called until // either ErrNotConnected is returned, or the writer is closed. Connect(context.Context) error // Write a batch of messages to a sink, or return an error if delivery is // not possible. // // If this method returns ErrNotConnected then write will not be called // again until Connect has returned a nil error. WriteBatch(context.Context, MessageBatch) error Closer }
BatchOutput is an interface implemented by Benthos outputs that require Benthos to batch messages before dispatch in order to improve throughput. Each call to WriteBatch should block until either all messages in the batch have been successfully or unsuccessfully sent, or the context is cancelled.
Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel batched write calls the output supports.
type BatchOutputConstructor ¶
type BatchOutputConstructor func(conf *ParsedConfig, mgr *Resources) (out BatchOutput, batchPolicy BatchPolicy, maxInFlight int, err error)
BatchOutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config, a batching policy, and a maximum number of in-flight message batches to allow, or an error.
type BatchPolicy ¶
type BatchPolicy struct { ByteSize int Count int Check string Period string // contains filtered or unexported fields }
BatchPolicy describes the mechanisms by which batching should be performed of messages destined for a Batch output. This is returned by constructors of batch outputs.
type BatchProcessor ¶
type BatchProcessor interface { // Process a batch of messages into one or more resulting batches, or return // an error if the entire batch could not be processed. If zero messages are // returned and the error is nil then all messages are filtered. // // The provided MessageBatch should NOT be modified, in order to return a // mutated batch a copy of the slice should be created instead. // // When an error is returned all of the input messages will continue down // the pipeline but will be marked with the error with *message.SetError, // and metrics and logs will be emitted. // // In order to add errors to individual messages of the batch for downstream // handling use *message.SetError(err) and return it in the resulting batch // with a nil error. // // The Message types returned MUST be derived from the provided messages, // and CANNOT be custom implementations of Message. In order to copy the // provided messages use CopyMessage. ProcessBatch(context.Context, MessageBatch) ([]MessageBatch, error) Closer }
BatchProcessor is a Benthos processor implementation that works against batches of messages, which allows windowed processing.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.
type BatchProcessorConstructor ¶
type BatchProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (BatchProcessor, error)
BatchProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.
type Cache ¶
type Cache interface { // Get a cache item. Get(ctx context.Context, key string) ([]byte, error) // Set a cache item, specifying an optional TTL. It is okay for caches to // ignore the ttl parameter if it isn't possible to implement. Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error // Add is the same operation as Set except that it returns an error if the // key already exists. It is okay for caches to return nil on duplicates if // it isn't possible to implement. Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error // Delete attempts to remove a key. If the key does not exist then it is // considered correct to return an error, however, for cache implementations // where it is difficult to determine this then it is acceptable to return // nil. Delete(ctx context.Context, key string) error Closer }
Cache is an interface implemented by Benthos caches.
type CacheConstructor ¶
type CacheConstructor func(conf *ParsedConfig, mgr *Resources) (Cache, error)
CacheConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a cache based on the config, or an error.
type Closer ¶
type Closer interface { // Close the component, blocks until either the underlying resources are // cleaned up or the context is cancelled. Returns an error if the context // is cancelled. Close(ctx context.Context) error }
Closer is implemented by components that support stopping and cleaning up their underlying resources.
type ConfigField ¶
type ConfigField struct {
// contains filtered or unexported fields
}
ConfigField describes a field within a component configuration, to be added to a ConfigSpec.
func NewBatchPolicyField ¶
func NewBatchPolicyField(name string) *ConfigField
NewBatchPolicyField defines a new object type config field that describes a batching policy for batched outputs. It is then possible to extract a BatchPolicy from the resulting parsed config with the method FieldBatchPolicy.
func NewBloblangField ¶
func NewBloblangField(name string) *ConfigField
NewBloblangField defines a new config field that describes a Bloblang mapping string. It is then possible to extract a *bloblang.Executor from the resulting parsed config with the method FieldBloblang.
func NewBoolField ¶
func NewBoolField(name string) *ConfigField
NewBoolField describes a new bool type config field.
func NewFloatField ¶
func NewFloatField(name string) *ConfigField
NewFloatField describes a new float type config field.
func NewIntField ¶
func NewIntField(name string) *ConfigField
NewIntField describes a new int type config field.
func NewIntListField ¶ added in v3.57.0
func NewIntListField(name string) *ConfigField
NewIntListField describes a new config field consisting of a list of integers.
func NewIntMapField ¶ added in v3.57.0
func NewIntMapField(name string) *ConfigField
NewIntMapField describes a new config field consisting of an object of arbitrary keys with integer values.
func NewInterpolatedStringField ¶
func NewInterpolatedStringField(name string) *ConfigField
NewInterpolatedStringField defines a new config field that describes a dynamic string that supports Bloblang interpolation functions. It is then possible to extract an *InterpolatedString from the resulting parsed config with the method FieldInterpolatedString.
func NewObjectField ¶
func NewObjectField(name string, fields ...*ConfigField) *ConfigField
NewObjectField describes a new object type config field, consisting of one or more child fields.
func NewStringEnumField ¶ added in v3.55.0
func NewStringEnumField(name string, options ...string) *ConfigField
NewStringEnumField describes a new string type config field that can have one of a discrete list of values.
func NewStringField ¶
func NewStringField(name string) *ConfigField
NewStringField describes a new string type config field.
func NewStringListField ¶
func NewStringListField(name string) *ConfigField
NewStringListField describes a new config field consisting of a list of strings.
func NewStringMapField ¶ added in v3.57.0
func NewStringMapField(name string) *ConfigField
NewStringMapField describes a new config field consisting of an object of arbitrary keys with string values.
func NewTLSField ¶
func NewTLSField(name string) *ConfigField
NewTLSField defines a new object type config field that describes TLS settings for networked components. It is then possible to extract a *tls.Config from the resulting parsed config with the method FieldTLS.
func (*ConfigField) Advanced ¶
func (c *ConfigField) Advanced() *ConfigField
Advanced marks a config field as being advanced, and therefore it will not appear in simplified documentation examples.
func (*ConfigField) Default ¶
func (c *ConfigField) Default(v interface{}) *ConfigField
Default specifies a default value that this field will assume if it is omitted from a provided config. Fields that do not have a default value are considered mandatory, and so parsing a config will fail in their absence.
func (*ConfigField) Description ¶
func (c *ConfigField) Description(d string) *ConfigField
Description adds a description to the field which will be shown when printing documentation for the component config spec.
func (*ConfigField) Example ¶
func (c *ConfigField) Example(e interface{}) *ConfigField
Example adds an example value to the field which will be shown when printing documentation for the component config spec.
type ConfigSpec ¶
type ConfigSpec struct {
// contains filtered or unexported fields
}
ConfigSpec describes the configuration specification for a plugin component. This will be used for validating and linting configuration files and providing a parsed configuration struct to the plugin constructor.
func NewConfigSpec ¶
func NewConfigSpec() *ConfigSpec
NewConfigSpec creates a new empty component configuration spec. If the plugin does not require configuration fields the result of this call is enough.
func NewStructConfigSpec
deprecated
func NewStructConfigSpec(ctor ConfigStructConstructor) (*ConfigSpec, error)
NewStructConfigSpec creates a new component configuration spec around a constructor func. The provided constructor func will be used during parsing in order to validate and return fields for the plugin from a configuration file.
Deprecated: This config mechanism exists only as an interim solution for plugin authors migrating from the previous APIs.
func (*ConfigSpec) Beta ¶
func (c *ConfigSpec) Beta() *ConfigSpec
Beta sets a documentation label on the component indicating that its configuration spec is ready for beta testing, meaning backwards incompatible changes will not be made unless a fundamental problem is found. Plugins are considered experimental by default.
func (*ConfigSpec) Categories ¶
func (c *ConfigSpec) Categories(categories ...string) *ConfigSpec
Categories adds one or more string tags to the component, these are used for arbitrarily grouping components in documentation.
func (*ConfigSpec) Description ¶
func (c *ConfigSpec) Description(description string) *ConfigSpec
Description adds a description to the plugin configuration spec that describes in more detail the behaviour of the component and how it should be used.
func (*ConfigSpec) EncodeJSON ¶
func (c *ConfigSpec) EncodeJSON(v []byte) error
EncodeJSON attempts to parse a JSON object as a byte slice and uses it to populate the configuration spec. The schema of this method is undocumented and is not intended for general use.
EXPERIMENTAL: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.
func (*ConfigSpec) Example ¶
func (c *ConfigSpec) Example(title, summary, config string) *ConfigSpec
Example adds an example to the plugin configuration spec that demonstrates how the component can be used. An example has a title, summary, and a YAML configuration showing a real use case.
func (*ConfigSpec) Field ¶
func (c *ConfigSpec) Field(f *ConfigField) *ConfigSpec
Field sets the specification of a field within the config spec, used for linting and generating documentation for the component.
When creating a spec with a struct constructor the fields from that struct will already be inferred. However, setting a field explicitly is sometimes useful for enriching the field documentation with more information.
func (*ConfigSpec) ParseYAML ¶ added in v3.55.0
func (c *ConfigSpec) ParseYAML(yamlStr string, env *Environment) (*ParsedConfig, error)
ParseYAML attempts to parse a YAML document as the defined configuration spec and returns a parsed config view. The provided environment determines which child components and Bloblang functions can be created by the fields of the spec, you can leave the environment nil to use the global environment.
This method is intended for testing purposes and is not required for normal use of plugin components, as parsing is managed by other components.
func (*ConfigSpec) Stable ¶
func (c *ConfigSpec) Stable() *ConfigSpec
Stable sets a documentation label on the component indicating that its configuration spec is stable. Plugins are considered experimental by default.
func (*ConfigSpec) Summary ¶
func (c *ConfigSpec) Summary(summary string) *ConfigSpec
Summary adds a short summary to the plugin configuration spec that describes the general purpose of the component.
func (*ConfigSpec) Version ¶
func (c *ConfigSpec) Version(v string) *ConfigSpec
Version specifies that this component was introduced in a given version.
type ConfigStructConstructor
deprecated
type ConfigStructConstructor func() interface{}
ConfigStructConstructor is a function signature that must return a pointer to a struct to be used for parsing configuration fields of a component plugin, ideally instanciated with default field values.
The function will be called each time a parsed configuration file contains the plugin type, and the returned struct will be unmarshalled as YAML using gopkg.in/yaml.v3.
The returned value must be a pointer type in order to be properly unmarshalled during config parsing.
Deprecated: This config mechanism exists only as an interim solution for plugin authors migrating from the previous APIs.
type ConfigView ¶
type ConfigView struct {
// contains filtered or unexported fields
}
ConfigView is a struct returned by a Benthos service environment when walking the list of registered components and provides access to information about the component.
func (*ConfigView) Description ¶
func (c *ConfigView) Description() string
Description returns a documentation description of the component, often formatted as markdown.
func (*ConfigView) FormatJSON ¶
func (c *ConfigView) FormatJSON() ([]byte, error)
FormatJSON returns a byte slice of the component configuration formatted as a JSON object. The schema of this method is undocumented and is not intended for general use.
EXPERIMENTAL: This method is not intended for general use and could have its signature and/or behaviour changed outside of major version bumps.
func (*ConfigView) IsDeprecated ¶ added in v3.53.0
func (c *ConfigView) IsDeprecated() bool
IsDeprecated returns true if the component is marked as deprecated.
func (*ConfigView) Summary ¶
func (c *ConfigView) Summary() string
Summary returns a documentation summary of the component, often formatted as markdown.
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
Environment is a collection of Benthos component plugins that can be used in order to build and run streaming pipelines with access to different sets of plugins. This can be useful for sandboxing, testing, etc, but most plugin authors do not need to create an Environment and can simply use the global environment.
func NewEnvironment ¶
func NewEnvironment() *Environment
NewEnvironment creates a new environment that inherits all globally defined plugins, but can have plugins defined on it that are isolated.
func (*Environment) Clone ¶
func (e *Environment) Clone() *Environment
Clone an environment, creating a new environment containing the same plugins that can be modified independently of the source.
func (*Environment) NewStreamBuilder ¶
func (e *Environment) NewStreamBuilder() *StreamBuilder
NewStreamBuilder creates a new StreamBuilder upon the defined environment, only components known to this environment will be available to the stream builder.
func (*Environment) RegisterBatchBuffer ¶ added in v3.53.0
func (e *Environment) RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error
RegisterBatchBuffer attempts to register a new buffer plugin by providing a description of the configuration for the buffer and a constructor for the buffer processor. The constructor will be called for each instantiation of the component within a config.
Consumed message batches must be created by upstream components (inputs, etc) otherwise this buffer will simply receive batches containing single messages.
func (*Environment) RegisterBatchInput ¶ added in v3.52.0
func (e *Environment) RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error
RegisterBatchInput attempts to register a new batched input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacksBatched to get automatic retries.
func (*Environment) RegisterBatchOutput ¶
func (e *Environment) RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error
RegisterBatchOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
The constructor of a batch output is able to return a batch policy to be applied before calls to write are made, creating batches from the stream of messages. However, batches can also be created by upstream components (inputs, buffers, etc).
If a batch has been formed upstream it is possible that its size may exceed the policy specified in your constructor.
func (*Environment) RegisterBatchProcessor ¶
func (e *Environment) RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error
RegisterBatchProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise this processor will simply receive batches containing single messages.
func (*Environment) RegisterCache ¶
func (e *Environment) RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error
RegisterCache attempts to register a new cache plugin by providing a description of the configuration for the plugin as well as a constructor for the cache itself. The constructor will be called for each instantiation of the component within a config.
func (*Environment) RegisterInput ¶
func (e *Environment) RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error
RegisterInput attempts to register a new input plugin by providing a description of the configuration for the plugin as well as a constructor for the input itself. The constructor will be called for each instantiation of the component within a config.
If your input implementation doesn't have a specific mechanism for dealing with a nack (when the AckFunc provides a non-nil error) then you can instead wrap your input implementation with AutoRetryNacks to get automatic retries.
func (*Environment) RegisterOutput ¶
func (e *Environment) RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error
RegisterOutput attempts to register a new output plugin by providing a description of the configuration for the plugin as well as a constructor for the output itself. The constructor will be called for each instantiation of the component within a config.
func (*Environment) RegisterProcessor ¶
func (e *Environment) RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error
RegisterProcessor attempts to register a new processor plugin by providing a description of the configuration for the processor and a constructor for the processor itself. The constructor will be called for each instantiation of the component within a config.
For simple transformations consider implementing a Bloblang plugin method instead.
func (*Environment) RegisterRateLimit ¶
func (e *Environment) RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error
RegisterRateLimit attempts to register a new rate limit plugin by providing a description of the configuration for the plugin as well as a constructor for the rate limit itself. The constructor will be called for each instantiation of the component within a config.
func (*Environment) UseBloblangEnvironment ¶
func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment)
UseBloblangEnvironment configures the service environment to restrict components constructed with it to a specific Bloblang environment.
func (*Environment) WalkBuffers ¶
func (e *Environment) WalkBuffers(fn func(name string, config *ConfigView))
WalkBuffers executes a provided function argument for every buffer component that has been registered to the environment.
func (*Environment) WalkCaches ¶
func (e *Environment) WalkCaches(fn func(name string, config *ConfigView))
WalkCaches executes a provided function argument for every cache component that has been registered to the environment.
func (*Environment) WalkInputs ¶
func (e *Environment) WalkInputs(fn func(name string, config *ConfigView))
WalkInputs executes a provided function argument for every input component that has been registered to the environment.
func (*Environment) WalkOutputs ¶
func (e *Environment) WalkOutputs(fn func(name string, config *ConfigView))
WalkOutputs executes a provided function argument for every output component that has been registered to the environment.
func (*Environment) WalkProcessors ¶
func (e *Environment) WalkProcessors(fn func(name string, config *ConfigView))
WalkProcessors executes a provided function argument for every processor component that has been registered to the environment.
func (*Environment) WalkRateLimits ¶
func (e *Environment) WalkRateLimits(fn func(name string, config *ConfigView))
WalkRateLimits executes a provided function argument for every rate limit component that has been registered to the environment.
type HTTPMultiplexer ¶
type HTTPMultiplexer interface {
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}
HTTPMultiplexer is an interface supported by most HTTP multiplexers.
type Input ¶
type Input interface { // Establish a connection to the upstream service. Connect will always be // called first when a reader is instantiated, and will be continuously // called with back off until a nil error is returned. // // Once Connect returns a nil error the Read method will be called until // either ErrNotConnected is returned, or the reader is closed. Connect(context.Context) error // Read a single message from a source, along with a function to be called // once the message can be either acked (successfully sent or intentionally // filtered) or nacked (failed to be processed or dispatched to the output). // // The AckFunc will be called for every message at least once, but there are // no guarantees as to when this will occur. If your input implementation // doesn't have a specific mechanism for dealing with a nack then you can // wrap your input implementation with AutoRetryNacks to get automatic // retries. // // If this method returns ErrNotConnected then Read will not be called again // until Connect has returned a nil error. If ErrEndOfInput is returned then // Read will no longer be called and the pipeline will gracefully terminate. Read(context.Context) (*Message, AckFunc, error) Closer }
Input is an interface implemented by Benthos inputs. Calls to Read should block until either a message has been received, the connection is lost, or the provided context is cancelled.
func AutoRetryNacks ¶
AutoRetryNacks wraps an input implementation with a component that automatically reattempts messages that fail downstream. This is useful for inputs that do not support nacks, and therefore don't have an answer for when an ack func is called with an error.
When messages fail to be delivered they will be reattempted with back off until success or the stream is stopped.
type InputConstructor ¶
type InputConstructor func(conf *ParsedConfig, mgr *Resources) (Input, error)
InputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a reader based on the config, or an error.
type InterpolatedString ¶
type InterpolatedString struct {
// contains filtered or unexported fields
}
InterpolatedString resolves a string containing dynamic interpolation functions for a given message.
func NewInterpolatedString ¶
func NewInterpolatedString(expr string) (*InterpolatedString, error)
NewInterpolatedString parses an interpolated string expression.
func (*InterpolatedString) Bytes ¶
func (i *InterpolatedString) Bytes(m *Message) []byte
Bytes resolves the interpolated field for a given message as a byte slice.
func (*InterpolatedString) String ¶
func (i *InterpolatedString) String(m *Message) string
String resolves the interpolated field for a given message as a string.
type LintError ¶
type LintError []Lint
LintError is an error type that represents one or more configuration file linting errors that were encountered.
type Logger ¶
type Logger struct {
// contains filtered or unexported fields
}
Logger allows plugin authors to write custom logs from components that are exported the same way as native Benthos logs. It's safe to pass around a nil pointer for testing components.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message represents a single discrete message passing through a Benthos pipeline. It is safe to mutate the message via Set methods, but the underlying byte data should not be edited directly.
func NewMessage ¶
NewMessage creates a new message with an initial raw bytes content. The initial content can be nil, which is recommended if you intend to set it with structured contents.
func (*Message) AsBytes ¶
AsBytes returns the underlying byte array contents of a message or, if the contents are a structured type, attempts to marshal the contents as a JSON document and returns either the byte array result or an error.
It is NOT safe to mutate the contents of the returned slice.
func (*Message) AsStructured ¶
AsStructured returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.
It is NOT safe to mutate the contents of the returned value if it is a reference type (slice or map). In order to safely mutate the structured contents of a message use AsStructuredMut.
func (*Message) AsStructuredMut ¶
AsStructuredMut returns the underlying structured contents of a message or, if the contents are a byte array, attempts to parse the bytes contents as a JSON document and returns either the structured result or an error.
It is safe to mutate the contents of the returned value even if it is a reference type (slice or map), as the structured contents will be lazily deep cloned if it is still owned by an upstream component.
func (*Message) BloblangQuery ¶
BloblangQuery executes a parsed Bloblang mapping on a message and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.
func (*Message) Context ¶ added in v3.52.0
Context returns a context associated with the message, or a background context in the absence of one.
func (*Message) Copy ¶
Copy creates a shallow copy of a message that is safe to mutate with Set methods without mutating the original. Both messages will share a context, and therefore a tracing ID, if one has been associated with them.
Note that this does not perform a deep copy of the byte or structured contents of the message, and therefore it is not safe to perform inline mutations on those values without copying them.
func (*Message) GetError ¶ added in v3.53.0
GetError returns an error associated with a message, or nil if there isn't one. Messages marked with errors can be handled using a range of methods outlined in https://www.benthos.dev/docs/configuration/error_handling.
func (*Message) MetaDelete ¶
MetaDelete removes a key from the message metadata.
func (*Message) MetaGet ¶
MetaGet attempts to find a metadata key from the message and returns a string result and a boolean indicating whether it was found.
func (*Message) MetaSet ¶
MetaSet sets the value of a metadata key. If the value is an empty string the metadata key is deleted.
func (*Message) MetaWalk ¶
MetaWalk iterates each metadata key/value pair and executes a provided closure on each iteration. To stop iterating, return an error from the closure. An error returned by the closure will be returned by this function.
func (*Message) SetError ¶
SetError marks the message as having failed a processing step and adds the error to it as context. Messages marked with errors can be handled using a range of methods outlined in https://www.benthos.dev/docs/configuration/error_handling.
func (*Message) SetStructured ¶
func (m *Message) SetStructured(i interface{})
SetStructured sets the underlying contents of the message as a structured type. This structured value should be a scalar Go type, or either a map[string]interface{} or []interface{} containing the same types all the way through the hierarchy, this ensures that other processors are able to work with the contents and that they can be JSON marshalled when coerced into a byte array.
type MessageBatch ¶
type MessageBatch []*Message
MessageBatch describes a collection of one or more messages.
func (MessageBatch) BloblangQuery ¶
BloblangQuery executes a parsed Bloblang mapping on a message batch, from the perspective of a particular message index, and returns a message back or an error if the mapping fails. If the mapping results in the root being deleted the returned message will be nil, which indicates it has been filtered.
This method allows mappings to perform windowed aggregations across message batches.
func (MessageBatch) Copy ¶
func (b MessageBatch) Copy() MessageBatch
Copy creates a new slice of the same messages, which can be modified without changing the contents of the original batch.
func (MessageBatch) InterpolatedString ¶
func (b MessageBatch) InterpolatedString(index int, i *InterpolatedString) string
InterpolatedString resolves an interpolated string expression on a message batch, from the perspective of a particular message index.
This method allows interpolation functions to perform windowed aggregations across message batches, and is a more powerful way to interpolate strings than the standard .String method.
type MessageBatchHandlerFunc ¶ added in v3.54.0
type MessageBatchHandlerFunc func(context.Context, MessageBatch) error
MessageBatchHandlerFunc is a function signature defining a component that consumes Benthos message batches. An error must be returned if the context is cancelled, or if the messages could not be delivered or processed.
type MessageHandlerFunc ¶
MessageHandlerFunc is a function signature defining a component that consumes Benthos messages. An error must be returned if the context is cancelled, or if the message could not be delivered or processed.
type MetricCounter ¶
type MetricCounter struct {
// contains filtered or unexported fields
}
MetricCounter represents a counter metric of a given name and labels.
func (*MetricCounter) Incr ¶
func (c *MetricCounter) Incr(count int64, labelValues ...string)
Incr increments a counter metric by an amount, the number of label values must match the number and order of labels specified when the counter was created.
type MetricGauge ¶
type MetricGauge struct {
// contains filtered or unexported fields
}
MetricGauge represents a gauge metric of a given name and labels.
func (*MetricGauge) Set ¶
func (g *MetricGauge) Set(value int64, labelValues ...string)
Set a gauge metric, the number of label values must match the number and order of labels specified when the gauge was created.
type MetricTimer ¶
type MetricTimer struct {
// contains filtered or unexported fields
}
MetricTimer represents a timing metric of a given name and labels.
func (*MetricTimer) Timing ¶
func (t *MetricTimer) Timing(delta int64, labelValues ...string)
Timing adds a delta to a timing metric, the number of label values must match the number and order of labels specified when the timing was created.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics allows plugin authors to emit custom metrics from components that are exported the same way as native Benthos metrics. It's safe to pass around a nil pointer for testing components.
func (*Metrics) NewCounter ¶
func (m *Metrics) NewCounter(name string, labelKeys ...string) *MetricCounter
NewCounter creates a new counter metric with a name and variant list of label keys.
type Output ¶
type Output interface { // Establish a connection to the downstream service. Connect will always be // called first when a writer is instantiated, and will be continuously // called with back off until a nil error is returned. // // Once Connect returns a nil error the write method will be called until // either ErrNotConnected is returned, or the writer is closed. Connect(context.Context) error // Write a message to a sink, or return an error if delivery is not // possible. // // If this method returns ErrNotConnected then write will not be called // again until Connect has returned a nil error. Write(context.Context, *Message) error Closer }
Output is an interface implemented by Benthos outputs that support single message writes. Each call to Write should block until either the message has been successfully or unsuccessfully sent, or the context is cancelled.
Multiple write calls can be performed in parallel, and the constructor of an output must provide a MaxInFlight parameter indicating the maximum number of parallel write calls the output supports.
type OutputConstructor ¶
type OutputConstructor func(conf *ParsedConfig, mgr *Resources) (out Output, maxInFlight int, err error)
OutputConstructor is a func that's provided a configuration type and access to a service manager, and must return an instantiation of a writer based on the config and a maximum number of in-flight messages to allow, or an error.
type ParsedConfig ¶
type ParsedConfig struct {
// contains filtered or unexported fields
}
ParsedConfig represents a plugin configuration that has been validated and parsed from a ConfigSpec, and allows plugin constructors to access configuration fields.
The correct way to access configuration fields depends on how the configuration spec was built. For example, if the spec was established with a struct constructor then the method AsStruct should be used in order to access the parsed struct.
func (*ParsedConfig) AsStruct
deprecated
func (p *ParsedConfig) AsStruct() interface{}
AsStruct returns the root of the parsed config. If the configuration spec was built around a config constructor then the value returned will match the type returned by the constructor, otherwise it will be a generic map[string]interface{} type.
Deprecated: This config mechanism exists only as an interim solution for plugin authors migrating from the previous APIs.
func (*ParsedConfig) FieldBatchPolicy ¶
func (p *ParsedConfig) FieldBatchPolicy(path ...string) (conf BatchPolicy, err error)
FieldBatchPolicy accesses a field from a parsed config that was defined with NewBatchPolicyField and returns a BatchPolicy, or an error if the configuration was invalid.
func (*ParsedConfig) FieldBloblang ¶
func (p *ParsedConfig) FieldBloblang(path ...string) (*bloblang.Executor, error)
FieldBloblang accesses a field from a parsed config that was defined with NewBloblangField and returns either a *bloblang.Executor or an error if the mapping was invalid.
func (*ParsedConfig) FieldBool ¶
func (p *ParsedConfig) FieldBool(path ...string) (bool, error)
FieldBool accesses a bool field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a bool.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldFloat ¶
func (p *ParsedConfig) FieldFloat(path ...string) (float64, error)
FieldFloat accesses a float field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not a float.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldInt ¶
func (p *ParsedConfig) FieldInt(path ...string) (int, error)
FieldInt accesses an int field from the parsed config by its name and returns the value. Returns an error if the field is not found or is not an int.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldIntList ¶ added in v3.57.0
func (p *ParsedConfig) FieldIntList(path ...string) ([]int, error)
FieldIntList accesses a field that is a list of integers from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of integers.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldIntMap ¶ added in v3.57.0
func (p *ParsedConfig) FieldIntMap(path ...string) (map[string]int, error)
FieldIntMap accesses a field that is an object of arbitrary keys and integer values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of integers.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldInterpolatedString ¶
func (p *ParsedConfig) FieldInterpolatedString(path ...string) (*InterpolatedString, error)
FieldInterpolatedString accesses a field from a parsed config that was defined with NewInterpolatedStringField and returns either an *InterpolatedString or an error if the string was invalid.
func (*ParsedConfig) FieldString ¶
func (p *ParsedConfig) FieldString(path ...string) (string, error)
FieldString accesses a string field from the parsed config by its name. If the field is not found or is not a string an error is returned.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldStringList ¶
func (p *ParsedConfig) FieldStringList(path ...string) ([]string, error)
FieldStringList accesses a field that is a list of strings from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not a list of strings.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldStringMap ¶ added in v3.57.0
func (p *ParsedConfig) FieldStringMap(path ...string) (map[string]string, error)
FieldStringMap accesses a field that is an object of arbitrary keys and string values from the parsed config by its name and returns the value. Returns an error if the field is not found, or is not an object of strings.
This method is not valid when the configuration spec was built around a config constructor.
func (*ParsedConfig) FieldTLS ¶
func (p *ParsedConfig) FieldTLS(path ...string) (*tls.Config, error)
FieldTLS accesses a field from a parsed config that was defined with NewTLSField and returns a *tls.Config, or an error if the configuration was invalid.
func (*ParsedConfig) Namespace ¶ added in v3.53.0
func (p *ParsedConfig) Namespace(path ...string) *ParsedConfig
Namespace returns a version of the parsed config at a given field namespace. This is useful for extracting multiple fields under the same grouping.
type PrintLogger ¶
type PrintLogger interface { Printf(format string, v ...interface{}) Println(v ...interface{}) }
PrintLogger is a simple Print based interface implemented by custom loggers.
type Processor ¶
type Processor interface { // Process a message into one or more resulting messages, or return an error // if the message could not be processed. If zero messages are returned and // the error is nil then the message is filtered. // // When an error is returned the input message will continue down the // pipeline but will be marked with the error with *message.SetError, and // metrics and logs will be emitted. The failed message can then be handled // with the patterns outlined in https://www.benthos.dev/docs/configuration/error_handling. // // The Message types returned MUST be derived from the provided message, and // CANNOT be custom implementations of Message. In order to copy the // provided message use CopyMessage. Process(context.Context, *Message) (MessageBatch, error) Closer }
Processor is a Benthos processor implementation that works against single messages.
type ProcessorConstructor ¶
type ProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (Processor, error)
ProcessorConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a processor based on the config, or an error.
type RateLimit ¶
type RateLimit interface { // Access the rate limited resource. Returns a duration or an error if the // rate limit check fails. The returned duration is either zero (meaning the // resource may be accessed) or a reasonable length of time to wait before // requesting again. Access(context.Context) (time.Duration, error) Closer }
RateLimit is an interface implemented by Benthos rate limits.
type RateLimitConstructor ¶
type RateLimitConstructor func(conf *ParsedConfig, mgr *Resources) (RateLimit, error)
RateLimitConstructor is a func that's provided a configuration type and access to a service manager and must return an instantiation of a rate limit based on the config, or an error.
type Resources ¶
type Resources struct {
// contains filtered or unexported fields
}
Resources provides access to service-wide resources.
func (*Resources) AccessCache ¶
AccessCache attempts to access a cache resource by name. This action can block if CRUD operations are being actively performed on the resource.
func (*Resources) AccessRateLimit ¶
AccessRateLimit attempts to access a rate limit resource by name. This action can block if CRUD operations are being actively performed on the resource.
func (*Resources) Label ¶
Label returns a label that identifies the component instantiation. This could be an explicit label set in config, or is otherwise a generated label based on the position of the component within a config.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream executes a full Benthos stream and provides methods for performing status checks, terminating the stream, and blocking until the stream ends.
func (*Stream) Run ¶
Run attempts to start the stream pipeline and blocks until either the stream has gracefully come to a stop, or the provided context is cancelled.
func (*Stream) StopWithin ¶
StopWithin attempts to close the stream within the specified timeout period. Initially the attempt is graceful, but as the timeout draws close the attempt becomes progressively less graceful.
An ungraceful shutdown increases the likelihood of processing duplicate messages on the next start up, but never results in dropped messages as long as the input source supports at-least-once delivery.
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
StreamBuilder provides methods for building a Benthos stream configuration. When parsing Benthos configs this builder follows the schema and field defaults of a standard Benthos configuration. Environment variable interpolations are also parsed and resolved the same as regular configs.
Benthos streams register HTTP endpoints by default that expose metrics and ready checks. If your intention is to execute multiple streams in the same process then it is recommended that you disable the HTTP server in config, or use `SetHTTPMux` with prefixed multiplexers in order to share it across the streams.
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
NewStreamBuilder creates a new StreamBuilder.
func (*StreamBuilder) AddBatchConsumerFunc ¶ added in v3.54.0
func (s *StreamBuilder) AddBatchConsumerFunc(fn MessageBatchHandlerFunc) error
AddBatchConsumerFunc adds an output to the builder that executes a closure function argument for each message batch. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.
The provided MessageBatchHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.
Only one consumer can be added to a stream builder, and subsequent calls will return an error.
Message batches must be created by upstream components (inputs, buffers, etc) otherwise message batches received by this consumer will have a single message contents.
func (*StreamBuilder) AddBatchProducerFunc ¶ added in v3.54.0
func (s *StreamBuilder) AddBatchProducerFunc() (MessageBatchHandlerFunc, error)
AddBatchProducerFunc adds an input to the builder that allows you to write message batches directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.
The returned MessageBatchHandlerFunc can be called concurrently from any number of goroutines, and each call will block until all messages within the batch are successfully delivered downstream, were rejected (or otherwise could not be delivered) or the context is cancelled.
Only one producer func can be added to a stream builder, and subsequent calls will return an error.
func (*StreamBuilder) AddCacheYAML ¶
func (s *StreamBuilder) AddCacheYAML(conf string) error
AddCacheYAML parses a cache YAML configuration and adds it to the builder as a resource.
func (*StreamBuilder) AddConsumerFunc ¶
func (s *StreamBuilder) AddConsumerFunc(fn MessageHandlerFunc) error
AddConsumerFunc adds an output to the builder that executes a closure function argument for each message. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.
The provided MessageHandlerFunc may be called from any number of goroutines, and therefore it is recommended to implement some form of throttling or mutex locking in cases where the call is non-blocking.
Only one consumer can be added to a stream builder, and subsequent calls will return an error.
func (*StreamBuilder) AddInputYAML ¶
func (s *StreamBuilder) AddInputYAML(conf string) error
AddInputYAML parses an input YAML configuration and adds it to the builder. If more than one input configuration is added they will automatically be composed within a broker when the pipeline is built.
func (*StreamBuilder) AddOutputYAML ¶
func (s *StreamBuilder) AddOutputYAML(conf string) error
AddOutputYAML parses an output YAML configuration and adds it to the builder. If more than one output configuration is added they will automatically be composed within a fan out broker when the pipeline is built.
func (*StreamBuilder) AddProcessorYAML ¶
func (s *StreamBuilder) AddProcessorYAML(conf string) error
AddProcessorYAML parses a processor YAML configuration and adds it to the builder to be executed within the pipeline.processors section, after all prior added processor configs.
func (*StreamBuilder) AddProducerFunc ¶
func (s *StreamBuilder) AddProducerFunc() (MessageHandlerFunc, error)
AddProducerFunc adds an input to the builder that allows you to write messages directly into the stream with a closure function. If any other input has or will be added to the stream builder they will be automatically composed within a broker when the pipeline is built.
The returned MessageHandlerFunc can be called concurrently from any number of goroutines, and each call will block until the message is successfully delivered downstream, was rejected (or otherwise could not be delivered) or the context is cancelled.
Only one producer func can be added to a stream builder, and subsequent calls will return an error.
func (*StreamBuilder) AddRateLimitYAML ¶
func (s *StreamBuilder) AddRateLimitYAML(conf string) error
AddRateLimitYAML parses a rate limit YAML configuration and adds it to the builder as a resource.
func (*StreamBuilder) AddResourcesYAML ¶
func (s *StreamBuilder) AddResourcesYAML(conf string) error
AddResourcesYAML parses resource configurations and adds them to the config.
func (*StreamBuilder) AsYAML ¶
func (s *StreamBuilder) AsYAML() (string, error)
AsYAML prints a YAML representation of the stream config as it has been currently built.
func (*StreamBuilder) Build ¶
func (s *StreamBuilder) Build() (*Stream, error)
Build a Benthos stream pipeline according to the components specified by this stream builder.
func (*StreamBuilder) SetBufferYAML ¶ added in v3.53.0
func (s *StreamBuilder) SetBufferYAML(conf string) error
SetBufferYAML parses a buffer YAML configuration and sets it to the builder to be placed between the input and the pipeline (processors) sections. This config will replace any prior configured buffer.
func (*StreamBuilder) SetFields ¶ added in v3.53.0
func (s *StreamBuilder) SetFields(pathValues ...interface{}) error
SetFields modifies the config by setting one or more fields identified by a dot path to a value. The argument must be a variadic list of pairs, where the first element is a string containing the target field dot path, and the second element is a typed value to set the field to.
func (*StreamBuilder) SetHTTPMux ¶
func (s *StreamBuilder) SetHTTPMux(m HTTPMultiplexer)
SetHTTPMux sets an HTTP multiplexer to be used by stream components when registering endpoints instead of a new server spawned following the `http` fields of a Benthos config.
func (*StreamBuilder) SetLoggerYAML ¶
func (s *StreamBuilder) SetLoggerYAML(conf string) error
SetLoggerYAML parses a logger YAML configuration and adds it to the builder such that all stream components emit logs through it.
func (*StreamBuilder) SetMetricsYAML ¶
func (s *StreamBuilder) SetMetricsYAML(conf string) error
SetMetricsYAML parses a metrics YAML configuration and adds it to the builder such that all stream components emit metrics through it.
func (*StreamBuilder) SetPrintLogger ¶
func (s *StreamBuilder) SetPrintLogger(l PrintLogger)
SetPrintLogger sets a custom logger supporting a simple Print based interface to be used by stream components. This custom logger will override any logging fields set via config.
func (*StreamBuilder) SetThreads ¶
func (s *StreamBuilder) SetThreads(n int)
SetThreads configures the number of pipeline processor threads should be configured. By default the number will be zero, which means the thread count will match the number of logical CPUs on the machine.
func (*StreamBuilder) SetYAML ¶
func (s *StreamBuilder) SetYAML(conf string) error
SetYAML parses a full Benthos config and uses it to configure the builder. If any inputs, processors, outputs, resources, etc, have previously been added to the builder they will be overridden by this new config.
Source Files ¶
- buffer.go
- cache.go
- config.go
- config_batch_policy.go
- config_bloblang.go
- config_interpolated_string.go
- config_tls.go
- environment.go
- input.go
- input_auto_retry.go
- input_auto_retry_batched.go
- interpolated_string.go
- logger.go
- message.go
- metrics.go
- output.go
- package.go
- plugins.go
- processor.go
- rate_limit.go
- resources.go
- service.go
- stream.go
- stream_builder.go