Documentation ¶
Index ¶
- func LintYAMLBytes(lintConf docs.LintConfig, rawBytes []byte) ([]docs.Lint, error)
- func ShouldReread(err error) bool
- func Spec() docs.FieldSpecs
- func SpecWithoutStream(spec docs.FieldSpecs) docs.FieldSpecs
- type ErrMissingEnvVars
- type ErrNoReread
- type MainUpdateFunc
- type OptFunc
- func OptAddOverrides(overrides ...string) OptFunc
- func OptSetFullSpec(spec func() docs.FieldSpecs) OptFunc
- func OptSetLintConfig(lConf docs.LintConfig) OptFunc
- func OptSetStreamPaths(streamsPaths ...string) OptFunc
- func OptTestSuffix(suffix string) OptFunc
- func OptUseEnvLookupFunc(fn func(context.Context, string) (string, bool)) OptFunc
- func OptUseFS(fs ifs.FS) OptFunc
- type Reader
- func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error
- func (r *Reader) Close(ctx context.Context) error
- func (r *Reader) Read() (conf Type, pConf *docs.ParsedConfig, lints []string, err error)
- func (r *Reader) ReadFileEnvSwap(ctx context.Context, path string) (configBytes []byte, lints []docs.Lint, modTime time.Time, err error)
- func (r *Reader) ReadStreams(confs map[string]stream.Config) (lints []string, err error)
- func (r *Reader) ReadYAMLFileLinted(ctx context.Context, spec docs.FieldSpecs, path string, skipEnvVarCheck bool, ...) (Type, []docs.Lint, error)
- func (r *Reader) ReplaceEnvVariables(ctx context.Context, inBytes []byte) (replaced []byte, err error)
- func (r *Reader) SubscribeConfigChanges(fn MainUpdateFunc) error
- func (r *Reader) SubscribeStreamChanges(fn StreamUpdateFunc) error
- func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error
- func (r *Reader) TriggerResourceDelete(mgr bundle.NewManagement, path string) error
- func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error
- func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error
- type StreamUpdateFunc
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LintYAMLBytes ¶
LintYAMLBytes attempts to report errors within a user config. Returns a slice of lint results.
func ShouldReread ¶
ShouldReread returns true if the error returned from an update trigger is non nil and also temporal, and therefore it is worth trying the update again even if the content has not changed.
func Spec ¶
func Spec() docs.FieldSpecs
Spec returns a docs.FieldSpec for an entire Benthos configuration.
func SpecWithoutStream ¶
func SpecWithoutStream(spec docs.FieldSpecs) docs.FieldSpecs
SpecWithoutStream describes a stream config without the core stream fields.
Types ¶
type ErrMissingEnvVars ¶
type ErrMissingEnvVars struct { Variables []string // Our best attempt at parsing the config that's missing variables by simply // inserting an empty string. There's a good chance this is still a valid // config! :) BestAttempt []byte }
ErrMissingEnvVars is returned when attempting environment variable interpolations where the referenced environment variables are missing.
func (*ErrMissingEnvVars) Error ¶
func (e *ErrMissingEnvVars) Error() string
Error returns a rather sweet error message.
type ErrNoReread ¶
type ErrNoReread struct {
// contains filtered or unexported fields
}
ErrNoReread is an error type returned from update triggers that indicates an attempt should not be re-made unless the source file has been modified.
func (*ErrNoReread) Error ¶
func (e *ErrNoReread) Error() string
Error returns a human readable error string.
type MainUpdateFunc ¶
MainUpdateFunc is a closure function called whenever a main config has been updated. If an error is returned then the attempt will be made again after a grace period.
type OptFunc ¶
type OptFunc func(*Reader)
OptFunc is an opt function that changes the behaviour of a config reader.
func OptAddOverrides ¶
OptAddOverrides adds one or more override expressions to the config reader, each of the form `path=value`.
func OptSetFullSpec ¶
func OptSetFullSpec(spec func() docs.FieldSpecs) OptFunc
OptSetFullSpec overrides the default general config spec with the provided one.
func OptSetLintConfig ¶
func OptSetLintConfig(lConf docs.LintConfig) OptFunc
OptSetLintConfig sets the config used for linting files.
func OptSetStreamPaths ¶
OptSetStreamPaths marks this config reader as operating in streams mode, and adds a list of paths to obtain individual stream configs from.
func OptTestSuffix ¶
OptTestSuffix configures the suffix given to unit test definition files, this is used in order to exclude unit tests from being run in streams mode with arbitrary directory walking.
func OptUseEnvLookupFunc ¶ added in v4.31.0
OptUseEnvLookupFunc overrides the default environment variable lookup func.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader provides utilities for parsing a Benthos config as a main file with a collection of resource files, and options such as overrides.
func (*Reader) BeginFileWatching ¶
func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error
BeginFileWatching creates a goroutine that watches all active configuration files for changes. If a resource is changed then it is swapped out automatically through the provided manager. If a main config or stream config changes then the closures registered with either SubscribeConfigChanges or SubscribeStreamChanges will be called.
WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be called before this, as otherwise it is unsafe to register them during watching.
func (*Reader) ReadFileEnvSwap ¶ added in v4.31.0
func (r *Reader) ReadFileEnvSwap(ctx context.Context, path string) (configBytes []byte, lints []docs.Lint, modTime time.Time, err error)
ReadFileEnvSwap reads a file and replaces any environment variable interpolations before returning the contents. Linting errors are returned if the file has an unexpected higher level format, such as invalid utf-8 encoding.
An modTime timestamp is returned if the modtime of the file is available.
func (*Reader) ReadStreams ¶
ReadStreams attempts to read Benthos stream configs from one or more paths. Stream configs are extracted and added to a provided map, where the id is derived from the path of the stream config file.
func (*Reader) ReadYAMLFileLinted ¶ added in v4.31.0
func (r *Reader) ReadYAMLFileLinted(ctx context.Context, spec docs.FieldSpecs, path string, skipEnvVarCheck bool, lConf docs.LintConfig) (Type, []docs.Lint, error)
ReadYAMLFileLinted will attempt to read a configuration file path into a structure. Returns an array of lint messages or an error.
func (*Reader) ReplaceEnvVariables ¶ added in v4.31.0
func (r *Reader) ReplaceEnvVariables(ctx context.Context, inBytes []byte) (replaced []byte, err error)
ReplaceEnvVariables will search a blob of data for the pattern `${FOO:bar}`, where `FOO` is an environment variable name and `bar` is a default value. The `bar` section (including the colon) can be left out if there is no appropriate default value for the field.
For each aforementioned pattern found in the blob the contents of the respective environment variable will be read and will replace the pattern. If the environment variable is empty or does not exist then either the default value is used or the field will be left empty.
func (*Reader) SubscribeConfigChanges ¶
func (r *Reader) SubscribeConfigChanges(fn MainUpdateFunc) error
SubscribeConfigChanges registers a closure function to be called whenever the main configuration file is updated.
The provided closure should return true if the stream was successfully replaced.
func (*Reader) SubscribeStreamChanges ¶
func (r *Reader) SubscribeStreamChanges(fn StreamUpdateFunc) error
SubscribeStreamChanges registers a closure to be called whenever the configuration of a stream is updated.
The provided closure should return true if the stream was successfully replaced.
func (*Reader) TriggerMainUpdate ¶
TriggerMainUpdate attempts to re-read the main configuration file, trigger the provided main update func, and apply changes to resources to the provided manager as appropriate.
func (*Reader) TriggerResourceDelete ¶
func (r *Reader) TriggerResourceDelete(mgr bundle.NewManagement, path string) error
TriggerResourceDelete attempts to remove all resources that originated from a given file.
func (*Reader) TriggerResourceUpdate ¶
TriggerResourceUpdate attempts to re-read a resource configuration file and apply changes to the provided manager as appropriate.
func (*Reader) TriggerStreamUpdate ¶
TriggerStreamUpdate attempts to re-read a stream configuration file, and trigger the provided stream update func.
type StreamUpdateFunc ¶
StreamUpdateFunc is a closure function called whenever a stream config has been updated. If an error is returned then the attempt will be made again after a grace period.
When the provided config is nil it is a signal that the stream has been deleted, and it is expected that the provided update func should shut that stream down.
type Type ¶
type Type struct { HTTP api.Config `yaml:"http"` stream.Config `yaml:",inline"` manager.ResourceConfig `yaml:",inline"` Logger log.Config `yaml:"logger"` Metrics metrics.Config `yaml:"metrics"` Tracer tracer.Config `yaml:"tracer"` SystemCloseDelay string `yaml:"shutdown_delay"` SystemCloseTimeout string `yaml:"shutdown_timeout"` Tests []any `yaml:"tests"` // contains filtered or unexported fields }
Type is the Benthos service configuration struct.
func FromParsed ¶
FromParsed extracts the Benthos service fields from the parsed config and returns a Benthos service config.
func (*Type) GetRawSource ¶
GetRawSource returns the Type raw source.