Documentation ¶
Overview ¶
Package core is a generated protocol buffer package.
It is generated from these files:
message.proto
It has these top-level messages:
SerializedMessageData SerializedMessage
Index ¶
- Constants
- Variables
- func ActivateMessageTrace()
- func CountConsumers()
- func CountFallbackRouters()
- func CountMessageDiscarded()
- func CountMessageRouted()
- func CountMessagesEnqueued()
- func CountProducers()
- func CountRouters()
- func DeactivateMessageTrace()
- func DiscardMessage(msg *Message, pluginID string, comment string)
- func GetVersionNumber() int64
- func GetVersionString() string
- func Route(msg *Message, router Router) error
- func RouteOriginal(msg *Message, router Router) error
- type AssemblyFunc
- type AsyncMessageSource
- type BatchedProducer
- type BufferedProducer
- func (prod *BufferedProducer) CloseMessageChannel(handleMessage func(*Message)) (empty bool)
- func (prod *BufferedProducer) Configure(conf PluginConfigReader)
- func (prod *BufferedProducer) DefaultClose()
- func (prod *BufferedProducer) DefaultDrain()
- func (prod *BufferedProducer) DrainMessageChannel(handleMessage func(*Message), timeout time.Duration) bool
- func (prod *BufferedProducer) Enqueue(msg *Message, timeout time.Duration)
- func (prod *BufferedProducer) GetQueueTimeout() time.Duration
- func (prod *BufferedProducer) MessageControlLoop(onMessage func(*Message))
- func (prod *BufferedProducer) TickerMessageControlLoop(onMessage func(*Message), interval time.Duration, onTimeOut func())
- type Config
- type Configurable
- type Consumer
- type DirectProducer
- func (prod *DirectProducer) Configure(conf PluginConfigReader)
- func (prod *DirectProducer) Enqueue(msg *Message, timeout time.Duration)
- func (prod *DirectProducer) MessageControlLoop(onMessage func(*Message))
- func (prod *DirectProducer) TickerMessageControlLoop(onMessage func(*Message), interval time.Duration, onTimeOut func())
- type Filter
- type FilterArray
- type FilterModulator
- type FilterResult
- type Formatter
- type FormatterArray
- type FormatterModulator
- type GetAppliedContent
- type LogConsumer
- func (cons *LogConsumer) Configure(conf PluginConfigReader)
- func (cons *LogConsumer) Consume(threads *sync.WaitGroup)
- func (cons *LogConsumer) Control() chan<- PluginControl
- func (cons *LogConsumer) Fire(logrusEntry *logrus.Entry) error
- func (cons *LogConsumer) GetID() string
- func (cons *LogConsumer) GetShutdownTimeout() time.Duration
- func (cons *LogConsumer) GetState() PluginState
- func (cons *LogConsumer) IsBlocked() bool
- func (cons *LogConsumer) Levels() []logrus.Level
- func (cons *LogConsumer) Streams() []MessageStreamID
- type Message
- func (msg *Message) Clone() *Message
- func (msg *Message) CloneOriginal() *Message
- func (msg *Message) ExtendPayload(size int) []byte
- func (msg *Message) FreezeOriginal()
- func (msg *Message) GetCreationTime() time.Time
- func (msg *Message) GetMetadata() Metadata
- func (msg *Message) GetOrigRouter() Router
- func (msg *Message) GetOrigStreamID() MessageStreamID
- func (msg *Message) GetPayload() []byte
- func (msg *Message) GetPrevRouter() Router
- func (msg *Message) GetPrevStreamID() MessageStreamID
- func (msg *Message) GetRouter() Router
- func (msg *Message) GetSource() MessageSource
- func (msg *Message) GetStreamID() MessageStreamID
- func (msg *Message) ResizePayload(size int) []byte
- func (msg *Message) Serialize() ([]byte, error)
- func (msg *Message) SetStreamID(streamID MessageStreamID)
- func (msg *Message) SetlStreamIDAsOriginal(streamID MessageStreamID)
- func (msg *Message) StorePayload(data []byte)
- func (msg *Message) String() string
- func (msg *Message) TryGetMetadata() Metadata
- type MessageBatch
- func (batch *MessageBatch) AfterFlushDo(callback func() error) error
- func (batch *MessageBatch) Append(msg *Message) bool
- func (batch *MessageBatch) AppendOrBlock(msg *Message) bool
- func (batch *MessageBatch) AppendOrFlush(msg *Message, flushBuffer func(), canBlock func() bool, ...)
- func (batch *MessageBatch) Close(assemble AssemblyFunc, timeout time.Duration)
- func (batch *MessageBatch) Flush(assemble AssemblyFunc)
- func (batch MessageBatch) IsClosed() bool
- func (batch MessageBatch) IsEmpty() bool
- func (batch *MessageBatch) Len() int
- func (batch MessageBatch) ReachedSizeThreshold(size int) bool
- func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool
- func (batch *MessageBatch) Touch()
- func (batch *MessageBatch) WaitForFlush(timeout time.Duration)
- type MessageData
- type MessageQueue
- func (channel MessageQueue) Close()
- func (channel MessageQueue) GetNumQueued() int
- func (channel MessageQueue) IsEmpty() bool
- func (channel MessageQueue) Pop() (*Message, bool)
- func (channel MessageQueue) PopWithTimeout(maxDuration time.Duration) (*Message, bool)
- func (channel MessageQueue) Push(msg *Message, timeout time.Duration) (state MessageQueueResult)
- type MessageQueueResult
- type MessageSource
- type MessageStreamID
- type Metadata
- func (meta Metadata) Clone() (clone Metadata)
- func (meta Metadata) Delete(key string)
- func (meta Metadata) GetValue(key string) []byte
- func (meta Metadata) GetValueString(key string) string
- func (meta Metadata) SetValue(key string, value []byte)
- func (meta Metadata) TryGetValue(key string) ([]byte, bool)
- func (meta Metadata) TryGetValueString(key string) (string, bool)
- func (meta Metadata) TrySetValue(key string, value []byte) bool
- type ModulateResult
- type ModulateResultError
- type Modulator
- type ModulatorArray
- type Plugin
- type PluginConfig
- type PluginConfigReader
- func (reader *PluginConfigReader) Configure(item interface{}) error
- func (reader *PluginConfigReader) GetArray(key string, defaultValue []interface{}) []interface{}
- func (reader *PluginConfigReader) GetBool(key string, defaultValue bool) bool
- func (reader *PluginConfigReader) GetFilterArray(key string, logger logrus.FieldLogger, defaultValue FilterArray) FilterArray
- func (reader *PluginConfigReader) GetFormatterArray(key string, logger logrus.FieldLogger, defaultValue FormatterArray) FormatterArray
- func (reader *PluginConfigReader) GetID() string
- func (reader *PluginConfigReader) GetInt(key string, defaultValue int64) int64
- func (reader *PluginConfigReader) GetLogger() logrus.FieldLogger
- func (reader *PluginConfigReader) GetMap(key string, defaultValue tcontainer.MarshalMap) tcontainer.MarshalMap
- func (reader *PluginConfigReader) GetModulatorArray(key string, logger logrus.FieldLogger, defaultValue ModulatorArray) ModulatorArray
- func (reader *PluginConfigReader) GetPlugin(key string, defaultType string, defaultConfig tcontainer.MarshalMap) Plugin
- func (reader *PluginConfigReader) GetPluginArray(key string, defaultValue []Plugin) []Plugin
- func (reader *PluginConfigReader) GetStreamArray(key string, defaultValue []MessageStreamID) []MessageStreamID
- func (reader *PluginConfigReader) GetStreamID(key string, defaultValue MessageStreamID) MessageStreamID
- func (reader *PluginConfigReader) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string
- func (reader *PluginConfigReader) GetStreamRoutes(key string, defaultValue map[MessageStreamID][]MessageStreamID) map[MessageStreamID][]MessageStreamID
- func (reader *PluginConfigReader) GetString(key string, defaultValue string) string
- func (reader *PluginConfigReader) GetStringArray(key string, defaultValue []string) []string
- func (reader *PluginConfigReader) GetStringMap(key string, defaultValue map[string]string) map[string]string
- func (reader *PluginConfigReader) GetSubLogger(subScope string) logrus.FieldLogger
- func (reader *PluginConfigReader) GetTypename() string
- func (reader *PluginConfigReader) GetURL(key string, defaultValue string) *url.URL
- func (reader *PluginConfigReader) GetUint(key string, defaultValue uint64) uint64
- func (reader *PluginConfigReader) GetValue(key string, defaultValue interface{}) interface{}
- func (reader *PluginConfigReader) HasValue(key string) bool
- type PluginConfigReaderWithError
- func (reader PluginConfigReaderWithError) GetArray(key string, defaultValue []interface{}) ([]interface{}, error)
- func (reader PluginConfigReaderWithError) GetBool(key string, defaultValue bool) (bool, error)
- func (reader PluginConfigReaderWithError) GetFilterArray(key string, logger logrus.FieldLogger, defaultValue FilterArray) (FilterArray, error)
- func (reader PluginConfigReaderWithError) GetFloat(key string, defaultValue float64) (float64, error)
- func (reader PluginConfigReaderWithError) GetFormatterArray(key string, logger logrus.FieldLogger, defaultValue FormatterArray) (FormatterArray, error)
- func (reader PluginConfigReaderWithError) GetID() string
- func (reader PluginConfigReaderWithError) GetInt(key string, defaultValue int64) (int64, error)
- func (reader PluginConfigReaderWithError) GetLogger() logrus.FieldLogger
- func (reader PluginConfigReaderWithError) GetMap(key string, defaultValue tcontainer.MarshalMap) (tcontainer.MarshalMap, error)
- func (reader PluginConfigReaderWithError) GetModulatorArray(key string, logger logrus.FieldLogger, defaultValue ModulatorArray) (ModulatorArray, error)
- func (reader PluginConfigReaderWithError) GetPlugin(key string, defaultType string, defaultConfig tcontainer.MarshalMap) (Plugin, error)
- func (reader PluginConfigReaderWithError) GetPluginArray(key string, defaultValue []Plugin) ([]Plugin, error)
- func (reader PluginConfigReaderWithError) GetStreamArray(key string, defaultValue []MessageStreamID) ([]MessageStreamID, error)
- func (reader PluginConfigReaderWithError) GetStreamID(key string, defaultValue MessageStreamID) (MessageStreamID, error)
- func (reader PluginConfigReaderWithError) GetStreamMap(key string, defaultValue string) (map[MessageStreamID]string, error)
- func (reader PluginConfigReaderWithError) GetStreamRoutes(key string, defaultValue map[MessageStreamID][]MessageStreamID) (map[MessageStreamID][]MessageStreamID, error)
- func (reader PluginConfigReaderWithError) GetString(key string, defaultValue string) (string, error)
- func (reader PluginConfigReaderWithError) GetStringArray(key string, defaultValue []string) ([]string, error)
- func (reader PluginConfigReaderWithError) GetStringMap(key string, defaultValue map[string]string) (map[string]string, error)
- func (reader PluginConfigReaderWithError) GetSubLogger(subScope string) logrus.FieldLogger
- func (reader PluginConfigReaderWithError) GetTypename() string
- func (reader PluginConfigReaderWithError) GetURL(key string, defaultValue string) (*url.URL, error)
- func (reader PluginConfigReaderWithError) GetUint(key string, defaultValue uint64) (uint64, error)
- func (reader PluginConfigReaderWithError) GetValue(key string, defaultValue interface{}) interface{}
- func (reader PluginConfigReaderWithError) HasValue(key string) bool
- type PluginControl
- type PluginMetric
- type PluginRunState
- func (state *PluginRunState) AddWorker()
- func (state *PluginRunState) GetState() PluginState
- func (state *PluginRunState) GetStateString() string
- func (state *PluginRunState) SetState(nextState PluginState)
- func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup)
- func (state *PluginRunState) WorkerDone()
- type PluginState
- type PluginStructTag
- func (tag PluginStructTag) GetBool() bool
- func (tag PluginStructTag) GetByteArray() []byte
- func (tag PluginStructTag) GetInt() int64
- func (tag PluginStructTag) GetMetricScale() int64
- func (tag PluginStructTag) GetStream() MessageStreamID
- func (tag PluginStructTag) GetStreamArray() []MessageStreamID
- func (tag PluginStructTag) GetString() string
- func (tag PluginStructTag) GetStringArray() []string
- func (tag PluginStructTag) GetUint() uint64
- type PluginWithState
- type Producer
- type Router
- type ScopedLogger
- type ScopedModulator
- type SerialMessageSource
- type SerializedMessage
- func (*SerializedMessage) Descriptor() ([]byte, []int)
- func (m *SerializedMessage) GetData() *SerializedMessageData
- func (m *SerializedMessage) GetOrigStreamID() uint64
- func (m *SerializedMessage) GetOriginal() *SerializedMessageData
- func (m *SerializedMessage) GetPrevStreamID() uint64
- func (m *SerializedMessage) GetStreamID() uint64
- func (m *SerializedMessage) GetTimestamp() int64
- func (*SerializedMessage) ProtoMessage()
- func (m *SerializedMessage) Reset()
- func (m *SerializedMessage) String() string
- type SerializedMessageData
- func (*SerializedMessageData) Descriptor() ([]byte, []int)
- func (m *SerializedMessageData) GetData() []byte
- func (m *SerializedMessageData) GetMetadata() map[string][]byte
- func (*SerializedMessageData) ProtoMessage()
- func (m *SerializedMessageData) Reset()
- func (m *SerializedMessageData) String() string
- type SetAppliedContent
- type SimpleConsumer
- func (cons *SimpleConsumer) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)
- func (cons *SimpleConsumer) AddMainWorker(workers *sync.WaitGroup)
- func (cons *SimpleConsumer) AddWorker()
- func (cons *SimpleConsumer) Configure(conf PluginConfigReader)
- func (cons *SimpleConsumer) Control() chan<- PluginControl
- func (cons *SimpleConsumer) ControlLoop()
- func (cons *SimpleConsumer) Enqueue(data []byte)
- func (cons *SimpleConsumer) EnqueueWithMetadata(data []byte, metaData Metadata)
- func (cons *SimpleConsumer) GetID() string
- func (cons *SimpleConsumer) GetLogger() logrus.FieldLogger
- func (cons *SimpleConsumer) GetShutdownTimeout() time.Duration
- func (cons *SimpleConsumer) GetState() PluginState
- func (cons *SimpleConsumer) IsActive() bool
- func (cons *SimpleConsumer) IsActiveOrStopping() bool
- func (cons *SimpleConsumer) IsBlocked() bool
- func (cons *SimpleConsumer) IsStopping() bool
- func (cons *SimpleConsumer) SetPrepareStopCallback(onPrepareStop func())
- func (cons *SimpleConsumer) SetRollCallback(onRoll func())
- func (cons *SimpleConsumer) SetStopCallback(onStop func())
- func (cons *SimpleConsumer) SetWorkerWaitGroup(workers *sync.WaitGroup)
- func (cons *SimpleConsumer) TickerControlLoop(interval time.Duration, onTick func())
- func (cons *SimpleConsumer) WorkerDone()
- type SimpleFilter
- type SimpleFormatter
- type SimpleProducer
- func (prod *SimpleProducer) AddHealthCheck(callback thealthcheck.CallbackFunc)
- func (prod *SimpleProducer) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)
- func (prod *SimpleProducer) AddMainWorker(workers *sync.WaitGroup)
- func (prod *SimpleProducer) AddWorker()
- func (prod *SimpleProducer) Configure(conf PluginConfigReader)
- func (prod *SimpleProducer) Control() chan<- PluginControl
- func (prod *SimpleProducer) ControlLoop()
- func (prod *SimpleProducer) GetID() string
- func (prod *SimpleProducer) GetLogger() logrus.FieldLogger
- func (prod *SimpleProducer) GetShutdownTimeout() time.Duration
- func (prod *SimpleProducer) GetState() PluginState
- func (prod *SimpleProducer) HasContinueAfterModulate(msg *Message) bool
- func (prod *SimpleProducer) IsActive() bool
- func (prod *SimpleProducer) IsActiveOrStopping() bool
- func (prod *SimpleProducer) IsBlocked() bool
- func (prod *SimpleProducer) IsStopping() bool
- func (prod *SimpleProducer) Modulate(msg *Message) ModulateResult
- func (prod *SimpleProducer) SetPrepareStopCallback(onPrepareStop func())
- func (prod *SimpleProducer) SetRollCallback(onRoll func())
- func (prod *SimpleProducer) SetStopCallback(onStop func())
- func (prod *SimpleProducer) SetWorkerWaitGroup(workers *sync.WaitGroup)
- func (prod *SimpleProducer) Streams() []MessageStreamID
- func (prod *SimpleProducer) TickerControlLoop(interval time.Duration, onTimeOut func())
- func (prod *SimpleProducer) TryFallback(msg *Message)
- func (prod *SimpleProducer) WorkerDone()
- type SimpleRouter
- func (router *SimpleRouter) AddHealthCheck(callback thealthcheck.CallbackFunc)
- func (router *SimpleRouter) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)
- func (router *SimpleRouter) AddProducer(producers ...Producer)
- func (router *SimpleRouter) Configure(conf PluginConfigReader)
- func (router *SimpleRouter) GetID() string
- func (router *SimpleRouter) GetLogger() logrus.FieldLogger
- func (router *SimpleRouter) GetProducers() []Producer
- func (router *SimpleRouter) GetStreamID() MessageStreamID
- func (router *SimpleRouter) GetTimeout() time.Duration
- func (router *SimpleRouter) Modulate(msg *Message) ModulateResult
- type StreamMetric
- type WriterAssembly
- func (asm *WriterAssembly) Flush(messages []*Message)
- func (asm *WriterAssembly) SetErrorHandler(handleError func(error) bool)
- func (asm *WriterAssembly) SetFlush(flush func(*Message))
- func (asm *WriterAssembly) SetValidator(validate func() bool)
- func (asm *WriterAssembly) SetWriter(writer io.Writer)
- func (asm *WriterAssembly) Write(messages []*Message)
Constants ¶
const ( // MessageQueueOk is returned if the message could be delivered MessageQueueOk = MessageQueueResult(iota) // MessageQueueTimeout is returned if a message timed out MessageQueueTimeout = MessageQueueResult(iota) // MessageQueueDiscard is returned if a message should be discarded MessageQueueDiscard = MessageQueueResult(iota) )
const ( MetricActiveWorkers = "Plugins:ActiveWorkers" MetricPluginsInit = "Plugins:State:Initializing" MetricPluginsActive = "Plugins:State:Active" MetricPluginsWaiting = "Plugins:State:Waiting" MetricPluginsPrepareStop = "Plugins:State:PrepareStop" MetricPluginsStopping = "Plugins:State:Stopping" MetricPluginsDead = "Plugins:State:Dead" )
MetricActiveWorkers metric string MetricPluginsInit metric string MetricPluginsActive metric string MetricPluginsWaiting metric string MetricPluginsPrepareStop metric string MetricPluginsStopping metric string MetricPluginsDead metric string
const ( // ModulateResultContinue indicates that a message can be passed along. ModulateResultContinue = ModulateResult(iota) // ModulateResultFallback indicates that a fallback path should be used and // that no further modulator should be called. ModulateResultFallback = ModulateResult(iota) // ModulateResultDiscard indicates that a message should be discarded and // that no further modulators should be called. ModulateResultDiscard = ModulateResult(iota) )
const ( // PluginControlStopProducer will cause any producer to halt and shutdown. PluginControlStopProducer = PluginControl(iota) // PluginControlStopConsumer will cause any consumer to halt and shutdown. PluginControlStopConsumer = PluginControl(iota) // PluginControlRoll notifies the consumer/producer about a reconnect or reopen request PluginControlRoll = PluginControl(iota) )
const ( // PluginStateInitializing is set when a plugin is not yet configured PluginStateInitializing = PluginState(iota) // PluginStateWaiting is set when a plugin is active but currently unable to process data PluginStateWaiting = PluginState(iota) // PluginStateActive is set when a plugin is ready to process data PluginStateActive = PluginState(iota) // PluginStatePrepareStop is set when a plugin should prepare for shutdown PluginStatePrepareStop = PluginState(iota) // PluginStateStopping is set when a plugin is about to stop PluginStateStopping = PluginState(iota) // PluginStateDead is set when a plugin is unable to process any data PluginStateDead = PluginState(iota) )
const ( // PluginStructTagDefault contains the tag name for default values PluginStructTagDefault = "default" // PluginStructTagMetric contains the tag name for metric values PluginStructTagMetric = "metric" )
const ( // InvalidStream is used for invalid stream handling and maps to "" InvalidStream = "" // LogInternalStream is the name of the internal message channel (logs) LogInternalStream = "_GOLLUM_" // TraceInternalStream is the name of the internal trace channel (-tm flag) TraceInternalStream = "_TRACE_" // WildcardStream is the name of the "all routers" channel WildcardStream = "*" )
const ( // FilterResultMessageAccept indicates that a message can be passed along and continue FilterResultMessageAccept = FilterResult(0) )
const (
MetricMessagesRoutedAvg = "Messages:Routed:AvgPerSec"
)
MetricMessagesRoutedAvg is used as a key for storing message throughput
Variables ¶
var ( // InvalidStreamID denotes an invalid stream for function returing stream IDs InvalidStreamID = GetStreamID(InvalidStream) // LogInternalStreamID is the ID of the "_GOLLUM_" stream LogInternalStreamID = GetStreamID(LogInternalStream) // WildcardStreamID is the ID of the "*" stream WildcardStreamID = GetStreamID(WildcardStream) // TraceInternalStreamID is the ID of the "_TRACE_" stream TraceInternalStreamID = GetStreamID(TraceInternalStream) )
var (
// GeneratedRouterPrefix is prepended to all generated routers
GeneratedRouterPrefix = "_GENERATED_"
)
MessageTrace provide the MessageTrace() function. By default this function do nothing.
var PluginRegistry = pluginRegistry{ // contains filtered or unexported fields }
PluginRegistry holds all plugins by their name
var StreamRegistry = streamRegistry{ // contains filtered or unexported fields }
StreamRegistry is the global instance of streamRegistry used to store the all registered routers.
var TypeRegistry = treflect.NewTypeRegistry()
TypeRegistry is the global typeRegistry instance. Use this instance to register plugins.
Functions ¶
func ActivateMessageTrace ¶ added in v0.5.0
func ActivateMessageTrace()
ActivateMessageTrace set a MessageTrace function to dump out the message trace
func CountConsumers ¶ added in v0.5.0
func CountConsumers()
CountConsumers increases the consumer counter by 1
func CountFallbackRouters ¶ added in v0.5.0
func CountFallbackRouters()
CountFallbackRouters increases the fallback stream counter by 1
func CountMessageDiscarded ¶ added in v0.5.0
func CountMessageDiscarded()
CountMessageDiscarded increases the discarded messages counter by 1
func CountMessageRouted ¶ added in v0.5.0
func CountMessageRouted()
CountMessageRouted increases the messages counter by 1
func CountMessagesEnqueued ¶ added in v0.5.0
func CountMessagesEnqueued()
CountMessagesEnqueued increases the enqueued messages counter by 1
func CountProducers ¶ added in v0.5.0
func CountProducers()
CountProducers increases the producer counter by 1
func CountRouters ¶ added in v0.5.0
func CountRouters()
CountRouters increases the stream counter by 1
func DeactivateMessageTrace ¶ added in v0.5.0
func DeactivateMessageTrace()
DeactivateMessageTrace set a MessageTrace function to default This method is necessary for unit testing
func DiscardMessage ¶ added in v0.5.0
DiscardMessage increases the discard statistic and discards the given message.
func GetVersionNumber ¶ added in v0.5.0
func GetVersionNumber() int64
GetVersionNumber return a symantic based version number
func GetVersionString ¶ added in v0.5.0
func GetVersionString() string
GetVersionString return a symantic version string
func Route ¶ added in v0.5.0
Route tries to enqueue a message to the given stream. This function also handles redirections enforced by formatters.
Types ¶
type AssemblyFunc ¶ added in v0.4.0
type AssemblyFunc func([]*Message)
AssemblyFunc is the function signature for callbacks passed to the Flush method.
type AsyncMessageSource ¶
type AsyncMessageSource interface { MessageSource // EnqueueResponse sends a message to the source of another message. EnqueueResponse(msg *Message) }
AsyncMessageSource extends the MessageSource interface to allow a backchannel that simply forwards any message coming from the producer.
type BatchedProducer ¶ added in v0.5.0
type BatchedProducer struct { DirectProducer `gollumdoc:"embed_type"` Batch MessageBatch // contains filtered or unexported fields }
BatchedProducer producer
This type defines a common base type that can be used by producers that work better when sending batches of messages instead of single ones. Batched producers will collect messages and flush them after certain limits have been reached. The flush process will be done in the background. Message collection continues non-blocking unless flushing takes longer than filling up the internal buffer again.
Parameters ¶
- Batch/MaxCount: Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.
- Batch/FlushCount: Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.
- Batch/TimeoutSec: Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
func (*BatchedProducer) BatchMessageLoop ¶ added in v0.5.0
func (prod *BatchedProducer) BatchMessageLoop(workers *sync.WaitGroup, onBatchFlush func() AssemblyFunc)
BatchMessageLoop start the TickerMessageControlLoop() for batch producer
func (*BatchedProducer) Configure ¶ added in v0.5.0
func (prod *BatchedProducer) Configure(conf PluginConfigReader)
Configure initializes the standard producer config values.
func (*BatchedProducer) DefaultClose ¶ added in v0.5.0
func (prod *BatchedProducer) DefaultClose()
DefaultClose defines the default closing process
func (*BatchedProducer) Enqueue ¶ added in v0.5.0
func (prod *BatchedProducer) Enqueue(msg *Message, timeout time.Duration)
Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.
type BufferedProducer ¶ added in v0.5.0
type BufferedProducer struct { DirectProducer `gollumdoc:"embed_type"` // contains filtered or unexported fields }
BufferedProducer plugin base type
This type defines a common BufferedProducer base class. Producers may derive from this class.
Parameters ¶
- Channel: This value defines the capacity of the message buffer. By default this parameter is set to "8192".
- ChannelTimeoutMs: This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to "0".
func (*BufferedProducer) CloseMessageChannel ¶ added in v0.5.0
func (prod *BufferedProducer) CloseMessageChannel(handleMessage func(*Message)) (empty bool)
CloseMessageChannel first calls DrainMessageChannel with shutdown timeout, closes the channel afterwards and calls DrainMessageChannel again to make sure all messages are actually gone. The return value indicates wether the channel is empty or not.
func (*BufferedProducer) Configure ¶ added in v0.5.0
func (prod *BufferedProducer) Configure(conf PluginConfigReader)
Configure initializes the standard producer config values.
func (*BufferedProducer) DefaultClose ¶ added in v0.5.0
func (prod *BufferedProducer) DefaultClose()
DefaultClose is the function registered to onStop by default. It calls CloseMessageChannel with the message handling function passed to Any of the control functions. If no such call happens, this function does nothing.
func (*BufferedProducer) DefaultDrain ¶ added in v0.5.0
func (prod *BufferedProducer) DefaultDrain()
DefaultDrain is the function registered to onPrepareStop by default. It calls DrainMessageChannel with the message handling function passed to Any of the control functions. If no such call happens, this function does nothing.
func (*BufferedProducer) DrainMessageChannel ¶ added in v0.5.0
func (prod *BufferedProducer) DrainMessageChannel(handleMessage func(*Message), timeout time.Duration) bool
DrainMessageChannel empties the message channel. This functions returns after the queue being empty for a given amount of time or when the queue has been closed and no more messages are available. The return value indicates wether the channel is empty or not.
func (*BufferedProducer) Enqueue ¶ added in v0.5.0
func (prod *BufferedProducer) Enqueue(msg *Message, timeout time.Duration)
Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.
func (*BufferedProducer) GetQueueTimeout ¶ added in v0.5.0
func (prod *BufferedProducer) GetQueueTimeout() time.Duration
GetQueueTimeout returns the duration this producer will block before a message is sent to the fallback. A value of -1 will cause the message to drop. A value of 0 will cause the producer to always block.
func (*BufferedProducer) MessageControlLoop ¶ added in v0.5.0
func (prod *BufferedProducer) MessageControlLoop(onMessage func(*Message))
MessageControlLoop provides a producer main loop that is sufficient for most use cases. ControlLoop will be called in a separate go routine. This function will block until a stop signal is received.
func (*BufferedProducer) TickerMessageControlLoop ¶ added in v0.5.0
func (prod *BufferedProducer) TickerMessageControlLoop(onMessage func(*Message), interval time.Duration, onTimeOut func())
TickerMessageControlLoop is like MessageLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.
type Config ¶
type Config struct { Values map[string]tcontainer.MarshalMap Plugins []PluginConfig }
Config represents the top level config containing all plugin clonfigs
func ReadConfig ¶
ReadConfig creates a config from a yaml byte stream.
func ReadConfigFromFile ¶ added in v0.5.0
ReadConfigFromFile parses a YAML config file into a new Config struct.
func (*Config) GetConsumers ¶ added in v0.5.0
func (conf *Config) GetConsumers() []PluginConfig
GetConsumers returns all consumer plugins from the config
func (*Config) GetProducers ¶ added in v0.5.0
func (conf *Config) GetProducers() []PluginConfig
GetProducers returns all producer plugins from the config
func (*Config) GetRouters ¶ added in v0.5.0
func (conf *Config) GetRouters() []PluginConfig
GetRouters returns all stream plugins from the config
type Configurable ¶ added in v0.5.0
type Configurable interface { // Configure is called during NewPluginWithType Configure(PluginConfigReader) }
Configurable defines an interface for structs that can be configured using a PluginConfigReader.
type Consumer ¶
type Consumer interface { PluginWithState MessageSource // Consume should implement to main loop that fetches messages from a given // source and pushes it to the Message channel. Consume(*sync.WaitGroup) // Control returns write access to this consumer's control channel. // See PluginControl* constants. Control() chan<- PluginControl // GetShutdownTimeout returns the duration gollum will wait for this consumer // before canceling the shutdown process. GetShutdownTimeout() time.Duration }
Consumer is an interface for plugins that receive data from outside sources and generate Message objects from this data.
type DirectProducer ¶ added in v0.5.0
type DirectProducer struct { SimpleProducer `gollumdoc:"embed_type"` // contains filtered or unexported fields }
DirectProducer plugin base type
This type defines a common baseclass for producers.
func (*DirectProducer) Configure ¶ added in v0.5.0
func (prod *DirectProducer) Configure(conf PluginConfigReader)
Configure initializes the standard producer config values.
func (*DirectProducer) Enqueue ¶ added in v0.5.0
func (prod *DirectProducer) Enqueue(msg *Message, timeout time.Duration)
Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.
func (*DirectProducer) MessageControlLoop ¶ added in v0.5.0
func (prod *DirectProducer) MessageControlLoop(onMessage func(*Message))
MessageControlLoop provides a producer main loop that is sufficient for most use cases. ControlLoop will be called in a separate go routine. This function will block until a stop signal is received.
func (*DirectProducer) TickerMessageControlLoop ¶ added in v0.5.0
func (prod *DirectProducer) TickerMessageControlLoop(onMessage func(*Message), interval time.Duration, onTimeOut func())
TickerMessageControlLoop is like MessageLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.
type Filter ¶
type Filter interface {
ApplyFilter(msg *Message) (FilterResult, error)
}
A Filter defines an analysis step inside the message A filter also have to implement the modulator interface
type FilterArray ¶ added in v0.5.0
type FilterArray []Filter
FilterArray is a type wrapper to []Filter to make array of filters
func (FilterArray) ApplyFilter ¶ added in v0.5.0
func (filters FilterArray) ApplyFilter(msg *Message) (FilterResult, error)
ApplyFilter calls ApplyFilter on every filter Return FilterResultMessageReject in case of an error or if one filter rejects
type FilterModulator ¶ added in v0.5.0
type FilterModulator struct {
Filter Filter
}
FilterModulator is a wrapper to provide a Filter as a Modulator
func NewFilterModulator ¶ added in v0.5.0
func NewFilterModulator(filter Filter) *FilterModulator
NewFilterModulator return a instance of FilterModulator
func (*FilterModulator) ApplyFilter ¶ added in v0.5.0
func (filterModulator *FilterModulator) ApplyFilter(msg *Message) (FilterResult, error)
ApplyFilter calls the Filter.ApplyFilter method
func (*FilterModulator) Modulate ¶ added in v0.5.0
func (filterModulator *FilterModulator) Modulate(msg *Message) ModulateResult
Modulate implementation for Filters
type FilterResult ¶ added in v0.5.0
type FilterResult uint64
FilterResult defines a set of results for filtering
func FilterResultMessageReject ¶ added in v0.5.0
func FilterResultMessageReject(stream MessageStreamID) FilterResult
FilterResultMessageReject indicates that a message is filtered and not pared along the stream
func (FilterResult) GetStreamID ¶ added in v0.5.0
func (f FilterResult) GetStreamID() MessageStreamID
GetStreamID converts a FilterResult to a stream id
type Formatter ¶
A Formatter defines a modification step inside the message A Formatter also have to implement the modulator interface
type FormatterArray ¶ added in v0.5.0
type FormatterArray []Formatter
FormatterArray is a type wrapper to []Formatter to make array of formatter
func (FormatterArray) ApplyFormatter ¶ added in v0.5.0
func (formatters FormatterArray) ApplyFormatter(msg *Message) error
ApplyFormatter calls ApplyFormatter on every formatter
func (FormatterArray) CanBeApplied ¶ added in v0.5.0
func (formatters FormatterArray) CanBeApplied(msg *Message) bool
CanBeApplied returns true if the array is not empty
type FormatterModulator ¶ added in v0.5.0
type FormatterModulator struct {
Formatter Formatter
}
FormatterModulator is a wrapper to provide a Formatter as a Modulator
func NewFormatterModulator ¶ added in v0.5.0
func NewFormatterModulator(formatter Formatter) *FormatterModulator
NewFormatterModulator return a instance of FormatterModulator
func (*FormatterModulator) ApplyFormatter ¶ added in v0.5.0
func (formatterModulator *FormatterModulator) ApplyFormatter(msg *Message) error
ApplyFormatter calls the Formatter.ApplyFormatter method
func (*FormatterModulator) CanBeApplied ¶ added in v0.5.0
func (formatterModulator *FormatterModulator) CanBeApplied(msg *Message) bool
CanBeApplied returns true if the array is not empty
func (*FormatterModulator) Modulate ¶ added in v0.5.0
func (formatterModulator *FormatterModulator) Modulate(msg *Message) ModulateResult
Modulate implementation for Formatter
type GetAppliedContent ¶ added in v0.5.0
GetAppliedContent is a func() to get message content from payload or meta data for later handling by plugins
func GetAppliedContentGetFunction ¶ added in v0.5.0
func GetAppliedContentGetFunction(applyTo string) GetAppliedContent
GetAppliedContentGetFunction returns a GetAppliedContent function
type LogConsumer ¶
type LogConsumer struct { Consumer // contains filtered or unexported fields }
LogConsumer is an internal consumer plugin used indirectly by the gollum log package.
func (*LogConsumer) Configure ¶
func (cons *LogConsumer) Configure(conf PluginConfigReader)
Configure initializes this consumer with values from a plugin config.
func (*LogConsumer) Consume ¶
func (cons *LogConsumer) Consume(threads *sync.WaitGroup)
Consume starts listening for control statements
func (*LogConsumer) Control ¶
func (cons *LogConsumer) Control() chan<- PluginControl
Control returns a handle to the control channel
func (*LogConsumer) Fire ¶ added in v0.5.0
func (cons *LogConsumer) Fire(logrusEntry *logrus.Entry) error
Fire and Levels() implement the logrus.Hook interface
func (*LogConsumer) GetID ¶ added in v0.5.0
func (cons *LogConsumer) GetID() string
GetID returns the pluginID of the message source
func (*LogConsumer) GetShutdownTimeout ¶ added in v0.5.0
func (cons *LogConsumer) GetShutdownTimeout() time.Duration
GetShutdownTimeout always returns 1 millisecond
func (*LogConsumer) GetState ¶ added in v0.4.0
func (cons *LogConsumer) GetState() PluginState
GetState always returns PluginStateActive
func (*LogConsumer) IsBlocked ¶ added in v0.5.0
func (cons *LogConsumer) IsBlocked() bool
IsBlocked always returns false
func (*LogConsumer) Levels ¶ added in v0.5.0
func (cons *LogConsumer) Levels() []logrus.Level
Levels and Fire() implement the logrus.Hook interface
func (*LogConsumer) Streams ¶
func (cons *LogConsumer) Streams() []MessageStreamID
Streams always returns an array with one member - the internal log stream
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is a container used for storing the internal state of messages. This struct is passed between consumers and producers.
func DeserializeMessage ¶ added in v0.4.0
DeserializeMessage generates a message from a byte array produced by Message.Serialize. Please note that the payload is restored but the original data is not. As of this FreezeOriginal can be called again after this call.
func NewMessage ¶
func NewMessage(source MessageSource, data []byte, metadata Metadata, streamID MessageStreamID) *Message
NewMessage creates a new message from a given data stream by copying data.
func (*Message) Clone ¶ added in v0.5.0
Clone returns a copy of this message, i.e. the payload is duplicated. The created timestamp is copied, too.
func (*Message) CloneOriginal ¶ added in v0.5.0
CloneOriginal returns a copy of this message with the original payload and stream. If FreezeOriginal has not been called before it will be at this point so that all subsequential calls will use the same original.
func (*Message) ExtendPayload ¶ added in v0.5.0
ExtendPayload changes the size of the stored buffer. The current content will be preserved. If content does not need to be preserved use Resize.
func (*Message) FreezeOriginal ¶ added in v0.5.0
func (msg *Message) FreezeOriginal()
FreezeOriginal will take the current state of the message and store it as the "original" message. This function can only be called once for each message. Please note that this function only affects payload and metadata and can only be called once. Additional calls will have no effect. The original stream ID can be changed at any time by using SetOrigStreamID.
func (*Message) GetCreationTime ¶ added in v0.5.0
GetCreationTime returns the time when this message was created.
func (*Message) GetMetadata ¶ added in v0.5.0
GetMetadata returns the current Metadata. If no metadata is present, the metadata map will be created by this call.
func (*Message) GetOrigRouter ¶ added in v0.5.0
GetOrigRouter returns the stream object behind the original StreamID.
func (*Message) GetOrigStreamID ¶ added in v0.5.0
func (msg *Message) GetOrigStreamID() MessageStreamID
GetOrigStreamID returns the original/first streamID
func (*Message) GetPayload ¶ added in v0.5.0
GetPayload returns the stored data
func (*Message) GetPrevRouter ¶ added in v0.5.0
GetPrevRouter returns the stream object behind the previous StreamID.
func (*Message) GetPrevStreamID ¶ added in v0.5.0
func (msg *Message) GetPrevStreamID() MessageStreamID
GetPrevStreamID returns the last "hop" of this message.
func (*Message) GetRouter ¶ added in v0.5.0
GetRouter returns the stream object behind the current StreamID.
func (*Message) GetSource ¶ added in v0.5.0
func (msg *Message) GetSource() MessageSource
GetSource returns the message's source (can be nil).
func (*Message) GetStreamID ¶ added in v0.5.0
func (msg *Message) GetStreamID() MessageStreamID
GetStreamID returns the stream this message is currently routed to.
func (*Message) ResizePayload ¶ added in v0.5.0
ResizePayload changes the size of the stored buffer. The current content is not guaranteed to be preserved. If content needs to be preserved use Extend.
func (*Message) Serialize ¶ added in v0.4.0
Serialize generates a new payload containing all data that can be preserved over shutdown (i.e. no data directly referencing runtime components). The serialized data is based on the current message state and does not preserve the original data created by FreezeOriginal.
func (*Message) SetStreamID ¶ added in v0.5.0
func (msg *Message) SetStreamID(streamID MessageStreamID)
SetStreamID sets a new stream and stores the current one in the previous stream field. This method does not affect the original stream ID.
func (*Message) SetlStreamIDAsOriginal ¶ added in v0.5.0
func (msg *Message) SetlStreamIDAsOriginal(streamID MessageStreamID)
SetlStreamIDAsOriginal acts like SetStreamID but always sets the original stream ID, too. This method should be used before a message is routed for the first time (e.g. in a consumer) or when the original stream should change.
func (*Message) StorePayload ¶ added in v0.5.0
StorePayload copies data into the hold data buffer. If the buffer can hold data it is resized, otherwise a new buffer will be allocated.
func (*Message) String ¶
String implements the stringer interface
type MessageBatch ¶
type MessageBatch struct {
// contains filtered or unexported fields
}
MessageBatch is a helper class for producers to format and store messages into a single buffer that is flushed to an io.Writer. You can use the Reached* functions to determine whether a flush should be called, i.e. if a timeout or size threshold has been reached.
func NewMessageBatch ¶
func NewMessageBatch(maxMessageCount int) MessageBatch
NewMessageBatch creates a new MessageBatch with a given size (in bytes) and a given formatter.
func (*MessageBatch) AfterFlushDo ¶ added in v0.4.2
func (batch *MessageBatch) AfterFlushDo(callback func() error) error
AfterFlushDo calls a function after a currently running flush is done. It also blocks any flush during the execution of callback. Returns the error returned by callback
func (*MessageBatch) Append ¶
func (batch *MessageBatch) Append(msg *Message) bool
Append formats a message and appends it to the internal buffer. If the message does not fit into the buffer this function returns false. If the message can never fit into the buffer (too large), true is returned and an error is logged.
func (*MessageBatch) AppendOrBlock ¶ added in v0.4.0
func (batch *MessageBatch) AppendOrBlock(msg *Message) bool
AppendOrBlock works like Append but will block until Append returns true. If the batch was closed during this call, false is returned.
func (*MessageBatch) AppendOrFlush ¶ added in v0.4.0
func (batch *MessageBatch) AppendOrFlush(msg *Message, flushBuffer func(), canBlock func() bool, tryFallback func(*Message))
AppendOrFlush is a common combinatorial pattern of Append and AppendOrBlock. If append fails, flush is called, followed by AppendOrBlock if blocking is allowed. If AppendOrBlock fails (or blocking is not allowed) the message is sent to the fallback.
func (*MessageBatch) Close ¶ added in v0.4.0
func (batch *MessageBatch) Close(assemble AssemblyFunc, timeout time.Duration)
Close disables Append, calls flush and waits for this call to finish. Timeout is passed to WaitForFlush.
func (*MessageBatch) Flush ¶
func (batch *MessageBatch) Flush(assemble AssemblyFunc)
Flush writes the content of the buffer to a given resource and resets the internal state, i.e. the buffer is empty after a call to Flush. Writing will be done in a separate go routine to be non-blocking.
The validate callback will be called after messages have been successfully written to the io.Writer. If validate returns false the buffer will not be resetted (automatic retry). If validate is nil a return value of true is assumed (buffer reset).
The onError callback will be called if the io.Writer returned an error. If onError returns false the buffer will not be resetted (automatic retry). If onError is nil a return value of true is assumed (buffer reset).
func (MessageBatch) IsClosed ¶ added in v0.4.0
func (batch MessageBatch) IsClosed() bool
IsClosed returns true of Close has been called at least once.
func (MessageBatch) IsEmpty ¶
func (batch MessageBatch) IsEmpty() bool
IsEmpty returns true if no data is stored in the front buffer, i.e. if no data is scheduled for flushing.
func (*MessageBatch) Len ¶ added in v0.4.0
func (batch *MessageBatch) Len() int
Len returns the length of one buffer
func (MessageBatch) ReachedSizeThreshold ¶
func (batch MessageBatch) ReachedSizeThreshold(size int) bool
ReachedSizeThreshold returns true if the bytes stored in the buffer are above or equal to the size given. If there is no data this function returns false.
func (MessageBatch) ReachedTimeThreshold ¶
func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool
ReachedTimeThreshold returns true if the last flush was more than timeout ago. If there is no data this function returns false.
func (*MessageBatch) Touch ¶
func (batch *MessageBatch) Touch()
Touch resets the timer queried by ReachedTimeThreshold, i.e. this resets the automatic flush timeout
func (*MessageBatch) WaitForFlush ¶
func (batch *MessageBatch) WaitForFlush(timeout time.Duration)
WaitForFlush blocks until the current flush command returns. Passing a timeout > 0 will unblock this function after the given duration at the latest.
type MessageData ¶ added in v0.5.0
type MessageData struct {
// contains filtered or unexported fields
}
MessageData is a container for the message payload, streamID and an optional message key The struct is used by Message.data for the current message data and orig for the original message data
type MessageQueue ¶ added in v0.5.0
type MessageQueue chan *Message
MessageQueue is the type used for transferring messages between plugins
func NewMessageQueue ¶ added in v0.5.0
func NewMessageQueue(capacity int) MessageQueue
NewMessageQueue creates a new message buffer of the given capacity
func (MessageQueue) Close ¶ added in v0.5.0
func (channel MessageQueue) Close()
Close stops the buffer from being able to receive messages
func (MessageQueue) GetNumQueued ¶ added in v0.5.0
func (channel MessageQueue) GetNumQueued() int
GetNumQueued returns the number of queued messages. Please note that this information can be extremely volatile in multithreaded environments.
func (MessageQueue) IsEmpty ¶ added in v0.5.0
func (channel MessageQueue) IsEmpty() bool
IsEmpty returns true if no element is currently stored in the channel. Please note that this information can be extremely volatile in multithreaded environments.
func (MessageQueue) Pop ¶ added in v0.5.0
func (channel MessageQueue) Pop() (*Message, bool)
Pop returns a message from the buffer
func (MessageQueue) PopWithTimeout ¶ added in v0.5.0
func (channel MessageQueue) PopWithTimeout(maxDuration time.Duration) (*Message, bool)
PopWithTimeout returns a message from the buffer with a runtime <= maxDuration. If the channel is empty or the timout hit, the second return value is false.
func (MessageQueue) Push ¶ added in v0.5.0
func (channel MessageQueue) Push(msg *Message, timeout time.Duration) (state MessageQueueResult)
Push adds a message to the MessageStream. waiting for a timeout instead of just blocking. Passing a timeout of -1 will discard the message. Passing a timout of 0 will always block. Messages that time out will be passed to the sent to the fallback queue if a Dropped consumer exists. The source parameter is used when a message is sent to the fallback, i.e. it is passed to the Drop function.
type MessageQueueResult ¶ added in v0.5.0
type MessageQueueResult int
MessageQueueResult is used as a return value for the Enqueue method
type MessageSource ¶
type MessageSource interface { // IsActive returns true if the source can produce messages IsActive() bool // IsBlocked returns true if the source cannot produce messages IsBlocked() bool // GetID returns the pluginID of the message source GetID() string }
MessageSource defines methods that are common to all message sources. Currently this is only a placeholder.
type MessageStreamID ¶
type MessageStreamID uint64
MessageStreamID is the "compiled name" of a stream
func (MessageStreamID) GetName ¶ added in v0.5.0
func (streamID MessageStreamID) GetName() string
GetName resolves the name of the streamID
type Metadata ¶ added in v0.5.0
Metadata is a map for optional meta data which can set by consumers and modulators
func (Metadata) Clone ¶ added in v0.5.0
Clone creates an exact copy of this metadata map.
func (Metadata) Delete ¶ added in v0.5.0
Delete removes the given key from the map
func (Metadata) GetValue ¶ added in v0.5.0
GetValue returns a meta data value by key. This function returns a value if key is not set, too. In that case it will return an empty byte array.
func (Metadata) GetValueString ¶ added in v0.5.0
GetValueString casts the results of GetValue to a string
func (Metadata) SetValue ¶ added in v0.5.0
SetValue set a key value pair at meta data
func (Metadata) TryGetValue ¶ added in v0.5.0
TryGetValue behaves like GetValue but returns a second value which denotes if the key was set or not.
func (Metadata) TryGetValueString ¶ added in v0.5.0
TryGetValueString casts the data result of TryGetValue to string
type ModulateResult ¶ added in v0.5.0
type ModulateResult int
ModulateResult defines a set of results used to control the message flow induced by Modulator actions.
type ModulateResultError ¶ added in v0.5.0
type ModulateResultError struct {
// contains filtered or unexported fields
}
ModulateResultError is used by modulators to return a problem that happened during the modulation process, caused by the modulator.
func NewModulateResultError ¶ added in v0.5.0
func NewModulateResultError(message string, values ...interface{}) ModulateResultError
NewModulateResultError creates a new ModulateResultError with the given message.
func (ModulateResultError) Error ¶ added in v0.5.0
func (p ModulateResultError) Error() string
Error fullfills the golang error interface
type Modulator ¶ added in v0.5.0
type Modulator interface { // Modulate processes the given message for analysis or modification. // The result of this function defines how and if a message proceeds // along the ModulateResult. Modulate(msg *Message) ModulateResult }
A Modulator defines a modification or analysis step inside the message ModulateResult. It may alter messages or stop the ModulateResult for this message.
type ModulatorArray ¶ added in v0.5.0
type ModulatorArray []Modulator
ModulatorArray is a type wrapper to []Modulator to make array of modulators compatible with the modulator interface
func (ModulatorArray) Modulate ¶ added in v0.5.0
func (modulators ModulatorArray) Modulate(msg *Message) ModulateResult
Modulate calls Modulate on every Modulator in the array and react according to the definition of each ModulateResult state.
type Plugin ¶
type Plugin interface { Configurable }
Plugin is the base class for any runtime class that can be configured and instantiated during runtime.
func NewPluginWithConfig ¶ added in v0.5.0
func NewPluginWithConfig(config PluginConfig) (Plugin, error)
NewPluginWithConfig creates a new plugin from the type information stored in its config. This function internally calls NewPluginWithType.
type PluginConfig ¶
type PluginConfig struct { ID string Typename string Enable bool Settings tcontainer.MarshalMap // contains filtered or unexported fields }
PluginConfig is a configuration for a specific plugin
func NewNestedPluginConfig ¶ added in v0.5.0
func NewNestedPluginConfig(defaultTypename string, values tcontainer.MarshalMap) (PluginConfig, error)
NewNestedPluginConfig creates a pluginconfig based on a given set of config values. The plugin created does not have an id, i.e. it is considered anonymous.
func NewPluginConfig ¶
func NewPluginConfig(pluginID string, defaultTypename string) PluginConfig
NewPluginConfig creates a new plugin config with default values. By default the plugin is enabled, has one instance and has no additional settings. Passing an empty pluginID makes the plugin anonymous. The defaultTypename may be overridden by a later call to read.
func (*PluginConfig) Override ¶
func (conf *PluginConfig) Override(key string, value interface{})
Override sets or override a configuration value for non-predefined options.
func (*PluginConfig) Read ¶
func (conf *PluginConfig) Read(values tcontainer.MarshalMap) error
Read analyzes a given key/value map to extract the configuration values valid for each plugin. All non-default values are written to the Settings member. All keys will be converted to lowercase.
func (PluginConfig) Validate ¶
func (conf PluginConfig) Validate() error
Validate should be called after a configuration has been processed. It will check the keys read from the config files against the keys requested up to this point. Unknown keys will be returned as errors
type PluginConfigReader ¶ added in v0.5.0
type PluginConfigReader struct { WithError PluginConfigReaderWithError Errors *tgo.ErrorStack }
PluginConfigReader provides another convenience wrapper on top of a PluginConfigReaderWithError. All functions of this reader are stripped from returning errors. Errors from calls are collected internally and can be accessed at any given point.
func NewPluginConfigReader ¶ added in v0.5.0
func NewPluginConfigReader(config *PluginConfig) PluginConfigReader
NewPluginConfigReader creates a new reader on top of a given config.
func NewPluginConfigReaderFromReader ¶ added in v0.5.0
func NewPluginConfigReaderFromReader(reader PluginConfigReaderWithError) PluginConfigReader
NewPluginConfigReaderFromReader encapsulates a WithError reader that is already attached to a config to read from.
func (*PluginConfigReader) Configure ¶ added in v0.5.0
func (reader *PluginConfigReader) Configure(item interface{}) error
Configure will configure a given item by scanning for plugin struct tags and calling the Configure method. Nested types will be traversed automatically.
func (*PluginConfigReader) GetArray ¶ added in v0.5.0
func (reader *PluginConfigReader) GetArray(key string, defaultValue []interface{}) []interface{}
GetArray tries to read a untyped array from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetBool ¶ added in v0.5.0
func (reader *PluginConfigReader) GetBool(key string, defaultValue bool) bool
GetBool tries to read a boolean value from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetFilterArray ¶ added in v0.5.0
func (reader *PluginConfigReader) GetFilterArray(key string, logger logrus.FieldLogger, defaultValue FilterArray) FilterArray
GetFilterArray returns an array of filter plugins.
func (*PluginConfigReader) GetFormatterArray ¶ added in v0.5.0
func (reader *PluginConfigReader) GetFormatterArray(key string, logger logrus.FieldLogger, defaultValue FormatterArray) FormatterArray
GetFormatterArray returns an array of formatter plugins.
func (*PluginConfigReader) GetID ¶ added in v0.5.0
func (reader *PluginConfigReader) GetID() string
GetID returns the plugin's id
func (*PluginConfigReader) GetInt ¶ added in v0.5.0
func (reader *PluginConfigReader) GetInt(key string, defaultValue int64) int64
GetInt tries to read a integer value from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetLogger ¶ added in v0.5.0
func (reader *PluginConfigReader) GetLogger() logrus.FieldLogger
GetLogger creates a new scoped logger (logrus.FieldLogger) for the plugin contained in this config.
func (*PluginConfigReader) GetMap ¶ added in v0.5.0
func (reader *PluginConfigReader) GetMap(key string, defaultValue tcontainer.MarshalMap) tcontainer.MarshalMap
GetMap tries to read a MarshalMap from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetModulatorArray ¶ added in v0.5.0
func (reader *PluginConfigReader) GetModulatorArray(key string, logger logrus.FieldLogger, defaultValue ModulatorArray) ModulatorArray
GetModulatorArray reads an array of modulator plugins
func (*PluginConfigReader) GetPlugin ¶ added in v0.5.0
func (reader *PluginConfigReader) GetPlugin(key string, defaultType string, defaultConfig tcontainer.MarshalMap) Plugin
GetPlugin creates a nested plugin from a config map. The default type has to be passed and is overridden if the config specifies a type. The value stored in the config can either be a string or a map. If a map is found it is used to override defaultConfig. If a string is found it is used to override defaultType.
func (*PluginConfigReader) GetPluginArray ¶ added in v0.5.0
func (reader *PluginConfigReader) GetPluginArray(key string, defaultValue []Plugin) []Plugin
GetPluginArray tries to read a array of plugins (type to config) from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetStreamArray ¶ added in v0.5.0
func (reader *PluginConfigReader) GetStreamArray(key string, defaultValue []MessageStreamID) []MessageStreamID
GetStreamArray tries to read a string array from a pluginconfig and translates all values to streamIds. If the key is not found defaultValue is returned.
func (*PluginConfigReader) GetStreamID ¶ added in v0.5.0
func (reader *PluginConfigReader) GetStreamID(key string, defaultValue MessageStreamID) MessageStreamID
GetStreamID tries to read a StreamID value from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetStreamMap ¶ added in v0.5.0
func (reader *PluginConfigReader) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string
GetStreamMap tries to read a stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.
func (*PluginConfigReader) GetStreamRoutes ¶ added in v0.5.0
func (reader *PluginConfigReader) GetStreamRoutes(key string, defaultValue map[MessageStreamID][]MessageStreamID) map[MessageStreamID][]MessageStreamID
GetStreamRoutes tries to read a stream to stream map from a plugin config. If no routes are defined an empty map is returned
func (*PluginConfigReader) GetString ¶ added in v0.5.0
func (reader *PluginConfigReader) GetString(key string, defaultValue string) string
GetString tries to read a string value from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetStringArray ¶ added in v0.5.0
func (reader *PluginConfigReader) GetStringArray(key string, defaultValue []string) []string
GetStringArray tries to read a string array from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetStringMap ¶ added in v0.5.0
func (reader *PluginConfigReader) GetStringMap(key string, defaultValue map[string]string) map[string]string
GetStringMap tries to read a string to string map from a PluginConfig. If the key is not found defaultValue is returned.
func (*PluginConfigReader) GetSubLogger ¶ added in v0.5.0
func (reader *PluginConfigReader) GetSubLogger(subScope string) logrus.FieldLogger
GetSubLogger creates a new sub-scoped logger (logrus.FieldLogger) for the plugin contained in this config
func (*PluginConfigReader) GetTypename ¶ added in v0.5.0
func (reader *PluginConfigReader) GetTypename() string
GetTypename returns the plugin's typename
func (*PluginConfigReader) GetURL ¶ added in v0.5.0
func (reader *PluginConfigReader) GetURL(key string, defaultValue string) *url.URL
GetURL tries to read an URL struct from a PluginConfig. If that value is not found nil is returned.
func (*PluginConfigReader) GetUint ¶ added in v0.5.0
func (reader *PluginConfigReader) GetUint(key string, defaultValue uint64) uint64
GetUint tries to read an unsigned integer value from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) GetValue ¶ added in v0.5.0
func (reader *PluginConfigReader) GetValue(key string, defaultValue interface{}) interface{}
GetValue tries to read a untyped value from a PluginConfig. If that value is not found defaultValue is returned.
func (*PluginConfigReader) HasValue ¶ added in v0.5.0
func (reader *PluginConfigReader) HasValue(key string) bool
HasValue returns true if the given key has been set as a config option. This function only takes settings into account.
type PluginConfigReaderWithError ¶ added in v0.5.0
type PluginConfigReaderWithError struct {
// contains filtered or unexported fields
}
PluginConfigReaderWithError is a read-only wrapper on top of a plugin config that provides convenience functions for accessing values from the wrapped config.
func NewPluginConfigReaderWithError ¶ added in v0.5.0
func NewPluginConfigReaderWithError(config *PluginConfig) PluginConfigReaderWithError
NewPluginConfigReaderWithError creates a new raw reader for a given config. In contrast to the PluginConfigReaderWithError all functions return an error next to the value.
func (PluginConfigReaderWithError) GetArray ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetArray(key string, defaultValue []interface{}) ([]interface{}, error)
GetArray tries to read a untyped array from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetBool ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetBool(key string, defaultValue bool) (bool, error)
GetBool tries to read a boolean value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetFilterArray ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetFilterArray(key string, logger logrus.FieldLogger, defaultValue FilterArray) (FilterArray, error)
GetFilterArray returns an array of filter plugins.
func (PluginConfigReaderWithError) GetFloat ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetFloat(key string, defaultValue float64) (float64, error)
GetFloat tries to read a float value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetFormatterArray ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetFormatterArray(key string, logger logrus.FieldLogger, defaultValue FormatterArray) (FormatterArray, error)
GetFormatterArray returns an array of formatter plugins.
func (PluginConfigReaderWithError) GetID ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetID() string
GetID returns the plugin's id
func (PluginConfigReaderWithError) GetInt ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetInt(key string, defaultValue int64) (int64, error)
GetInt tries to read a integer value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetLogger ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetLogger() logrus.FieldLogger
GetLogger creates a logger scoped for the plugin contained in this config.
func (PluginConfigReaderWithError) GetMap ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetMap(key string, defaultValue tcontainer.MarshalMap) (tcontainer.MarshalMap, error)
GetMap tries to read a MarshalMap from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetModulatorArray ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetModulatorArray(key string, logger logrus.FieldLogger, defaultValue ModulatorArray) (ModulatorArray, error)
GetModulatorArray returns an array of modulator plugins.
func (PluginConfigReaderWithError) GetPlugin ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetPlugin(key string, defaultType string, defaultConfig tcontainer.MarshalMap) (Plugin, error)
GetPlugin creates a nested plugin from a config map. The default type has to be passed and is overridden if the config specifies a type. The value stored in the config can either be a string or a map. If a map is found it is used to override defaultConfig. If a string is found it is used to override defaultType.
func (PluginConfigReaderWithError) GetPluginArray ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetPluginArray(key string, defaultValue []Plugin) ([]Plugin, error)
GetPluginArray tries to read a array of plugins (type to config) from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetStreamArray ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetStreamArray(key string, defaultValue []MessageStreamID) ([]MessageStreamID, error)
GetStreamArray tries to read a string array from a pluginconfig and translates all values to streamIds. If the key is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetStreamID ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetStreamID(key string, defaultValue MessageStreamID) (MessageStreamID, error)
GetStreamID tries to read a StreamID value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetStreamMap ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetStreamMap(key string, defaultValue string) (map[MessageStreamID]string, error)
GetStreamMap tries to read a stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.
func (PluginConfigReaderWithError) GetStreamRoutes ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetStreamRoutes(key string, defaultValue map[MessageStreamID][]MessageStreamID) (map[MessageStreamID][]MessageStreamID, error)
GetStreamRoutes tries to read a stream to stream map from a plugin config. If no routes are defined an empty map is returned
func (PluginConfigReaderWithError) GetString ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetString(key string, defaultValue string) (string, error)
GetString tries to read a string value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetStringArray ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetStringArray(key string, defaultValue []string) ([]string, error)
GetStringArray tries to read a string array from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetStringMap ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetStringMap(key string, defaultValue map[string]string) (map[string]string, error)
GetStringMap tries to read a string to string map from a PluginConfig. If the key is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetSubLogger ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetSubLogger(subScope string) logrus.FieldLogger
GetSubLogger creates a logger scoped gor the plugin contained in this config, with an additional subscope.
func (PluginConfigReaderWithError) GetTypename ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetTypename() string
GetTypename returns the plugin's typename
func (PluginConfigReaderWithError) GetURL ¶ added in v0.5.0
GetURL tries to read an URL struct from a PluginConfig. If that value is not found nil is returned.
func (PluginConfigReaderWithError) GetUint ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetUint(key string, defaultValue uint64) (uint64, error)
GetUint tries to read an unsigned integer value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) GetValue ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) GetValue(key string, defaultValue interface{}) interface{}
GetValue tries to read a untyped value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfigReaderWithError) HasValue ¶ added in v0.5.0
func (reader PluginConfigReaderWithError) HasValue(key string) bool
HasValue returns true if the given key has been set as a config option. This function only takes settings into account.
type PluginControl ¶
type PluginControl int
PluginControl is an enumeration used to pass signals to plugins
type PluginMetric ¶ added in v0.5.0
type PluginMetric struct { }
PluginMetric class for plugin based metrics
func (*PluginMetric) DecWorker ¶ added in v0.5.0
func (metric *PluginMetric) DecWorker()
DecWorker decrease the active worker count
func (*PluginMetric) IncWorker ¶ added in v0.5.0
func (metric *PluginMetric) IncWorker()
IncWorker increase the active worker count
func (*PluginMetric) Init ¶ added in v0.5.0
func (metric *PluginMetric) Init()
Init increase the plugin state init count
func (*PluginMetric) UpdateStateMetric ¶ added in v0.5.0
func (metric *PluginMetric) UpdateStateMetric(prevState, nextState string)
UpdateStateMetric decrease the prev plugin state and increase the next plugin state
type PluginRunState ¶
type PluginRunState struct {
// contains filtered or unexported fields
}
PluginRunState is used in some plugins to store information about the execution state of the plugin (i.e. if it is running or not) as well as threading primitives that enable gollum to wait for a plugin top properly shut down.
func NewPluginRunState ¶ added in v0.4.0
func NewPluginRunState() *PluginRunState
NewPluginRunState creates a new plugin state helper
func (*PluginRunState) AddWorker ¶
func (state *PluginRunState) AddWorker()
AddWorker adds a worker to the waitgroup configured by SetWorkerWaitGroup.
func (*PluginRunState) GetState ¶ added in v0.4.0
func (state *PluginRunState) GetState() PluginState
GetState returns the current plugin state casted to the correct type
func (*PluginRunState) GetStateString ¶ added in v0.5.0
func (state *PluginRunState) GetStateString() string
GetStateString returns the current state as string
func (*PluginRunState) SetState ¶ added in v0.4.0
func (state *PluginRunState) SetState(nextState PluginState)
SetState sets a new plugin state casted to the correct type
func (*PluginRunState) SetWorkerWaitGroup ¶
func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup sets the WaitGroup used to manage workers
func (*PluginRunState) WorkerDone ¶
func (state *PluginRunState) WorkerDone()
WorkerDone removes a worker from the waitgroup configured by SetWorkerWaitGroup.
type PluginState ¶ added in v0.4.0
type PluginState int32
PluginState is an enumeration used to describe the current working state of a plugin
type PluginStructTag ¶ added in v0.5.0
PluginStructTag extends reflect.StructTag by convenience methods used to retrieve auto configure values used by PluginConfigReader.
func (PluginStructTag) GetBool ¶ added in v0.5.0
func (tag PluginStructTag) GetBool() bool
GetBool returns the default boolean value for an auto configured field. When not set, false is returned.
func (PluginStructTag) GetByteArray ¶ added in v0.5.0
func (tag PluginStructTag) GetByteArray() []byte
GetByteArray returns the default byte array value for an auto configured field. When not set an empty array is returned.
func (PluginStructTag) GetInt ¶ added in v0.5.0
func (tag PluginStructTag) GetInt() int64
GetInt returns the default integer value for an auto configured field. When not set, 0 is returned. Metrics are not applied to this value.
func (PluginStructTag) GetMetricScale ¶ added in v0.5.0
func (tag PluginStructTag) GetMetricScale() int64
GetMetricScale returns the scale as defined by the "metric" tag as a number.
func (PluginStructTag) GetStream ¶ added in v0.5.0
func (tag PluginStructTag) GetStream() MessageStreamID
GetStream returns the default message stream value for an auto configured field. When not set, InvalidStream is returned.
func (PluginStructTag) GetStreamArray ¶ added in v0.5.0
func (tag PluginStructTag) GetStreamArray() []MessageStreamID
GetStreamArray returns the default message stream array value for an auto configured field. When not set an empty array is returned.
func (PluginStructTag) GetString ¶ added in v0.5.0
func (tag PluginStructTag) GetString() string
GetString returns the default string value for an auto configured field. When not set, "" is returned.
func (PluginStructTag) GetStringArray ¶ added in v0.5.0
func (tag PluginStructTag) GetStringArray() []string
GetStringArray returns the default string array value for an auto configured field. When not set an empty array is returned.
func (PluginStructTag) GetUint ¶ added in v0.5.0
func (tag PluginStructTag) GetUint() uint64
GetUint returns the default unsigned integer value for an auto configured field. When not set, 0 is returned. Metrics are not applied to this value.
type PluginWithState ¶ added in v0.4.0
type PluginWithState interface { Plugin // GetState returns the current state of a plugin GetState() PluginState }
PluginWithState allows certain plugins to give information about their runstate
type Producer ¶
type Producer interface { PluginWithState MessageSource // Enqueue sends a message to the producer. The producer may reject // the message or try a fallback after a given timeout. Enqueue can block. Enqueue(msg *Message, timeout time.Duration) // Produce should implement a main loop that passes messages from the // message channel to some other service like the console. // This can be part of this function or a separate go routine. // Produce is always called as a go routine. Produce(workers *sync.WaitGroup) // Streams returns the streamIDs this producer is listening to. Streams() []MessageStreamID // Control returns write access to this producer's control channel. // See ProducerControl* constants. Control() chan<- PluginControl // GetShutdownTimeout returns the duration gollum will wait for this producer // before canceling the shutdown process. GetShutdownTimeout() time.Duration }
Producer is an interface for plugins that pass messages to other services, files or storages.
type Router ¶ added in v0.5.0
type Router interface { Modulator // StreamID returns the stream id this plugin is bound to. GetStreamID() MessageStreamID // GetID returns the pluginID of the message source GetID() string // AddProducer adds one or more producers to this stream, i.e. the producers // listening to messages on this stream. AddProducer(producers ...Producer) // Enqueue sends a given message to all registered end points. // This function is called by Route() which should be preferred over this // function when sending messages. Enqueue(msg *Message) error // GetTimeout returns the timeout configured for this router GetTimeout() time.Duration // Start starts the router by the coordinator.StartPlugins() method Start() error }
Router defines the interface for all stream plugins
type ScopedLogger ¶ added in v0.5.0
type ScopedLogger interface {
GetLogger() logrus.FieldLogger
}
ScopedLogger defines an interface for structs that have a log scope attached
type ScopedModulator ¶ added in v0.5.0
type ScopedModulator interface { Modulator // SetLogger defines the log scope for this modulator. SetLogger(logger logrus.FieldLogger) }
ScopedModulator extends the Modulator interface by adding a log scope. This interface is implemented by modulators that are embedded in plugins that already have their own scope.
type SerialMessageSource ¶
type SerialMessageSource interface { AsyncMessageSource // Notify the end of the response stream ResponseDone() }
SerialMessageSource extends the AsyncMessageSource interface to allow waiting for all parts of the response to be submitted.
type SerializedMessage ¶ added in v0.4.0
type SerializedMessage struct { StreamID *uint64 `protobuf:"varint,1,req,name=StreamID" json:"StreamID,omitempty"` Data *SerializedMessageData `protobuf:"bytes,2,req,name=Data" json:"Data,omitempty"` PrevStreamID *uint64 `protobuf:"varint,3,opt,name=PrevStreamID" json:"PrevStreamID,omitempty"` OrigStreamID *uint64 `protobuf:"varint,4,opt,name=OrigStreamID" json:"OrigStreamID,omitempty"` Timestamp *int64 `protobuf:"varint,5,opt,name=Timestamp" json:"Timestamp,omitempty"` Original *SerializedMessageData `protobuf:"bytes,6,opt,name=Original" json:"Original,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*SerializedMessage) Descriptor ¶ added in v0.5.0
func (*SerializedMessage) Descriptor() ([]byte, []int)
func (*SerializedMessage) GetData ¶ added in v0.4.0
func (m *SerializedMessage) GetData() *SerializedMessageData
func (*SerializedMessage) GetOrigStreamID ¶ added in v0.5.0
func (m *SerializedMessage) GetOrigStreamID() uint64
func (*SerializedMessage) GetOriginal ¶ added in v0.5.0
func (m *SerializedMessage) GetOriginal() *SerializedMessageData
func (*SerializedMessage) GetPrevStreamID ¶ added in v0.4.0
func (m *SerializedMessage) GetPrevStreamID() uint64
func (*SerializedMessage) GetStreamID ¶ added in v0.4.0
func (m *SerializedMessage) GetStreamID() uint64
func (*SerializedMessage) GetTimestamp ¶ added in v0.4.0
func (m *SerializedMessage) GetTimestamp() int64
func (*SerializedMessage) ProtoMessage ¶ added in v0.4.0
func (*SerializedMessage) ProtoMessage()
func (*SerializedMessage) Reset ¶ added in v0.4.0
func (m *SerializedMessage) Reset()
func (*SerializedMessage) String ¶ added in v0.4.0
func (m *SerializedMessage) String() string
type SerializedMessageData ¶ added in v0.5.0
type SerializedMessageData struct { Data []byte `protobuf:"bytes,1,req,name=Data" json:"Data,omitempty"` Metadata map[string][]byte `` /* 136-byte string literal not displayed */ XXX_unrecognized []byte `json:"-"` }
func (*SerializedMessageData) Descriptor ¶ added in v0.5.0
func (*SerializedMessageData) Descriptor() ([]byte, []int)
func (*SerializedMessageData) GetData ¶ added in v0.5.0
func (m *SerializedMessageData) GetData() []byte
func (*SerializedMessageData) GetMetadata ¶ added in v0.5.0
func (m *SerializedMessageData) GetMetadata() map[string][]byte
func (*SerializedMessageData) ProtoMessage ¶ added in v0.5.0
func (*SerializedMessageData) ProtoMessage()
func (*SerializedMessageData) Reset ¶ added in v0.5.0
func (m *SerializedMessageData) Reset()
func (*SerializedMessageData) String ¶ added in v0.5.0
func (m *SerializedMessageData) String() string
type SetAppliedContent ¶ added in v0.5.0
SetAppliedContent is a func() to store message content to payload or meta data
func GetAppliedContentSetFunction ¶ added in v0.5.0
func GetAppliedContentSetFunction(applyTo string) SetAppliedContent
GetAppliedContentSetFunction returns SetAppliedContent function to store message content
type SimpleConsumer ¶ added in v0.5.0
type SimpleConsumer struct { Logger logrus.FieldLogger // contains filtered or unexported fields }
SimpleConsumer consumer
This type defines a common baseclass for all consumers. All consumer plugins should derive from this class as all required basic functions are already implemented in a general way.
Parameters ¶
- Streams: Defines a list of streams a consumer will send to. This parameter is mandatory. When using "*" messages will be sent only to the internal "*" stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
- ShutdownTimeoutMs: Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
- Modulators: Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
- ModulatorRoutines: Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
- ModulatorQueueSize: Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
func (*SimpleConsumer) AddHealthCheckAt ¶ added in v0.5.0
func (cons *SimpleConsumer) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)
AddHealthCheckAt adds a health check at a subpath (http://<addr>:<port>/<plugin_id><path>)
func (*SimpleConsumer) AddMainWorker ¶ added in v0.5.0
func (cons *SimpleConsumer) AddMainWorker(workers *sync.WaitGroup)
AddMainWorker adds the first worker to the waitgroup
func (*SimpleConsumer) AddWorker ¶ added in v0.5.0
func (cons *SimpleConsumer) AddWorker()
AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.
func (*SimpleConsumer) Configure ¶ added in v0.5.0
func (cons *SimpleConsumer) Configure(conf PluginConfigReader)
Configure initializes standard consumer values from a plugin config.
func (*SimpleConsumer) Control ¶ added in v0.5.0
func (cons *SimpleConsumer) Control() chan<- PluginControl
Control returns write access to this consumer's control channel. See ConsumerControl* constants.
func (*SimpleConsumer) ControlLoop ¶ added in v0.5.0
func (cons *SimpleConsumer) ControlLoop()
ControlLoop listens to the control channel and triggers callbacks for these messages. Upon stop control message doExit will be set to true.
func (*SimpleConsumer) Enqueue ¶ added in v0.5.0
func (cons *SimpleConsumer) Enqueue(data []byte)
Enqueue creates a new message from a given byte slice and passes it to EnqueueMessage. Data is copied to the message.
func (*SimpleConsumer) EnqueueWithMetadata ¶ added in v0.5.0
func (cons *SimpleConsumer) EnqueueWithMetadata(data []byte, metaData Metadata)
EnqueueWithMetadata works like EnqueueWithSequence and allows to set meta data directly
func (*SimpleConsumer) GetID ¶ added in v0.5.0
func (cons *SimpleConsumer) GetID() string
GetID returns the ID of this consumer
func (*SimpleConsumer) GetLogger ¶ added in v0.5.0
func (cons *SimpleConsumer) GetLogger() logrus.FieldLogger
GetLogger returns the logger scoped to this plugin
func (*SimpleConsumer) GetShutdownTimeout ¶ added in v0.5.0
func (cons *SimpleConsumer) GetShutdownTimeout() time.Duration
GetShutdownTimeout returns the duration gollum will wait for this producer before canceling the shutdown process.
func (*SimpleConsumer) GetState ¶ added in v0.5.0
func (cons *SimpleConsumer) GetState() PluginState
GetState returns the state this plugin is currently in
func (*SimpleConsumer) IsActive ¶ added in v0.5.0
func (cons *SimpleConsumer) IsActive() bool
IsActive returns true if GetState() returns initialize, active, waiting or prepareStop.
func (*SimpleConsumer) IsActiveOrStopping ¶ added in v0.5.0
func (cons *SimpleConsumer) IsActiveOrStopping() bool
IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()
func (*SimpleConsumer) IsBlocked ¶ added in v0.5.0
func (cons *SimpleConsumer) IsBlocked() bool
IsBlocked returns true if GetState() returns waiting
func (*SimpleConsumer) IsStopping ¶ added in v0.5.0
func (cons *SimpleConsumer) IsStopping() bool
IsStopping returns true if GetState() returns prepareStop, stopping or dead
func (*SimpleConsumer) SetPrepareStopCallback ¶ added in v0.5.0
func (cons *SimpleConsumer) SetPrepareStopCallback(onPrepareStop func())
SetPrepareStopCallback sets the function to be called upon PluginControlPrepareStop
func (*SimpleConsumer) SetRollCallback ¶ added in v0.5.0
func (cons *SimpleConsumer) SetRollCallback(onRoll func())
SetRollCallback sets the function to be called upon PluginControlRoll
func (*SimpleConsumer) SetStopCallback ¶ added in v0.5.0
func (cons *SimpleConsumer) SetStopCallback(onStop func())
SetStopCallback sets the function to be called upon PluginControlStop
func (*SimpleConsumer) SetWorkerWaitGroup ¶ added in v0.5.0
func (cons *SimpleConsumer) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.
func (*SimpleConsumer) TickerControlLoop ¶ added in v0.5.0
func (cons *SimpleConsumer) TickerControlLoop(interval time.Duration, onTick func())
TickerControlLoop is like MessageLoop but executes a given function at every given interval tick, too. Note that the interval is not exact. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.
func (*SimpleConsumer) WorkerDone ¶ added in v0.5.0
func (cons *SimpleConsumer) WorkerDone()
WorkerDone removes an additional worker to the waitgroup.
type SimpleFilter ¶ added in v0.5.0
type SimpleFilter struct { Logger logrus.FieldLogger // contains filtered or unexported fields }
SimpleFilter plugin base type
This type defines a common base class for all Filters. All filter plugins should derive from this class but don't necessarily need to.
Parameters ¶
- FilteredStream: This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to "". By default this parameter is set to "".
func (*SimpleFilter) Configure ¶ added in v0.5.0
func (filter *SimpleFilter) Configure(conf PluginConfigReader) error
Configure sets up all values required by SimpleFormatter.
func (*SimpleFilter) GetFilterResultMessageReject ¶ added in v0.5.0
func (filter *SimpleFilter) GetFilterResultMessageReject() FilterResult
GetFilterResultMessageReject returns a FilterResultMessageReject with the stream set to GetfilteredStreamID()
func (*SimpleFilter) GetLogger ¶ added in v0.5.0
func (filter *SimpleFilter) GetLogger() logrus.FieldLogger
GetLogger returns this plugin's scoped logger
func (*SimpleFilter) SetLogger ¶ added in v0.5.0
func (filter *SimpleFilter) SetLogger(logger logrus.FieldLogger)
SetLogger sets the scoped logger to be used for this filter
type SimpleFormatter ¶ added in v0.5.0
type SimpleFormatter struct { Logger logrus.FieldLogger GetAppliedContent GetAppliedContent SetAppliedContent SetAppliedContent SkipIfEmpty bool `config:"SkipIfEmpty"` }
SimpleFormatter formatter
This type defines a common baseclass for formatters. Formatter plugins may derive from this class.
Parameters ¶
- ApplyTo: This value chooses the part of the message the formatting should be applied to. Use "" to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to "".
- SkipIfEmpty: When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
func (*SimpleFormatter) CanBeApplied ¶ added in v0.5.0
func (format *SimpleFormatter) CanBeApplied(msg *Message) bool
CanBeApplied returns true if the formatter can be applied to this message
func (*SimpleFormatter) Configure ¶ added in v0.5.0
func (format *SimpleFormatter) Configure(conf PluginConfigReader)
Configure sets up all values required by SimpleFormatter.
func (*SimpleFormatter) GetLogger ¶ added in v0.5.0
func (format *SimpleFormatter) GetLogger() logrus.FieldLogger
GetLogger returns the scoped logger of this plugin
func (*SimpleFormatter) SetLogger ¶ added in v0.5.0
func (format *SimpleFormatter) SetLogger(logger logrus.FieldLogger)
SetLogger sets the scoped logger to be used for this formatter
type SimpleProducer ¶ added in v0.5.0
type SimpleProducer struct { Logger logrus.FieldLogger // contains filtered or unexported fields }
SimpleProducer producer
This type defines a common baseclass for all producers. All producer plugins should derive from this class as all required basic functions are already implemented here in a general way.
Parameters ¶
- Streams: Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying "*" causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
- FallbackStream: Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to "" will cause messages to be discared when delivery fails.
- ShutdownTimeoutMs: Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
- Modulators: Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
func (*SimpleProducer) AddHealthCheck ¶ added in v0.5.0
func (prod *SimpleProducer) AddHealthCheck(callback thealthcheck.CallbackFunc)
AddHealthCheck adds a health check at the default URL (http://<addr>:<port>/<plugin_id>)
func (*SimpleProducer) AddHealthCheckAt ¶ added in v0.5.0
func (prod *SimpleProducer) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)
AddHealthCheckAt adds a health check at a subpath (http://<addr>:<port>/<plugin_id><path>)
func (*SimpleProducer) AddMainWorker ¶ added in v0.5.0
func (prod *SimpleProducer) AddMainWorker(workers *sync.WaitGroup)
AddMainWorker adds the first worker to the waitgroup
func (*SimpleProducer) AddWorker ¶ added in v0.5.0
func (prod *SimpleProducer) AddWorker()
AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.
func (*SimpleProducer) Configure ¶ added in v0.5.0
func (prod *SimpleProducer) Configure(conf PluginConfigReader)
Configure initializes the standard producer config values.
func (*SimpleProducer) Control ¶ added in v0.5.0
func (prod *SimpleProducer) Control() chan<- PluginControl
Control returns write access to this producer's control channel. See PluginControl* constants.
func (*SimpleProducer) ControlLoop ¶ added in v0.5.0
func (prod *SimpleProducer) ControlLoop()
ControlLoop listens to the control channel and triggers callbacks for these messags. Upon stop control message doExit will be set to true.
func (*SimpleProducer) GetID ¶ added in v0.5.0
func (prod *SimpleProducer) GetID() string
GetID returns the ID of this producer
func (*SimpleProducer) GetLogger ¶ added in v0.5.0
func (prod *SimpleProducer) GetLogger() logrus.FieldLogger
GetLogger returns the logging scope of this plugin
func (*SimpleProducer) GetShutdownTimeout ¶ added in v0.5.0
func (prod *SimpleProducer) GetShutdownTimeout() time.Duration
GetShutdownTimeout returns the duration gollum will wait for this producer before canceling the shutdown process.
func (*SimpleProducer) GetState ¶ added in v0.5.0
func (prod *SimpleProducer) GetState() PluginState
GetState returns the state this plugin is currently in
func (*SimpleProducer) HasContinueAfterModulate ¶ added in v0.5.0
func (prod *SimpleProducer) HasContinueAfterModulate(msg *Message) bool
HasContinueAfterModulate applies all modulators by Modulate, handle the ModulateResult and return if you have to continue the message process. This method is a default producer modulate handling.
func (*SimpleProducer) IsActive ¶ added in v0.5.0
func (prod *SimpleProducer) IsActive() bool
IsActive returns true if GetState() returns initializing, active, waiting, or prepareStop
func (*SimpleProducer) IsActiveOrStopping ¶ added in v0.5.0
func (prod *SimpleProducer) IsActiveOrStopping() bool
IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()
func (*SimpleProducer) IsBlocked ¶ added in v0.5.0
func (prod *SimpleProducer) IsBlocked() bool
IsBlocked returns true if GetState() returns waiting
func (*SimpleProducer) IsStopping ¶ added in v0.5.0
func (prod *SimpleProducer) IsStopping() bool
IsStopping returns true if GetState() returns prepareStop, stopping or dead
func (*SimpleProducer) Modulate ¶ added in v0.5.0
func (prod *SimpleProducer) Modulate(msg *Message) ModulateResult
Modulate applies all modulators from this producer to a given message. This implementation handles routing and discarding of messages.
func (*SimpleProducer) SetPrepareStopCallback ¶ added in v0.5.0
func (prod *SimpleProducer) SetPrepareStopCallback(onPrepareStop func())
SetPrepareStopCallback sets the function to be called upon PluginControlPrepareStop
func (*SimpleProducer) SetRollCallback ¶ added in v0.5.0
func (prod *SimpleProducer) SetRollCallback(onRoll func())
SetRollCallback sets the function to be called upon PluginControlRoll
func (*SimpleProducer) SetStopCallback ¶ added in v0.5.0
func (prod *SimpleProducer) SetStopCallback(onStop func())
SetStopCallback sets the function to be called upon PluginControlStop
func (*SimpleProducer) SetWorkerWaitGroup ¶ added in v0.5.0
func (prod *SimpleProducer) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.
func (*SimpleProducer) Streams ¶ added in v0.5.0
func (prod *SimpleProducer) Streams() []MessageStreamID
Streams returns the routers this producer is listening to.
func (*SimpleProducer) TickerControlLoop ¶ added in v0.5.0
func (prod *SimpleProducer) TickerControlLoop(interval time.Duration, onTimeOut func())
TickerControlLoop is like ControlLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.
func (*SimpleProducer) TryFallback ¶ added in v0.5.0
func (prod *SimpleProducer) TryFallback(msg *Message)
TryFallback routes the message to the configured fallback stream.
func (*SimpleProducer) WorkerDone ¶ added in v0.5.0
func (prod *SimpleProducer) WorkerDone()
WorkerDone removes an additional worker to the waitgroup.
type SimpleRouter ¶ added in v0.5.0
type SimpleRouter struct { Producers []Producer Logger logrus.FieldLogger // contains filtered or unexported fields }
SimpleRouter plugin base type
This type defines a common baseclass for routers. All routers should derive from this class, but not necessarily need to.
Parameters ¶
- Stream: This value specifies the name of the stream this plugin is supposed to read messages from.
- Filters: This value defines an optional list of Filter plugins to connect to this router.
- TimeoutMs: This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to "0". By default this parameter is set to "0".
func (*SimpleRouter) AddHealthCheck ¶ added in v0.5.0
func (router *SimpleRouter) AddHealthCheck(callback thealthcheck.CallbackFunc)
AddHealthCheck adds a health check at the default URL (http://<addr>:<port>/<plugin_id>)
func (*SimpleRouter) AddHealthCheckAt ¶ added in v0.5.0
func (router *SimpleRouter) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)
AddHealthCheckAt adds a health check at a subpath (http://<addr>:<port>/<plugin_id><path>)
func (*SimpleRouter) AddProducer ¶ added in v0.5.0
func (router *SimpleRouter) AddProducer(producers ...Producer)
AddProducer adds all producers to the list of known producers. Duplicates will be filtered.
func (*SimpleRouter) Configure ¶ added in v0.5.0
func (router *SimpleRouter) Configure(conf PluginConfigReader)
Configure sets up all values required by SimpleRouter.
func (*SimpleRouter) GetID ¶ added in v0.5.0
func (router *SimpleRouter) GetID() string
GetID returns the ID of this router
func (*SimpleRouter) GetLogger ¶ added in v0.5.0
func (router *SimpleRouter) GetLogger() logrus.FieldLogger
GetLogger returns the logging scope of this plugin
func (*SimpleRouter) GetProducers ¶ added in v0.5.0
func (router *SimpleRouter) GetProducers() []Producer
GetProducers returns the producers bound to this stream
func (*SimpleRouter) GetStreamID ¶ added in v0.5.0
func (router *SimpleRouter) GetStreamID() MessageStreamID
GetStreamID returns the id of the stream this plugin is bound to.
func (*SimpleRouter) GetTimeout ¶ added in v0.5.0
func (router *SimpleRouter) GetTimeout() time.Duration
GetTimeout returns the timeout set for this router
func (*SimpleRouter) Modulate ¶ added in v0.5.0
func (router *SimpleRouter) Modulate(msg *Message) ModulateResult
Modulate calls all modulators in their order of definition
type StreamMetric ¶ added in v0.5.0
type StreamMetric struct {
// contains filtered or unexported fields
}
StreamMetric class for stream based metrics
func GetStreamMetric ¶ added in v0.5.0
func GetStreamMetric(streamID MessageStreamID) StreamMetric
GetStreamMetric returns a StreamMetric instance for the given MessageStreamID
func (*StreamMetric) CountMessageDiscarded ¶ added in v0.5.0
func (metric *StreamMetric) CountMessageDiscarded()
CountMessageDiscarded increases the discarded messages counter by 1
func (*StreamMetric) CountMessageRouted ¶ added in v0.5.0
func (metric *StreamMetric) CountMessageRouted()
CountMessageRouted increases the messages counter by 1
type WriterAssembly ¶ added in v0.4.0
type WriterAssembly struct {
// contains filtered or unexported fields
}
WriterAssembly is a helper struct for io.Writer compatible classes that use message batch.
func NewWriterAssembly ¶ added in v0.4.0
func NewWriterAssembly(writer io.Writer, flush func(*Message), modulator Modulator) WriterAssembly
NewWriterAssembly creates a new adapter between io.Writer and the MessageBatch AssemblyFunc function signature
func (*WriterAssembly) Flush ¶ added in v0.4.0
func (asm *WriterAssembly) Flush(messages []*Message)
Flush is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to e.g. the Drop function of a producer. Flush will also be called by Write if the io.Writer reported an error.
func (*WriterAssembly) SetErrorHandler ¶ added in v0.4.0
func (asm *WriterAssembly) SetErrorHandler(handleError func(error) bool)
SetErrorHandler sets a callback that is called if an error occurred. HandleError needs to return true to prevent messages to be flushed.
func (*WriterAssembly) SetFlush ¶ added in v0.4.0
func (asm *WriterAssembly) SetFlush(flush func(*Message))
SetFlush changes the bound flush function
func (*WriterAssembly) SetValidator ¶ added in v0.4.0
func (asm *WriterAssembly) SetValidator(validate func() bool)
SetValidator sets a callback that is called if a write was successful. Validate needs to return true to prevent messages to be flushed.
func (*WriterAssembly) SetWriter ¶ added in v0.4.0
func (asm *WriterAssembly) SetWriter(writer io.Writer)
SetWriter changes the writer interface used during Assemble
func (*WriterAssembly) Write ¶ added in v0.4.0
func (asm *WriterAssembly) Write(messages []*Message)
Write is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to an io.Writer. Messages are formatted using a given formatter. If the io.Writer fails to write the assembled buffer all messages are passed to the FLush() method.
Source Files ¶
- batchedproducer.go
- bufferedproducer.go
- config.go
- consumer.go
- directproducer.go
- errors.go
- filter.go
- filtermodulator.go
- filterresult.go
- formatter.go
- formattermodulator.go
- logconsumer.go
- message.go
- message.pb.go
- messagebatch.go
- messagehelper.go
- messagequeue.go
- messagesource.go
- messagetracer.go
- metadata.go
- metrics.go
- modulator.go
- plugin.go
- pluginconfig.go
- pluginconfigreader.go
- pluginconfigreaderwitherror.go
- pluginregistry.go
- pluginstructtag.go
- producer.go
- router.go
- simpleconsumer.go
- simplefilter.go
- simpleformatter.go
- simpleproducer.go
- simplerouter.go
- streamregistry.go
- streams.go
- version.go
- writerassembly.go