Documentation ¶
Index ¶
- type Cache
- func (c *Cache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error
- func (c *Cache) Close(ctx context.Context) error
- func (c *Cache) Delete(ctx context.Context, key string) error
- func (c *Cache) Get(ctx context.Context, key string) ([]byte, error)
- func (c *Cache) Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error
- func (c *Cache) SetMulti(ctx context.Context, kvs map[string]cache.TTLItem) error
- type CacheItem
- type Input
- type Manager
- func (m *Manager) AccessCache(ctx context.Context, name string, fn func(cache.V1)) error
- func (m *Manager) AccessInput(ctx context.Context, name string, fn func(input.Streamed)) error
- func (m *Manager) AccessOutput(ctx context.Context, name string, fn func(output.Sync)) error
- func (m *Manager) AccessProcessor(ctx context.Context, name string, fn func(processor.V1)) error
- func (m *Manager) AccessRateLimit(ctx context.Context, name string, fn func(ratelimit.V1)) error
- func (m *Manager) BloblEnvironment() *bloblang.Environment
- func (m *Manager) EngineVersion() string
- func (m *Manager) Environment() *bundle.Environment
- func (m *Manager) FS() ifs.FS
- func (m *Manager) ForStream(id string) bundle.NewManagement
- func (m *Manager) GetGeneric(key any) (any, bool)
- func (m *Manager) GetOrSetGeneric(key, value any) (actual any, loaded bool)
- func (m *Manager) GetPipe(name string) (<-chan message.Transaction, error)
- func (m *Manager) IntoPath(segments ...string) bundle.NewManagement
- func (m *Manager) Label() string
- func (m *Manager) Logger() log.Modular
- func (m *Manager) Metrics() metrics.Type
- func (m *Manager) NewBuffer(conf buffer.Config) (buffer.Streamed, error)
- func (m *Manager) NewCache(conf cache.Config) (cache.V1, error)
- func (m *Manager) NewInput(conf input.Config) (input.Streamed, error)
- func (m *Manager) NewOutput(conf output.Config, pipelines ...processor.PipelineConstructorFunc) (output.Streamed, error)
- func (m *Manager) NewProcessor(conf processor.Config) (processor.V1, error)
- func (m *Manager) NewRateLimit(conf ratelimit.Config) (ratelimit.V1, error)
- func (m *Manager) NewScanner(conf scanner.Config) (scanner.Creator, error)
- func (m *Manager) Path() []string
- func (m *Manager) ProbeCache(name string) bool
- func (m *Manager) ProbeInput(name string) bool
- func (m *Manager) ProbeOutput(name string) bool
- func (m *Manager) ProbeProcessor(name string) bool
- func (m *Manager) ProbeRateLimit(name string) bool
- func (m *Manager) RegisterEndpoint(path, desc string, h http.HandlerFunc)
- func (m *Manager) RemoveCache(ctx context.Context, name string) error
- func (m *Manager) RemoveInput(ctx context.Context, name string) error
- func (m *Manager) RemoveOutput(ctx context.Context, name string) error
- func (m *Manager) RemoveProcessor(ctx context.Context, name string) error
- func (m *Manager) RemoveRateLimit(ctx context.Context, name string) error
- func (m *Manager) SetGeneric(key, value any)
- func (m *Manager) SetPipe(name string, t <-chan message.Transaction)
- func (m *Manager) StoreCache(ctx context.Context, name string, conf cache.Config) error
- func (m *Manager) StoreInput(ctx context.Context, name string, conf input.Config) error
- func (m *Manager) StoreOutput(ctx context.Context, name string, conf output.Config) error
- func (m *Manager) StoreProcessor(ctx context.Context, name string, conf processor.Config) error
- func (m *Manager) StoreRateLimit(ctx context.Context, name string, conf ratelimit.Config) error
- func (m *Manager) Tracer() trace.TracerProvider
- func (m *Manager) UnsetPipe(name string, t <-chan message.Transaction)
- func (m *Manager) WithAddedMetrics(m2 metrics.Type) bundle.NewManagement
- type OutputChanneled
- type OutputWriter
- func (o OutputWriter) ConnectionStatus() component.ConnectionStatuses
- func (o OutputWriter) TriggerCloseNow()
- func (o OutputWriter) TriggerStopConsuming()
- func (o OutputWriter) WaitForClose(ctx context.Context) error
- func (o OutputWriter) WriteTransaction(ctx context.Context, t message.Transaction) error
- type Processor
- type RateLimit
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cache ¶
Cache provides a mock cache implementation.
type Input ¶
type Input struct { TChan chan message.Transaction // contains filtered or unexported fields }
Input provides a mocked input implementation.
func NewInput ¶
NewInput creates a new mock input that will return transactions containing a list of batches, then exit.
func (*Input) ConnectionStatus ¶ added in v4.31.0
func (f *Input) ConnectionStatus() component.ConnectionStatuses
ConnectionStatus returns the current connection activity.
func (*Input) TransactionChan ¶
func (f *Input) TransactionChan() <-chan message.Transaction
TransactionChan returns a transaction channel.
func (*Input) TriggerCloseNow ¶
func (f *Input) TriggerCloseNow()
TriggerCloseNow closes the input transaction channel.
func (*Input) TriggerStopConsuming ¶
func (f *Input) TriggerStopConsuming()
TriggerStopConsuming closes the input transaction channel.
type Manager ¶
type Manager struct { Version string Inputs map[string]*Input Caches map[string]map[string]CacheItem RateLimits map[string]RateLimit Outputs map[string]OutputWriter Processors map[string]Processor Pipes map[string]<-chan message.Transaction // OnRegisterEndpoint can be set in order to intercept endpoints registered // by components. OnRegisterEndpoint func(path string, h http.HandlerFunc) CustomFS ifs.FS M metrics.Type L log.Modular T trace.TracerProvider // contains filtered or unexported fields }
Manager provides a mock benthos manager that components can use to test interactions with fake resources.
func (*Manager) AccessCache ¶
AccessCache executes a closure on a cache resource.
func (*Manager) AccessInput ¶
AccessInput executes a closure on an input resource.
func (*Manager) AccessOutput ¶
AccessOutput executes a closure on an output resource.
func (*Manager) AccessProcessor ¶
AccessProcessor executes a closure on a processor resource.
func (*Manager) AccessRateLimit ¶
AccessRateLimit executes a closure on a rate limit resource.
func (*Manager) BloblEnvironment ¶
func (m *Manager) BloblEnvironment() *bloblang.Environment
BloblEnvironment always returns the global environment.
func (*Manager) EngineVersion ¶
EngineVersion returns the version stamp associated with the underlying benthos engine.
func (*Manager) Environment ¶
func (m *Manager) Environment() *bundle.Environment
Environment always returns the global environment.
func (*Manager) ForStream ¶
func (m *Manager) ForStream(id string) bundle.NewManagement
ForStream returns the same mock manager.
func (*Manager) GetGeneric ¶ added in v4.31.0
GetGeneric attempts to obtain and return a generic resource value by key.
func (*Manager) GetOrSetGeneric ¶ added in v4.31.0
GetOrSetGeneric attempts to obtain an existing value for a given key if present. Otherwise, it stores and returns the provided value. The loaded result is true if the value was loaded, false if stored.
func (*Manager) GetPipe ¶
func (m *Manager) GetPipe(name string) (<-chan message.Transaction, error)
GetPipe attempts to find a service wide transaction chan by its name.
func (*Manager) IntoPath ¶
func (m *Manager) IntoPath(segments ...string) bundle.NewManagement
IntoPath returns the same mock manager.
func (*Manager) NewOutput ¶
func (m *Manager) NewOutput(conf output.Config, pipelines ...processor.PipelineConstructorFunc) (output.Streamed, error)
NewOutput always errors on invalid type.
func (*Manager) NewProcessor ¶
NewProcessor always errors on invalid type.
func (*Manager) NewRateLimit ¶
NewRateLimit always errors on invalid type.
func (*Manager) NewScanner ¶
NewScanner attempts to create a new scanner component from a config.
func (*Manager) ProbeCache ¶
ProbeCache returns true if a cache resource exists under the provided name.
func (*Manager) ProbeInput ¶
ProbeInput returns true if an input resource exists under the provided name.
func (*Manager) ProbeOutput ¶
ProbeOutput returns true if an output resource exists under the provided name.
func (*Manager) ProbeProcessor ¶
ProbeProcessor returns true if a processor resource exists under the provided name.
func (*Manager) ProbeRateLimit ¶
ProbeRateLimit returns true if a rate limit resource exists under the provided name.
func (*Manager) RegisterEndpoint ¶
func (m *Manager) RegisterEndpoint(path, desc string, h http.HandlerFunc)
RegisterEndpoint registers a server wide HTTP endpoint.
func (*Manager) RemoveCache ¶
RemoveCache removes a resource.
func (*Manager) RemoveInput ¶
RemoveInput removes a resource.
func (*Manager) RemoveOutput ¶
RemoveOutput removes an output resource.
func (*Manager) RemoveProcessor ¶
RemoveProcessor removes a resource.
func (*Manager) RemoveRateLimit ¶
RemoveRateLimit removes a resource.
func (*Manager) SetGeneric ¶ added in v4.31.0
SetGeneric attempts to set a generic resource to a given value by key.
func (*Manager) SetPipe ¶
func (m *Manager) SetPipe(name string, t <-chan message.Transaction)
SetPipe registers a transaction chan under a name.
func (*Manager) StoreCache ¶
StoreCache always errors on invalid type.
func (*Manager) StoreInput ¶
StoreInput always errors on invalid type.
func (*Manager) StoreOutput ¶
StoreOutput always errors on invalid type.
func (*Manager) StoreProcessor ¶
StoreProcessor always errors on invalid type.
func (*Manager) StoreRateLimit ¶
StoreRateLimit always errors on invalid type.
func (*Manager) Tracer ¶
func (m *Manager) Tracer() trace.TracerProvider
Tracer returns a no-op tracer.
func (*Manager) UnsetPipe ¶
func (m *Manager) UnsetPipe(name string, t <-chan message.Transaction)
UnsetPipe removes a named transaction chan.
func (*Manager) WithAddedMetrics ¶
func (m *Manager) WithAddedMetrics(m2 metrics.Type) bundle.NewManagement
WithAddedMetrics returns the same mock manager.
type OutputChanneled ¶
type OutputChanneled struct {
TChan <-chan message.Transaction
}
OutputChanneled implements the output.Type interface around an exported transaction channel.
func (*OutputChanneled) ConnectionStatus ¶ added in v4.31.0
func (m *OutputChanneled) ConnectionStatus() component.ConnectionStatuses
ConnectionStatus returns the current status of the given component connection. The result is a slice in order to accommodate higher order components that wrap several others.
func (*OutputChanneled) Consume ¶
func (m *OutputChanneled) Consume(msgs <-chan message.Transaction) error
Consume sets the read channel. This implementation is NOT thread safe.
func (*OutputChanneled) TriggerCloseNow ¶
func (m *OutputChanneled) TriggerCloseNow()
TriggerCloseNow does nothing.
func (OutputChanneled) WaitForClose ¶
func (m OutputChanneled) WaitForClose(ctx context.Context) error
WaitForClose does nothing.
type OutputWriter ¶
type OutputWriter func(context.Context, message.Transaction) error
OutputWriter provides a mock implementation of types.OutputWriter.
func (OutputWriter) ConnectionStatus ¶ added in v4.31.0
func (o OutputWriter) ConnectionStatus() component.ConnectionStatuses
ConnectionStatus returns the current status of the given component connection. The result is a slice in order to accommodate higher order components that wrap several others.
func (OutputWriter) TriggerCloseNow ¶
func (o OutputWriter) TriggerCloseNow()
TriggerCloseNow does nothing.
func (OutputWriter) TriggerStopConsuming ¶
func (o OutputWriter) TriggerStopConsuming()
TriggerStopConsuming does nothing.
func (OutputWriter) WaitForClose ¶
func (o OutputWriter) WaitForClose(ctx context.Context) error
WaitForClose does nothing.
func (OutputWriter) WriteTransaction ¶
func (o OutputWriter) WriteTransaction(ctx context.Context, t message.Transaction) error
WriteTransaction attempts to write a transaction to an output.