input

package
v1.4.1-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsyncReaderWithConnBackOff

func AsyncReaderWithConnBackOff(boff backoff.BackOff) func(a *AsyncReader)

AsyncReaderWithConnBackOff set the backoff used for limiting connection attempts. If the maximum number of retry attempts is reached then the input will gracefully stop.

Types

type Async

type Async interface {
	// Connect attempts to establish a connection to the source, if
	// unsuccessful returns an error. If the attempt is successful (or not
	// necessary) returns nil.
	Connect(ctx context.Context) error

	// ReadBatch attempts to read a new message from the source. If
	// successful a message is returned along with a function used to
	// acknowledge receipt of the returned message. It's safe to process the
	// returned message and read the next message asynchronously.
	ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)

	// Close triggers the shut down of this component and blocks until
	// completion or context cancellation.
	Close(ctx context.Context) error
}

Async is a type that reads Bento messages from an external source and allows acknowledgements for a message batch to be propagated asynchronously.

type AsyncAckFn

type AsyncAckFn func(context.Context, error) error

AsyncAckFn is a function used to acknowledge receipt of a message batch. The provided response indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.

type AsyncCutOff

type AsyncCutOff struct {
	// contains filtered or unexported fields
}

AsyncCutOff is a wrapper for input.Async implementations that exits from WaitForClose immediately. This is only useful when the underlying readable resource cannot be closed reliably and can block forever.

func NewAsyncCutOff

func NewAsyncCutOff(r Async) *AsyncCutOff

NewAsyncCutOff returns a new AsyncCutOff wrapper around a input.Async.

func (*AsyncCutOff) Close

func (c *AsyncCutOff) Close(ctx context.Context) error

Close triggers the asynchronous closing of the reader.

func (*AsyncCutOff) Connect

func (c *AsyncCutOff) Connect(ctx context.Context) error

Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.

func (*AsyncCutOff) ReadBatch

func (c *AsyncCutOff) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)

ReadBatch attempts to read a new message from the source.

type AsyncPreserver

type AsyncPreserver struct {
	// contains filtered or unexported fields
}

AsyncPreserver is a wrapper for input.Async implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. AsyncPreserver implements input.Async.

Wrapping an input with this type is useful when your source of messages doesn't have a concept of a NoAck (like Kafka), and instead of "rejecting" messages we always intend to simply retry them until success.

func NewAsyncPreserver

func NewAsyncPreserver(r Async) *AsyncPreserver

NewAsyncPreserver returns a new AsyncPreserver wrapper around a input.Async.

func (*AsyncPreserver) Close

func (p *AsyncPreserver) Close(ctx context.Context) error

Close triggers the shut down of this component and blocks until completion or context cancellation.

func (*AsyncPreserver) Connect

func (p *AsyncPreserver) Connect(ctx context.Context) error

Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.

func (*AsyncPreserver) ReadBatch

func (p *AsyncPreserver) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)

ReadBatch attempts to read a new message from the source.

type AsyncReader

type AsyncReader struct {
	// contains filtered or unexported fields
}

AsyncReader is an input implementation that reads messages from an input.Async component.

func (*AsyncReader) ConnectionStatus added in v1.2.0

func (r *AsyncReader) ConnectionStatus() component.ConnectionStatuses

ConnectionStatus returns the current status of the given component connection. The result is a slice in order to accommodate higher order components that wrap several others.

func (*AsyncReader) TransactionChan

func (r *AsyncReader) TransactionChan() <-chan message.Transaction

TransactionChan returns a transactions channel for consuming messages from this input type.

func (*AsyncReader) TriggerCloseNow

func (r *AsyncReader) TriggerCloseNow()

TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.

func (*AsyncReader) TriggerStopConsuming

func (r *AsyncReader) TriggerStopConsuming()

TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.

func (*AsyncReader) WaitForClose

func (r *AsyncReader) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.

type Config

type Config struct {
	Label      string             `json:"label" yaml:"label"`
	Type       string             `json:"type" yaml:"type"`
	Plugin     any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Processors []processor.Config `json:"processors" yaml:"processors"`
}

Config is the all encompassing configuration struct for all input types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func FromAny

func FromAny(prov docs.Provider, value any) (conf Config, err error)

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

type Streamed

type Streamed interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// ConnectionStatus returns the current status of the given component
	// connection. The result is a slice in order to accommodate higher order
	// components that wrap several others.
	ConnectionStatus() component.ConnectionStatuses

	// TriggerStopConsuming instructs the input to start shutting down resources
	// once all pending messages are delivered and acknowledged. This call does
	// not block.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is a common interface implemented by inputs and provides channel based streaming APIs.

func NewAsyncReader

func NewAsyncReader(
	typeStr string,
	r Async,
	mgr component.Observability,
	opts ...func(a *AsyncReader),
) (Streamed, error)

NewAsyncReader creates a new AsyncReader input type.

func WrapWithPipelines

func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)

WrapWithPipelines wraps an input with a variadic number of pipelines.

type WithPipeline

type WithPipeline struct {
	// contains filtered or unexported fields
}

WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.

func WrapWithPipeline

func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)

WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.

func (*WithPipeline) ConnectionStatus added in v1.2.0

func (i *WithPipeline) ConnectionStatus() component.ConnectionStatuses

ConnectionStatus returns the current status of the connection of the wrapped component.

func (*WithPipeline) TransactionChan

func (i *WithPipeline) TransactionChan() <-chan message.Transaction

TransactionChan returns the channel used for consuming transactions from this input.

func (*WithPipeline) TriggerCloseNow

func (i *WithPipeline) TriggerCloseNow()

TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.

func (*WithPipeline) TriggerStopConsuming

func (i *WithPipeline) TriggerStopConsuming()

TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.

func (*WithPipeline) WaitForClose

func (i *WithPipeline) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.

Directories

Path Synopsis
Package config contains reusable config definitions and parsers for inputs defined via the public/service package.
Package config contains reusable config definitions and parsers for inputs defined via the public/service package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL