Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IterateBatchedSend ¶
IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.
However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.
Types ¶
type AsyncSink ¶
type AsyncSink interface { // Connect attempts to establish a connection to the sink, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. Connect(ctx context.Context) error // WriteBatch should block until either the message is sent (and // acknowledged) to a sink, or a transport specific error has occurred, or // the Type is closed. WriteBatch(ctx context.Context, msg message.Batch) error // Close is a blocking call to wait until the component has finished // shutting down and cleaning up resources. Close(ctx context.Context) error }
AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.
type AsyncWriter ¶
type AsyncWriter struct {
// contains filtered or unexported fields
}
AsyncWriter is an output type that writes messages to a writer.Type.
func (*AsyncWriter) Connected ¶
func (w *AsyncWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*AsyncWriter) Consume ¶
func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error
Consume assigns a messages channel for the output to read.
func (*AsyncWriter) TriggerCloseNow ¶
func (w *AsyncWriter) TriggerCloseNow()
TriggerCloseNow shuts down the output and stops processing messages.
func (*AsyncWriter) WaitForClose ¶
func (w *AsyncWriter) WaitForClose(ctx context.Context) error
WaitForClose blocks until the File output has closed down.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type Streamed ¶
type Streamed interface { // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Streamed is a common interface implemented by outputs and provides channel based streaming APIs.
func NewAsyncWriter ¶
func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)
NewAsyncWriter creates a Streamed implementation around an AsyncSink.
func OnlySinglePayloads ¶
OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.
func WrapWithPipelines ¶
func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an output with a variadic number of pipelines.
type Sync ¶
type Sync interface { // WriteTransaction attempts to write a transaction to an output. WriteTransaction(context.Context, message.Transaction) error // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool // TriggerStopConsuming instructs the output to start shutting down // resources once all pending messages are delivered and acknowledged. TriggerStopConsuming() // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Sync is a common interface implemented by outputs and provides synchronous based writing APIs.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.
func WrapWithPipeline ¶
func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*WithPipeline) Consume ¶
func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error
Consume starts the type listening to a message channel from a producer.
func (*WithPipeline) TriggerCloseNow ¶
func (i *WithPipeline) TriggerCloseNow()
TriggerCloseNow triggers a closure of this object but does not block.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.