Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AsyncReaderWithConnBackOff ¶
func AsyncReaderWithConnBackOff(boff backoff.BackOff) func(a *AsyncReader)
AsyncReaderWithConnBackOff set the backoff used for limiting connection attempts. If the maximum number of retry attempts is reached then the input will gracefully stop.
Types ¶
type Async ¶
type Async interface { // Connect attempts to establish a connection to the source, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. Connect(ctx context.Context) error // ReadBatch attempts to read a new message from the source. If // successful a message is returned along with a function used to // acknowledge receipt of the returned message. It's safe to process the // returned message and read the next message asynchronously. ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error) // Close triggers the shut down of this component and blocks until // completion or context cancellation. Close(ctx context.Context) error }
Async is a type that reads Benthos messages from an external source and allows acknowledgements for a message batch to be propagated asynchronously.
type AsyncAckFn ¶
AsyncAckFn is a function used to acknowledge receipt of a message batch. The provided response indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type AsyncCutOff ¶
type AsyncCutOff struct {
// contains filtered or unexported fields
}
AsyncCutOff is a wrapper for input.Async implementations that exits from WaitForClose immediately. This is only useful when the underlying readable resource cannot be closed reliably and can block forever.
func NewAsyncCutOff ¶
func NewAsyncCutOff(r Async) *AsyncCutOff
NewAsyncCutOff returns a new AsyncCutOff wrapper around a input.Async.
func (*AsyncCutOff) Close ¶
func (c *AsyncCutOff) Close(ctx context.Context) error
Close triggers the asynchronous closing of the reader.
func (*AsyncCutOff) Connect ¶
func (c *AsyncCutOff) Connect(ctx context.Context) error
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncCutOff) ReadBatch ¶
func (c *AsyncCutOff) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)
ReadBatch attempts to read a new message from the source.
type AsyncPreserver ¶
type AsyncPreserver struct {
// contains filtered or unexported fields
}
AsyncPreserver is a wrapper for input.Async implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. AsyncPreserver implements input.Async.
Wrapping an input with this type is useful when your source of messages doesn't have a concept of a NoAck (like Kafka), and instead of "rejecting" messages we always intend to simply retry them until success.
func NewAsyncPreserver ¶
func NewAsyncPreserver(r Async) *AsyncPreserver
NewAsyncPreserver returns a new AsyncPreserver wrapper around a input.Async.
func (*AsyncPreserver) Close ¶
func (p *AsyncPreserver) Close(ctx context.Context) error
Close triggers the shut down of this component and blocks until completion or context cancellation.
func (*AsyncPreserver) Connect ¶
func (p *AsyncPreserver) Connect(ctx context.Context) error
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncPreserver) ReadBatch ¶
func (p *AsyncPreserver) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)
ReadBatch attempts to read a new message from the source.
type AsyncReader ¶
type AsyncReader struct {
// contains filtered or unexported fields
}
AsyncReader is an input implementation that reads messages from an input.Async component.
func (*AsyncReader) Connected ¶
func (r *AsyncReader) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*AsyncReader) TransactionChan ¶
func (r *AsyncReader) TransactionChan() <-chan message.Transaction
TransactionChan returns a transactions channel for consuming messages from this input type.
func (*AsyncReader) TriggerCloseNow ¶
func (r *AsyncReader) TriggerCloseNow()
TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.
func (*AsyncReader) TriggerStopConsuming ¶
func (r *AsyncReader) TriggerStopConsuming()
TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.
func (*AsyncReader) WaitForClose ¶
func (r *AsyncReader) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all input types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type Streamed ¶
type Streamed interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Connected returns a boolean indicating whether this input is currently // connected to its target. Connected() bool // TriggerStopConsuming instructs the input to start shutting down resources // once all pending messages are delivered and acknowledged. This call does // not block. TriggerStopConsuming() // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Streamed is a common interface implemented by inputs and provides channel based streaming APIs.
func NewAsyncReader ¶
func NewAsyncReader( typeStr string, r Async, mgr component.Observability, opts ...func(a *AsyncReader), ) (Streamed, error)
NewAsyncReader creates a new AsyncReader input type.
func WrapWithPipelines ¶
func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an input with a variadic number of pipelines.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.
func WrapWithPipeline ¶
func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*WithPipeline) TransactionChan ¶
func (i *WithPipeline) TransactionChan() <-chan message.Transaction
TransactionChan returns the channel used for consuming transactions from this input.
func (*WithPipeline) TriggerCloseNow ¶
func (i *WithPipeline) TriggerCloseNow()
TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.
func (*WithPipeline) TriggerStopConsuming ¶
func (i *WithPipeline) TriggerStopConsuming()
TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package config contains reusable config definitions and parsers for inputs defined via the public/service package.
|
Package config contains reusable config definitions and parsers for inputs defined via the public/service package. |