Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AckFunc ¶
AckFunc is a function used to acknowledge receipt of a message batch from a buffer. The provided error indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type ReaderWriter ¶
type ReaderWriter interface { // Read the next oldest message batch. If the buffer has a persisted store // the message is preserved until the returned AckFunc is called. Some // temporal buffer implementations such as windowers will ignore the ack // func. Read(context.Context) (types.Message, AckFunc, error) // Write a new message batch to the stack. Write(context.Context, types.Message, AckFunc) error // EndOfInput indicates to the buffer that the input has ended and that once // the buffer is depleted it should return types.ErrTypeClosed from Read in // order to gracefully shut down the pipeline. // // EndOfInput should be idempotent as it may be called more than once. EndOfInput() // Close the buffer and all resources it has, messages should no longer be // written or read by the implementation and it should clean up all // resources. Close(context.Context) error }
ReaderWriter is a read/write interface implemented by buffers.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream wraps a read/write buffer implementation with a channel based streaming component that satisfies the internal Benthos Consumer and Producer interfaces.
func (*Stream) CloseAsync ¶
func (m *Stream) CloseAsync()
CloseAsync shuts down the Stream and stops processing messages.
func (*Stream) Consume ¶
func (m *Stream) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*Stream) StopConsuming ¶
func (m *Stream) StopConsuming()
StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.
func (*Stream) TransactionChan ¶
func (m *Stream) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this buffer.