Documentation ¶
Overview ¶
Package buffer - Links an input and an output together and buffers messages in between.
Index ¶
- func Descriptions() string
- type Config
- type Empty
- func (e *Empty) CloseAsync()
- func (e *Empty) ErrorsChan() <-chan []error
- func (e *Empty) MessageChan() <-chan types.Message
- func (e *Empty) ResponseChan() <-chan types.Response
- func (e *Empty) StartListening(responses <-chan types.Response) error
- func (e *Empty) StartReceiving(msgs <-chan types.Message) error
- func (e *Empty) WaitForClose(timeout time.Duration) error
- type MockType
- func (m MockType) CloseAsync()
- func (m *MockType) ErrorsChan() <-chan []error
- func (m *MockType) MessageChan() <-chan types.Message
- func (m *MockType) ResponseChan() <-chan types.Response
- func (m *MockType) StartListening(res <-chan types.Response) error
- func (m *MockType) StartReceiving(msgs <-chan types.Message) error
- func (m MockType) WaitForClose(time.Duration) error
- type StackBuffer
- func (m *StackBuffer) CloseAsync()
- func (m *StackBuffer) ErrorsChan() <-chan []error
- func (m *StackBuffer) MessageChan() <-chan types.Message
- func (m *StackBuffer) ResponseChan() <-chan types.Response
- func (m *StackBuffer) StartListening(responses <-chan types.Response) error
- func (m *StackBuffer) StartReceiving(msgs <-chan types.Message) error
- func (m *StackBuffer) WaitForClose(timeout time.Duration) error
- type Type
- func Construct(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewEmpty(config Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewMemory(config Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewMmapFile(config Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewStackBuffer(buffer ring.MessageStack, stats metrics.Aggregator) Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Descriptions ¶
func Descriptions() string
Descriptions - Returns a formatted string of collated descriptions of each type.
Types ¶
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` Mmap ring.MmapConfig `json:"mmap_file" yaml:"mmap_file"` Memory ring.MemoryConfig `json:"memory" yaml:"memory"` }
Config - The all encompassing configuration struct for all input types.
type Empty ¶
type Empty struct {
// contains filtered or unexported fields
}
Empty - Empty buffer, simply forwards messages on directly.
func (*Empty) CloseAsync ¶
func (e *Empty) CloseAsync()
CloseAsync - Shuts down the StackBuffer output and stops processing messages.
func (*Empty) ErrorsChan ¶
ErrorsChan - Returns the errors channel.
func (*Empty) MessageChan ¶
MessageChan - Returns the channel used for consuming messages from this input.
func (*Empty) ResponseChan ¶
ResponseChan - Returns the response channel.
func (*Empty) StartListening ¶
StartListening - Sets the channel for reading responses.
func (*Empty) StartReceiving ¶
StartReceiving - Assigns a messages channel for the output to read.
type MockType ¶
type MockType struct { Messages chan types.Message Responses chan types.Response Errors chan []error }
MockType - Implements the buffer.Type interface.
func (*MockType) ErrorsChan ¶
ErrorsChan - Returns the errors channel
func (*MockType) MessageChan ¶
MessageChan - Returns the messages channel.
func (*MockType) ResponseChan ¶
ResponseChan - Returns the errors channel.
func (*MockType) StartListening ¶
StartListening - Sets the channel used for reading responses.
func (*MockType) StartReceiving ¶
StartReceiving - Sets the read channel. This implementation is NOT thread safe.
type StackBuffer ¶
type StackBuffer struct {
// contains filtered or unexported fields
}
StackBuffer - An agent that wraps an output with a message buffer.
func (*StackBuffer) CloseAsync ¶
func (m *StackBuffer) CloseAsync()
CloseAsync - Shuts down the StackBuffer output and stops processing messages.
func (*StackBuffer) ErrorsChan ¶
func (m *StackBuffer) ErrorsChan() <-chan []error
ErrorsChan - Returns the errors channel.
func (*StackBuffer) MessageChan ¶
func (m *StackBuffer) MessageChan() <-chan types.Message
MessageChan - Returns the channel used for consuming messages from this input.
func (*StackBuffer) ResponseChan ¶
func (m *StackBuffer) ResponseChan() <-chan types.Response
ResponseChan - Returns the response channel.
func (*StackBuffer) StartListening ¶
func (m *StackBuffer) StartListening(responses <-chan types.Response) error
StartListening - Sets the channel for reading responses.
func (*StackBuffer) StartReceiving ¶
func (m *StackBuffer) StartReceiving(msgs <-chan types.Message) error
StartReceiving - Assigns a messages channel for the output to read.
func (*StackBuffer) WaitForClose ¶
func (m *StackBuffer) WaitForClose(timeout time.Duration) error
WaitForClose - Blocks until the StackBuffer output has closed down.
type Type ¶
type Type interface { types.Producer types.Consumer types.Closable // ErrorsChan - Returns the channel used for returning any accumulated errors. This needs // reading in the same select block where messages are sent as the errors can occur at any time. ErrorsChan() <-chan []error }
Type - The standard interface of an agent type.
func NewMmapFile ¶
NewMmapFile - Create a buffer held in memory and persisted to file through memory map.
func NewStackBuffer ¶
func NewStackBuffer(buffer ring.MessageStack, stats metrics.Aggregator) Type
NewStackBuffer - Create a new buffered agent type.