Documentation ¶
Overview ¶
Package buffer is both a types.Consumer and types.Producer implementation that is able to sit between other stream components, effectively decoupling their transaction channels by storing messages in a buffer implementation.
Buffers are not needed within Benthos, and should not be used unless there is a specific problem to be solved with one.
Index ¶
- Constants
- Variables
- func Descriptions() string
- func SanitiseConfig(conf Config) (interface{}, error)
- type Config
- type Empty
- type EnabledBatchPolicyConfig
- type MemoryConfig
- type Parallel
- type ParallelBatcher
- type ParallelWrapper
- type Single
- type SingleWrapper
- type Type
- func New(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewEmpty(config Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewMemory(config Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewParallelBatcher(batcher *batch.Policy, child Type, log log.Modular, stats metrics.Type) Type
- func NewParallelWrapper(conf Config, buffer Parallel, log log.Modular, stats metrics.Type) Type
- func NewSingleWrapper(conf Config, buffer Single, log log.Modular, stats metrics.Type) Type
- type TypeSpec
Constants ¶
const ( TypeMemory = "memory" TypeNone = "none" )
String constants representing each buffer type.
Variables ¶
var Constructors = map[string]TypeSpec{}
Constructors is a map of all buffer types with their specs.
Functions ¶
func Descriptions ¶
func Descriptions() string
Descriptions returns a formatted string of collated descriptions of each type.
func SanitiseConfig ¶
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
Types ¶
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` Memory MemoryConfig `json:"memory" yaml:"memory"` None struct{} `json:"none" yaml:"none"` }
Config is the all encompassing configuration struct for all buffer types.
type Empty ¶
type Empty struct {
// contains filtered or unexported fields
}
Empty is an empty buffer, simply forwards messages on directly.
func (*Empty) CloseAsync ¶
func (e *Empty) CloseAsync()
CloseAsync shuts down the StackBuffer output and stops processing messages.
func (*Empty) Consume ¶
func (e *Empty) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*Empty) ErrorsChan ¶
ErrorsChan returns the errors channel.
func (*Empty) StopConsuming ¶
func (e *Empty) StopConsuming()
StopConsuming instructs the buffer to no longer consume data.
func (*Empty) TransactionChan ¶
func (e *Empty) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this input.
type EnabledBatchPolicyConfig ¶
type EnabledBatchPolicyConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` batch.PolicyConfig `json:",inline" yaml:",inline"` }
EnabledBatchPolicyConfig is a batch.PolicyConfig with an enable field.
type MemoryConfig ¶
type MemoryConfig struct { Limit int `json:"limit" yaml:"limit"` BatchPolicy EnabledBatchPolicyConfig `json:"batch_policy" yaml:"batch_policy"` }
MemoryConfig is config values for a purely memory based ring buffer type.
func NewMemoryConfig ¶
func NewMemoryConfig() MemoryConfig
NewMemoryConfig creates a new MemoryConfig with default values.
type Parallel ¶
type Parallel interface { // NextMessage reads the next oldest message, the message is preserved until // the returned AckFunc is called. NextMessage() (types.Message, parallel.AckFunc, error) // PushMessage adds a new message to the stack. Returns the backlog in // bytes. PushMessage(types.Message) (int, error) // CloseOnceEmpty closes the Buffer once the buffer has been emptied, this // is a way for a writer to signal to a reader that it is finished writing // messages, and therefore the reader can close once it is caught up. This // call blocks until the close is completed. CloseOnceEmpty() // Close closes the Buffer so that blocked readers or writers become // unblocked. Close() }
Parallel represents a method of buffering messages such that they can be consumed by any number of parallel consumers, and can be acknowledged in any order.
type ParallelBatcher ¶
type ParallelBatcher struct {
// contains filtered or unexported fields
}
ParallelBatcher wraps a buffer with a Producer/Consumer interface.
func (*ParallelBatcher) CloseAsync ¶
func (m *ParallelBatcher) CloseAsync()
CloseAsync shuts down the ParallelBatcher and stops processing messages.
func (*ParallelBatcher) Consume ¶
func (m *ParallelBatcher) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*ParallelBatcher) StopConsuming ¶
func (m *ParallelBatcher) StopConsuming()
StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.
func (*ParallelBatcher) TransactionChan ¶
func (m *ParallelBatcher) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this buffer.
func (*ParallelBatcher) WaitForClose ¶
func (m *ParallelBatcher) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the ParallelBatcher output has closed down.
type ParallelWrapper ¶
type ParallelWrapper struct {
// contains filtered or unexported fields
}
ParallelWrapper wraps a buffer with a Producer/Consumer interface.
func (*ParallelWrapper) CloseAsync ¶
func (m *ParallelWrapper) CloseAsync()
CloseAsync shuts down the ParallelWrapper and stops processing messages.
func (*ParallelWrapper) Consume ¶
func (m *ParallelWrapper) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*ParallelWrapper) StopConsuming ¶
func (m *ParallelWrapper) StopConsuming()
StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.
func (*ParallelWrapper) TransactionChan ¶
func (m *ParallelWrapper) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this buffer.
func (*ParallelWrapper) WaitForClose ¶
func (m *ParallelWrapper) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the ParallelWrapper output has closed down.
type Single ¶
type Single interface { // ShiftMessage removes the oldest message from the stack. Returns the // backlog in bytes. ShiftMessage() (int, error) // NextMessage reads the oldest message, the message is preserved until // ShiftMessage is called. NextMessage() (types.Message, error) // PushMessage adds a new message to the stack. Returns the backlog in // bytes. PushMessage(types.Message) (int, error) // CloseOnceEmpty closes the Buffer once the buffer has been emptied, this // is a way for a writer to signal to a reader that it is finished writing // messages, and therefore the reader can close once it is caught up. This // call blocks until the close is completed. CloseOnceEmpty() // Close closes the Buffer so that blocked readers or writers become // unblocked. Close() }
Single represents a method of buffering sequential messages, supporting only a single, sequential consumer.
type SingleWrapper ¶
type SingleWrapper struct {
// contains filtered or unexported fields
}
SingleWrapper wraps a buffer with a Producer/Consumer interface.
func (*SingleWrapper) CloseAsync ¶
func (m *SingleWrapper) CloseAsync()
CloseAsync shuts down the SingleWrapper and stops processing messages.
func (*SingleWrapper) Consume ¶
func (m *SingleWrapper) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*SingleWrapper) StopConsuming ¶
func (m *SingleWrapper) StopConsuming()
StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.
func (*SingleWrapper) TransactionChan ¶
func (m *SingleWrapper) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this buffer.
func (*SingleWrapper) WaitForClose ¶
func (m *SingleWrapper) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the SingleWrapper output has closed down.
type Type ¶
type Type interface { types.Producer types.Consumer types.Closable // StopConsuming instructs the buffer to cut off the producer it is // consuming from. It will then enter a mode whereby messages can only be // read, and when the buffer is empty it will shut down. StopConsuming() }
Type is an interface implemented by all buffer types.
func NewParallelBatcher ¶
func NewParallelBatcher( batcher *batch.Policy, child Type, log log.Modular, stats metrics.Type, ) Type
NewParallelBatcher creates a new Producer/Consumer around a buffer.
func NewParallelWrapper ¶
NewParallelWrapper creates a new Producer/Consumer around a buffer.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
|
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads. |
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
|
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers). |