buffer

package
v4.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckFunc

type AckFunc func(context.Context, error) error

AckFunc is a function used to acknowledge receipt of a message batch from a buffer. The provided error indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.

type Config

type Config struct {
	Type   string `json:"type" yaml:"type"`
	Plugin any    `json:"plugin,omitempty" yaml:"plugin,omitempty"`
}

Config is the all encompassing configuration struct for all buffer types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func FromAny

func FromAny(prov docs.Provider, value any) (conf Config, err error)

FromAny returns a buffer config from a parsed config, yaml node or map.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

type ReaderWriter

type ReaderWriter interface {
	// Read the next oldest message batch. If the buffer has a persisted store
	// the message is preserved until the returned AckFunc is called. Some
	// temporal buffer implementations such as windowers will ignore the ack
	// func.
	Read(context.Context) (message.Batch, AckFunc, error)

	// Write a new message batch to the stack.
	Write(context.Context, message.Batch, AckFunc) error

	// EndOfInput indicates to the buffer that the input has ended and that once
	// the buffer is depleted it should return component.ErrTypeClosed from Read in
	// order to gracefully shut down the pipeline.
	//
	// EndOfInput should be idempotent as it may be called more than once.
	EndOfInput()

	// Close the buffer and all resources it has, messages should no longer be
	// written or read by the implementation and it should clean up all
	// resources.
	Close(context.Context) error
}

ReaderWriter is a read/write interface implemented by buffers.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream wraps a read/write buffer implementation with a channel based streaming component that satisfies the internal Benthos Consumer and Producer interfaces.

func (*Stream) Consume

func (m *Stream) Consume(msgs <-chan message.Transaction) error

Consume assigns a messages channel for the output to read.

func (*Stream) TransactionChan

func (m *Stream) TransactionChan() <-chan message.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*Stream) TriggerCloseNow

func (m *Stream) TriggerCloseNow()

TriggerCloseNow shuts down the Stream and stops processing messages.

func (*Stream) TriggerStopConsuming

func (m *Stream) TriggerStopConsuming()

TriggerStopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.

func (*Stream) WaitForClose

func (m *Stream) WaitForClose(ctx context.Context) error

WaitForClose blocks until the Stream output has closed down.

type Streamed

type Streamed interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan message.Transaction

	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// TriggerStopConsuming instructs the buffer to cut off the producer it is
	// consuming from. It will then enter a mode whereby messages can only be
	// read, and when the buffer is empty it will shut down.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is an interface implemented by all buffer types that provides stream based methods.

func NewStream

func NewStream(typeStr string, buffer ReaderWriter, mgr component.Observability) Streamed

NewStream creates a new Producer/Consumer around a buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL