input

package
v4.25.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsyncReaderWithConnBackOff added in v4.16.0

func AsyncReaderWithConnBackOff(boff backoff.BackOff) func(a *AsyncReader)

AsyncReaderWithConnBackOff set the backoff used for limiting connection attempts. If the maximum number of retry attempts is reached then the input will gracefully stop.

Types

type Async added in v4.1.0

type Async interface {
	// Connect attempts to establish a connection to the source, if
	// unsuccessful returns an error. If the attempt is successful (or not
	// necessary) returns nil.
	Connect(ctx context.Context) error

	// ReadBatch attempts to read a new message from the source. If
	// successful a message is returned along with a function used to
	// acknowledge receipt of the returned message. It's safe to process the
	// returned message and read the next message asynchronously.
	ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)

	// Close triggers the shut down of this component and blocks until
	// completion or context cancellation.
	Close(ctx context.Context) error
}

Async is a type that reads Benthos messages from an external source and allows acknowledgements for a message batch to be propagated asynchronously.

type AsyncAckFn added in v4.1.0

type AsyncAckFn func(context.Context, error) error

AsyncAckFn is a function used to acknowledge receipt of a message batch. The provided response indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.

type AsyncCutOff added in v4.1.0

type AsyncCutOff struct {
	// contains filtered or unexported fields
}

AsyncCutOff is a wrapper for input.Async implementations that exits from WaitForClose immediately. This is only useful when the underlying readable resource cannot be closed reliably and can block forever.

func NewAsyncCutOff added in v4.1.0

func NewAsyncCutOff(r Async) *AsyncCutOff

NewAsyncCutOff returns a new AsyncCutOff wrapper around a input.Async.

func (*AsyncCutOff) Close added in v4.6.0

func (c *AsyncCutOff) Close(ctx context.Context) error

Close triggers the asynchronous closing of the reader.

func (*AsyncCutOff) Connect added in v4.6.0

func (c *AsyncCutOff) Connect(ctx context.Context) error

Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.

func (*AsyncCutOff) ReadBatch added in v4.6.0

func (c *AsyncCutOff) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)

ReadBatch attempts to read a new message from the source.

type AsyncPreserver added in v4.1.0

type AsyncPreserver struct {
	// contains filtered or unexported fields
}

AsyncPreserver is a wrapper for input.Async implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. AsyncPreserver implements input.Async.

Wrapping an input with this type is useful when your source of messages doesn't have a concept of a NoAck (like Kafka), and instead of "rejecting" messages we always intend to simply retry them until success.

func NewAsyncPreserver added in v4.1.0

func NewAsyncPreserver(r Async) *AsyncPreserver

NewAsyncPreserver returns a new AsyncPreserver wrapper around a input.Async.

func (*AsyncPreserver) Close added in v4.6.0

func (p *AsyncPreserver) Close(ctx context.Context) error

Close triggers the shut down of this component and blocks until completion or context cancellation.

func (*AsyncPreserver) Connect added in v4.6.0

func (p *AsyncPreserver) Connect(ctx context.Context) error

Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.

func (*AsyncPreserver) ReadBatch added in v4.6.0

func (p *AsyncPreserver) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)

ReadBatch attempts to read a new message from the source.

type AsyncReader added in v4.1.0

type AsyncReader struct {
	// contains filtered or unexported fields
}

AsyncReader is an input implementation that reads messages from an input.Async component.

func (*AsyncReader) Connected added in v4.1.0

func (r *AsyncReader) Connected() bool

Connected returns a boolean indicating whether this input is currently connected to its target.

func (*AsyncReader) TransactionChan added in v4.1.0

func (r *AsyncReader) TransactionChan() <-chan message.Transaction

TransactionChan returns a transactions channel for consuming messages from this input type.

func (*AsyncReader) TriggerCloseNow added in v4.6.0

func (r *AsyncReader) TriggerCloseNow()

TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.

func (*AsyncReader) TriggerStopConsuming added in v4.6.0

func (r *AsyncReader) TriggerStopConsuming()

TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.

func (*AsyncReader) WaitForClose added in v4.1.0

func (r *AsyncReader) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.

type Config added in v4.1.0

type Config struct {
	Label      string             `json:"label" yaml:"label"`
	Type       string             `json:"type" yaml:"type"`
	Plugin     any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Processors []processor.Config `json:"processors" yaml:"processors"`
}

Config is the all encompassing configuration struct for all input types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func FromAny added in v4.25.0

func FromAny(prov docs.Provider, value any) (conf Config, err error)

func NewConfig added in v4.1.0

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

type Streamed

type Streamed interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// Connected returns a boolean indicating whether this input is currently
	// connected to its target.
	Connected() bool

	// TriggerStopConsuming instructs the input to start shutting down resources
	// once all pending messages are delivered and acknowledged. This call does
	// not block.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is a common interface implemented by inputs and provides channel based streaming APIs.

func NewAsyncReader added in v4.1.0

func NewAsyncReader(
	typeStr string,
	r Async,
	mgr component.Observability,
	opts ...func(a *AsyncReader),
) (Streamed, error)

NewAsyncReader creates a new AsyncReader input type.

func WrapWithPipelines added in v4.1.0

func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)

WrapWithPipelines wraps an input with a variadic number of pipelines.

type WithPipeline added in v4.1.0

type WithPipeline struct {
	// contains filtered or unexported fields
}

WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.

func WrapWithPipeline added in v4.1.0

func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)

WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.

func (*WithPipeline) Connected added in v4.1.0

func (i *WithPipeline) Connected() bool

Connected returns a boolean indicating whether this input is currently connected to its target.

func (*WithPipeline) TransactionChan added in v4.1.0

func (i *WithPipeline) TransactionChan() <-chan message.Transaction

TransactionChan returns the channel used for consuming transactions from this input.

func (*WithPipeline) TriggerCloseNow added in v4.6.0

func (i *WithPipeline) TriggerCloseNow()

TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.

func (*WithPipeline) TriggerStopConsuming added in v4.6.0

func (i *WithPipeline) TriggerStopConsuming()

TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.

func (*WithPipeline) WaitForClose added in v4.1.0

func (i *WithPipeline) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.

Directories

Path Synopsis
Package config contains reusable config definitions and parsers for inputs defined via the public/service package.
Package config contains reusable config definitions and parsers for inputs defined via the public/service package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL