buffer

package
v0.40.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2018 License: MIT Imports: 17 Imported by: 8

Documentation

Overview

Package buffer is both a types.Consumer and types.Producer implementation that is able to sit between other stream components, effectively decoupling their transaction channels by storing messages in a buffer implementation.

Buffers are not needed within Benthos, and should not be used unless there is a specific problem to be solved with one.

Index

Constants

View Source
const (
	TypeMemory = "memory"
	TypeMMAP   = "mmap_file"
	TypeNone   = "none"
)

String constants representing each buffer type.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all buffer types with their specs.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig added in v0.8.4

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Config

type Config struct {
	Type   string                  `json:"type" yaml:"type"`
	Memory single.MemoryConfig     `json:"memory" yaml:"memory"`
	Mmap   single.MmapBufferConfig `json:"mmap_file" yaml:"mmap_file"`
	None   struct{}                `json:"none" yaml:"none"`
}

Config is the all encompassing configuration struct for all buffer types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

type Empty

type Empty struct {
	// contains filtered or unexported fields
}

Empty is an empty buffer, simply forwards messages on directly.

func (*Empty) CloseAsync

func (e *Empty) CloseAsync()

CloseAsync shuts down the StackBuffer output and stops processing messages.

func (*Empty) Consume added in v0.19.0

func (e *Empty) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*Empty) ErrorsChan

func (e *Empty) ErrorsChan() <-chan []error

ErrorsChan returns the errors channel.

func (*Empty) StopConsuming added in v0.13.0

func (e *Empty) StopConsuming()

StopConsuming instructs the buffer to no longer consume data.

func (*Empty) TransactionChan added in v0.9.0

func (e *Empty) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this input.

func (*Empty) WaitForClose

func (e *Empty) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the StackBuffer output has closed down.

type Parallel added in v0.10.1

type Parallel interface {
	// NextMessage reads the next oldest message, the message is preserved until
	// the returned AckFunc is called.
	NextMessage() (types.Message, parallel.AckFunc, error)

	// PushMessage adds a new message to the stack. Returns the backlog in
	// bytes.
	PushMessage(types.Message) (int, error)

	// CloseOnceEmpty closes the Buffer once the buffer has been emptied, this
	// is a way for a writer to signal to a reader that it is finished writing
	// messages, and therefore the reader can close once it is caught up. This
	// call blocks until the close is completed.
	CloseOnceEmpty()

	// Close closes the Buffer so that blocked readers or writers become
	// unblocked.
	Close()
}

Parallel represents a method of buffering messages such that they can be consumed by any number of parallel consumers, and can be acknowledged in any order.

type ParallelWrapper added in v0.10.1

type ParallelWrapper struct {
	// contains filtered or unexported fields
}

ParallelWrapper wraps a buffer with a Producer/Consumer interface.

func (*ParallelWrapper) CloseAsync added in v0.10.1

func (m *ParallelWrapper) CloseAsync()

CloseAsync shuts down the ParallelWrapper and stops processing messages.

func (*ParallelWrapper) Consume added in v0.19.0

func (m *ParallelWrapper) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*ParallelWrapper) StopConsuming added in v0.13.0

func (m *ParallelWrapper) StopConsuming()

StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.

func (*ParallelWrapper) TransactionChan added in v0.10.1

func (m *ParallelWrapper) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*ParallelWrapper) WaitForClose added in v0.10.1

func (m *ParallelWrapper) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the ParallelWrapper output has closed down.

type Single added in v0.10.1

type Single interface {
	// ShiftMessage removes the oldest message from the stack. Returns the
	// backlog in bytes.
	ShiftMessage() (int, error)

	// NextMessage reads the oldest message, the message is preserved until
	// ShiftMessage is called.
	NextMessage() (types.Message, error)

	// PushMessage adds a new message to the stack. Returns the backlog in
	// bytes.
	PushMessage(types.Message) (int, error)

	// CloseOnceEmpty closes the Buffer once the buffer has been emptied, this
	// is a way for a writer to signal to a reader that it is finished writing
	// messages, and therefore the reader can close once it is caught up. This
	// call blocks until the close is completed.
	CloseOnceEmpty()

	// Close closes the Buffer so that blocked readers or writers become
	// unblocked.
	Close()
}

Single represents a method of buffering sequential messages, supporting only a single, sequential consumer.

type SingleWrapper added in v0.10.1

type SingleWrapper struct {
	// contains filtered or unexported fields
}

SingleWrapper wraps a buffer with a Producer/Consumer interface.

func (*SingleWrapper) CloseAsync added in v0.10.1

func (m *SingleWrapper) CloseAsync()

CloseAsync shuts down the SingleWrapper and stops processing messages.

func (*SingleWrapper) Consume added in v0.19.0

func (m *SingleWrapper) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*SingleWrapper) StopConsuming added in v0.13.0

func (m *SingleWrapper) StopConsuming()

StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.

func (*SingleWrapper) TransactionChan added in v0.10.1

func (m *SingleWrapper) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*SingleWrapper) WaitForClose added in v0.10.1

func (m *SingleWrapper) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the SingleWrapper output has closed down.

type Type

type Type interface {
	types.Producer
	types.Consumer
	types.Closable

	// StopConsuming instructs the buffer to cut off the producer it is
	// consuming from. It will then enter a mode whereby messages can only be
	// read, and when the buffer is empty it will shut down.
	StopConsuming()
}

Type is an interface implemented by all buffer types.

func New added in v0.0.2

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates a buffer type based on a buffer configuration.

func NewEmpty

func NewEmpty(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewEmpty creates a new buffer interface but doesn't buffer messages.

func NewMemory

func NewMemory(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewMemory - Create a buffer held in memory.

func NewMmapFile

func NewMmapFile(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewMmapFile creates a buffer held in memory and persisted to file through memory map.

func NewParallelWrapper added in v0.10.1

func NewParallelWrapper(
	conf Config,
	buffer Parallel,
	log log.Modular,
	stats metrics.Type,
) Type

NewParallelWrapper creates a new Producer/Consumer around a buffer.

func NewSingleWrapper added in v0.10.1

func NewSingleWrapper(
	conf Config,
	buffer Single,
	log log.Modular,
	stats metrics.Type,
) Type

NewSingleWrapper creates a new Producer/Consumer around a buffer.

type TypeSpec added in v0.9.1

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec is a constructor and usage description for each buffer type.

Directories

Path Synopsis
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL