buffer

package
v3.65.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewStream

func NewStream(typeStr string, buffer ReaderWriter, log log.Modular, stats metrics.Type) buffer.Type

NewStream creates a new Producer/Consumer around a buffer.

Types

type AckFunc

type AckFunc func(context.Context, error) error

AckFunc is a function used to acknowledge receipt of a message batch from a buffer. The provided error indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.

type ReaderWriter

type ReaderWriter interface {
	// Read the next oldest message batch. If the buffer has a persisted store
	// the message is preserved until the returned AckFunc is called. Some
	// temporal buffer implementations such as windowers will ignore the ack
	// func.
	Read(context.Context) (types.Message, AckFunc, error)

	// Write a new message batch to the stack.
	Write(context.Context, types.Message, AckFunc) error

	// EndOfInput indicates to the buffer that the input has ended and that once
	// the buffer is depleted it should return types.ErrTypeClosed from Read in
	// order to gracefully shut down the pipeline.
	//
	// EndOfInput should be idempotent as it may be called more than once.
	EndOfInput()

	// Close the buffer and all resources it has, messages should no longer be
	// written or read by the implementation and it should clean up all
	// resources.
	Close(context.Context) error
}

ReaderWriter is a read/write interface implemented by buffers.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream wraps a read/write buffer implementation with a channel based streaming component that satisfies the internal Benthos Consumer and Producer interfaces.

func (*Stream) CloseAsync

func (m *Stream) CloseAsync()

CloseAsync shuts down the Stream and stops processing messages.

func (*Stream) Consume

func (m *Stream) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*Stream) StopConsuming

func (m *Stream) StopConsuming()

StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.

func (*Stream) TransactionChan

func (m *Stream) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*Stream) WaitForClose

func (m *Stream) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Stream output has closed down.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL