Versions in this module Expand all Collapse all v4 v4.27.1 May 31, 2024 v4.27.0 May 31, 2024 Changes in this version + func AsyncReaderWithConnBackOff(boff backoff.BackOff) func(a *AsyncReader) + type Async interface + Close func(ctx context.Context) error + Connect func(ctx context.Context) error + ReadBatch func(ctx context.Context) (message.Batch, AsyncAckFn, error) + type AsyncAckFn func(context.Context, error) error + type AsyncCutOff struct + func NewAsyncCutOff(r Async) *AsyncCutOff + func (c *AsyncCutOff) Close(ctx context.Context) error + func (c *AsyncCutOff) Connect(ctx context.Context) error + func (c *AsyncCutOff) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error) + type AsyncPreserver struct + func NewAsyncPreserver(r Async) *AsyncPreserver + func (p *AsyncPreserver) Close(ctx context.Context) error + func (p *AsyncPreserver) Connect(ctx context.Context) error + func (p *AsyncPreserver) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error) + type AsyncReader struct + func (r *AsyncReader) Connected() bool + func (r *AsyncReader) TransactionChan() <-chan message.Transaction + func (r *AsyncReader) TriggerCloseNow() + func (r *AsyncReader) TriggerStopConsuming() + func (r *AsyncReader) WaitForClose(ctx context.Context) error + type Config struct + Label string + Plugin any + Processors []processor.Config + Type string + func FromAny(prov docs.Provider, value any) (conf Config, err error) + func NewConfig() Config + type Streamed interface + Connected func() bool + TransactionChan func() <-chan message.Transaction + TriggerCloseNow func() + TriggerStopConsuming func() + WaitForClose func(ctx context.Context) error + func NewAsyncReader(typeStr string, r Async, mgr component.Observability, ...) (Streamed, error) + func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error) + type WithPipeline struct + func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error) + func (i *WithPipeline) Connected() bool + func (i *WithPipeline) TransactionChan() <-chan message.Transaction + func (i *WithPipeline) TriggerCloseNow() + func (i *WithPipeline) TriggerStopConsuming() + func (i *WithPipeline) WaitForClose(ctx context.Context) error