Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckFunc ¶
AckFunc is a function used to acknowledge receipt of a message batch from a buffer. The provided error indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` }
Config is the all encompassing configuration struct for all buffer types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type ReaderWriter ¶
type ReaderWriter interface { // Read the next oldest message batch. If the buffer has a persisted store // the message is preserved until the returned AckFunc is called. Some // temporal buffer implementations such as windowers will ignore the ack // func. Read(context.Context) (message.Batch, AckFunc, error) // Write a new message batch to the stack. Write(context.Context, message.Batch, AckFunc) error // EndOfInput indicates to the buffer that the input has ended and that once // the buffer is depleted it should return component.ErrTypeClosed from Read in // order to gracefully shut down the pipeline. // // EndOfInput should be idempotent as it may be called more than once. EndOfInput() // Close the buffer and all resources it has, messages should no longer be // written or read by the implementation and it should clean up all // resources. Close(context.Context) error }
ReaderWriter is a read/write interface implemented by buffers.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream wraps a read/write buffer implementation with a channel based streaming component that satisfies the internal Benthos Consumer and Producer interfaces.
func (*Stream) Consume ¶
func (m *Stream) Consume(msgs <-chan message.Transaction) error
Consume assigns a messages channel for the output to read.
func (*Stream) TransactionChan ¶
func (m *Stream) TransactionChan() <-chan message.Transaction
TransactionChan returns the channel used for consuming messages from this buffer.
func (*Stream) TriggerCloseNow ¶
func (m *Stream) TriggerCloseNow()
TriggerCloseNow shuts down the Stream and stops processing messages.
func (*Stream) TriggerStopConsuming ¶
func (m *Stream) TriggerStopConsuming()
TriggerStopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.
type Streamed ¶
type Streamed interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // TriggerStopConsuming instructs the buffer to cut off the producer it is // consuming from. It will then enter a mode whereby messages can only be // read, and when the buffer is empty it will shut down. TriggerStopConsuming() // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Streamed is an interface implemented by all buffer types that provides stream based methods.
func NewStream ¶
func NewStream(typeStr string, buffer ReaderWriter, mgr component.Observability) Streamed
NewStream creates a new Producer/Consumer around a buffer.