bundle

package
v4.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package bundle contains singletons referenced throughout the Benthos codebase that allow imported components to add their constructors and documentation to a service.

Each component type has it's own singleton bundle containing all imported implementations of the component, and from this bundle more can be derived that modify the components that are available.

Index

Constants

This section is empty.

Variables

View Source
var AllBuffers = &BufferSet{
	specs: map[string]bufferSpec{},
}

AllBuffers is a set containing every single buffer that has been imported.

View Source
var AllCaches = &CacheSet{
	specs: map[string]cacheSpec{},
}

AllCaches is a set containing every single cache that has been imported.

View Source
var AllInputs = &InputSet{
	specs: map[string]inputSpec{},
}

AllInputs is a set containing every single input that has been imported.

View Source
var AllMetrics = &MetricsSet{
	specs: map[string]metricsSpec{},
}

AllMetrics is a set containing every single metrics that has been imported.

View Source
var AllOutputs = &OutputSet{
	specs: map[string]outputSpec{},
}

AllOutputs is a set containing every single output that has been imported.

View Source
var AllProcessors = &ProcessorSet{
	specs: map[string]processorSpec{},
}

AllProcessors is a set containing every single processor that has been imported.

View Source
var AllRateLimits = &RateLimitSet{
	specs: map[string]rateLimitSpec{},
}

AllRateLimits is a set containing every single ratelimit that has been imported.

View Source
var AllScanners = &ScannerSet{
	specs: map[string]scannerSpec{},
}

AllScanners is a set containing every single scanner that has been imported.

View Source
var AllTracers = &TracerSet{
	specs: map[string]tracerSpec{},
}

AllTracers is a set containing every single tracer that has been imported.

View Source
var GlobalEnvironment = &Environment{
	buffers:    AllBuffers,
	caches:     AllCaches,
	inputs:     AllInputs,
	outputs:    AllOutputs,
	processors: AllProcessors,
	rateLimits: AllRateLimits,
	metrics:    AllMetrics,
	tracers:    AllTracers,
	scanners:   AllScanners,
}

GlobalEnvironment contains service-wide singleton bundles.

Functions

This section is empty.

Types

type BufferConstructor

type BufferConstructor func(buffer.Config, NewManagement) (buffer.Streamed, error)

BufferConstructor constructs an buffer component.

type BufferSet

type BufferSet struct {
	// contains filtered or unexported fields
}

BufferSet contains an explicit set of buffers available to a Benthos service.

func (*BufferSet) Add

func (s *BufferSet) Add(constructor BufferConstructor, spec docs.ComponentSpec) error

Add a new buffer to this set by providing a spec (name, documentation, and constructor).

func (*BufferSet) Docs

func (s *BufferSet) Docs() []docs.ComponentSpec

Docs returns a slice of buffer specs, which document each method.

func (*BufferSet) DocsFor

func (s *BufferSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*BufferSet) Init

func (s *BufferSet) Init(conf buffer.Config, mgr NewManagement) (buffer.Streamed, error)

Init attempts to initialise an buffer from a config.

func (*BufferSet) Without added in v4.32.0

func (s *BufferSet) Without(names ...string) *BufferSet

Without creates a clone of the set excluding a variadic list of components.

type CacheConstructor

type CacheConstructor func(cache.Config, NewManagement) (cache.V1, error)

CacheConstructor constructs an cache component.

type CacheSet

type CacheSet struct {
	// contains filtered or unexported fields
}

CacheSet contains an explicit set of caches available to a Benthos service.

func (*CacheSet) Add

func (s *CacheSet) Add(constructor CacheConstructor, spec docs.ComponentSpec) error

Add a new cache to this set by providing a spec (name, documentation, and constructor).

func (*CacheSet) Docs

func (s *CacheSet) Docs() []docs.ComponentSpec

Docs returns a slice of cache specs, which document each method.

func (*CacheSet) DocsFor

func (s *CacheSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*CacheSet) Init

func (s *CacheSet) Init(conf cache.Config, mgr NewManagement) (cache.V1, error)

Init attempts to initialise an cache from a config.

func (*CacheSet) Without added in v4.32.0

func (s *CacheSet) Without(names ...string) *CacheSet

Without creates a clone of the set excluding a variadic list of components.

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

Environment is a collection of Benthos component plugins that can be used in order to build and run streaming pipelines with access to different sets of plugins. This is useful for sandboxing, testing, etc.

func NewEnvironment

func NewEnvironment() *Environment

NewEnvironment creates an empty environment.

func (*Environment) BufferAdd

func (e *Environment) BufferAdd(constructor BufferConstructor, spec docs.ComponentSpec) error

BufferAdd adds a new buffer to this environment by providing a constructor and documentation.

func (*Environment) BufferDocs

func (e *Environment) BufferDocs() []docs.ComponentSpec

BufferDocs returns a slice of buffer specs, which document each method.

func (*Environment) BufferInit

func (e *Environment) BufferInit(conf buffer.Config, mgr NewManagement) (buffer.Streamed, error)

BufferInit attempts to initialise a buffer from a config.

func (*Environment) CacheAdd

func (e *Environment) CacheAdd(constructor CacheConstructor, spec docs.ComponentSpec) error

CacheAdd adds a new cache to this environment by providing a constructor and documentation.

func (*Environment) CacheDocs

func (e *Environment) CacheDocs() []docs.ComponentSpec

CacheDocs returns a slice of cache specs, which document each method.

func (*Environment) CacheInit

func (e *Environment) CacheInit(conf cache.Config, mgr NewManagement) (cache.V1, error)

CacheInit attempts to initialise a cache from a config.

func (*Environment) Clone

func (e *Environment) Clone() *Environment

Clone an existing environment to a new one that can be modified independently.

func (*Environment) GetDocs

func (e *Environment) GetDocs(name string, ctype docs.Type) (docs.ComponentSpec, bool)

GetDocs returns a documentation spec for an implementation of a component.

func (*Environment) InputAdd

func (e *Environment) InputAdd(constructor InputConstructor, spec docs.ComponentSpec) error

InputAdd adds a new input to this environment by providing a constructor and documentation.

func (*Environment) InputDocs

func (e *Environment) InputDocs() []docs.ComponentSpec

InputDocs returns a slice of input specs, which document each method.

func (*Environment) InputInit

func (e *Environment) InputInit(conf input.Config, mgr NewManagement) (input.Streamed, error)

InputInit attempts to initialise an input from a config.

func (*Environment) MetricsAdd

func (e *Environment) MetricsAdd(constructor MetricConstructor, spec docs.ComponentSpec) error

MetricsAdd adds a new metrics exporter to this environment by providing a constructor and documentation.

func (*Environment) MetricsDocs

func (e *Environment) MetricsDocs() []docs.ComponentSpec

MetricsDocs returns a slice of metrics exporter specs.

func (*Environment) MetricsInit

func (e *Environment) MetricsInit(conf metrics.Config, nm NewManagement) (*metrics.Namespaced, error)

MetricsInit attempts to initialise a metrics exporter from a config.

func (*Environment) OutputAdd

func (e *Environment) OutputAdd(constructor OutputConstructor, spec docs.ComponentSpec) error

OutputAdd adds a new output to this environment by providing a constructor and documentation.

func (*Environment) OutputDocs

func (e *Environment) OutputDocs() []docs.ComponentSpec

OutputDocs returns a slice of output specs, which document each method.

func (*Environment) OutputInit

func (e *Environment) OutputInit(
	conf output.Config,
	mgr NewManagement,
	pipelines ...processor.PipelineConstructorFunc,
) (output.Streamed, error)

OutputInit attempts to initialise a output from a config.

func (*Environment) ProcessorAdd

func (e *Environment) ProcessorAdd(constructor ProcessorConstructor, spec docs.ComponentSpec) error

ProcessorAdd adds a new processor to this environment by providing a constructor and documentation.

func (*Environment) ProcessorDocs

func (e *Environment) ProcessorDocs() []docs.ComponentSpec

ProcessorDocs returns a slice of processor specs, which document each method.

func (*Environment) ProcessorInit

func (e *Environment) ProcessorInit(conf processor.Config, mgr NewManagement) (processor.V1, error)

ProcessorInit attempts to initialise a processor from a config.

func (*Environment) RateLimitAdd

func (e *Environment) RateLimitAdd(constructor RateLimitConstructor, spec docs.ComponentSpec) error

RateLimitAdd adds a new ratelimit to this environment by providing a constructor and documentation.

func (*Environment) RateLimitDocs

func (e *Environment) RateLimitDocs() []docs.ComponentSpec

RateLimitDocs returns a slice of ratelimit specs, which document each method.

func (*Environment) RateLimitInit

func (e *Environment) RateLimitInit(conf ratelimit.Config, mgr NewManagement) (ratelimit.V1, error)

RateLimitInit attempts to initialise a ratelimit from a config.

func (*Environment) ScannerAdd

func (e *Environment) ScannerAdd(constructor ScannerConstructor, spec docs.ComponentSpec) error

ScannerAdd adds a new scanner to this environment by providing a constructor and documentation.

func (*Environment) ScannerDocs

func (e *Environment) ScannerDocs() []docs.ComponentSpec

ScannerDocs returns a slice of scanner specs.

func (*Environment) ScannerInit

func (e *Environment) ScannerInit(conf scanner.Config, nm NewManagement) (scanner.Creator, error)

ScannerInit attempts to initialise a scanner creator from a config.

func (*Environment) TracersAdd

func (e *Environment) TracersAdd(constructor TracerConstructor, spec docs.ComponentSpec) error

TracersAdd adds a new tracers exporter to this environment by providing a constructor and documentation.

func (*Environment) TracersDocs

func (e *Environment) TracersDocs() []docs.ComponentSpec

TracersDocs returns a slice of tracers exporter specs.

func (*Environment) TracersInit

func (e *Environment) TracersInit(conf tracer.Config, nm NewManagement) (trace.TracerProvider, error)

TracersInit attempts to initialise a tracers exporter from a config.

func (*Environment) With added in v4.32.0

func (e *Environment) With(names ...string) *Environment

With creates a clone of an existing environment with only a variadic list of plugin names included from the resulting environment.

func (*Environment) Without added in v4.32.0

func (e *Environment) Without(names ...string) *Environment

Without creates a clone of an existing environment with a variadic list of plugin names excluded from the resulting environment.

func (*Environment) WithoutBuffers added in v4.32.0

func (e *Environment) WithoutBuffers(names ...string) *Environment

WithoutBuffers returns a copy of Environment with a cloned plugin registry of buffers, where the specified plugins are not included.

func (*Environment) WithoutCaches added in v4.32.0

func (e *Environment) WithoutCaches(names ...string) *Environment

WithoutCaches returns a copy of Environment with a cloned plugin registry of caches, where the specified plugins are not included.

func (*Environment) WithoutInputs added in v4.32.0

func (e *Environment) WithoutInputs(names ...string) *Environment

WithoutInputs returns a copy of Environment with a cloned plugin registry of inputs, where the specified plugins are not included.

func (*Environment) WithoutMetrics added in v4.32.0

func (e *Environment) WithoutMetrics(names ...string) *Environment

WithoutMetrics returns a copy of Environment with a cloned plugin registry of metrics, where the specified plugins are not included.

func (*Environment) WithoutOutputs added in v4.32.0

func (e *Environment) WithoutOutputs(names ...string) *Environment

WithoutOutputs returns a copy of Environment with a cloned plugin registry of outputs, where the specified plugins are not included.

func (*Environment) WithoutProcessors added in v4.32.0

func (e *Environment) WithoutProcessors(names ...string) *Environment

WithoutProcessors returns a copy of Environment with a cloned plugin registry of processors, where the specified plugins are not included.

func (*Environment) WithoutRateLimits added in v4.32.0

func (e *Environment) WithoutRateLimits(names ...string) *Environment

WithoutRateLimits returns a copy of Environment with a cloned plugin registry of rate limits, where the specified plugins are not included.

func (*Environment) WithoutScanners added in v4.32.0

func (e *Environment) WithoutScanners(names ...string) *Environment

WithoutScanners returns a copy of Environment with a cloned plugin registry of scanners, where the specified plugins are not included.

func (*Environment) WithoutTracers added in v4.32.0

func (e *Environment) WithoutTracers(names ...string) *Environment

WithoutTracers returns a copy of Environment with a cloned plugin registry of tracers, where the specified plugins are not included.

type InputConstructor

type InputConstructor func(input.Config, NewManagement) (input.Streamed, error)

InputConstructor constructs an input component.

type InputSet

type InputSet struct {
	// contains filtered or unexported fields
}

InputSet contains an explicit set of inputs available to a Benthos service.

func (*InputSet) Add

func (s *InputSet) Add(constructor InputConstructor, spec docs.ComponentSpec) error

Add a new input to this set by providing a constructor and documentation.

func (*InputSet) Docs

func (s *InputSet) Docs() []docs.ComponentSpec

Docs returns a slice of input specs, which document each method.

func (*InputSet) DocsFor

func (s *InputSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*InputSet) Init

func (s *InputSet) Init(conf input.Config, mgr NewManagement) (input.Streamed, error)

Init attempts to initialise an input from a config.

func (*InputSet) Without added in v4.32.0

func (s *InputSet) Without(names ...string) *InputSet

Without creates a clone of the set excluding a variadic list of components.

type MetricConstructor

type MetricConstructor func(conf metrics.Config, nm NewManagement) (metrics.Type, error)

MetricConstructor constructs an metrics component.

type MetricsSet

type MetricsSet struct {
	// contains filtered or unexported fields
}

MetricsSet contains an explicit set of metrics available to a Benthos service.

func (*MetricsSet) Add

func (s *MetricsSet) Add(constructor MetricConstructor, spec docs.ComponentSpec) error

Add a new metrics to this set by providing a spec (name, documentation, and constructor).

func (*MetricsSet) Docs

func (s *MetricsSet) Docs() []docs.ComponentSpec

Docs returns a slice of metrics specs, which document each method.

func (*MetricsSet) DocsFor

func (s *MetricsSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*MetricsSet) Init

Init attempts to initialise an metrics from a config.

func (*MetricsSet) Without added in v4.32.0

func (s *MetricsSet) Without(names ...string) *MetricsSet

Without creates a clone of the set excluding a variadic list of components.

type NewManagement

type NewManagement interface {
	ForStream(id string) NewManagement
	IntoPath(segments ...string) NewManagement
	WithAddedMetrics(m metrics.Type) NewManagement

	EngineVersion() string

	Path() []string
	Label() string

	Metrics() metrics.Type
	Logger() log.Modular
	Tracer() trace.TracerProvider
	FS() ifs.FS
	Environment() *Environment
	BloblEnvironment() *bloblang.Environment

	RegisterEndpoint(path, desc string, h http.HandlerFunc)

	NewBuffer(conf buffer.Config) (buffer.Streamed, error)
	NewCache(conf cache.Config) (cache.V1, error)
	NewInput(conf input.Config) (input.Streamed, error)
	NewProcessor(conf processor.Config) (processor.V1, error)
	NewOutput(conf output.Config, pipelines ...processor.PipelineConstructorFunc) (output.Streamed, error)
	NewRateLimit(conf ratelimit.Config) (ratelimit.V1, error)
	NewScanner(conf scanner.Config) (scanner.Creator, error)

	ProbeCache(name string) bool
	AccessCache(ctx context.Context, name string, fn func(cache.V1)) error
	StoreCache(ctx context.Context, name string, conf cache.Config) error
	RemoveCache(ctx context.Context, name string) error

	ProbeInput(name string) bool
	AccessInput(ctx context.Context, name string, fn func(input.Streamed)) error
	StoreInput(ctx context.Context, name string, conf input.Config) error
	RemoveInput(ctx context.Context, name string) error

	ProbeProcessor(name string) bool
	AccessProcessor(ctx context.Context, name string, fn func(processor.V1)) error
	StoreProcessor(ctx context.Context, name string, conf processor.Config) error
	RemoveProcessor(ctx context.Context, name string) error

	ProbeOutput(name string) bool
	AccessOutput(ctx context.Context, name string, fn func(output.Sync)) error
	StoreOutput(ctx context.Context, name string, conf output.Config) error
	RemoveOutput(ctx context.Context, name string) error

	ProbeRateLimit(name string) bool
	AccessRateLimit(ctx context.Context, name string, fn func(ratelimit.V1)) error
	StoreRateLimit(ctx context.Context, name string, conf ratelimit.Config) error
	RemoveRateLimit(ctx context.Context, name string) error

	GetPipe(name string) (<-chan message.Transaction, error)
	SetPipe(name string, t <-chan message.Transaction)
	UnsetPipe(name string, t <-chan message.Transaction)

	GetGeneric(key any) (any, bool)
	GetOrSetGeneric(key, value any) (actual any, loaded bool)
	SetGeneric(key, value any)
}

NewManagement defines the latest API for a Benthos manager, which will become the only API (internally) in Benthos V4.

type OutputConstructor

OutputConstructor constructs an output component.

type OutputSet

type OutputSet struct {
	// contains filtered or unexported fields
}

OutputSet contains an explicit set of outputs available to a Benthos service.

func (*OutputSet) Add

func (s *OutputSet) Add(constructor OutputConstructor, spec docs.ComponentSpec) error

Add a new output to this set by providing a spec (name, documentation, and constructor).

func (*OutputSet) Docs

func (s *OutputSet) Docs() []docs.ComponentSpec

Docs returns a slice of output specs, which document each method.

func (*OutputSet) DocsFor

func (s *OutputSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*OutputSet) Init

Init attempts to initialise an output from a config.

func (*OutputSet) Without added in v4.32.0

func (s *OutputSet) Without(names ...string) *OutputSet

Without creates a clone of the set excluding a variadic list of components.

type ProcessorConstructor

type ProcessorConstructor func(conf processor.Config, mgr NewManagement) (processor.V1, error)

ProcessorConstructor constructs an processor component.

type ProcessorSet

type ProcessorSet struct {
	// contains filtered or unexported fields
}

ProcessorSet contains an explicit set of processors available to a Benthos service.

func (*ProcessorSet) Add

func (s *ProcessorSet) Add(constructor ProcessorConstructor, spec docs.ComponentSpec) error

Add a new processor to this set by providing a spec (name, documentation, and constructor).

func (*ProcessorSet) Docs

func (s *ProcessorSet) Docs() []docs.ComponentSpec

Docs returns a slice of processor specs, which document each method.

func (*ProcessorSet) DocsFor

func (s *ProcessorSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*ProcessorSet) Init

Init attempts to initialise an processor from a config.

func (*ProcessorSet) Without added in v4.32.0

func (s *ProcessorSet) Without(names ...string) *ProcessorSet

Without creates a clone of the set excluding a variadic list of components.

type RateLimitConstructor

type RateLimitConstructor func(ratelimit.Config, NewManagement) (ratelimit.V1, error)

RateLimitConstructor constructs an ratelimit component.

type RateLimitSet

type RateLimitSet struct {
	// contains filtered or unexported fields
}

RateLimitSet contains an explicit set of ratelimits available to a Benthos service.

func (*RateLimitSet) Add

func (s *RateLimitSet) Add(constructor RateLimitConstructor, spec docs.ComponentSpec) error

Add a new ratelimit to this set by providing a spec (name, documentation, and constructor).

func (*RateLimitSet) Docs

func (s *RateLimitSet) Docs() []docs.ComponentSpec

Docs returns a slice of ratelimit specs, which document each method.

func (*RateLimitSet) DocsFor

func (s *RateLimitSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*RateLimitSet) Init

Init attempts to initialise an ratelimit from a config.

func (*RateLimitSet) Without added in v4.32.0

func (s *RateLimitSet) Without(names ...string) *RateLimitSet

Without creates a clone of the set excluding a variadic list of components.

type ScannerConstructor

type ScannerConstructor func(scanner.Config, NewManagement) (scanner.Creator, error)

ScannerConstructor constructs a scanner component.

type ScannerSet

type ScannerSet struct {
	// contains filtered or unexported fields
}

ScannerSet contains an explicit set of scanners available to a Benthos service.

func (*ScannerSet) Add

func (s *ScannerSet) Add(constructor ScannerConstructor, spec docs.ComponentSpec) error

Add a new scanner to this set by providing a spec (name, documentation, and constructor).

func (*ScannerSet) Docs

func (s *ScannerSet) Docs() []docs.ComponentSpec

Docs returns a slice of scanner specs, which document each method.

func (*ScannerSet) DocsFor

func (s *ScannerSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*ScannerSet) Init

Init attempts to initialise a scanner from a config.

func (*ScannerSet) Without added in v4.32.0

func (s *ScannerSet) Without(names ...string) *ScannerSet

Without creates a clone of the set excluding a variadic list of components.

type TracerConstructor

type TracerConstructor func(tracer.Config, NewManagement) (trace.TracerProvider, error)

TracerConstructor constructs an tracer component.

type TracerSet

type TracerSet struct {
	// contains filtered or unexported fields
}

TracerSet contains an explicit set of tracers available to a Benthos service.

func (*TracerSet) Add

func (s *TracerSet) Add(constructor TracerConstructor, spec docs.ComponentSpec) error

Add a new tracer to this set by providing a spec (name, documentation, and constructor).

func (*TracerSet) Docs

func (s *TracerSet) Docs() []docs.ComponentSpec

Docs returns a slice of tracer specs, which document each method.

func (*TracerSet) DocsFor

func (s *TracerSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*TracerSet) Init

Init attempts to initialise an tracer from a config.

func (*TracerSet) Without added in v4.32.0

func (s *TracerSet) Without(names ...string) *TracerSet

Without creates a clone of the set excluding a variadic list of components.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL