mock

package
v4.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cache

type Cache struct {
	Values map[string]CacheItem
}

Cache provides a mock cache implementation.

func (*Cache) Add

func (c *Cache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error

Add a mock cache item.

func (*Cache) Close

func (c *Cache) Close(ctx context.Context) error

Close does nothing.

func (*Cache) Delete

func (c *Cache) Delete(ctx context.Context, key string) error

Delete a mock cache item.

func (*Cache) Get

func (c *Cache) Get(ctx context.Context, key string) ([]byte, error)

Get a mock cache item.

func (*Cache) Set

func (c *Cache) Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error

Set a mock cache item.

func (*Cache) SetMulti

func (c *Cache) SetMulti(ctx context.Context, kvs map[string]cache.TTLItem) error

SetMulti sets multiple mock cache items.

type CacheItem

type CacheItem struct {
	Value string
	TTL   *time.Duration
}

CacheItem represents a cached key/ttl pair.

type Input

type Input struct {
	TChan chan message.Transaction
	// contains filtered or unexported fields
}

Input provides a mocked input implementation.

func NewInput

func NewInput(batches []message.Batch) *Input

NewInput creates a new mock input that will return transactions containing a list of batches, then exit.

func (*Input) ConnectionStatus added in v4.31.0

func (f *Input) ConnectionStatus() component.ConnectionStatuses

ConnectionStatus returns the current connection activity.

func (*Input) TransactionChan

func (f *Input) TransactionChan() <-chan message.Transaction

TransactionChan returns a transaction channel.

func (*Input) TriggerCloseNow

func (f *Input) TriggerCloseNow()

TriggerCloseNow closes the input transaction channel.

func (*Input) TriggerStopConsuming

func (f *Input) TriggerStopConsuming()

TriggerStopConsuming closes the input transaction channel.

func (*Input) WaitForClose

func (f *Input) WaitForClose(ctx context.Context) error

WaitForClose does nothing.

type Manager

type Manager struct {
	Version string

	Inputs     map[string]*Input
	Caches     map[string]map[string]CacheItem
	RateLimits map[string]RateLimit
	Outputs    map[string]OutputWriter
	Processors map[string]Processor
	Pipes      map[string]<-chan message.Transaction

	// OnRegisterEndpoint can be set in order to intercept endpoints registered
	// by components.
	OnRegisterEndpoint func(path string, h http.HandlerFunc)

	CustomFS ifs.FS
	M        metrics.Type
	L        log.Modular
	T        trace.TracerProvider
	// contains filtered or unexported fields
}

Manager provides a mock benthos manager that components can use to test interactions with fake resources.

func NewManager

func NewManager() *Manager

NewManager provides a new mock manager.

func (*Manager) AccessCache

func (m *Manager) AccessCache(ctx context.Context, name string, fn func(cache.V1)) error

AccessCache executes a closure on a cache resource.

func (*Manager) AccessInput

func (m *Manager) AccessInput(ctx context.Context, name string, fn func(input.Streamed)) error

AccessInput executes a closure on an input resource.

func (*Manager) AccessOutput

func (m *Manager) AccessOutput(ctx context.Context, name string, fn func(output.Sync)) error

AccessOutput executes a closure on an output resource.

func (*Manager) AccessProcessor

func (m *Manager) AccessProcessor(ctx context.Context, name string, fn func(processor.V1)) error

AccessProcessor executes a closure on a processor resource.

func (*Manager) AccessRateLimit

func (m *Manager) AccessRateLimit(ctx context.Context, name string, fn func(ratelimit.V1)) error

AccessRateLimit executes a closure on a rate limit resource.

func (*Manager) BloblEnvironment

func (m *Manager) BloblEnvironment() *bloblang.Environment

BloblEnvironment always returns the global environment.

func (*Manager) EngineVersion

func (m *Manager) EngineVersion() string

EngineVersion returns the version stamp associated with the underlying benthos engine.

func (*Manager) Environment

func (m *Manager) Environment() *bundle.Environment

Environment always returns the global environment.

func (*Manager) FS

func (m *Manager) FS() ifs.FS

FS returns CustomFS, which wraps the os package unless overridden.

func (*Manager) ForStream

func (m *Manager) ForStream(id string) bundle.NewManagement

ForStream returns the same mock manager.

func (*Manager) GetGeneric added in v4.31.0

func (m *Manager) GetGeneric(key any) (any, bool)

GetGeneric attempts to obtain and return a generic resource value by key.

func (*Manager) GetOrSetGeneric added in v4.31.0

func (m *Manager) GetOrSetGeneric(key, value any) (actual any, loaded bool)

GetOrSetGeneric attempts to obtain an existing value for a given key if present. Otherwise, it stores and returns the provided value. The loaded result is true if the value was loaded, false if stored.

func (*Manager) GetPipe

func (m *Manager) GetPipe(name string) (<-chan message.Transaction, error)

GetPipe attempts to find a service wide transaction chan by its name.

func (*Manager) IntoPath

func (m *Manager) IntoPath(segments ...string) bundle.NewManagement

IntoPath returns the same mock manager.

func (*Manager) Label

func (m *Manager) Label() string

Label always returns empty.

func (*Manager) Logger

func (m *Manager) Logger() log.Modular

Logger returns a no-op logger.

func (*Manager) Metrics

func (m *Manager) Metrics() metrics.Type

Metrics returns a no-op metrics.

func (*Manager) NewBuffer

func (m *Manager) NewBuffer(conf buffer.Config) (buffer.Streamed, error)

NewBuffer always errors on invalid type.

func (*Manager) NewCache

func (m *Manager) NewCache(conf cache.Config) (cache.V1, error)

NewCache always errors on invalid type.

func (*Manager) NewInput

func (m *Manager) NewInput(conf input.Config) (input.Streamed, error)

NewInput always errors on invalid type.

func (*Manager) NewOutput

func (m *Manager) NewOutput(conf output.Config, pipelines ...processor.PipelineConstructorFunc) (output.Streamed, error)

NewOutput always errors on invalid type.

func (*Manager) NewProcessor

func (m *Manager) NewProcessor(conf processor.Config) (processor.V1, error)

NewProcessor always errors on invalid type.

func (*Manager) NewRateLimit

func (m *Manager) NewRateLimit(conf ratelimit.Config) (ratelimit.V1, error)

NewRateLimit always errors on invalid type.

func (*Manager) NewScanner

func (m *Manager) NewScanner(conf scanner.Config) (scanner.Creator, error)

NewScanner attempts to create a new scanner component from a config.

func (*Manager) Path

func (m *Manager) Path() []string

Path always returns empty.

func (*Manager) ProbeCache

func (m *Manager) ProbeCache(name string) bool

ProbeCache returns true if a cache resource exists under the provided name.

func (*Manager) ProbeInput

func (m *Manager) ProbeInput(name string) bool

ProbeInput returns true if an input resource exists under the provided name.

func (*Manager) ProbeOutput

func (m *Manager) ProbeOutput(name string) bool

ProbeOutput returns true if an output resource exists under the provided name.

func (*Manager) ProbeProcessor

func (m *Manager) ProbeProcessor(name string) bool

ProbeProcessor returns true if a processor resource exists under the provided name.

func (*Manager) ProbeRateLimit

func (m *Manager) ProbeRateLimit(name string) bool

ProbeRateLimit returns true if a rate limit resource exists under the provided name.

func (*Manager) RegisterEndpoint

func (m *Manager) RegisterEndpoint(path, desc string, h http.HandlerFunc)

RegisterEndpoint registers a server wide HTTP endpoint.

func (*Manager) RemoveCache

func (m *Manager) RemoveCache(ctx context.Context, name string) error

RemoveCache removes a resource.

func (*Manager) RemoveInput

func (m *Manager) RemoveInput(ctx context.Context, name string) error

RemoveInput removes a resource.

func (*Manager) RemoveOutput

func (m *Manager) RemoveOutput(ctx context.Context, name string) error

RemoveOutput removes an output resource.

func (*Manager) RemoveProcessor

func (m *Manager) RemoveProcessor(ctx context.Context, name string) error

RemoveProcessor removes a resource.

func (*Manager) RemoveRateLimit

func (m *Manager) RemoveRateLimit(ctx context.Context, name string) error

RemoveRateLimit removes a resource.

func (*Manager) SetGeneric added in v4.31.0

func (m *Manager) SetGeneric(key, value any)

SetGeneric attempts to set a generic resource to a given value by key.

func (*Manager) SetPipe

func (m *Manager) SetPipe(name string, t <-chan message.Transaction)

SetPipe registers a transaction chan under a name.

func (*Manager) StoreCache

func (m *Manager) StoreCache(ctx context.Context, name string, conf cache.Config) error

StoreCache always errors on invalid type.

func (*Manager) StoreInput

func (m *Manager) StoreInput(ctx context.Context, name string, conf input.Config) error

StoreInput always errors on invalid type.

func (*Manager) StoreOutput

func (m *Manager) StoreOutput(ctx context.Context, name string, conf output.Config) error

StoreOutput always errors on invalid type.

func (*Manager) StoreProcessor

func (m *Manager) StoreProcessor(ctx context.Context, name string, conf processor.Config) error

StoreProcessor always errors on invalid type.

func (*Manager) StoreRateLimit

func (m *Manager) StoreRateLimit(ctx context.Context, name string, conf ratelimit.Config) error

StoreRateLimit always errors on invalid type.

func (*Manager) Tracer

func (m *Manager) Tracer() trace.TracerProvider

Tracer returns a no-op tracer.

func (*Manager) UnsetPipe

func (m *Manager) UnsetPipe(name string, t <-chan message.Transaction)

UnsetPipe removes a named transaction chan.

func (*Manager) WithAddedMetrics

func (m *Manager) WithAddedMetrics(m2 metrics.Type) bundle.NewManagement

WithAddedMetrics returns the same mock manager.

type OutputChanneled

type OutputChanneled struct {
	TChan <-chan message.Transaction
}

OutputChanneled implements the output.Type interface around an exported transaction channel.

func (*OutputChanneled) ConnectionStatus added in v4.31.0

func (m *OutputChanneled) ConnectionStatus() component.ConnectionStatuses

ConnectionStatus returns the current status of the given component connection. The result is a slice in order to accommodate higher order components that wrap several others.

func (*OutputChanneled) Consume

func (m *OutputChanneled) Consume(msgs <-chan message.Transaction) error

Consume sets the read channel. This implementation is NOT thread safe.

func (*OutputChanneled) TriggerCloseNow

func (m *OutputChanneled) TriggerCloseNow()

TriggerCloseNow does nothing.

func (OutputChanneled) WaitForClose

func (m OutputChanneled) WaitForClose(ctx context.Context) error

WaitForClose does nothing.

type OutputWriter

type OutputWriter func(context.Context, message.Transaction) error

OutputWriter provides a mock implementation of types.OutputWriter.

func (OutputWriter) ConnectionStatus added in v4.31.0

func (o OutputWriter) ConnectionStatus() component.ConnectionStatuses

ConnectionStatus returns the current status of the given component connection. The result is a slice in order to accommodate higher order components that wrap several others.

func (OutputWriter) TriggerCloseNow

func (o OutputWriter) TriggerCloseNow()

TriggerCloseNow does nothing.

func (OutputWriter) TriggerStopConsuming

func (o OutputWriter) TriggerStopConsuming()

TriggerStopConsuming does nothing.

func (OutputWriter) WaitForClose

func (o OutputWriter) WaitForClose(ctx context.Context) error

WaitForClose does nothing.

func (OutputWriter) WriteTransaction

func (o OutputWriter) WriteTransaction(ctx context.Context, t message.Transaction) error

WriteTransaction attempts to write a transaction to an output.

type Processor

type Processor func(message.Batch) ([]message.Batch, error)

Processor provides a mock processor implementation around a closure.

func (Processor) Close

func (p Processor) Close(context.Context) error

Close does nothing.

func (Processor) ProcessBatch

func (p Processor) ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error)

ProcessBatch returns the closure result executed on a batch.

type RateLimit

type RateLimit func(context.Context) (time.Duration, error)

RateLimit provides a mock rate limit implementation around a closure.

func (RateLimit) Access

func (r RateLimit) Access(ctx context.Context) (time.Duration, error)

Access the rate limit.

func (RateLimit) Close

func (r RateLimit) Close(ctx context.Context) error

Close does nothing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL