Versions in this module Expand all Collapse all v0 v0.3.2 Jun 22, 2015 Changes in this version + const DroppedStream + const LogInternalStream + const PluginControlRoll + const PluginControlStop + const WildcardStream + var DroppedStreamID = GetStreamID(DroppedStream) + var LogInternalStreamID = GetStreamID(LogInternalStream) + var MessageCount = uint32(0) + var StreamTypes = StreamRegistry + var WildcardStreamID = GetStreamID(WildcardStream) + func EnableRetryQueue(size int) + func GetAndResetMessageCount() uint32 + func GetRetryQueue() <-chan Message + type AsyncMessageSource interface + EnqueueResponse func(msg Message) + type Config struct + Plugins []PluginConfig + Values []map[string]shared.MarshalMap + func ReadConfig(path string) (*Config, error) + type Consumer interface + Consume func(*sync.WaitGroup) + Control func() chan<- PluginControl + Streams func() []MessageStreamID + type ConsumerBase struct + func (cons *ConsumerBase) Configure(conf PluginConfig) error + func (cons *ConsumerBase) Control() chan<- PluginControl + func (cons *ConsumerBase) DefaultControlLoop(onRoll func()) + func (cons *ConsumerBase) Enqueue(data []byte, sequence uint64) + func (cons *ConsumerBase) EnqueueCopy(data []byte, sequence uint64) + func (cons *ConsumerBase) EnqueueMessage(msg Message) + func (cons *ConsumerBase) ProcessCommand(command PluginControl, onRoll func()) bool + func (cons *ConsumerBase) Streams() []MessageStreamID + func (cons *ConsumerBase) TickerControlLoop(interval time.Duration, onRoll func(), onTick func()) + func (cons ConsumerBase) AddMainWorker(workers *sync.WaitGroup) + func (cons ConsumerBase) AddWorker() + func (cons ConsumerBase) SetWorkerWaitGroup(workers *sync.WaitGroup) + func (cons ConsumerBase) WorkerDone() + type ConsumerError struct + func NewConsumerError(args ...interface{}) ConsumerError + func (err ConsumerError) Error() string + type Filter interface + Accepts func(msg Message) bool + type Formatter interface + Format func(msg Message) ([]byte, MessageStreamID) + type LinkableMessageSource interface + IsLinked func() bool + Link func(pipe interface{}) + type LogConsumer struct + func (cons *LogConsumer) Configure(conf PluginConfig) error + func (cons *LogConsumer) Consume(threads *sync.WaitGroup) + func (cons *LogConsumer) Control() chan<- PluginControl + func (cons *LogConsumer) Streams() []MessageStreamID + func (cons LogConsumer) IsPaused() bool + func (cons LogConsumer) Pause() + func (cons LogConsumer) Resume() + func (cons LogConsumer) Write(data []byte) (int, error) + type MappedStream struct + Stream Stream + StreamID MessageStreamID + type Message struct + Data []byte + Sequence uint64 + Source MessageSource + StreamID MessageStreamID + Timestamp time.Time + func NewMessage(source MessageSource, data []byte, sequence uint64) Message + func (msg Message) Drop(timeout time.Duration) + func (msg Message) Enqueue(channel chan<- Message, timeout time.Duration) + func (msg Message) Retry(timeout time.Duration) + func (msg Message) String() string + type MessageBatch struct + func NewMessageBatch(size int, format Formatter) *MessageBatch + func (batch *MessageBatch) Append(msg Message) bool + func (batch *MessageBatch) Flush(resource io.Writer, validate func() bool, onError func(error) bool) + func (batch *MessageBatch) Touch() + func (batch *MessageBatch) WaitForFlush(timeout time.Duration) + func (batch MessageBatch) IsEmpty() bool + func (batch MessageBatch) ReachedSizeThreshold(size int) bool + func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool + type MessageSource interface + type MessageStreamID uint64 + func GetStreamID(stream string) MessageStreamID + type Plugin interface + Configure func(conf PluginConfig) error + func NewPlugin(config PluginConfig) (Plugin, error) + func NewPluginWithType(typename string, config PluginConfig) (Plugin, error) + type PluginConfig struct + Enable bool + Instances int + Settings shared.MarshalMap + Stream []string + Typename string + func NewPluginConfig(typename string) PluginConfig + func (conf *PluginConfig) Read(values shared.MarshalMap) + func (conf PluginConfig) GetBool(key string, defaultValue bool) bool + func (conf PluginConfig) GetInt(key string, defaultValue int) int + func (conf PluginConfig) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string + func (conf PluginConfig) GetStreamRoutes(key string) map[MessageStreamID][]MessageStreamID + func (conf PluginConfig) GetString(key string, defaultValue string) string + func (conf PluginConfig) GetStringArray(key string, defaultValue []string) []string + func (conf PluginConfig) GetStringMap(key string, defaultValue map[string]string) map[string]string + func (conf PluginConfig) GetValue(key string, defaultValue interface{}) interface{} + func (conf PluginConfig) HasValue(key string) bool + func (conf PluginConfig) Override(key string, value interface{}) + func (conf PluginConfig) Validate() bool + type PluginControl int + type PluginRunState struct + func (state *PluginRunState) AddWorker() + func (state *PluginRunState) IsPaused() bool + func (state *PluginRunState) Pause() + func (state *PluginRunState) Resume() + func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup) + func (state *PluginRunState) WorkerDone() + type Producer interface + Control func() chan<- PluginControl + Enqueue func(msg Message) + Produce func(workers *sync.WaitGroup) + Streams func() []MessageStreamID + type ProducerBase struct + func (prod *ProducerBase) Close(onMessage func(msg Message)) + func (prod *ProducerBase) Configure(conf PluginConfig) error + func (prod *ProducerBase) Control() chan<- PluginControl + func (prod *ProducerBase) DefaultControlLoop(onMessage func(msg Message), onRoll func()) + func (prod *ProducerBase) Enqueue(msg Message) + func (prod *ProducerBase) Format(msg Message) ([]byte, MessageStreamID) + func (prod *ProducerBase) GetFormatter() Formatter + func (prod *ProducerBase) Messages() chan<- Message + func (prod *ProducerBase) PauseAllStreams(capacity int) + func (prod *ProducerBase) ProcessCommand(command PluginControl, onRoll func()) bool + func (prod *ProducerBase) ResumeAllStreams() + func (prod *ProducerBase) Streams() []MessageStreamID + func (prod *ProducerBase) TickerControlLoop(interval time.Duration, onMessage func(msg Message), onRoll func(), ...) + func (prod ProducerBase) AddMainWorker(workers *sync.WaitGroup) + func (prod ProducerBase) AddWorker() + func (prod ProducerBase) GetTimeout() time.Duration + func (prod ProducerBase) Next() (Message, bool) + func (prod ProducerBase) NextNonBlocking(onMessage func(msg Message)) bool + func (prod ProducerBase) SetWorkerWaitGroup(workers *sync.WaitGroup) + func (prod ProducerBase) WorkerDone() + type ProducerError struct + func NewProducerError(args ...interface{}) ProducerError + func (err ProducerError) Error() string + type SerialMessageSource interface + ResponseDone func() + type Stream interface + AddProducer func(producers ...Producer) + Enqueue func(msg Message) + Pause func(capacity int) + Resume func() + type StreamBase struct + Distribute func(msg Message) + Filter Filter + Format Formatter + Producers []Producer + func (stream *StreamBase) AddProducer(producers ...Producer) + func (stream *StreamBase) Configure(conf PluginConfig) error + func (stream *StreamBase) Enqueue(msg Message) + func (stream *StreamBase) Pause(capacity int) + func (stream *StreamBase) Resume() + type StreamRegistry struct + func (registry *StreamRegistry) GetStreamOrFallback(streamID MessageStreamID) Stream + func (registry *StreamRegistry) Register(stream Stream, streamID MessageStreamID) + func (registry *StreamRegistry) RegisterWildcardProducer(producers ...Producer) + func (registry StreamRegistry) AddWildcardProducersToStream(stream Stream) + func (registry StreamRegistry) ForEachStream(callback func(streamID MessageStreamID, stream Stream)) + func (registry StreamRegistry) GetStream(id MessageStreamID) Stream + func (registry StreamRegistry) GetStreamByName(name string) Stream + func (registry StreamRegistry) GetStreamName(streamID MessageStreamID) string + func (registry StreamRegistry) IsStreamRegistered(id MessageStreamID) bool