Documentation ¶
Index ¶
- Constants
- Variables
- func EnableRetryQueue(size int)
- func GetAndResetMessageCount() uint32
- func GetRetryQueue() <-chan Message
- type AsyncMessageSource
- type Config
- type Consumer
- type ConsumerBase
- func (cons ConsumerBase) AddMainWorker(workers *sync.WaitGroup)
- func (cons ConsumerBase) AddWorker()
- func (cons *ConsumerBase) Configure(conf PluginConfig) error
- func (cons *ConsumerBase) Control() chan<- PluginControl
- func (cons *ConsumerBase) DefaultControlLoop(onRoll func())
- func (cons *ConsumerBase) Enqueue(data []byte, sequence uint64)
- func (cons *ConsumerBase) EnqueueCopy(data []byte, sequence uint64)
- func (cons *ConsumerBase) EnqueueMessage(msg Message)
- func (cons *ConsumerBase) ProcessCommand(command PluginControl, onRoll func()) bool
- func (cons ConsumerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
- func (cons *ConsumerBase) Streams() []MessageStreamID
- func (cons *ConsumerBase) TickerControlLoop(interval time.Duration, onRoll func(), onTick func())
- func (cons ConsumerBase) WorkerDone()
- type ConsumerError
- type Filter
- type Formatter
- type LinkableMessageSource
- type LogConsumer
- func (cons *LogConsumer) Configure(conf PluginConfig) error
- func (cons *LogConsumer) Consume(threads *sync.WaitGroup)
- func (cons *LogConsumer) Control() chan<- PluginControl
- func (cons LogConsumer) IsPaused() bool
- func (cons LogConsumer) Pause()
- func (cons LogConsumer) Resume()
- func (cons *LogConsumer) Streams() []MessageStreamID
- func (cons LogConsumer) Write(data []byte) (int, error)
- type MappedStream
- type Message
- type MessageBatch
- func (batch *MessageBatch) Append(msg Message) bool
- func (batch *MessageBatch) Flush(resource io.Writer, validate func() bool, onError func(error) bool)
- func (batch MessageBatch) IsEmpty() bool
- func (batch MessageBatch) ReachedSizeThreshold(size int) bool
- func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool
- func (batch *MessageBatch) Touch()
- func (batch *MessageBatch) WaitForFlush(timeout time.Duration)
- type MessageSource
- type MessageStreamID
- type Plugin
- type PluginConfig
- func (conf PluginConfig) GetBool(key string, defaultValue bool) bool
- func (conf PluginConfig) GetInt(key string, defaultValue int) int
- func (conf PluginConfig) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string
- func (conf PluginConfig) GetStreamRoutes(key string) map[MessageStreamID][]MessageStreamID
- func (conf PluginConfig) GetString(key string, defaultValue string) string
- func (conf PluginConfig) GetStringArray(key string, defaultValue []string) []string
- func (conf PluginConfig) GetStringMap(key string, defaultValue map[string]string) map[string]string
- func (conf PluginConfig) GetValue(key string, defaultValue interface{}) interface{}
- func (conf PluginConfig) HasValue(key string) bool
- func (conf PluginConfig) Override(key string, value interface{})
- func (conf *PluginConfig) Read(values shared.MarshalMap)
- func (conf PluginConfig) Validate() bool
- type PluginControl
- type PluginRunState
- type Producer
- type ProducerBase
- func (prod ProducerBase) AddMainWorker(workers *sync.WaitGroup)
- func (prod ProducerBase) AddWorker()
- func (prod *ProducerBase) Close(onMessage func(msg Message))
- func (prod *ProducerBase) Configure(conf PluginConfig) error
- func (prod *ProducerBase) Control() chan<- PluginControl
- func (prod *ProducerBase) DefaultControlLoop(onMessage func(msg Message), onRoll func())
- func (prod *ProducerBase) Enqueue(msg Message)
- func (prod *ProducerBase) Format(msg Message) ([]byte, MessageStreamID)
- func (prod *ProducerBase) GetFormatter() Formatter
- func (prod ProducerBase) GetTimeout() time.Duration
- func (prod *ProducerBase) Messages() chan<- Message
- func (prod ProducerBase) Next() (Message, bool)
- func (prod ProducerBase) NextNonBlocking(onMessage func(msg Message)) bool
- func (prod *ProducerBase) PauseAllStreams(capacity int)
- func (prod *ProducerBase) ProcessCommand(command PluginControl, onRoll func()) bool
- func (prod *ProducerBase) ResumeAllStreams()
- func (prod ProducerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
- func (prod *ProducerBase) Streams() []MessageStreamID
- func (prod *ProducerBase) TickerControlLoop(interval time.Duration, onMessage func(msg Message), onRoll func(), ...)
- func (prod ProducerBase) WorkerDone()
- type ProducerError
- type SerialMessageSource
- type Stream
- type StreamBase
- type StreamRegistry
- func (registry StreamRegistry) AddWildcardProducersToStream(stream Stream)
- func (registry StreamRegistry) ForEachStream(callback func(streamID MessageStreamID, stream Stream))
- func (registry StreamRegistry) GetStream(id MessageStreamID) Stream
- func (registry StreamRegistry) GetStreamByName(name string) Stream
- func (registry StreamRegistry) GetStreamName(streamID MessageStreamID) string
- func (registry *StreamRegistry) GetStreamOrFallback(streamID MessageStreamID) Stream
- func (registry StreamRegistry) IsStreamRegistered(id MessageStreamID) bool
- func (registry *StreamRegistry) Register(stream Stream, streamID MessageStreamID)
- func (registry *StreamRegistry) RegisterWildcardProducer(producers ...Producer)
Constants ¶
const ( // LogInternalStream is the name of the internal message channel (logs) LogInternalStream = "_GOLLUM_" // WildcardStream is the name of the "all streams" channel WildcardStream = "*" // DroppedStream is the name of the stream used to store dropped messages DroppedStream = "_DROPPED_" )
const ( // PluginControlStop will cause the consumer to halt and shutdown. PluginControlStop = PluginControl(1) // PluginControlRoll notifies the consumer about a reconnect or reopen request PluginControlRoll = PluginControl(2) )
Variables ¶
var ( // LogInternalStreamID is the ID of the "_GOLLUM_" stream LogInternalStreamID = GetStreamID(LogInternalStream) // WildcardStreamID is the ID of the "*" stream WildcardStreamID = GetStreamID(WildcardStream) // DroppedStreamID is the ID of the "_DROPPED_" stream DroppedStreamID = GetStreamID(DroppedStream) )
var MessageCount = uint32(0)
MessageCount holds the number of messages processed since the last call to GetAndResetMessageCount.
var StreamTypes = StreamRegistry{ // contains filtered or unexported fields }
StreamTypes is the global instance of StreamRegistry used to store the all registered streams.
Functions ¶
func EnableRetryQueue ¶
func EnableRetryQueue(size int)
EnableRetryQueue creates a retried messages channel using the given size.
func GetAndResetMessageCount ¶
func GetAndResetMessageCount() uint32
GetAndResetMessageCount returns the current message counter and resets it to 0. This function is threadsafe.
func GetRetryQueue ¶
func GetRetryQueue() <-chan Message
GetRetryQueue returns read access to the retry queue.
Types ¶
type AsyncMessageSource ¶
type AsyncMessageSource interface { MessageSource // EnqueueResponse sends a message to the source of another message. EnqueueResponse(msg Message) }
AsyncMessageSource extends the MessageSource interface to allow a backchannel that simply forwards any message coming from the producer.
type Config ¶
type Config struct { Values []map[string]shared.MarshalMap Plugins []PluginConfig }
Config represents the top level config containing all plugin clonfigs
func ReadConfig ¶
ReadConfig parses a YAML config file into a new Config struct.
type Consumer ¶
type Consumer interface { // Consume should implement to main loop that fetches messages from a given // source and pushes it to the Message channel. Consume(*sync.WaitGroup) // Streams returns the streams this consumer is writing to. Streams() []MessageStreamID // Control returns write access to this consumer's control channel. // See PluginControl* constants. Control() chan<- PluginControl }
Consumer is an interface for plugins that recieve data from outside sources and generate Message objects from this data.
type ConsumerBase ¶
type ConsumerBase struct {
// contains filtered or unexported fields
}
ConsumerBase base class All consumers support a common subset of configuration options:
- "consumer.Something": Enable: true Stream:
- "error"
- "default"
Enable switches the consumer on or off. By default this value is set to true.
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages.
func (ConsumerBase) AddMainWorker ¶
func (cons ConsumerBase) AddMainWorker(workers *sync.WaitGroup)
AddMainWorker adds the first worker to the waitgroup
func (ConsumerBase) AddWorker ¶
func (cons ConsumerBase) AddWorker()
AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.
func (*ConsumerBase) Configure ¶
func (cons *ConsumerBase) Configure(conf PluginConfig) error
Configure initializes standard consumer values from a plugin config.
func (*ConsumerBase) Control ¶
func (cons *ConsumerBase) Control() chan<- PluginControl
Control returns write access to this consumer's control channel. See ConsumerControl* constants.
func (*ConsumerBase) DefaultControlLoop ¶
func (cons *ConsumerBase) DefaultControlLoop(onRoll func())
DefaultControlLoop provides a consumer mainloop that is sufficient for most usecases.
func (*ConsumerBase) Enqueue ¶
func (cons *ConsumerBase) Enqueue(data []byte, sequence uint64)
Enqueue creates a new message from a given byte slice and passes it to EnqueueMessage. Note that data is not copied, just referenced by the message.
func (*ConsumerBase) EnqueueCopy ¶
func (cons *ConsumerBase) EnqueueCopy(data []byte, sequence uint64)
EnqueueCopy behaves like Enqueue but creates a copy of data that is attached to the message.
func (*ConsumerBase) EnqueueMessage ¶
func (cons *ConsumerBase) EnqueueMessage(msg Message)
EnqueueMessage passes a given message to all streams. Only the StreamID of the message is modified, everything else is passed as-is.
func (*ConsumerBase) ProcessCommand ¶
func (cons *ConsumerBase) ProcessCommand(command PluginControl, onRoll func()) bool
ProcessCommand provides a callback based possibility to react on the different consumer commands. Returns true if ConsumerControlStop was triggered.
func (ConsumerBase) SetWorkerWaitGroup ¶
func (cons ConsumerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.
func (*ConsumerBase) Streams ¶
func (cons *ConsumerBase) Streams() []MessageStreamID
Streams returns an array with all stream ids this consumer is writing to.
func (*ConsumerBase) TickerControlLoop ¶
func (cons *ConsumerBase) TickerControlLoop(interval time.Duration, onRoll func(), onTick func())
TickerControlLoop is like DefaultControlLoop but executes a given function at every given interval tick, too.
func (ConsumerBase) WorkerDone ¶
func (cons ConsumerBase) WorkerDone()
WorkerDone removes an additional worker to the waitgroup.
type ConsumerError ¶
type ConsumerError struct {
// contains filtered or unexported fields
}
ConsumerError can be used to return consumer related errors e.g. during a call to Configure
func NewConsumerError ¶
func NewConsumerError(args ...interface{}) ConsumerError
NewConsumerError creates a new ConsumerError
func (ConsumerError) Error ¶
func (err ConsumerError) Error() string
Error satisfies the error interface for the ConsumerError struct
type Filter ¶
Filter allows custom message filtering for ProducerBase derived plugins. Producers not deriving from ProducerBase might utilize this one, too.
type Formatter ¶
type Formatter interface { // Format transfers the message payload into a new format. The payload may // then be reassigned to the original or a new message. // In addition to that the formatter may change the stream of the message. Format(msg Message) ([]byte, MessageStreamID) }
Formatter is the interface definition for message formatters
type LinkableMessageSource ¶
type LinkableMessageSource interface { MessageSource // Link the message source to the message reciever. This makes it possible // to create stable "pipes" between e.g. a consumer and producer. Link(pipe interface{}) // IsLinked has to return true if Link executed successfull and does not // need to be called again. IsLinked() bool }
LinkableMessageSource extends the MessageSource interface to allow a pipe like behaviour between two components that communicate messages.
type LogConsumer ¶
type LogConsumer struct {
// contains filtered or unexported fields
}
LogConsumer is an internal consumer plugin used indirectly by the gollum log package.
func (*LogConsumer) Configure ¶
func (cons *LogConsumer) Configure(conf PluginConfig) error
Configure initializes this consumer with values from a plugin config.
func (*LogConsumer) Consume ¶
func (cons *LogConsumer) Consume(threads *sync.WaitGroup)
Consume starts listening for control statements
func (*LogConsumer) Control ¶
func (cons *LogConsumer) Control() chan<- PluginControl
Control returns a handle to the control channel
func (LogConsumer) IsPaused ¶
func (cons LogConsumer) IsPaused() bool
IsPaused returns false as Pause is not implemented
func (LogConsumer) Resume ¶
func (cons LogConsumer) Resume()
Resume is not implemented as Pause is not implemented.
func (*LogConsumer) Streams ¶
func (cons *LogConsumer) Streams() []MessageStreamID
Streams always returns an array with one member - the internal log stream
type MappedStream ¶
type MappedStream struct { StreamID MessageStreamID Stream Stream }
MappedStream holds a stream and the id the stream is assgined to
type Message ¶
type Message struct { Data []byte StreamID MessageStreamID Source MessageSource Timestamp time.Time Sequence uint64 }
Message is a container used for storing the internal state of messages. This struct is passed between consumers and producers.
func NewMessage ¶
func NewMessage(source MessageSource, data []byte, sequence uint64) Message
NewMessage creates a new message from a given data stream
func (Message) Drop ¶
Drop pushes a message to the retry queue and sets the stream to _DROPPED_. This queue can be consumed by the loopback consumer. If no such consumer has been configured, the message is lost.
func (Message) Enqueue ¶
Enqueue is a convenience function to push a message to a channel while waiting for a timeout instead of just blocking. Passing a timeout of -1 will discard the message. Passing a timout of 0 will always block. Messages that time out will be passed to the dropped queue if a Dropped consumer exists.
type MessageBatch ¶
type MessageBatch struct {
// contains filtered or unexported fields
}
MessageBatch is a helper class for producers to format and store messages into a single buffer that is flushed to an io.Writer. You can use the Reached* functions to determine whether a flush should be called, i.e. if a timeout or size threshold has been reached.
func NewMessageBatch ¶
func NewMessageBatch(size int, format Formatter) *MessageBatch
NewMessageBatch creates a new MessageBatch with a given size (in bytes) and a given formatter.
func (*MessageBatch) Append ¶
func (batch *MessageBatch) Append(msg Message) bool
Append formats a message and appends it to the internal buffer. If the message does not fit into the buffer this function returns false. If the message can never fit into the buffer (too large), true is returned and an error is logged.
func (*MessageBatch) Flush ¶
func (batch *MessageBatch) Flush(resource io.Writer, validate func() bool, onError func(error) bool)
Flush writes the content of the buffer to a given resource and resets the internal state, i.e. the buffer is empty after a call to Flush. Writing will be done in a separate go routine to be non-blocking.
The validate callback will be called after messages have been successfully written to the io.Writer. If validate returns false the buffer will not be resetted (automatic retry). If validate is nil a return value of true is assumed (buffer reset).
The onError callback will be called if the io.Writer returned an error. If onError returns false the buffer will not be resetted (automatic retry). If onError is nil a return value of true is assumed (buffer reset).
func (MessageBatch) IsEmpty ¶
func (batch MessageBatch) IsEmpty() bool
IsEmpty returns true if no data is stored in the buffer
func (MessageBatch) ReachedSizeThreshold ¶
func (batch MessageBatch) ReachedSizeThreshold(size int) bool
ReachedSizeThreshold returns true if the bytes stored in the buffer are above or equal to the size given. If there is no data this function returns false.
func (MessageBatch) ReachedTimeThreshold ¶
func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool
ReachedTimeThreshold returns true if the last flush was more than timeout ago. If there is no data this function returns false.
func (*MessageBatch) Touch ¶
func (batch *MessageBatch) Touch()
Touch resets the timer queried by ReachedTimeThreshold, i.e. this resets the automatic flush timeout
func (*MessageBatch) WaitForFlush ¶
func (batch *MessageBatch) WaitForFlush(timeout time.Duration)
WaitForFlush blocks until the current flush command returns. Passing a timeout > 0 will unblock this function after the given duration at the latest.
type MessageSource ¶
type MessageSource interface { }
MessageSource defines methods that are common to all message sources. Currently this is only a placeholder.
type MessageStreamID ¶
type MessageStreamID uint64
MessageStreamID is the "compiled name" of a stream
func GetStreamID ¶
func GetStreamID(stream string) MessageStreamID
GetStreamID returns the integer representation of a given stream name.
type Plugin ¶
type Plugin interface {
Configure(conf PluginConfig) error
}
Plugin is the base class for any runtime class that can be configured and instantiated during runtim.
func NewPlugin ¶
func NewPlugin(config PluginConfig) (Plugin, error)
NewPlugin creates a new plugin from the type information stored in its config. This function internally calls NewPluginWithType.
func NewPluginWithType ¶
func NewPluginWithType(typename string, config PluginConfig) (Plugin, error)
NewPluginWithType creates a new plugin of a given type and initializes it using the given config (i.e. passes that config to Configure). The type passed to this function may differ from the type stored in the config. If the type is meant to match use NewPlugin instead of NewPluginWithType. This function returns nil, error if the plugin could not be instantiated or plugin, error if Configure failed.
type PluginConfig ¶
type PluginConfig struct { Typename string Enable bool Instances int Stream []string Settings shared.MarshalMap // contains filtered or unexported fields }
PluginConfig is a configuration for a specific plugin
func NewPluginConfig ¶
func NewPluginConfig(typename string) PluginConfig
NewPluginConfig creates a new plugin config with default values. By default the plugin is enabled, has a buffered channel with 4096 slots, has one instance, is bound to no streams and has no additional settings.
func (PluginConfig) GetBool ¶
func (conf PluginConfig) GetBool(key string, defaultValue bool) bool
GetBool tries to read a non-predefined, boolean value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetInt ¶
func (conf PluginConfig) GetInt(key string, defaultValue int) int
GetInt tries to read a non-predefined, integer value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetStreamMap ¶
func (conf PluginConfig) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string
GetStreamMap tries to read a non-predefined, stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.
func (PluginConfig) GetStreamRoutes ¶
func (conf PluginConfig) GetStreamRoutes(key string) map[MessageStreamID][]MessageStreamID
GetStreamRoutes tries to read a non-predefined, stream to stream map from a plugin config. If no routes are defined an empty map is returned
func (PluginConfig) GetString ¶
func (conf PluginConfig) GetString(key string, defaultValue string) string
GetString tries to read a non-predefined, string value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetStringArray ¶
func (conf PluginConfig) GetStringArray(key string, defaultValue []string) []string
GetStringArray tries to read a non-predefined, string array from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetStringMap ¶
GetStringMap tries to read a non-predefined, string to string map from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetValue ¶
func (conf PluginConfig) GetValue(key string, defaultValue interface{}) interface{}
GetValue tries to read a non-predefined, untyped value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) HasValue ¶
func (conf PluginConfig) HasValue(key string) bool
HasValue returns true if the given key has been set as a config option. This function only takes non-predefined settings into account.
func (PluginConfig) Override ¶
func (conf PluginConfig) Override(key string, value interface{})
Override sets or override a configuration value for non-predefined options.
func (*PluginConfig) Read ¶
func (conf *PluginConfig) Read(values shared.MarshalMap)
Read analyzes a given key/value map to extract the configuration values valid for each plugin. All non-default values are written to the Settings member.
func (PluginConfig) Validate ¶
func (conf PluginConfig) Validate() bool
Validate should be called after a configuration has been processed. It will check the keys read from the config files against the keys requested up to this point. Unknown keys will be written to the error log.
type PluginControl ¶
type PluginControl int
PluginControl is an enumeration used by the Producer.control() channel
type PluginRunState ¶
type PluginRunState struct {
// contains filtered or unexported fields
}
PluginRunState is used in some plugins to store information about the execution state of the plugin (i.e. if it is running or not) as well as threading primitives that enable gollum to wait for a plugin top properly shut down.
func (*PluginRunState) AddWorker ¶
func (state *PluginRunState) AddWorker()
AddWorker adds a worker to the waitgroup configured by SetWorkerWaitGroup.
func (*PluginRunState) IsPaused ¶
func (state *PluginRunState) IsPaused() bool
IsPaused implements the MessageSource interface
func (*PluginRunState) Pause ¶
func (state *PluginRunState) Pause()
Pause implements the MessageSource interface
func (*PluginRunState) Resume ¶
func (state *PluginRunState) Resume()
Resume implements the MessageSource interface
func (*PluginRunState) SetWorkerWaitGroup ¶
func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup sets the WaitGroup used to manage workers
func (*PluginRunState) WorkerDone ¶
func (state *PluginRunState) WorkerDone()
WorkerDone removes a worker from the waitgroup configured by SetWorkerWaitGroup.
type Producer ¶
type Producer interface { // Enqueue sends a message to the producer. The producer may reject // the message or drop it after a given timeout. Enqueue can block. Enqueue(msg Message) // Produce should implement a main loop that passes messages from the // message channel to some other service like the console. // This can be part of this function or a separate go routine. // Produce is always called as a go routine. Produce(workers *sync.WaitGroup) // Streams returns the streams this producer is listening to. Streams() []MessageStreamID // Control returns write access to this producer's control channel. // See ProducerControl* constants. Control() chan<- PluginControl }
Producer is an interface for plugins that pass messages to other services, files or storages.
type ProducerBase ¶
type ProducerBase struct {
// contains filtered or unexported fields
}
ProducerBase base class All producers support a common subset of configuration options:
- "producer.Something": Enable: true Channel: 1024 ChannelTimeout: 200 Formatter: "format.Envelope" Stream:
- "error"
- "default"
Enable switches the consumer on or off. By default this value is set to true.
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer's queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to "*" which means "listen to all streams but the internal".
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward.
func (ProducerBase) AddMainWorker ¶
func (prod ProducerBase) AddMainWorker(workers *sync.WaitGroup)
AddMainWorker adds the first worker to the waitgroup
func (ProducerBase) AddWorker ¶
func (prod ProducerBase) AddWorker()
AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.
func (*ProducerBase) Close ¶
func (prod *ProducerBase) Close(onMessage func(msg Message))
Close closes the internal message channel and sends all remaining messages to the given callback. This function is called by *ControlLoop after a quit command has been recieved.
func (*ProducerBase) Configure ¶
func (prod *ProducerBase) Configure(conf PluginConfig) error
Configure initializes the standard producer config values.
func (*ProducerBase) Control ¶
func (prod *ProducerBase) Control() chan<- PluginControl
Control returns write access to this producer's control channel. See ProducerControl* constants.
func (*ProducerBase) DefaultControlLoop ¶
func (prod *ProducerBase) DefaultControlLoop(onMessage func(msg Message), onRoll func())
DefaultControlLoop provides a producer mainloop that is sufficient for most usecases. Before this function exits Close will be called.
func (*ProducerBase) Enqueue ¶
func (prod *ProducerBase) Enqueue(msg Message)
Enqueue will add the message to the internal channel so it can be processed by the producer main loop.
func (*ProducerBase) Format ¶
func (prod *ProducerBase) Format(msg Message) ([]byte, MessageStreamID)
Format calls the formatters Format function
func (*ProducerBase) GetFormatter ¶
func (prod *ProducerBase) GetFormatter() Formatter
GetFormatter returns the formatter of this producer
func (ProducerBase) GetTimeout ¶
func (prod ProducerBase) GetTimeout() time.Duration
GetTimeout returns the duration this producer will block before a message is dropped. A value of -1 will cause the message to drop. A value of 0 will cause the producer to always block.
func (*ProducerBase) Messages ¶
func (prod *ProducerBase) Messages() chan<- Message
Messages returns write access to the message channel this producer reads from.
func (ProducerBase) Next ¶
func (prod ProducerBase) Next() (Message, bool)
Next returns the latest message from the channel as well as the open state of the channel. This function blocks if the channel is empty.
func (ProducerBase) NextNonBlocking ¶
func (prod ProducerBase) NextNonBlocking(onMessage func(msg Message)) bool
NextNonBlocking calls a given callback if a message is queued or returns. Returns false if no message was recieved.
func (*ProducerBase) PauseAllStreams ¶
func (prod *ProducerBase) PauseAllStreams(capacity int)
PauseAllStreams sends the Pause() command to all streams this producer is listening to.
func (*ProducerBase) ProcessCommand ¶
func (prod *ProducerBase) ProcessCommand(command PluginControl, onRoll func()) bool
ProcessCommand provides a callback based possibility to react on the different producer commands. Returns true if ProducerControlStop was triggered.
func (*ProducerBase) ResumeAllStreams ¶
func (prod *ProducerBase) ResumeAllStreams()
ResumeAllStreams sends the Resume() command to all streams this producer is listening to.
func (ProducerBase) SetWorkerWaitGroup ¶
func (prod ProducerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.
func (*ProducerBase) Streams ¶
func (prod *ProducerBase) Streams() []MessageStreamID
Streams returns the streams this producer is listening to.
func (*ProducerBase) TickerControlLoop ¶
func (prod *ProducerBase) TickerControlLoop(interval time.Duration, onMessage func(msg Message), onRoll func(), onTimeOut func())
TickerControlLoop is like DefaultControlLoop but executes a given function at every given interval tick, too. Before this function exits Close will be called.
func (ProducerBase) WorkerDone ¶
func (prod ProducerBase) WorkerDone()
WorkerDone removes an additional worker to the waitgroup.
type ProducerError ¶
type ProducerError struct {
// contains filtered or unexported fields
}
ProducerError can be used to return consumer related errors e.g. during a call to Configure
func NewProducerError ¶
func NewProducerError(args ...interface{}) ProducerError
NewProducerError creates a new ProducerError
func (ProducerError) Error ¶
func (err ProducerError) Error() string
Error satisfies the error interface for the ProducerError struct
type SerialMessageSource ¶
type SerialMessageSource interface { AsyncMessageSource // Notify the end of the response stream ResponseDone() }
SerialMessageSource extends the AsyncMessageSource interface to allow waiting for all parts of the response to be submitted.
type Stream ¶
type Stream interface { // Pause causes this stream to go silent. Messages should be queued or cause // a blocking call. The passed capacity can be used to configure internal // channel for buffering incoming messages while this stream is paused. Pause(capacity int) // Resume causes this stream to send messages again after Pause() had been // called. Any buffered messages need to be sent by this method or by a // separate go routine. Resume() // AddProducer adds one or more producers to this stream, i.e. the producers // listening to messages on this stream. AddProducer(producers ...Producer) // Enqueue sends a given message to all registered producers Enqueue(msg Message) }
Stream defines the interface for all stream plugins
type StreamBase ¶
type StreamBase struct { Filter Filter Format Formatter Producers []Producer Distribute func(msg Message) // contains filtered or unexported fields }
StreamBase defines the standard stream implementation. New stream types should derive from this class. StreamBase allows streams to set and execute filters as well as format a message. Types derived from StreamBase should set the Distribute member instead of overloading the Enqueue method. See stream.Broadcast for default configuration values and examples.
func (*StreamBase) AddProducer ¶
func (stream *StreamBase) AddProducer(producers ...Producer)
AddProducer adds all producers to the list of known producers. Duplicates will be filtered.
func (*StreamBase) Configure ¶
func (stream *StreamBase) Configure(conf PluginConfig) error
Configure sets up all values requred by StreamBase
func (*StreamBase) Enqueue ¶
func (stream *StreamBase) Enqueue(msg Message)
Enqueue checks the filter, formats the message and sends it to all producers registered. Functions deriving from StreamBase can set the Distribute member to hook into this function.
func (*StreamBase) Pause ¶
func (stream *StreamBase) Pause(capacity int)
Pause will cause this stream to go silent. Messages will be queued to an internal channel that can be configured in size by setting the capacity parameter. Pass a capacity of 0 to disable buffering. Calling Pause on an already paused stream is ignored.
func (*StreamBase) Resume ¶
func (stream *StreamBase) Resume()
Resume causes this stream to send messages again after Pause() had been called. Any buffered messages will be sent by a separate go routine. Calling Resume on a stream that is not paused is ignored.
type StreamRegistry ¶
type StreamRegistry struct {
// contains filtered or unexported fields
}
StreamRegistry holds streams mapped by their MessageStreamID as well as a reverse lookup of MessageStreamID to stream name.
func (StreamRegistry) AddWildcardProducersToStream ¶
func (registry StreamRegistry) AddWildcardProducersToStream(stream Stream)
AddWildcardProducersToStream adds all known wildcard producers to a given stream. The state of the wildcard list is undefined during the configuration phase.
func (StreamRegistry) ForEachStream ¶
func (registry StreamRegistry) ForEachStream(callback func(streamID MessageStreamID, stream Stream))
ForEachStream loops over all registered streams and calls the given function.
func (StreamRegistry) GetStream ¶
func (registry StreamRegistry) GetStream(id MessageStreamID) Stream
GetStream returns a registered stream or nil
func (StreamRegistry) GetStreamByName ¶
func (registry StreamRegistry) GetStreamByName(name string) Stream
GetStreamByName returns a registered stream by name. See GetStream.
func (StreamRegistry) GetStreamName ¶
func (registry StreamRegistry) GetStreamName(streamID MessageStreamID) string
GetStreamName does a reverse lookup for a given MessageStreamID and returns the corresponding name. If the MessageStreamID is not registered, an empty string is returned.
func (*StreamRegistry) GetStreamOrFallback ¶
func (registry *StreamRegistry) GetStreamOrFallback(streamID MessageStreamID) Stream
GetStreamOrFallback returns the stream for the given id if it is registered. If no stream is registered for the given id the default stream is used. The default stream is equivalent to an unconfigured stream.Broadcast with all wildcard producers allready added.
func (StreamRegistry) IsStreamRegistered ¶
func (registry StreamRegistry) IsStreamRegistered(id MessageStreamID) bool
IsStreamRegistered returns true if the stream for the given id is registered.
func (*StreamRegistry) Register ¶
func (registry *StreamRegistry) Register(stream Stream, streamID MessageStreamID)
Register registeres a stream plugin to a given stream id
func (*StreamRegistry) RegisterWildcardProducer ¶
func (registry *StreamRegistry) RegisterWildcardProducer(producers ...Producer)
RegisterWildcardProducer adds a new producer to the list of known wildcard prodcuers. This list has to be added to new streams upon creation to send messages to producers listening to *. Duplicates will be filtered. This state of this list is undefined during the configuration phase.