Versions in this module Expand all Collapse all v4 v4.14.1 Apr 26, 2023 Changes in this version + var ErrEndOfBuffer = errors.New("end of buffer") + var ErrEndOfInput = errors.New("end of input") + var ErrKeyAlreadyExists = errors.New("key already exists") + var ErrKeyNotFound = errors.New("key does not exist") + var ErrNotConnected = errors.New("not connected") + func RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error + func RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error + func RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error + func RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error + func RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error + func RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error + func RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error + func RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error + func RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error + func RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error + func RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error + func RunCLI(ctx context.Context) + func XFormatConfigJSON() ([]byte, error) + type AckFunc func(ctx context.Context, err error) error + type BatchBuffer interface + EndOfInput func() + ReadBatch func(context.Context) (MessageBatch, AckFunc, error) + WriteBatch func(context.Context, MessageBatch, AckFunc) error + type BatchBufferConstructor func(conf *ParsedConfig, mgr *Resources) (BatchBuffer, error) + type BatchError struct + func NewBatchError(b MessageBatch, headline error) *BatchError + func (err *BatchError) Error() string + func (err *BatchError) Failed(i int, merr error) *BatchError + func (err *BatchError) IndexedErrors() int + func (err *BatchError) Unwrap() error + func (err *BatchError) WalkMessages(fn func(int, *Message, error) bool) + type BatchInput interface + Connect func(context.Context) error + ReadBatch func(context.Context) (MessageBatch, AckFunc, error) + func AutoRetryNacksBatched(i BatchInput) BatchInput + func InputBatchedWithMaxInFlight(n int, i BatchInput) BatchInput + type BatchInputConstructor func(conf *ParsedConfig, mgr *Resources) (BatchInput, error) + type BatchOutput interface + Connect func(context.Context) error + WriteBatch func(context.Context, MessageBatch) error + type BatchOutputConstructor func(conf *ParsedConfig, mgr *Resources) (out BatchOutput, batchPolicy BatchPolicy, maxInFlight int, err error) + type BatchPolicy struct + ByteSize int + Check string + Count int + Period string + func (b BatchPolicy) NewBatcher(res *Resources) (*Batcher, error) + type BatchProcessor interface + ProcessBatch func(context.Context, MessageBatch) ([]MessageBatch, error) + type BatchProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (BatchProcessor, error) + type Batcher struct + func (b *Batcher) Add(msg *Message) bool + func (b *Batcher) Close(ctx context.Context) error + func (b *Batcher) Flush(ctx context.Context) (batch MessageBatch, err error) + func (b *Batcher) UntilNext() (time.Duration, bool) + type Cache interface + Add func(ctx context.Context, key string, value []byte, ttl *time.Duration) error + Delete func(ctx context.Context, key string) error + Get func(ctx context.Context, key string) ([]byte, error) + Set func(ctx context.Context, key string, value []byte, ttl *time.Duration) error + type CacheConstructor func(conf *ParsedConfig, mgr *Resources) (Cache, error) + type CacheItem struct + Key string + TTL *time.Duration + Value []byte + type Closer interface + Close func(ctx context.Context) error + type ConfigField struct + func NewAnyField(name string) *ConfigField + func NewAnyListField(name string) *ConfigField + func NewAnyMapField(name string) *ConfigField + func NewBackOffField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField + func NewBackOffToggledField(name string, allowUnbounded bool, defaults *backoff.ExponentialBackOff) *ConfigField + func NewBatchPolicyField(name string) *ConfigField + func NewBloblangField(name string) *ConfigField + func NewBoolField(name string) *ConfigField + func NewDurationField(name string) *ConfigField + func NewFloatField(name string) *ConfigField + func NewInputField(name string) *ConfigField + func NewInputListField(name string) *ConfigField + func NewInputMaxInFlightField() *ConfigField + func NewIntField(name string) *ConfigField + func NewIntListField(name string) *ConfigField + func NewIntMapField(name string) *ConfigField + func NewInternalField(ifield docs.FieldSpec) *ConfigField + func NewInterpolatedStringField(name string) *ConfigField + func NewInterpolatedStringListField(name string) *ConfigField + func NewInterpolatedStringMapField(name string) *ConfigField + func NewMetadataFilterField(name string) *ConfigField + func NewObjectField(name string, fields ...*ConfigField) *ConfigField + func NewObjectListField(name string, fields ...*ConfigField) *ConfigField + func NewOutputField(name string) *ConfigField + func NewOutputListField(name string) *ConfigField + func NewProcessorField(name string) *ConfigField + func NewProcessorListField(name string) *ConfigField + func NewStringAnnotatedEnumField(name string, options map[string]string) *ConfigField + func NewStringEnumField(name string, options ...string) *ConfigField + func NewStringField(name string) *ConfigField + func NewStringListField(name string) *ConfigField + func NewStringMapField(name string) *ConfigField + func NewTLSField(name string) *ConfigField + func NewTLSToggledField(name string) *ConfigField + func NewURLField(name string) *ConfigField + func (c *ConfigField) Advanced() *ConfigField + func (c *ConfigField) Default(v any) *ConfigField + func (c *ConfigField) Deprecated() *ConfigField + func (c *ConfigField) Description(d string) *ConfigField + func (c *ConfigField) Example(e any) *ConfigField + func (c *ConfigField) LintRule(blobl string) *ConfigField + func (c *ConfigField) Optional() *ConfigField + func (c *ConfigField) Secret() *ConfigField + func (c *ConfigField) Version(v string) *ConfigField + func (c *ConfigField) XUnwrapper() any + type ConfigSpec struct + func NewConfigSpec() *ConfigSpec + func (c *ConfigSpec) Beta() *ConfigSpec + func (c *ConfigSpec) Categories(categories ...string) *ConfigSpec + func (c *ConfigSpec) Deprecated() *ConfigSpec + func (c *ConfigSpec) Description(description string) *ConfigSpec + func (c *ConfigSpec) EncodeJSON(v []byte) error + func (c *ConfigSpec) Example(title, summary, config string) *ConfigSpec + func (c *ConfigSpec) Field(f *ConfigField) *ConfigSpec + func (c *ConfigSpec) Fields(fs ...*ConfigField) *ConfigSpec + func (c *ConfigSpec) Footnotes(description string) *ConfigSpec + func (c *ConfigSpec) LintRule(blobl string) *ConfigSpec + func (c *ConfigSpec) ParseYAML(yamlStr string, env *Environment) (*ParsedConfig, error) + func (c *ConfigSpec) Stable() *ConfigSpec + func (c *ConfigSpec) Summary(summary string) *ConfigSpec + func (c *ConfigSpec) Version(v string) *ConfigSpec + type ConfigView struct + func (c *ConfigView) Description() string + func (c *ConfigView) FormatJSON() ([]byte, error) + func (c *ConfigView) IsDeprecated() bool + func (c *ConfigView) RenderDocs() ([]byte, error) + func (c *ConfigView) Summary() string + type Environment struct + func GlobalEnvironment() *Environment + func NewEnvironment() *Environment + func (e *Environment) Clone() *Environment + func (e *Environment) NewStreamBuilder() *StreamBuilder + func (e *Environment) RegisterBatchBuffer(name string, spec *ConfigSpec, ctor BatchBufferConstructor) error + func (e *Environment) RegisterBatchInput(name string, spec *ConfigSpec, ctor BatchInputConstructor) error + func (e *Environment) RegisterBatchOutput(name string, spec *ConfigSpec, ctor BatchOutputConstructor) error + func (e *Environment) RegisterBatchProcessor(name string, spec *ConfigSpec, ctor BatchProcessorConstructor) error + func (e *Environment) RegisterCache(name string, spec *ConfigSpec, ctor CacheConstructor) error + func (e *Environment) RegisterInput(name string, spec *ConfigSpec, ctor InputConstructor) error + func (e *Environment) RegisterMetricsExporter(name string, spec *ConfigSpec, ctor MetricsExporterConstructor) error + func (e *Environment) RegisterOtelTracerProvider(name string, spec *ConfigSpec, ctor OtelTracerProviderConstructor) error + func (e *Environment) RegisterOutput(name string, spec *ConfigSpec, ctor OutputConstructor) error + func (e *Environment) RegisterProcessor(name string, spec *ConfigSpec, ctor ProcessorConstructor) error + func (e *Environment) RegisterRateLimit(name string, spec *ConfigSpec, ctor RateLimitConstructor) error + func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment) + func (e *Environment) UseFS(fs *FS) + func (e *Environment) WalkBuffers(fn func(name string, config *ConfigView)) + func (e *Environment) WalkCaches(fn func(name string, config *ConfigView)) + func (e *Environment) WalkInputs(fn func(name string, config *ConfigView)) + func (e *Environment) WalkMetrics(fn func(name string, config *ConfigView)) + func (e *Environment) WalkOutputs(fn func(name string, config *ConfigView)) + func (e *Environment) WalkProcessors(fn func(name string, config *ConfigView)) + func (e *Environment) WalkRateLimits(fn func(name string, config *ConfigView)) + func (e *Environment) WalkTracers(fn func(name string, config *ConfigView)) + type FS struct + func NewFS(filesystem fs.FS) *FS + func (f *FS) MkdirAll(path string, perm fs.FileMode) error + func (f *FS) Open(name string) (fs.File, error) + func (f *FS) OpenFile(name string, flag int, perm fs.FileMode) (fs.File, error) + func (f *FS) Remove(name string) error + func (f *FS) Stat(name string) (fs.FileInfo, error) + type HTTPMultiplexer interface + HandleFunc func(pattern string, handler func(http.ResponseWriter, *http.Request)) + type Input interface + Connect func(context.Context) error + Read func(context.Context) (*Message, AckFunc, error) + func AutoRetryNacks(i Input) Input + func InputWithMaxInFlight(n int, i Input) Input + type InputConstructor func(conf *ParsedConfig, mgr *Resources) (Input, error) + type InterpolatedString struct + func NewInterpolatedString(expr string) (*InterpolatedString, error) + func (i *InterpolatedString) Bytes(m *Message) []byte + func (i *InterpolatedString) String(m *Message) string + func (i *InterpolatedString) TryBytes(m *Message) ([]byte, error) + func (i *InterpolatedString) TryString(m *Message) (string, error) + type Lint struct + Column int + Line int + Type LintType + What string + func (l Lint) Error() string + type LintError []Lint + func (e LintError) Error() string + type LintType int + const LintBadBloblang + const LintBadLabel + const LintComponentMissing + const LintComponentNotFound + const LintCustom + const LintDeprecated + const LintDuplicateLabel + const LintExpectedArray + const LintExpectedObject + const LintExpectedScalar + const LintFailedRead + const LintInvalidOption + const LintMissing + const LintMissingLabel + const LintShouldOmit + const LintUnknown + type Logger struct + func (l *Logger) Debug(message string) + func (l *Logger) Debugf(template string, args ...any) + func (l *Logger) Error(message string) + func (l *Logger) Errorf(template string, args ...any) + func (l *Logger) Info(message string) + func (l *Logger) Infof(template string, args ...any) + func (l *Logger) Trace(message string) + func (l *Logger) Tracef(template string, args ...any) + func (l *Logger) Warn(message string) + func (l *Logger) Warnf(template string, args ...any) + func (l *Logger) With(keyValuePairs ...any) *Logger + type Message struct + func NewMessage(content []byte) *Message + func (m *Message) AsBytes() ([]byte, error) + func (m *Message) AsStructured() (any, error) + func (m *Message) AsStructuredMut() (any, error) + func (m *Message) BloblangMutate(blobl *bloblang.Executor) (*Message, error) + func (m *Message) BloblangQuery(blobl *bloblang.Executor) (*Message, error) + func (m *Message) Context() context.Context + func (m *Message) Copy() *Message + func (m *Message) DeepCopy() *Message + func (m *Message) GetError() error + func (m *Message) MetaDelete(key string) + func (m *Message) MetaGet(key string) (string, bool) + func (m *Message) MetaGetMut(key string) (any, bool) + func (m *Message) MetaSet(key, value string) + func (m *Message) MetaSetMut(key string, value any) + func (m *Message) MetaWalk(fn func(string, string) error) error + func (m *Message) MetaWalkMut(fn func(key string, value any) error) error + func (m *Message) SetBytes(b []byte) + func (m *Message) SetError(err error) + func (m *Message) SetStructured(i any) + func (m *Message) SetStructuredMut(i any) + func (m *Message) WithContext(ctx context.Context) *Message + type MessageBatch []*Message + func ExecuteProcessors(ctx context.Context, processors []*OwnedProcessor, inbatches ...MessageBatch) ([]MessageBatch, error) + func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Message, error) + func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Message, error) + func (b MessageBatch) Copy() MessageBatch + func (b MessageBatch) DeepCopy() MessageBatch + func (b MessageBatch) InterpolatedBytes(index int, i *InterpolatedString) []byte + func (b MessageBatch) InterpolatedString(index int, i *InterpolatedString) string + func (b MessageBatch) TryInterpolatedBytes(index int, i *InterpolatedString) ([]byte, error) + func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (string, error) + type MessageBatchHandlerFunc func(context.Context, MessageBatch) error + type MessageHandlerFunc func(context.Context, *Message) error + type MetadataFilter struct + func (m *MetadataFilter) Walk(msg *Message, fn func(key, value string) error) error + type MetricCounter struct + func (c *MetricCounter) Incr(count int64, labelValues ...string) + type MetricGauge struct + func (g *MetricGauge) Set(value int64, labelValues ...string) + type MetricTimer struct + func (t *MetricTimer) Timing(delta int64, labelValues ...string) + type Metrics struct + func (m *Metrics) NewCounter(name string, labelKeys ...string) *MetricCounter + func (m *Metrics) NewGauge(name string, labelKeys ...string) *MetricGauge + func (m *Metrics) NewTimer(name string, labelKeys ...string) *MetricTimer + type MetricsExporter interface + Close func(ctx context.Context) error + NewCounterCtor func(name string, labelKeys ...string) MetricsExporterCounterCtor + NewGaugeCtor func(name string, labelKeys ...string) MetricsExporterGaugeCtor + NewTimerCtor func(name string, labelKeys ...string) MetricsExporterTimerCtor + type MetricsExporterConstructor func(conf *ParsedConfig, log *Logger) (MetricsExporter, error) + type MetricsExporterCounter interface + Incr func(count int64) + type MetricsExporterCounterCtor func(labelValues ...string) MetricsExporterCounter + type MetricsExporterGauge interface + Set func(value int64) + type MetricsExporterGaugeCtor func(labelValues ...string) MetricsExporterGauge + type MetricsExporterTimer interface + Timing func(delta int64) + type MetricsExporterTimerCtor func(labelValues ...string) MetricsExporterTimer + type MockResourcesOptFn func(*mock.Manager) + func MockResourcesOptAddCache(name string) MockResourcesOptFn + func MockResourcesOptAddRateLimit(name string, fn func(context.Context) (time.Duration, error)) MockResourcesOptFn + type OtelTracerProviderConstructor func(conf *ParsedConfig) (trace.TracerProvider, error) + type Output interface + Connect func(context.Context) error + Write func(context.Context, *Message) error + type OutputConstructor func(conf *ParsedConfig, mgr *Resources) (out Output, maxInFlight int, err error) + type OwnedInput struct + func (o *OwnedInput) BatchedWith(b *Batcher) *OwnedInput + func (o *OwnedInput) Close(ctx context.Context) error + func (o *OwnedInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error) + func (o *OwnedInput) XUnwrapper() any + type OwnedOutput struct + func (o *OwnedOutput) Close(ctx context.Context) error + func (o *OwnedOutput) Write(ctx context.Context, m *Message) error + func (o *OwnedOutput) WriteBatch(ctx context.Context, b MessageBatch) error + type OwnedProcessor struct + func (o *OwnedProcessor) Close(ctx context.Context) error + func (o *OwnedProcessor) Process(ctx context.Context, msg *Message) (MessageBatch, error) + func (o *OwnedProcessor) ProcessBatch(ctx context.Context, batch MessageBatch) ([]MessageBatch, error) + type ParsedConfig struct + func (p *ParsedConfig) Contains(path ...string) bool + func (p *ParsedConfig) FieldAny(path ...string) (any, error) + func (p *ParsedConfig) FieldAnyList(path ...string) ([]*ParsedConfig, error) + func (p *ParsedConfig) FieldAnyMap(path ...string) (map[string]*ParsedConfig, error) + func (p *ParsedConfig) FieldBackOff(path ...string) (*backoff.ExponentialBackOff, error) + func (p *ParsedConfig) FieldBackOffToggled(path ...string) (boff *backoff.ExponentialBackOff, enabled bool, err error) + func (p *ParsedConfig) FieldBatchPolicy(path ...string) (conf BatchPolicy, err error) + func (p *ParsedConfig) FieldBloblang(path ...string) (*bloblang.Executor, error) + func (p *ParsedConfig) FieldBool(path ...string) (bool, error) + func (p *ParsedConfig) FieldDuration(path ...string) (time.Duration, error) + func (p *ParsedConfig) FieldFloat(path ...string) (float64, error) + func (p *ParsedConfig) FieldInput(path ...string) (*OwnedInput, error) + func (p *ParsedConfig) FieldInputList(path ...string) ([]*OwnedInput, error) + func (p *ParsedConfig) FieldInt(path ...string) (int, error) + func (p *ParsedConfig) FieldIntList(path ...string) ([]int, error) + func (p *ParsedConfig) FieldIntMap(path ...string) (map[string]int, error) + func (p *ParsedConfig) FieldInterpolatedString(path ...string) (*InterpolatedString, error) + func (p *ParsedConfig) FieldInterpolatedStringList(path ...string) ([]*InterpolatedString, error) + func (p *ParsedConfig) FieldInterpolatedStringMap(path ...string) (map[string]*InterpolatedString, error) + func (p *ParsedConfig) FieldMetadataFilter(path ...string) (f *MetadataFilter, err error) + func (p *ParsedConfig) FieldObjectList(path ...string) ([]*ParsedConfig, error) + func (p *ParsedConfig) FieldOutput(path ...string) (*OwnedOutput, error) + func (p *ParsedConfig) FieldOutputList(path ...string) ([]*OwnedOutput, error) + func (p *ParsedConfig) FieldProcessor(path ...string) (*OwnedProcessor, error) + func (p *ParsedConfig) FieldProcessorList(path ...string) ([]*OwnedProcessor, error) + func (p *ParsedConfig) FieldString(path ...string) (string, error) + func (p *ParsedConfig) FieldStringList(path ...string) ([]string, error) + func (p *ParsedConfig) FieldStringMap(path ...string) (map[string]string, error) + func (p *ParsedConfig) FieldTLS(path ...string) (*tls.Config, error) + func (p *ParsedConfig) FieldTLSToggled(path ...string) (tconf *tls.Config, enabled bool, err error) + func (p *ParsedConfig) FieldURL(path ...string) (*url.URL, error) + func (p *ParsedConfig) Namespace(path ...string) *ParsedConfig + type PrintLogger interface + Printf func(format string, v ...any) + Println func(v ...any) + type Processor interface + Process func(context.Context, *Message) (MessageBatch, error) + type ProcessorConstructor func(conf *ParsedConfig, mgr *Resources) (Processor, error) + type RateLimit interface + Access func(context.Context) (time.Duration, error) + type RateLimitConstructor func(conf *ParsedConfig, mgr *Resources) (RateLimit, error) + type ResourceInput struct + func (r *ResourceInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, error) + type ResourceOutput struct + func (o *ResourceOutput) Write(ctx context.Context, m *Message) error + func (o *ResourceOutput) WriteBatch(ctx context.Context, b MessageBatch) error + type Resources struct + func MockResources(opts ...MockResourcesOptFn) *Resources + func (r *Resources) AccessCache(ctx context.Context, name string, fn func(c Cache)) error + func (r *Resources) AccessInput(ctx context.Context, name string, fn func(i *ResourceInput)) error + func (r *Resources) AccessOutput(ctx context.Context, name string, fn func(o *ResourceOutput)) error + func (r *Resources) AccessRateLimit(ctx context.Context, name string, fn func(r RateLimit)) error + func (r *Resources) FS() *FS + func (r *Resources) HasCache(name string) bool + func (r *Resources) HasInput(name string) bool + func (r *Resources) HasOutput(name string) bool + func (r *Resources) HasRateLimit(name string) bool + func (r *Resources) Label() string + func (r *Resources) Logger() *Logger + func (r *Resources) Metrics() *Metrics + func (r *Resources) OtelTracer() trace.TracerProvider + func (r *Resources) XUnwrapper() any + type Stream struct + func (s *Stream) Run(ctx context.Context) (err error) + func (s *Stream) Stop(ctx context.Context) (err error) + func (s *Stream) StopWithin(timeout time.Duration) error + type StreamBuilder struct + func NewStreamBuilder() *StreamBuilder + func (s *StreamBuilder) AddBatchConsumerFunc(fn MessageBatchHandlerFunc) error + func (s *StreamBuilder) AddBatchProducerFunc() (MessageBatchHandlerFunc, error) + func (s *StreamBuilder) AddCacheYAML(conf string) error + func (s *StreamBuilder) AddConsumerFunc(fn MessageHandlerFunc) error + func (s *StreamBuilder) AddInputYAML(conf string) error + func (s *StreamBuilder) AddOutputYAML(conf string) error + func (s *StreamBuilder) AddProcessorYAML(conf string) error + func (s *StreamBuilder) AddProducerFunc() (MessageHandlerFunc, error) + func (s *StreamBuilder) AddRateLimitYAML(conf string) error + func (s *StreamBuilder) AddResourcesYAML(conf string) error + func (s *StreamBuilder) AsYAML() (string, error) + func (s *StreamBuilder) Build() (*Stream, error) + func (s *StreamBuilder) BuildTraced() (*Stream, *TracingSummary, error) + func (s *StreamBuilder) DisableLinting() + func (s *StreamBuilder) SetBufferYAML(conf string) error + func (s *StreamBuilder) SetEnvVarLookupFunc(fn func(string) (string, bool)) + func (s *StreamBuilder) SetFields(pathValues ...any) error + func (s *StreamBuilder) SetHTTPMux(m HTTPMultiplexer) + func (s *StreamBuilder) SetLoggerYAML(conf string) error + func (s *StreamBuilder) SetMetricsYAML(conf string) error + func (s *StreamBuilder) SetPrintLogger(l PrintLogger) + func (s *StreamBuilder) SetThreads(n int) + func (s *StreamBuilder) SetTracerYAML(conf string) error + func (s *StreamBuilder) SetYAML(conf string) error + func (s *StreamBuilder) WalkComponents(fn func(w *WalkedComponent) error) error + type TracingEvent struct + Content string + Meta map[string]any + Type TracingEventType + type TracingEventType string + var TracingEventConsume TracingEventType = "CONSUME" + var TracingEventDelete TracingEventType = "DELETE" + var TracingEventError TracingEventType = "ERROR" + var TracingEventProduce TracingEventType = "PRODUCE" + var TracingEventUnknown TracingEventType = "UNKNOWN" + type TracingSummary struct + func (s *TracingSummary) InputEvents() map[string][]TracingEvent + func (s *TracingSummary) OutputEvents() map[string][]TracingEvent + func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent + func (s *TracingSummary) TotalInput() uint64 + func (s *TracingSummary) TotalOutput() uint64 + func (s *TracingSummary) TotalProcessorErrors() uint64 + type WalkedComponent struct + ComponentType string + Label string + Name string + func (w *WalkedComponent) ConfigYAML() string Other modules containing this package github.com/dafanshu/benthos/v3