buffer

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2018 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package buffer links an input and an output together and buffers messages in between.

Index

Constants

This section is empty.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all buffer types with their specs.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig added in v0.8.4

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Config

type Config struct {
	Type   string                  `json:"type" yaml:"type"`
	Badger parallel.BadgerConfig   `json:"badger" yaml:"badger"`
	Memory single.MemoryConfig     `json:"memory" yaml:"memory"`
	Mmap   single.MmapBufferConfig `json:"mmap_file" yaml:"mmap_file"`
	None   struct{}                `json:"none" yaml:"none"`
}

Config is the all encompassing configuration struct for all input types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

type Empty

type Empty struct {
	// contains filtered or unexported fields
}

Empty is an empty buffer, simply forwards messages on directly.

func (*Empty) CloseAsync

func (e *Empty) CloseAsync()

CloseAsync shuts down the StackBuffer output and stops processing messages.

func (*Empty) ErrorsChan

func (e *Empty) ErrorsChan() <-chan []error

ErrorsChan returns the errors channel.

func (*Empty) StartReceiving

func (e *Empty) StartReceiving(msgs <-chan types.Transaction) error

StartReceiving assigns a messages channel for the output to read.

func (*Empty) TransactionChan added in v0.9.0

func (e *Empty) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this input.

func (*Empty) WaitForClose

func (e *Empty) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the StackBuffer output has closed down.

type Parallel added in v0.10.1

type Parallel interface {
	// NextMessage reads the next oldest message, the message is preserved until
	// the returned AckFunc is called.
	NextMessage() (types.Message, parallel.AckFunc, error)

	// PushMessage adds a new message to the stack. Returns the backlog in
	// bytes.
	PushMessage(types.Message) (int, error)

	// CloseOnceEmpty closes the Buffer once the buffer has been emptied, this
	// is a way for a writer to signal to a reader that it is finished writing
	// messages, and therefore the reader can close once it is caught up. This
	// call blocks until the close is completed.
	CloseOnceEmpty()

	// Close closes the Buffer so that blocked readers or writers become
	// unblocked.
	Close()
}

Parallel represents a method of buffering sequential messages, supporting only a single consumer.

type ParallelWrapper added in v0.10.1

type ParallelWrapper struct {
	// contains filtered or unexported fields
}

ParallelWrapper wraps a buffer with a Producer/Consumer interface.

func (*ParallelWrapper) CloseAsync added in v0.10.1

func (m *ParallelWrapper) CloseAsync()

CloseAsync shuts down the ParallelWrapper and stops processing messages.

func (*ParallelWrapper) StartReceiving added in v0.10.1

func (m *ParallelWrapper) StartReceiving(msgs <-chan types.Transaction) error

StartReceiving assigns a messages channel for the output to read.

func (*ParallelWrapper) TransactionChan added in v0.10.1

func (m *ParallelWrapper) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*ParallelWrapper) WaitForClose added in v0.10.1

func (m *ParallelWrapper) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the ParallelWrapper output has closed down.

type Single added in v0.10.1

type Single interface {
	// ShiftMessage removes the oldest message from the stack. Returns the
	// backlog in bytes.
	ShiftMessage() (int, error)

	// NextMessage reads the oldest message, the message is preserved until
	// ShiftMessage is called.
	NextMessage() (types.Message, error)

	// PushMessage adds a new message to the stack. Returns the backlog in
	// bytes.
	PushMessage(types.Message) (int, error)

	// CloseOnceEmpty closes the Buffer once the buffer has been emptied, this
	// is a way for a writer to signal to a reader that it is finished writing
	// messages, and therefore the reader can close once it is caught up. This
	// call blocks until the close is completed.
	CloseOnceEmpty()

	// Close closes the Buffer so that blocked readers or writers become
	// unblocked.
	Close()
}

Single represents a method of buffering sequential messages, supporting only a single consumer.

type SingleWrapper added in v0.10.1

type SingleWrapper struct {
	// contains filtered or unexported fields
}

SingleWrapper wraps a buffer with a Producer/Consumer interface.

func (*SingleWrapper) CloseAsync added in v0.10.1

func (m *SingleWrapper) CloseAsync()

CloseAsync shuts down the SingleWrapper and stops processing messages.

func (*SingleWrapper) StartReceiving added in v0.10.1

func (m *SingleWrapper) StartReceiving(msgs <-chan types.Transaction) error

StartReceiving assigns a messages channel for the output to read.

func (*SingleWrapper) TransactionChan added in v0.10.1

func (m *SingleWrapper) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*SingleWrapper) WaitForClose added in v0.10.1

func (m *SingleWrapper) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the SingleWrapper output has closed down.

type Type

type Type interface {
	types.Producer
	types.Consumer
	types.Closable
}

Type is the standard interface of an agent type.

func New added in v0.0.2

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates an input type based on an input configuration.

func NewBadger added in v0.10.1

func NewBadger(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewBadger creates a buffer around a badger k/v db.

func NewEmpty

func NewEmpty(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewEmpty creates a new buffer interface but doesn't buffer messages.

func NewMemory

func NewMemory(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewMemory - Create a buffer held in memory.

func NewMmapFile

func NewMmapFile(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewMmapFile creates a buffer held in memory and persisted to file through memory map.

func NewParallelWrapper added in v0.10.1

func NewParallelWrapper(
	conf Config,
	buffer Parallel,
	log log.Modular,
	stats metrics.Type,
) Type

NewParallelWrapper creates a new Producer/Consumer around a buffer.

func NewSingleWrapper added in v0.10.1

func NewSingleWrapper(
	conf Config,
	buffer Single,
	log log.Modular,
	stats metrics.Type,
) Type

NewSingleWrapper creates a new Producer/Consumer around a buffer.

type TypeSpec added in v0.9.1

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec is a constructor and usage description for each buffer type.

Directories

Path Synopsis
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL