Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AsyncReaderWithConnBackOff ¶ added in v4.16.0
func AsyncReaderWithConnBackOff(boff backoff.BackOff) func(a *AsyncReader)
AsyncReaderWithConnBackOff set the backoff used for limiting connection attempts. If the maximum number of retry attempts is reached then the input will gracefully stop.
Types ¶
type Async ¶ added in v4.1.0
type Async interface { // Connect attempts to establish a connection to the source, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. Connect(ctx context.Context) error // ReadBatch attempts to read a new message from the source. If // successful a message is returned along with a function used to // acknowledge receipt of the returned message. It's safe to process the // returned message and read the next message asynchronously. ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error) // Close triggers the shut down of this component and blocks until // completion or context cancellation. Close(ctx context.Context) error }
Async is a type that reads Benthos messages from an external source and allows acknowledgements for a message batch to be propagated asynchronously.
type AsyncAckFn ¶ added in v4.1.0
AsyncAckFn is a function used to acknowledge receipt of a message batch. The provided response indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type AsyncCutOff ¶ added in v4.1.0
type AsyncCutOff struct {
// contains filtered or unexported fields
}
AsyncCutOff is a wrapper for input.Async implementations that exits from WaitForClose immediately. This is only useful when the underlying readable resource cannot be closed reliably and can block forever.
func NewAsyncCutOff ¶ added in v4.1.0
func NewAsyncCutOff(r Async) *AsyncCutOff
NewAsyncCutOff returns a new AsyncCutOff wrapper around a input.Async.
func (*AsyncCutOff) Close ¶ added in v4.6.0
func (c *AsyncCutOff) Close(ctx context.Context) error
Close triggers the asynchronous closing of the reader.
func (*AsyncCutOff) Connect ¶ added in v4.6.0
func (c *AsyncCutOff) Connect(ctx context.Context) error
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncCutOff) ReadBatch ¶ added in v4.6.0
func (c *AsyncCutOff) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)
ReadBatch attempts to read a new message from the source.
type AsyncPreserver ¶ added in v4.1.0
type AsyncPreserver struct {
// contains filtered or unexported fields
}
AsyncPreserver is a wrapper for input.Async implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. AsyncPreserver implements input.Async.
Wrapping an input with this type is useful when your source of messages doesn't have a concept of a NoAck (like Kafka), and instead of "rejecting" messages we always intend to simply retry them until success.
func NewAsyncPreserver ¶ added in v4.1.0
func NewAsyncPreserver(r Async) *AsyncPreserver
NewAsyncPreserver returns a new AsyncPreserver wrapper around a input.Async.
func (*AsyncPreserver) Close ¶ added in v4.6.0
func (p *AsyncPreserver) Close(ctx context.Context) error
Close triggers the shut down of this component and blocks until completion or context cancellation.
func (*AsyncPreserver) Connect ¶ added in v4.6.0
func (p *AsyncPreserver) Connect(ctx context.Context) error
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncPreserver) ReadBatch ¶ added in v4.6.0
func (p *AsyncPreserver) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)
ReadBatch attempts to read a new message from the source.
type AsyncReader ¶ added in v4.1.0
type AsyncReader struct {
// contains filtered or unexported fields
}
AsyncReader is an input implementation that reads messages from an input.Async component.
func (*AsyncReader) Connected ¶ added in v4.1.0
func (r *AsyncReader) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*AsyncReader) TransactionChan ¶ added in v4.1.0
func (r *AsyncReader) TransactionChan() <-chan message.Transaction
TransactionChan returns a transactions channel for consuming messages from this input type.
func (*AsyncReader) TriggerCloseNow ¶ added in v4.6.0
func (r *AsyncReader) TriggerCloseNow()
TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.
func (*AsyncReader) TriggerStopConsuming ¶ added in v4.6.0
func (r *AsyncReader) TriggerStopConsuming()
TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.
func (*AsyncReader) WaitForClose ¶ added in v4.1.0
func (r *AsyncReader) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.
type Config ¶ added in v4.1.0
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all input types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type Streamed ¶
type Streamed interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Connected returns a boolean indicating whether this input is currently // connected to its target. Connected() bool // TriggerStopConsuming instructs the input to start shutting down resources // once all pending messages are delivered and acknowledged. This call does // not block. TriggerStopConsuming() // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Streamed is a common interface implemented by inputs and provides channel based streaming APIs.
func NewAsyncReader ¶ added in v4.1.0
func NewAsyncReader( typeStr string, r Async, mgr component.Observability, opts ...func(a *AsyncReader), ) (Streamed, error)
NewAsyncReader creates a new AsyncReader input type.
func WrapWithPipelines ¶ added in v4.1.0
func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an input with a variadic number of pipelines.
type WithPipeline ¶ added in v4.1.0
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.
func WrapWithPipeline ¶ added in v4.1.0
func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.
func (*WithPipeline) Connected ¶ added in v4.1.0
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*WithPipeline) TransactionChan ¶ added in v4.1.0
func (i *WithPipeline) TransactionChan() <-chan message.Transaction
TransactionChan returns the channel used for consuming transactions from this input.
func (*WithPipeline) TriggerCloseNow ¶ added in v4.6.0
func (i *WithPipeline) TriggerCloseNow()
TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.
func (*WithPipeline) TriggerStopConsuming ¶ added in v4.6.0
func (i *WithPipeline) TriggerStopConsuming()
TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.
func (*WithPipeline) WaitForClose ¶ added in v4.1.0
func (i *WithPipeline) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package config contains reusable config definitions and parsers for inputs defined via the public/service package.
|
Package config contains reusable config definitions and parsers for inputs defined via the public/service package. |