core

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2018 License: Apache-2.0 Imports: 26 Imported by: 65

Documentation

Overview

Package core is a generated protocol buffer package.

It is generated from these files:

message.proto

It has these top-level messages:

SerializedMessageData
SerializedMessage

Index

Constants

View Source
const (
	// MessageQueueOk is returned if the message could be delivered
	MessageQueueOk = MessageQueueResult(iota)
	// MessageQueueTimeout is returned if a message timed out
	MessageQueueTimeout = MessageQueueResult(iota)
	// MessageQueueDiscard is returned if a message should be discarded
	MessageQueueDiscard = MessageQueueResult(iota)
)
View Source
const (
	MetricActiveWorkers      = "Plugins:ActiveWorkers"
	MetricPluginsInit        = "Plugins:State:Initializing"
	MetricPluginsActive      = "Plugins:State:Active"
	MetricPluginsWaiting     = "Plugins:State:Waiting"
	MetricPluginsPrepareStop = "Plugins:State:PrepareStop"
	MetricPluginsStopping    = "Plugins:State:Stopping"
	MetricPluginsDead        = "Plugins:State:Dead"
)

MetricActiveWorkers metric string MetricPluginsInit metric string MetricPluginsActive metric string MetricPluginsWaiting metric string MetricPluginsPrepareStop metric string MetricPluginsStopping metric string MetricPluginsDead metric string

View Source
const (
	// ModulateResultContinue indicates that a message can be passed along.
	ModulateResultContinue = ModulateResult(iota)
	// ModulateResultFallback indicates that a fallback path should be used and
	// that no further modulator should be called.
	ModulateResultFallback = ModulateResult(iota)
	// ModulateResultDiscard indicates that a message should be discarded and
	// that no further modulators should be called.
	ModulateResultDiscard = ModulateResult(iota)
)
View Source
const (
	// PluginControlStopProducer will cause any producer to halt and shutdown.
	PluginControlStopProducer = PluginControl(iota)
	// PluginControlStopConsumer will cause any consumer to halt and shutdown.
	PluginControlStopConsumer = PluginControl(iota)
	// PluginControlRoll notifies the consumer/producer about a reconnect or reopen request
	PluginControlRoll = PluginControl(iota)
)
View Source
const (

	// PluginStateInitializing is set when a plugin is not yet configured
	PluginStateInitializing = PluginState(iota)
	// PluginStateWaiting is set when a plugin is active but currently unable to process data
	PluginStateWaiting = PluginState(iota)
	// PluginStateActive is set when a plugin is ready to process data
	PluginStateActive = PluginState(iota)
	// PluginStatePrepareStop is set when a plugin should prepare for shutdown
	PluginStatePrepareStop = PluginState(iota)
	// PluginStateStopping is set when a plugin is about to stop
	PluginStateStopping = PluginState(iota)
	// PluginStateDead is set when a plugin is unable to process any data
	PluginStateDead = PluginState(iota)
)
View Source
const (
	// PluginStructTagDefault contains the tag name for default values
	PluginStructTagDefault = "default"
	// PluginStructTagMetric contains the tag name for metric values
	PluginStructTagMetric = "metric"
)
View Source
const (
	// InvalidStream is used for invalid stream handling and maps to ""
	InvalidStream = ""
	// LogInternalStream is the name of the internal message channel (logs)
	LogInternalStream = "_GOLLUM_"
	// TraceInternalStream is the name of the internal trace channel (-tm flag)
	TraceInternalStream = "_TRACE_"
	// WildcardStream is the name of the "all routers" channel
	WildcardStream = "*"
)
View Source
const (
	// FilterResultMessageAccept indicates that a message can be passed along and continue
	FilterResultMessageAccept = FilterResult(0)
)
View Source
const (
	MetricMessagesRoutedAvg = "Messages:Routed:AvgPerSec"
)

MetricMessagesRoutedAvg is used as a key for storing message throughput

Variables

View Source
var (
	// InvalidStreamID denotes an invalid stream for function returing stream IDs
	InvalidStreamID = GetStreamID(InvalidStream)
	// LogInternalStreamID is the ID of the "_GOLLUM_" stream
	LogInternalStreamID = GetStreamID(LogInternalStream)
	// WildcardStreamID is the ID of the "*" stream
	WildcardStreamID = GetStreamID(WildcardStream)
	// TraceInternalStreamID is the ID of the "_TRACE_" stream
	TraceInternalStreamID = GetStreamID(TraceInternalStream)
)
View Source
var (
	// GeneratedRouterPrefix is prepended to all generated routers
	GeneratedRouterPrefix = "_GENERATED_"
)
View Source
var (
	// MessageDataPool is the pool used for message payloads.
	// This pool should be used to allocate temporary buffers for e.g.
	// formatters.
	MessageDataPool = tcontainer.NewBytePoolWithSize(2)
)
View Source
var MessageTrace = func(msg *Message, pluginID string, comment string) {}

MessageTrace provide the MessageTrace() function. By default this function do nothing.

View Source
var PluginRegistry = pluginRegistry{
	// contains filtered or unexported fields
}

PluginRegistry holds all plugins by their name

View Source
var StreamRegistry = streamRegistry{
	// contains filtered or unexported fields
}

StreamRegistry is the global instance of streamRegistry used to store the all registered routers.

View Source
var TypeRegistry = treflect.NewTypeRegistry()

TypeRegistry is the global typeRegistry instance. Use this instance to register plugins.

Functions

func ActivateMessageTrace added in v0.5.0

func ActivateMessageTrace()

ActivateMessageTrace set a MessageTrace function to dump out the message trace

func CountConsumers added in v0.5.0

func CountConsumers()

CountConsumers increases the consumer counter by 1

func CountFallbackRouters added in v0.5.0

func CountFallbackRouters()

CountFallbackRouters increases the fallback stream counter by 1

func CountMessageDiscarded added in v0.5.0

func CountMessageDiscarded()

CountMessageDiscarded increases the discarded messages counter by 1

func CountMessageRouted added in v0.5.0

func CountMessageRouted()

CountMessageRouted increases the messages counter by 1

func CountMessagesEnqueued added in v0.5.0

func CountMessagesEnqueued()

CountMessagesEnqueued increases the enqueued messages counter by 1

func CountProducers added in v0.5.0

func CountProducers()

CountProducers increases the producer counter by 1

func CountRouters added in v0.5.0

func CountRouters()

CountRouters increases the stream counter by 1

func DeactivateMessageTrace added in v0.5.0

func DeactivateMessageTrace()

DeactivateMessageTrace set a MessageTrace function to default This method is necessary for unit testing

func DiscardMessage added in v0.5.0

func DiscardMessage(msg *Message, pluginID string, comment string)

DiscardMessage increases the discard statistic and discards the given message.

func GetVersionNumber added in v0.5.0

func GetVersionNumber() int64

GetVersionNumber return a symantic based version number

func GetVersionString added in v0.5.0

func GetVersionString() string

GetVersionString return a symantic version string

func Route added in v0.5.0

func Route(msg *Message, router Router) error

Route tries to enqueue a message to the given stream. This function also handles redirections enforced by formatters.

func RouteOriginal added in v0.5.0

func RouteOriginal(msg *Message, router Router) error

RouteOriginal restores the original message and routes it by using a a given router.

Types

type AssemblyFunc added in v0.4.0

type AssemblyFunc func([]*Message)

AssemblyFunc is the function signature for callbacks passed to the Flush method.

type AsyncMessageSource

type AsyncMessageSource interface {
	MessageSource

	// EnqueueResponse sends a message to the source of another message.
	EnqueueResponse(msg *Message)
}

AsyncMessageSource extends the MessageSource interface to allow a backchannel that simply forwards any message coming from the producer.

type BatchedProducer added in v0.5.0

type BatchedProducer struct {
	DirectProducer `gollumdoc:"embed_type"`
	Batch          MessageBatch
	// contains filtered or unexported fields
}

BatchedProducer producer

This type defines a common base type that can be used by producers that work better when sending batches of messages instead of single ones. Batched producers will collect messages and flush them after certain limits have been reached. The flush process will be done in the background. Message collection continues non-blocking unless flushing takes longer than filling up the internal buffer again.

Parameters

- Batch/MaxCount: Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

- Batch/FlushCount: Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

- Batch/TimeoutSec: Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.

func (*BatchedProducer) BatchMessageLoop added in v0.5.0

func (prod *BatchedProducer) BatchMessageLoop(workers *sync.WaitGroup, onBatchFlush func() AssemblyFunc)

BatchMessageLoop start the TickerMessageControlLoop() for batch producer

func (*BatchedProducer) Configure added in v0.5.0

func (prod *BatchedProducer) Configure(conf PluginConfigReader)

Configure initializes the standard producer config values.

func (*BatchedProducer) DefaultClose added in v0.5.0

func (prod *BatchedProducer) DefaultClose()

DefaultClose defines the default closing process

func (*BatchedProducer) Enqueue added in v0.5.0

func (prod *BatchedProducer) Enqueue(msg *Message, timeout time.Duration)

Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.

type BufferedProducer added in v0.5.0

type BufferedProducer struct {
	DirectProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

BufferedProducer plugin base type

This type defines a common BufferedProducer base class. Producers may derive from this class.

Parameters

- Channel: This value defines the capacity of the message buffer. By default this parameter is set to "8192".

- ChannelTimeoutMs: This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to "0".

func (*BufferedProducer) CloseMessageChannel added in v0.5.0

func (prod *BufferedProducer) CloseMessageChannel(handleMessage func(*Message)) (empty bool)

CloseMessageChannel first calls DrainMessageChannel with shutdown timeout, closes the channel afterwards and calls DrainMessageChannel again to make sure all messages are actually gone. The return value indicates wether the channel is empty or not.

func (*BufferedProducer) Configure added in v0.5.0

func (prod *BufferedProducer) Configure(conf PluginConfigReader)

Configure initializes the standard producer config values.

func (*BufferedProducer) DefaultClose added in v0.5.0

func (prod *BufferedProducer) DefaultClose()

DefaultClose is the function registered to onStop by default. It calls CloseMessageChannel with the message handling function passed to Any of the control functions. If no such call happens, this function does nothing.

func (*BufferedProducer) DefaultDrain added in v0.5.0

func (prod *BufferedProducer) DefaultDrain()

DefaultDrain is the function registered to onPrepareStop by default. It calls DrainMessageChannel with the message handling function passed to Any of the control functions. If no such call happens, this function does nothing.

func (*BufferedProducer) DrainMessageChannel added in v0.5.0

func (prod *BufferedProducer) DrainMessageChannel(handleMessage func(*Message), timeout time.Duration) bool

DrainMessageChannel empties the message channel. This functions returns after the queue being empty for a given amount of time or when the queue has been closed and no more messages are available. The return value indicates wether the channel is empty or not.

func (*BufferedProducer) Enqueue added in v0.5.0

func (prod *BufferedProducer) Enqueue(msg *Message, timeout time.Duration)

Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.

func (*BufferedProducer) GetQueueTimeout added in v0.5.0

func (prod *BufferedProducer) GetQueueTimeout() time.Duration

GetQueueTimeout returns the duration this producer will block before a message is sent to the fallback. A value of -1 will cause the message to drop. A value of 0 will cause the producer to always block.

func (*BufferedProducer) MessageControlLoop added in v0.5.0

func (prod *BufferedProducer) MessageControlLoop(onMessage func(*Message))

MessageControlLoop provides a producer main loop that is sufficient for most use cases. ControlLoop will be called in a separate go routine. This function will block until a stop signal is received.

func (*BufferedProducer) TickerMessageControlLoop added in v0.5.0

func (prod *BufferedProducer) TickerMessageControlLoop(onMessage func(*Message), interval time.Duration, onTimeOut func())

TickerMessageControlLoop is like MessageLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.

type Config

type Config struct {
	Values  map[string]tcontainer.MarshalMap
	Plugins []PluginConfig
}

Config represents the top level config containing all plugin clonfigs

func ReadConfig

func ReadConfig(buffer []byte) (*Config, error)

ReadConfig creates a config from a yaml byte stream.

func ReadConfigFromFile added in v0.5.0

func ReadConfigFromFile(path string) (*Config, error)

ReadConfigFromFile parses a YAML config file into a new Config struct.

func (*Config) GetConsumers added in v0.5.0

func (conf *Config) GetConsumers() []PluginConfig

GetConsumers returns all consumer plugins from the config

func (*Config) GetProducers added in v0.5.0

func (conf *Config) GetProducers() []PluginConfig

GetProducers returns all producer plugins from the config

func (*Config) GetRouters added in v0.5.0

func (conf *Config) GetRouters() []PluginConfig

GetRouters returns all stream plugins from the config

func (*Config) Validate added in v0.5.0

func (conf *Config) Validate() error

Validate checks all plugin configs and plugins on validity. I.e. it checks on mandatory fields and correct implementation of consumer, producer or stream interface. It does NOT call configure for each plugin.

type Configurable added in v0.5.0

type Configurable interface {
	// Configure is called during NewPluginWithType
	Configure(PluginConfigReader)
}

Configurable defines an interface for structs that can be configured using a PluginConfigReader.

type Consumer

type Consumer interface {
	PluginWithState
	MessageSource

	// Consume should implement to main loop that fetches messages from a given
	// source and pushes it to the Message channel.
	Consume(*sync.WaitGroup)

	// Control returns write access to this consumer's control channel.
	// See PluginControl* constants.
	Control() chan<- PluginControl

	// GetShutdownTimeout returns the duration gollum will wait for this consumer
	// before canceling the shutdown process.
	GetShutdownTimeout() time.Duration
}

Consumer is an interface for plugins that receive data from outside sources and generate Message objects from this data.

type DirectProducer added in v0.5.0

type DirectProducer struct {
	SimpleProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

DirectProducer plugin base type

This type defines a common baseclass for producers.

func (*DirectProducer) Configure added in v0.5.0

func (prod *DirectProducer) Configure(conf PluginConfigReader)

Configure initializes the standard producer config values.

func (*DirectProducer) Enqueue added in v0.5.0

func (prod *DirectProducer) Enqueue(msg *Message, timeout time.Duration)

Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.

func (*DirectProducer) MessageControlLoop added in v0.5.0

func (prod *DirectProducer) MessageControlLoop(onMessage func(*Message))

MessageControlLoop provides a producer main loop that is sufficient for most use cases. ControlLoop will be called in a separate go routine. This function will block until a stop signal is received.

func (*DirectProducer) TickerMessageControlLoop added in v0.5.0

func (prod *DirectProducer) TickerMessageControlLoop(onMessage func(*Message),
	interval time.Duration, onTimeOut func())

TickerMessageControlLoop is like MessageLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.

type Filter

type Filter interface {
	ApplyFilter(msg *Message) (FilterResult, error)
}

A Filter defines an analysis step inside the message A filter also have to implement the modulator interface

type FilterArray added in v0.5.0

type FilterArray []Filter

FilterArray is a type wrapper to []Filter to make array of filters

func (FilterArray) ApplyFilter added in v0.5.0

func (filters FilterArray) ApplyFilter(msg *Message) (FilterResult, error)

ApplyFilter calls ApplyFilter on every filter Return FilterResultMessageReject in case of an error or if one filter rejects

type FilterModulator added in v0.5.0

type FilterModulator struct {
	Filter Filter
}

FilterModulator is a wrapper to provide a Filter as a Modulator

func NewFilterModulator added in v0.5.0

func NewFilterModulator(filter Filter) *FilterModulator

NewFilterModulator return a instance of FilterModulator

func (*FilterModulator) ApplyFilter added in v0.5.0

func (filterModulator *FilterModulator) ApplyFilter(msg *Message) (FilterResult, error)

ApplyFilter calls the Filter.ApplyFilter method

func (*FilterModulator) Modulate added in v0.5.0

func (filterModulator *FilterModulator) Modulate(msg *Message) ModulateResult

Modulate implementation for Filters

type FilterResult added in v0.5.0

type FilterResult uint64

FilterResult defines a set of results for filtering

func FilterResultMessageReject added in v0.5.0

func FilterResultMessageReject(stream MessageStreamID) FilterResult

FilterResultMessageReject indicates that a message is filtered and not pared along the stream

func (FilterResult) GetStreamID added in v0.5.0

func (f FilterResult) GetStreamID() MessageStreamID

GetStreamID converts a FilterResult to a stream id

type Formatter

type Formatter interface {
	ApplyFormatter(msg *Message) error
	CanBeApplied(msg *Message) bool
}

A Formatter defines a modification step inside the message A Formatter also have to implement the modulator interface

type FormatterArray added in v0.5.0

type FormatterArray []Formatter

FormatterArray is a type wrapper to []Formatter to make array of formatter

func (FormatterArray) ApplyFormatter added in v0.5.0

func (formatters FormatterArray) ApplyFormatter(msg *Message) error

ApplyFormatter calls ApplyFormatter on every formatter

func (FormatterArray) CanBeApplied added in v0.5.0

func (formatters FormatterArray) CanBeApplied(msg *Message) bool

CanBeApplied returns true if the array is not empty

type FormatterModulator added in v0.5.0

type FormatterModulator struct {
	Formatter Formatter
}

FormatterModulator is a wrapper to provide a Formatter as a Modulator

func NewFormatterModulator added in v0.5.0

func NewFormatterModulator(formatter Formatter) *FormatterModulator

NewFormatterModulator return a instance of FormatterModulator

func (*FormatterModulator) ApplyFormatter added in v0.5.0

func (formatterModulator *FormatterModulator) ApplyFormatter(msg *Message) error

ApplyFormatter calls the Formatter.ApplyFormatter method

func (*FormatterModulator) CanBeApplied added in v0.5.0

func (formatterModulator *FormatterModulator) CanBeApplied(msg *Message) bool

CanBeApplied returns true if the array is not empty

func (*FormatterModulator) Modulate added in v0.5.0

func (formatterModulator *FormatterModulator) Modulate(msg *Message) ModulateResult

Modulate implementation for Formatter

type GetAppliedContent added in v0.5.0

type GetAppliedContent func(msg *Message) []byte

GetAppliedContent is a func() to get message content from payload or meta data for later handling by plugins

func GetAppliedContentGetFunction added in v0.5.0

func GetAppliedContentGetFunction(applyTo string) GetAppliedContent

GetAppliedContentGetFunction returns a GetAppliedContent function

type LogConsumer

type LogConsumer struct {
	Consumer
	// contains filtered or unexported fields
}

LogConsumer is an internal consumer plugin used indirectly by the gollum log package.

func (*LogConsumer) Configure

func (cons *LogConsumer) Configure(conf PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*LogConsumer) Consume

func (cons *LogConsumer) Consume(threads *sync.WaitGroup)

Consume starts listening for control statements

func (*LogConsumer) Control

func (cons *LogConsumer) Control() chan<- PluginControl

Control returns a handle to the control channel

func (*LogConsumer) Fire added in v0.5.0

func (cons *LogConsumer) Fire(logrusEntry *logrus.Entry) error

Fire and Levels() implement the logrus.Hook interface

func (*LogConsumer) GetID added in v0.5.0

func (cons *LogConsumer) GetID() string

GetID returns the pluginID of the message source

func (*LogConsumer) GetShutdownTimeout added in v0.5.0

func (cons *LogConsumer) GetShutdownTimeout() time.Duration

GetShutdownTimeout always returns 1 millisecond

func (*LogConsumer) GetState added in v0.4.0

func (cons *LogConsumer) GetState() PluginState

GetState always returns PluginStateActive

func (*LogConsumer) IsBlocked added in v0.5.0

func (cons *LogConsumer) IsBlocked() bool

IsBlocked always returns false

func (*LogConsumer) Levels added in v0.5.0

func (cons *LogConsumer) Levels() []logrus.Level

Levels and Fire() implement the logrus.Hook interface

func (*LogConsumer) Streams

func (cons *LogConsumer) Streams() []MessageStreamID

Streams always returns an array with one member - the internal log stream

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is a container used for storing the internal state of messages. This struct is passed between consumers and producers.

func DeserializeMessage added in v0.4.0

func DeserializeMessage(data []byte) (*Message, error)

DeserializeMessage generates a message from a byte array produced by Message.Serialize. Please note that the payload is restored but the original data is not. As of this FreezeOriginal can be called again after this call.

func NewMessage

func NewMessage(source MessageSource, data []byte, metadata Metadata, streamID MessageStreamID) *Message

NewMessage creates a new message from a given data stream by copying data.

func (*Message) Clone added in v0.5.0

func (msg *Message) Clone() *Message

Clone returns a copy of this message, i.e. the payload is duplicated. The created timestamp is copied, too.

func (*Message) CloneOriginal added in v0.5.0

func (msg *Message) CloneOriginal() *Message

CloneOriginal returns a copy of this message with the original payload and stream. If FreezeOriginal has not been called before it will be at this point so that all subsequential calls will use the same original.

func (*Message) ExtendPayload added in v0.5.0

func (msg *Message) ExtendPayload(size int) []byte

ExtendPayload changes the size of the stored buffer. The current content will be preserved. If content does not need to be preserved use Resize.

func (*Message) FreezeOriginal added in v0.5.0

func (msg *Message) FreezeOriginal()

FreezeOriginal will take the current state of the message and store it as the "original" message. This function can only be called once for each message. Please note that this function only affects payload and metadata and can only be called once. Additional calls will have no effect. The original stream ID can be changed at any time by using SetOrigStreamID.

func (*Message) GetCreationTime added in v0.5.0

func (msg *Message) GetCreationTime() time.Time

GetCreationTime returns the time when this message was created.

func (*Message) GetMetadata added in v0.5.0

func (msg *Message) GetMetadata() Metadata

GetMetadata returns the current Metadata. If no metadata is present, the metadata map will be created by this call.

func (*Message) GetOrigRouter added in v0.5.0

func (msg *Message) GetOrigRouter() Router

GetOrigRouter returns the stream object behind the original StreamID.

func (*Message) GetOrigStreamID added in v0.5.0

func (msg *Message) GetOrigStreamID() MessageStreamID

GetOrigStreamID returns the original/first streamID

func (*Message) GetPayload added in v0.5.0

func (msg *Message) GetPayload() []byte

GetPayload returns the stored data

func (*Message) GetPrevRouter added in v0.5.0

func (msg *Message) GetPrevRouter() Router

GetPrevRouter returns the stream object behind the previous StreamID.

func (*Message) GetPrevStreamID added in v0.5.0

func (msg *Message) GetPrevStreamID() MessageStreamID

GetPrevStreamID returns the last "hop" of this message.

func (*Message) GetRouter added in v0.5.0

func (msg *Message) GetRouter() Router

GetRouter returns the stream object behind the current StreamID.

func (*Message) GetSource added in v0.5.0

func (msg *Message) GetSource() MessageSource

GetSource returns the message's source (can be nil).

func (*Message) GetStreamID added in v0.5.0

func (msg *Message) GetStreamID() MessageStreamID

GetStreamID returns the stream this message is currently routed to.

func (*Message) ResizePayload added in v0.5.0

func (msg *Message) ResizePayload(size int) []byte

ResizePayload changes the size of the stored buffer. The current content is not guaranteed to be preserved. If content needs to be preserved use Extend.

func (*Message) Serialize added in v0.4.0

func (msg *Message) Serialize() ([]byte, error)

Serialize generates a new payload containing all data that can be preserved over shutdown (i.e. no data directly referencing runtime components). The serialized data is based on the current message state and does not preserve the original data created by FreezeOriginal.

func (*Message) SetStreamID added in v0.5.0

func (msg *Message) SetStreamID(streamID MessageStreamID)

SetStreamID sets a new stream and stores the current one in the previous stream field. This method does not affect the original stream ID.

func (*Message) SetlStreamIDAsOriginal added in v0.5.0

func (msg *Message) SetlStreamIDAsOriginal(streamID MessageStreamID)

SetlStreamIDAsOriginal acts like SetStreamID but always sets the original stream ID, too. This method should be used before a message is routed for the first time (e.g. in a consumer) or when the original stream should change.

func (*Message) StorePayload added in v0.5.0

func (msg *Message) StorePayload(data []byte)

StorePayload copies data into the hold data buffer. If the buffer can hold data it is resized, otherwise a new buffer will be allocated.

func (*Message) String

func (msg *Message) String() string

String implements the stringer interface

func (*Message) TryGetMetadata added in v0.5.0

func (msg *Message) TryGetMetadata() Metadata

TryGetMetadata returns the current Metadata. If no metadata is present, nil will be returned.

type MessageBatch

type MessageBatch struct {
	// contains filtered or unexported fields
}

MessageBatch is a helper class for producers to format and store messages into a single buffer that is flushed to an io.Writer. You can use the Reached* functions to determine whether a flush should be called, i.e. if a timeout or size threshold has been reached.

func NewMessageBatch

func NewMessageBatch(maxMessageCount int) MessageBatch

NewMessageBatch creates a new MessageBatch with a given size (in bytes) and a given formatter.

func (*MessageBatch) AfterFlushDo added in v0.4.2

func (batch *MessageBatch) AfterFlushDo(callback func() error) error

AfterFlushDo calls a function after a currently running flush is done. It also blocks any flush during the execution of callback. Returns the error returned by callback

func (*MessageBatch) Append

func (batch *MessageBatch) Append(msg *Message) bool

Append formats a message and appends it to the internal buffer. If the message does not fit into the buffer this function returns false. If the message can never fit into the buffer (too large), true is returned and an error is logged.

func (*MessageBatch) AppendOrBlock added in v0.4.0

func (batch *MessageBatch) AppendOrBlock(msg *Message) bool

AppendOrBlock works like Append but will block until Append returns true. If the batch was closed during this call, false is returned.

func (*MessageBatch) AppendOrFlush added in v0.4.0

func (batch *MessageBatch) AppendOrFlush(msg *Message, flushBuffer func(), canBlock func() bool, tryFallback func(*Message))

AppendOrFlush is a common combinatorial pattern of Append and AppendOrBlock. If append fails, flush is called, followed by AppendOrBlock if blocking is allowed. If AppendOrBlock fails (or blocking is not allowed) the message is sent to the fallback.

func (*MessageBatch) Close added in v0.4.0

func (batch *MessageBatch) Close(assemble AssemblyFunc, timeout time.Duration)

Close disables Append, calls flush and waits for this call to finish. Timeout is passed to WaitForFlush.

func (*MessageBatch) Flush

func (batch *MessageBatch) Flush(assemble AssemblyFunc)

Flush writes the content of the buffer to a given resource and resets the internal state, i.e. the buffer is empty after a call to Flush. Writing will be done in a separate go routine to be non-blocking.

The validate callback will be called after messages have been successfully written to the io.Writer. If validate returns false the buffer will not be resetted (automatic retry). If validate is nil a return value of true is assumed (buffer reset).

The onError callback will be called if the io.Writer returned an error. If onError returns false the buffer will not be resetted (automatic retry). If onError is nil a return value of true is assumed (buffer reset).

func (MessageBatch) IsClosed added in v0.4.0

func (batch MessageBatch) IsClosed() bool

IsClosed returns true of Close has been called at least once.

func (MessageBatch) IsEmpty

func (batch MessageBatch) IsEmpty() bool

IsEmpty returns true if no data is stored in the front buffer, i.e. if no data is scheduled for flushing.

func (*MessageBatch) Len added in v0.4.0

func (batch *MessageBatch) Len() int

Len returns the length of one buffer

func (MessageBatch) ReachedSizeThreshold

func (batch MessageBatch) ReachedSizeThreshold(size int) bool

ReachedSizeThreshold returns true if the bytes stored in the buffer are above or equal to the size given. If there is no data this function returns false.

func (MessageBatch) ReachedTimeThreshold

func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool

ReachedTimeThreshold returns true if the last flush was more than timeout ago. If there is no data this function returns false.

func (*MessageBatch) Touch

func (batch *MessageBatch) Touch()

Touch resets the timer queried by ReachedTimeThreshold, i.e. this resets the automatic flush timeout

func (*MessageBatch) WaitForFlush

func (batch *MessageBatch) WaitForFlush(timeout time.Duration)

WaitForFlush blocks until the current flush command returns. Passing a timeout > 0 will unblock this function after the given duration at the latest.

type MessageData added in v0.5.0

type MessageData struct {
	// contains filtered or unexported fields
}

MessageData is a container for the message payload, streamID and an optional message key The struct is used by Message.data for the current message data and orig for the original message data

type MessageQueue added in v0.5.0

type MessageQueue chan *Message

MessageQueue is the type used for transferring messages between plugins

func NewMessageQueue added in v0.5.0

func NewMessageQueue(capacity int) MessageQueue

NewMessageQueue creates a new message buffer of the given capacity

func (MessageQueue) Close added in v0.5.0

func (channel MessageQueue) Close()

Close stops the buffer from being able to receive messages

func (MessageQueue) GetNumQueued added in v0.5.0

func (channel MessageQueue) GetNumQueued() int

GetNumQueued returns the number of queued messages. Please note that this information can be extremely volatile in multithreaded environments.

func (MessageQueue) IsEmpty added in v0.5.0

func (channel MessageQueue) IsEmpty() bool

IsEmpty returns true if no element is currently stored in the channel. Please note that this information can be extremely volatile in multithreaded environments.

func (MessageQueue) Pop added in v0.5.0

func (channel MessageQueue) Pop() (*Message, bool)

Pop returns a message from the buffer

func (MessageQueue) PopWithTimeout added in v0.5.0

func (channel MessageQueue) PopWithTimeout(maxDuration time.Duration) (*Message, bool)

PopWithTimeout returns a message from the buffer with a runtime <= maxDuration. If the channel is empty or the timout hit, the second return value is false.

func (MessageQueue) Push added in v0.5.0

func (channel MessageQueue) Push(msg *Message, timeout time.Duration) (state MessageQueueResult)

Push adds a message to the MessageStream. waiting for a timeout instead of just blocking. Passing a timeout of -1 will discard the message. Passing a timout of 0 will always block. Messages that time out will be passed to the sent to the fallback queue if a Dropped consumer exists. The source parameter is used when a message is sent to the fallback, i.e. it is passed to the Drop function.

type MessageQueueResult added in v0.5.0

type MessageQueueResult int

MessageQueueResult is used as a return value for the Enqueue method

type MessageSource

type MessageSource interface {
	// IsActive returns true if the source can produce messages
	IsActive() bool

	// IsBlocked returns true if the source cannot produce messages
	IsBlocked() bool

	// GetID returns the pluginID of the message source
	GetID() string
}

MessageSource defines methods that are common to all message sources. Currently this is only a placeholder.

type MessageStreamID

type MessageStreamID uint64

MessageStreamID is the "compiled name" of a stream

func GetStreamID

func GetStreamID(stream string) MessageStreamID

GetStreamID is deprecated

func (MessageStreamID) GetName added in v0.5.0

func (streamID MessageStreamID) GetName() string

GetName resolves the name of the streamID

type Metadata added in v0.5.0

type Metadata map[string][]byte

Metadata is a map for optional meta data which can set by consumers and modulators

func (Metadata) Clone added in v0.5.0

func (meta Metadata) Clone() (clone Metadata)

Clone creates an exact copy of this metadata map.

func (Metadata) Delete added in v0.5.0

func (meta Metadata) Delete(key string)

Delete removes the given key from the map

func (Metadata) GetValue added in v0.5.0

func (meta Metadata) GetValue(key string) []byte

GetValue returns a meta data value by key. This function returns a value if key is not set, too. In that case it will return an empty byte array.

func (Metadata) GetValueString added in v0.5.0

func (meta Metadata) GetValueString(key string) string

GetValueString casts the results of GetValue to a string

func (Metadata) SetValue added in v0.5.0

func (meta Metadata) SetValue(key string, value []byte)

SetValue set a key value pair at meta data

func (Metadata) TryGetValue added in v0.5.0

func (meta Metadata) TryGetValue(key string) ([]byte, bool)

TryGetValue behaves like GetValue but returns a second value which denotes if the key was set or not.

func (Metadata) TryGetValueString added in v0.5.0

func (meta Metadata) TryGetValueString(key string) (string, bool)

TryGetValueString casts the data result of TryGetValue to string

func (Metadata) TrySetValue added in v0.5.0

func (meta Metadata) TrySetValue(key string, value []byte) bool

TrySetValue sets a key value pair only if the key is already existing

type ModulateResult added in v0.5.0

type ModulateResult int

ModulateResult defines a set of results used to control the message flow induced by Modulator actions.

type ModulateResultError added in v0.5.0

type ModulateResultError struct {
	// contains filtered or unexported fields
}

ModulateResultError is used by modulators to return a problem that happened during the modulation process, caused by the modulator.

func NewModulateResultError added in v0.5.0

func NewModulateResultError(message string, values ...interface{}) ModulateResultError

NewModulateResultError creates a new ModulateResultError with the given message.

func (ModulateResultError) Error added in v0.5.0

func (p ModulateResultError) Error() string

Error fullfills the golang error interface

type Modulator added in v0.5.0

type Modulator interface {
	// Modulate processes the given message for analysis or modification.
	// The result of this function defines how and if a message proceeds
	// along the ModulateResult.
	Modulate(msg *Message) ModulateResult
}

A Modulator defines a modification or analysis step inside the message ModulateResult. It may alter messages or stop the ModulateResult for this message.

type ModulatorArray added in v0.5.0

type ModulatorArray []Modulator

ModulatorArray is a type wrapper to []Modulator to make array of modulators compatible with the modulator interface

func (ModulatorArray) Modulate added in v0.5.0

func (modulators ModulatorArray) Modulate(msg *Message) ModulateResult

Modulate calls Modulate on every Modulator in the array and react according to the definition of each ModulateResult state.

type Plugin

type Plugin interface {
	Configurable
}

Plugin is the base class for any runtime class that can be configured and instantiated during runtime.

func NewPluginWithConfig added in v0.5.0

func NewPluginWithConfig(config PluginConfig) (Plugin, error)

NewPluginWithConfig creates a new plugin from the type information stored in its config. This function internally calls NewPluginWithType.

type PluginConfig

type PluginConfig struct {
	ID       string
	Typename string
	Enable   bool
	Settings tcontainer.MarshalMap
	// contains filtered or unexported fields
}

PluginConfig is a configuration for a specific plugin

func NewNestedPluginConfig added in v0.5.0

func NewNestedPluginConfig(defaultTypename string, values tcontainer.MarshalMap) (PluginConfig, error)

NewNestedPluginConfig creates a pluginconfig based on a given set of config values. The plugin created does not have an id, i.e. it is considered anonymous.

func NewPluginConfig

func NewPluginConfig(pluginID string, defaultTypename string) PluginConfig

NewPluginConfig creates a new plugin config with default values. By default the plugin is enabled, has one instance and has no additional settings. Passing an empty pluginID makes the plugin anonymous. The defaultTypename may be overridden by a later call to read.

func (*PluginConfig) Override

func (conf *PluginConfig) Override(key string, value interface{})

Override sets or override a configuration value for non-predefined options.

func (*PluginConfig) Read

func (conf *PluginConfig) Read(values tcontainer.MarshalMap) error

Read analyzes a given key/value map to extract the configuration values valid for each plugin. All non-default values are written to the Settings member. All keys will be converted to lowercase.

func (PluginConfig) Validate

func (conf PluginConfig) Validate() error

Validate should be called after a configuration has been processed. It will check the keys read from the config files against the keys requested up to this point. Unknown keys will be returned as errors

type PluginConfigReader added in v0.5.0

type PluginConfigReader struct {
	WithError PluginConfigReaderWithError
	Errors    *tgo.ErrorStack
}

PluginConfigReader provides another convenience wrapper on top of a PluginConfigReaderWithError. All functions of this reader are stripped from returning errors. Errors from calls are collected internally and can be accessed at any given point.

func NewPluginConfigReader added in v0.5.0

func NewPluginConfigReader(config *PluginConfig) PluginConfigReader

NewPluginConfigReader creates a new reader on top of a given config.

func NewPluginConfigReaderFromReader added in v0.5.0

func NewPluginConfigReaderFromReader(reader PluginConfigReaderWithError) PluginConfigReader

NewPluginConfigReaderFromReader encapsulates a WithError reader that is already attached to a config to read from.

func (*PluginConfigReader) Configure added in v0.5.0

func (reader *PluginConfigReader) Configure(item interface{}) error

Configure will configure a given item by scanning for plugin struct tags and calling the Configure method. Nested types will be traversed automatically.

func (*PluginConfigReader) GetArray added in v0.5.0

func (reader *PluginConfigReader) GetArray(key string, defaultValue []interface{}) []interface{}

GetArray tries to read a untyped array from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetBool added in v0.5.0

func (reader *PluginConfigReader) GetBool(key string, defaultValue bool) bool

GetBool tries to read a boolean value from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetFilterArray added in v0.5.0

func (reader *PluginConfigReader) GetFilterArray(key string, logger logrus.FieldLogger, defaultValue FilterArray) FilterArray

GetFilterArray returns an array of filter plugins.

func (*PluginConfigReader) GetFormatterArray added in v0.5.0

func (reader *PluginConfigReader) GetFormatterArray(key string, logger logrus.FieldLogger, defaultValue FormatterArray) FormatterArray

GetFormatterArray returns an array of formatter plugins.

func (*PluginConfigReader) GetID added in v0.5.0

func (reader *PluginConfigReader) GetID() string

GetID returns the plugin's id

func (*PluginConfigReader) GetInt added in v0.5.0

func (reader *PluginConfigReader) GetInt(key string, defaultValue int64) int64

GetInt tries to read a integer value from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetLogger added in v0.5.0

func (reader *PluginConfigReader) GetLogger() logrus.FieldLogger

GetLogger creates a new scoped logger (logrus.FieldLogger) for the plugin contained in this config.

func (*PluginConfigReader) GetMap added in v0.5.0

func (reader *PluginConfigReader) GetMap(key string, defaultValue tcontainer.MarshalMap) tcontainer.MarshalMap

GetMap tries to read a MarshalMap from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetModulatorArray added in v0.5.0

func (reader *PluginConfigReader) GetModulatorArray(key string, logger logrus.FieldLogger, defaultValue ModulatorArray) ModulatorArray

GetModulatorArray reads an array of modulator plugins

func (*PluginConfigReader) GetPlugin added in v0.5.0

func (reader *PluginConfigReader) GetPlugin(key string, defaultType string, defaultConfig tcontainer.MarshalMap) Plugin

GetPlugin creates a nested plugin from a config map. The default type has to be passed and is overridden if the config specifies a type. The value stored in the config can either be a string or a map. If a map is found it is used to override defaultConfig. If a string is found it is used to override defaultType.

func (*PluginConfigReader) GetPluginArray added in v0.5.0

func (reader *PluginConfigReader) GetPluginArray(key string, defaultValue []Plugin) []Plugin

GetPluginArray tries to read a array of plugins (type to config) from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetStreamArray added in v0.5.0

func (reader *PluginConfigReader) GetStreamArray(key string, defaultValue []MessageStreamID) []MessageStreamID

GetStreamArray tries to read a string array from a pluginconfig and translates all values to streamIds. If the key is not found defaultValue is returned.

func (*PluginConfigReader) GetStreamID added in v0.5.0

func (reader *PluginConfigReader) GetStreamID(key string, defaultValue MessageStreamID) MessageStreamID

GetStreamID tries to read a StreamID value from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetStreamMap added in v0.5.0

func (reader *PluginConfigReader) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string

GetStreamMap tries to read a stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.

func (*PluginConfigReader) GetStreamRoutes added in v0.5.0

func (reader *PluginConfigReader) GetStreamRoutes(key string, defaultValue map[MessageStreamID][]MessageStreamID) map[MessageStreamID][]MessageStreamID

GetStreamRoutes tries to read a stream to stream map from a plugin config. If no routes are defined an empty map is returned

func (*PluginConfigReader) GetString added in v0.5.0

func (reader *PluginConfigReader) GetString(key string, defaultValue string) string

GetString tries to read a string value from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetStringArray added in v0.5.0

func (reader *PluginConfigReader) GetStringArray(key string, defaultValue []string) []string

GetStringArray tries to read a string array from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetStringMap added in v0.5.0

func (reader *PluginConfigReader) GetStringMap(key string, defaultValue map[string]string) map[string]string

GetStringMap tries to read a string to string map from a PluginConfig. If the key is not found defaultValue is returned.

func (*PluginConfigReader) GetSubLogger added in v0.5.0

func (reader *PluginConfigReader) GetSubLogger(subScope string) logrus.FieldLogger

GetSubLogger creates a new sub-scoped logger (logrus.FieldLogger) for the plugin contained in this config

func (*PluginConfigReader) GetTypename added in v0.5.0

func (reader *PluginConfigReader) GetTypename() string

GetTypename returns the plugin's typename

func (*PluginConfigReader) GetURL added in v0.5.0

func (reader *PluginConfigReader) GetURL(key string, defaultValue string) *url.URL

GetURL tries to read an URL struct from a PluginConfig. If that value is not found nil is returned.

func (*PluginConfigReader) GetUint added in v0.5.0

func (reader *PluginConfigReader) GetUint(key string, defaultValue uint64) uint64

GetUint tries to read an unsigned integer value from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) GetValue added in v0.5.0

func (reader *PluginConfigReader) GetValue(key string, defaultValue interface{}) interface{}

GetValue tries to read a untyped value from a PluginConfig. If that value is not found defaultValue is returned.

func (*PluginConfigReader) HasValue added in v0.5.0

func (reader *PluginConfigReader) HasValue(key string) bool

HasValue returns true if the given key has been set as a config option. This function only takes settings into account.

type PluginConfigReaderWithError added in v0.5.0

type PluginConfigReaderWithError struct {
	// contains filtered or unexported fields
}

PluginConfigReaderWithError is a read-only wrapper on top of a plugin config that provides convenience functions for accessing values from the wrapped config.

func NewPluginConfigReaderWithError added in v0.5.0

func NewPluginConfigReaderWithError(config *PluginConfig) PluginConfigReaderWithError

NewPluginConfigReaderWithError creates a new raw reader for a given config. In contrast to the PluginConfigReaderWithError all functions return an error next to the value.

func (PluginConfigReaderWithError) GetArray added in v0.5.0

func (reader PluginConfigReaderWithError) GetArray(key string, defaultValue []interface{}) ([]interface{}, error)

GetArray tries to read a untyped array from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetBool added in v0.5.0

func (reader PluginConfigReaderWithError) GetBool(key string, defaultValue bool) (bool, error)

GetBool tries to read a boolean value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetFilterArray added in v0.5.0

func (reader PluginConfigReaderWithError) GetFilterArray(key string, logger logrus.FieldLogger,
	defaultValue FilterArray) (FilterArray, error)

GetFilterArray returns an array of filter plugins.

func (PluginConfigReaderWithError) GetFloat added in v0.5.0

func (reader PluginConfigReaderWithError) GetFloat(key string, defaultValue float64) (float64, error)

GetFloat tries to read a float value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetFormatterArray added in v0.5.0

func (reader PluginConfigReaderWithError) GetFormatterArray(key string, logger logrus.FieldLogger, defaultValue FormatterArray) (FormatterArray, error)

GetFormatterArray returns an array of formatter plugins.

func (PluginConfigReaderWithError) GetID added in v0.5.0

func (reader PluginConfigReaderWithError) GetID() string

GetID returns the plugin's id

func (PluginConfigReaderWithError) GetInt added in v0.5.0

func (reader PluginConfigReaderWithError) GetInt(key string, defaultValue int64) (int64, error)

GetInt tries to read a integer value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetLogger added in v0.5.0

func (reader PluginConfigReaderWithError) GetLogger() logrus.FieldLogger

GetLogger creates a logger scoped for the plugin contained in this config.

func (PluginConfigReaderWithError) GetMap added in v0.5.0

GetMap tries to read a MarshalMap from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetModulatorArray added in v0.5.0

func (reader PluginConfigReaderWithError) GetModulatorArray(key string, logger logrus.FieldLogger,
	defaultValue ModulatorArray) (ModulatorArray, error)

GetModulatorArray returns an array of modulator plugins.

func (PluginConfigReaderWithError) GetPlugin added in v0.5.0

func (reader PluginConfigReaderWithError) GetPlugin(key string, defaultType string, defaultConfig tcontainer.MarshalMap) (Plugin, error)

GetPlugin creates a nested plugin from a config map. The default type has to be passed and is overridden if the config specifies a type. The value stored in the config can either be a string or a map. If a map is found it is used to override defaultConfig. If a string is found it is used to override defaultType.

func (PluginConfigReaderWithError) GetPluginArray added in v0.5.0

func (reader PluginConfigReaderWithError) GetPluginArray(key string, defaultValue []Plugin) ([]Plugin, error)

GetPluginArray tries to read a array of plugins (type to config) from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetStreamArray added in v0.5.0

func (reader PluginConfigReaderWithError) GetStreamArray(key string, defaultValue []MessageStreamID) ([]MessageStreamID, error)

GetStreamArray tries to read a string array from a pluginconfig and translates all values to streamIds. If the key is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetStreamID added in v0.5.0

func (reader PluginConfigReaderWithError) GetStreamID(key string, defaultValue MessageStreamID) (MessageStreamID, error)

GetStreamID tries to read a StreamID value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetStreamMap added in v0.5.0

func (reader PluginConfigReaderWithError) GetStreamMap(key string, defaultValue string) (map[MessageStreamID]string, error)

GetStreamMap tries to read a stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.

func (PluginConfigReaderWithError) GetStreamRoutes added in v0.5.0

func (reader PluginConfigReaderWithError) GetStreamRoutes(key string, defaultValue map[MessageStreamID][]MessageStreamID) (map[MessageStreamID][]MessageStreamID, error)

GetStreamRoutes tries to read a stream to stream map from a plugin config. If no routes are defined an empty map is returned

func (PluginConfigReaderWithError) GetString added in v0.5.0

func (reader PluginConfigReaderWithError) GetString(key string, defaultValue string) (string, error)

GetString tries to read a string value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetStringArray added in v0.5.0

func (reader PluginConfigReaderWithError) GetStringArray(key string, defaultValue []string) ([]string, error)

GetStringArray tries to read a string array from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetStringMap added in v0.5.0

func (reader PluginConfigReaderWithError) GetStringMap(key string, defaultValue map[string]string) (map[string]string, error)

GetStringMap tries to read a string to string map from a PluginConfig. If the key is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetSubLogger added in v0.5.0

func (reader PluginConfigReaderWithError) GetSubLogger(subScope string) logrus.FieldLogger

GetSubLogger creates a logger scoped gor the plugin contained in this config, with an additional subscope.

func (PluginConfigReaderWithError) GetTypename added in v0.5.0

func (reader PluginConfigReaderWithError) GetTypename() string

GetTypename returns the plugin's typename

func (PluginConfigReaderWithError) GetURL added in v0.5.0

func (reader PluginConfigReaderWithError) GetURL(key string, defaultValue string) (*url.URL, error)

GetURL tries to read an URL struct from a PluginConfig. If that value is not found nil is returned.

func (PluginConfigReaderWithError) GetUint added in v0.5.0

func (reader PluginConfigReaderWithError) GetUint(key string, defaultValue uint64) (uint64, error)

GetUint tries to read an unsigned integer value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) GetValue added in v0.5.0

func (reader PluginConfigReaderWithError) GetValue(key string, defaultValue interface{}) interface{}

GetValue tries to read a untyped value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfigReaderWithError) HasValue added in v0.5.0

func (reader PluginConfigReaderWithError) HasValue(key string) bool

HasValue returns true if the given key has been set as a config option. This function only takes settings into account.

type PluginControl

type PluginControl int

PluginControl is an enumeration used to pass signals to plugins

type PluginMetric added in v0.5.0

type PluginMetric struct {
}

PluginMetric class for plugin based metrics

func (*PluginMetric) DecWorker added in v0.5.0

func (metric *PluginMetric) DecWorker()

DecWorker decrease the active worker count

func (*PluginMetric) IncWorker added in v0.5.0

func (metric *PluginMetric) IncWorker()

IncWorker increase the active worker count

func (*PluginMetric) Init added in v0.5.0

func (metric *PluginMetric) Init()

Init increase the plugin state init count

func (*PluginMetric) UpdateStateMetric added in v0.5.0

func (metric *PluginMetric) UpdateStateMetric(prevState, nextState string)

UpdateStateMetric decrease the prev plugin state and increase the next plugin state

type PluginRunState

type PluginRunState struct {
	// contains filtered or unexported fields
}

PluginRunState is used in some plugins to store information about the execution state of the plugin (i.e. if it is running or not) as well as threading primitives that enable gollum to wait for a plugin top properly shut down.

func NewPluginRunState added in v0.4.0

func NewPluginRunState() *PluginRunState

NewPluginRunState creates a new plugin state helper

func (*PluginRunState) AddWorker

func (state *PluginRunState) AddWorker()

AddWorker adds a worker to the waitgroup configured by SetWorkerWaitGroup.

func (*PluginRunState) GetState added in v0.4.0

func (state *PluginRunState) GetState() PluginState

GetState returns the current plugin state casted to the correct type

func (*PluginRunState) GetStateString added in v0.5.0

func (state *PluginRunState) GetStateString() string

GetStateString returns the current state as string

func (*PluginRunState) SetState added in v0.4.0

func (state *PluginRunState) SetState(nextState PluginState)

SetState sets a new plugin state casted to the correct type

func (*PluginRunState) SetWorkerWaitGroup

func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup sets the WaitGroup used to manage workers

func (*PluginRunState) WorkerDone

func (state *PluginRunState) WorkerDone()

WorkerDone removes a worker from the waitgroup configured by SetWorkerWaitGroup.

type PluginState added in v0.4.0

type PluginState int32

PluginState is an enumeration used to describe the current working state of a plugin

type PluginStructTag added in v0.5.0

type PluginStructTag reflect.StructTag

PluginStructTag extends reflect.StructTag by convenience methods used to retrieve auto configure values used by PluginConfigReader.

func (PluginStructTag) GetBool added in v0.5.0

func (tag PluginStructTag) GetBool() bool

GetBool returns the default boolean value for an auto configured field. When not set, false is returned.

func (PluginStructTag) GetByteArray added in v0.5.0

func (tag PluginStructTag) GetByteArray() []byte

GetByteArray returns the default byte array value for an auto configured field. When not set an empty array is returned.

func (PluginStructTag) GetInt added in v0.5.0

func (tag PluginStructTag) GetInt() int64

GetInt returns the default integer value for an auto configured field. When not set, 0 is returned. Metrics are not applied to this value.

func (PluginStructTag) GetMetricScale added in v0.5.0

func (tag PluginStructTag) GetMetricScale() int64

GetMetricScale returns the scale as defined by the "metric" tag as a number.

func (PluginStructTag) GetStream added in v0.5.0

func (tag PluginStructTag) GetStream() MessageStreamID

GetStream returns the default message stream value for an auto configured field. When not set, InvalidStream is returned.

func (PluginStructTag) GetStreamArray added in v0.5.0

func (tag PluginStructTag) GetStreamArray() []MessageStreamID

GetStreamArray returns the default message stream array value for an auto configured field. When not set an empty array is returned.

func (PluginStructTag) GetString added in v0.5.0

func (tag PluginStructTag) GetString() string

GetString returns the default string value for an auto configured field. When not set, "" is returned.

func (PluginStructTag) GetStringArray added in v0.5.0

func (tag PluginStructTag) GetStringArray() []string

GetStringArray returns the default string array value for an auto configured field. When not set an empty array is returned.

func (PluginStructTag) GetUint added in v0.5.0

func (tag PluginStructTag) GetUint() uint64

GetUint returns the default unsigned integer value for an auto configured field. When not set, 0 is returned. Metrics are not applied to this value.

type PluginWithState added in v0.4.0

type PluginWithState interface {
	Plugin
	// GetState returns the current state of a plugin
	GetState() PluginState
}

PluginWithState allows certain plugins to give information about their runstate

type Producer

type Producer interface {
	PluginWithState
	MessageSource

	// Enqueue sends a message to the producer. The producer may reject
	// the message or try a fallback after a given timeout. Enqueue can block.
	Enqueue(msg *Message, timeout time.Duration)

	// Produce should implement a main loop that passes messages from the
	// message channel to some other service like the console.
	// This can be part of this function or a separate go routine.
	// Produce is always called as a go routine.
	Produce(workers *sync.WaitGroup)

	// Streams returns the streamIDs this producer is listening to.
	Streams() []MessageStreamID

	// Control returns write access to this producer's control channel.
	// See ProducerControl* constants.
	Control() chan<- PluginControl

	// GetShutdownTimeout returns the duration gollum will wait for this producer
	// before canceling the shutdown process.
	GetShutdownTimeout() time.Duration
}

Producer is an interface for plugins that pass messages to other services, files or storages.

type Router added in v0.5.0

type Router interface {
	Modulator

	// StreamID returns the stream id this plugin is bound to.
	GetStreamID() MessageStreamID

	// GetID returns the pluginID of the message source
	GetID() string

	// AddProducer adds one or more producers to this stream, i.e. the producers
	// listening to messages on this stream.
	AddProducer(producers ...Producer)

	// Enqueue sends a given message to all registered end points.
	// This function is called by Route() which should be preferred over this
	// function when sending messages.
	Enqueue(msg *Message) error

	// GetTimeout returns the timeout configured for this router
	GetTimeout() time.Duration

	// Start starts the router by the coordinator.StartPlugins() method
	Start() error
}

Router defines the interface for all stream plugins

type ScopedLogger added in v0.5.0

type ScopedLogger interface {
	GetLogger() logrus.FieldLogger
}

ScopedLogger defines an interface for structs that have a log scope attached

type ScopedModulator added in v0.5.0

type ScopedModulator interface {
	Modulator

	// SetLogger defines the log scope for this modulator.
	SetLogger(logger logrus.FieldLogger)
}

ScopedModulator extends the Modulator interface by adding a log scope. This interface is implemented by modulators that are embedded in plugins that already have their own scope.

type SerialMessageSource

type SerialMessageSource interface {
	AsyncMessageSource

	// Notify the end of the response stream
	ResponseDone()
}

SerialMessageSource extends the AsyncMessageSource interface to allow waiting for all parts of the response to be submitted.

type SerializedMessage added in v0.4.0

type SerializedMessage struct {
	StreamID         *uint64                `protobuf:"varint,1,req,name=StreamID" json:"StreamID,omitempty"`
	Data             *SerializedMessageData `protobuf:"bytes,2,req,name=Data" json:"Data,omitempty"`
	PrevStreamID     *uint64                `protobuf:"varint,3,opt,name=PrevStreamID" json:"PrevStreamID,omitempty"`
	OrigStreamID     *uint64                `protobuf:"varint,4,opt,name=OrigStreamID" json:"OrigStreamID,omitempty"`
	Timestamp        *int64                 `protobuf:"varint,5,opt,name=Timestamp" json:"Timestamp,omitempty"`
	Original         *SerializedMessageData `protobuf:"bytes,6,opt,name=Original" json:"Original,omitempty"`
	XXX_unrecognized []byte                 `json:"-"`
}

func (*SerializedMessage) Descriptor added in v0.5.0

func (*SerializedMessage) Descriptor() ([]byte, []int)

func (*SerializedMessage) GetData added in v0.4.0

func (*SerializedMessage) GetOrigStreamID added in v0.5.0

func (m *SerializedMessage) GetOrigStreamID() uint64

func (*SerializedMessage) GetOriginal added in v0.5.0

func (m *SerializedMessage) GetOriginal() *SerializedMessageData

func (*SerializedMessage) GetPrevStreamID added in v0.4.0

func (m *SerializedMessage) GetPrevStreamID() uint64

func (*SerializedMessage) GetStreamID added in v0.4.0

func (m *SerializedMessage) GetStreamID() uint64

func (*SerializedMessage) GetTimestamp added in v0.4.0

func (m *SerializedMessage) GetTimestamp() int64

func (*SerializedMessage) ProtoMessage added in v0.4.0

func (*SerializedMessage) ProtoMessage()

func (*SerializedMessage) Reset added in v0.4.0

func (m *SerializedMessage) Reset()

func (*SerializedMessage) String added in v0.4.0

func (m *SerializedMessage) String() string

type SerializedMessageData added in v0.5.0

type SerializedMessageData struct {
	Data             []byte            `protobuf:"bytes,1,req,name=Data" json:"Data,omitempty"`
	Metadata         map[string][]byte `` /* 136-byte string literal not displayed */
	XXX_unrecognized []byte            `json:"-"`
}

func (*SerializedMessageData) Descriptor added in v0.5.0

func (*SerializedMessageData) Descriptor() ([]byte, []int)

func (*SerializedMessageData) GetData added in v0.5.0

func (m *SerializedMessageData) GetData() []byte

func (*SerializedMessageData) GetMetadata added in v0.5.0

func (m *SerializedMessageData) GetMetadata() map[string][]byte

func (*SerializedMessageData) ProtoMessage added in v0.5.0

func (*SerializedMessageData) ProtoMessage()

func (*SerializedMessageData) Reset added in v0.5.0

func (m *SerializedMessageData) Reset()

func (*SerializedMessageData) String added in v0.5.0

func (m *SerializedMessageData) String() string

type SetAppliedContent added in v0.5.0

type SetAppliedContent func(msg *Message, content []byte)

SetAppliedContent is a func() to store message content to payload or meta data

func GetAppliedContentSetFunction added in v0.5.0

func GetAppliedContentSetFunction(applyTo string) SetAppliedContent

GetAppliedContentSetFunction returns SetAppliedContent function to store message content

type SimpleConsumer added in v0.5.0

type SimpleConsumer struct {
	Logger logrus.FieldLogger
	// contains filtered or unexported fields
}

SimpleConsumer consumer

This type defines a common baseclass for all consumers. All consumer plugins should derive from this class as all required basic functions are already implemented in a general way.

Parameters

- Streams: Defines a list of streams a consumer will send to. This parameter is mandatory. When using "*" messages will be sent only to the internal "*" stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

- ShutdownTimeoutMs: Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

- Modulators: Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

- ModulatorRoutines: Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

- ModulatorQueueSize: Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.

func (*SimpleConsumer) AddHealthCheckAt added in v0.5.0

func (cons *SimpleConsumer) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)

AddHealthCheckAt adds a health check at a subpath (http://<addr>:<port>/<plugin_id><path>)

func (*SimpleConsumer) AddMainWorker added in v0.5.0

func (cons *SimpleConsumer) AddMainWorker(workers *sync.WaitGroup)

AddMainWorker adds the first worker to the waitgroup

func (*SimpleConsumer) AddWorker added in v0.5.0

func (cons *SimpleConsumer) AddWorker()

AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.

func (*SimpleConsumer) Configure added in v0.5.0

func (cons *SimpleConsumer) Configure(conf PluginConfigReader)

Configure initializes standard consumer values from a plugin config.

func (*SimpleConsumer) Control added in v0.5.0

func (cons *SimpleConsumer) Control() chan<- PluginControl

Control returns write access to this consumer's control channel. See ConsumerControl* constants.

func (*SimpleConsumer) ControlLoop added in v0.5.0

func (cons *SimpleConsumer) ControlLoop()

ControlLoop listens to the control channel and triggers callbacks for these messages. Upon stop control message doExit will be set to true.

func (*SimpleConsumer) Enqueue added in v0.5.0

func (cons *SimpleConsumer) Enqueue(data []byte)

Enqueue creates a new message from a given byte slice and passes it to EnqueueMessage. Data is copied to the message.

func (*SimpleConsumer) EnqueueWithMetadata added in v0.5.0

func (cons *SimpleConsumer) EnqueueWithMetadata(data []byte, metaData Metadata)

EnqueueWithMetadata works like EnqueueWithSequence and allows to set meta data directly

func (*SimpleConsumer) GetID added in v0.5.0

func (cons *SimpleConsumer) GetID() string

GetID returns the ID of this consumer

func (*SimpleConsumer) GetLogger added in v0.5.0

func (cons *SimpleConsumer) GetLogger() logrus.FieldLogger

GetLogger returns the logger scoped to this plugin

func (*SimpleConsumer) GetShutdownTimeout added in v0.5.0

func (cons *SimpleConsumer) GetShutdownTimeout() time.Duration

GetShutdownTimeout returns the duration gollum will wait for this producer before canceling the shutdown process.

func (*SimpleConsumer) GetState added in v0.5.0

func (cons *SimpleConsumer) GetState() PluginState

GetState returns the state this plugin is currently in

func (*SimpleConsumer) IsActive added in v0.5.0

func (cons *SimpleConsumer) IsActive() bool

IsActive returns true if GetState() returns initialize, active, waiting or prepareStop.

func (*SimpleConsumer) IsActiveOrStopping added in v0.5.0

func (cons *SimpleConsumer) IsActiveOrStopping() bool

IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()

func (*SimpleConsumer) IsBlocked added in v0.5.0

func (cons *SimpleConsumer) IsBlocked() bool

IsBlocked returns true if GetState() returns waiting

func (*SimpleConsumer) IsStopping added in v0.5.0

func (cons *SimpleConsumer) IsStopping() bool

IsStopping returns true if GetState() returns prepareStop, stopping or dead

func (*SimpleConsumer) SetPrepareStopCallback added in v0.5.0

func (cons *SimpleConsumer) SetPrepareStopCallback(onPrepareStop func())

SetPrepareStopCallback sets the function to be called upon PluginControlPrepareStop

func (*SimpleConsumer) SetRollCallback added in v0.5.0

func (cons *SimpleConsumer) SetRollCallback(onRoll func())

SetRollCallback sets the function to be called upon PluginControlRoll

func (*SimpleConsumer) SetStopCallback added in v0.5.0

func (cons *SimpleConsumer) SetStopCallback(onStop func())

SetStopCallback sets the function to be called upon PluginControlStop

func (*SimpleConsumer) SetWorkerWaitGroup added in v0.5.0

func (cons *SimpleConsumer) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.

func (*SimpleConsumer) TickerControlLoop added in v0.5.0

func (cons *SimpleConsumer) TickerControlLoop(interval time.Duration, onTick func())

TickerControlLoop is like MessageLoop but executes a given function at every given interval tick, too. Note that the interval is not exact. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.

func (*SimpleConsumer) WorkerDone added in v0.5.0

func (cons *SimpleConsumer) WorkerDone()

WorkerDone removes an additional worker to the waitgroup.

type SimpleFilter added in v0.5.0

type SimpleFilter struct {
	Logger logrus.FieldLogger
	// contains filtered or unexported fields
}

SimpleFilter plugin base type

This type defines a common base class for all Filters. All filter plugins should derive from this class but don't necessarily need to.

Parameters

- FilteredStream: This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to "". By default this parameter is set to "".

func (*SimpleFilter) Configure added in v0.5.0

func (filter *SimpleFilter) Configure(conf PluginConfigReader) error

Configure sets up all values required by SimpleFormatter.

func (*SimpleFilter) GetFilterResultMessageReject added in v0.5.0

func (filter *SimpleFilter) GetFilterResultMessageReject() FilterResult

GetFilterResultMessageReject returns a FilterResultMessageReject with the stream set to GetfilteredStreamID()

func (*SimpleFilter) GetLogger added in v0.5.0

func (filter *SimpleFilter) GetLogger() logrus.FieldLogger

GetLogger returns this plugin's scoped logger

func (*SimpleFilter) SetLogger added in v0.5.0

func (filter *SimpleFilter) SetLogger(logger logrus.FieldLogger)

SetLogger sets the scoped logger to be used for this filter

type SimpleFormatter added in v0.5.0

type SimpleFormatter struct {
	Logger            logrus.FieldLogger
	GetAppliedContent GetAppliedContent
	SetAppliedContent SetAppliedContent
	SkipIfEmpty       bool `config:"SkipIfEmpty"`
}

SimpleFormatter formatter

This type defines a common baseclass for formatters. Formatter plugins may derive from this class.

Parameters

- ApplyTo: This value chooses the part of the message the formatting should be applied to. Use "" to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to "".

- SkipIfEmpty: When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false

func (*SimpleFormatter) CanBeApplied added in v0.5.0

func (format *SimpleFormatter) CanBeApplied(msg *Message) bool

CanBeApplied returns true if the formatter can be applied to this message

func (*SimpleFormatter) Configure added in v0.5.0

func (format *SimpleFormatter) Configure(conf PluginConfigReader)

Configure sets up all values required by SimpleFormatter.

func (*SimpleFormatter) GetLogger added in v0.5.0

func (format *SimpleFormatter) GetLogger() logrus.FieldLogger

GetLogger returns the scoped logger of this plugin

func (*SimpleFormatter) SetLogger added in v0.5.0

func (format *SimpleFormatter) SetLogger(logger logrus.FieldLogger)

SetLogger sets the scoped logger to be used for this formatter

type SimpleProducer added in v0.5.0

type SimpleProducer struct {
	Logger logrus.FieldLogger
	// contains filtered or unexported fields
}

SimpleProducer producer

This type defines a common baseclass for all producers. All producer plugins should derive from this class as all required basic functions are already implemented here in a general way.

Parameters

- Streams: Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying "*" causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

- FallbackStream: Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to "" will cause messages to be discared when delivery fails.

- ShutdownTimeoutMs: Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

- Modulators: Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.

func (*SimpleProducer) AddHealthCheck added in v0.5.0

func (prod *SimpleProducer) AddHealthCheck(callback thealthcheck.CallbackFunc)

AddHealthCheck adds a health check at the default URL (http://<addr>:<port>/<plugin_id>)

func (*SimpleProducer) AddHealthCheckAt added in v0.5.0

func (prod *SimpleProducer) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)

AddHealthCheckAt adds a health check at a subpath (http://<addr>:<port>/<plugin_id><path>)

func (*SimpleProducer) AddMainWorker added in v0.5.0

func (prod *SimpleProducer) AddMainWorker(workers *sync.WaitGroup)

AddMainWorker adds the first worker to the waitgroup

func (*SimpleProducer) AddWorker added in v0.5.0

func (prod *SimpleProducer) AddWorker()

AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.

func (*SimpleProducer) Configure added in v0.5.0

func (prod *SimpleProducer) Configure(conf PluginConfigReader)

Configure initializes the standard producer config values.

func (*SimpleProducer) Control added in v0.5.0

func (prod *SimpleProducer) Control() chan<- PluginControl

Control returns write access to this producer's control channel. See PluginControl* constants.

func (*SimpleProducer) ControlLoop added in v0.5.0

func (prod *SimpleProducer) ControlLoop()

ControlLoop listens to the control channel and triggers callbacks for these messags. Upon stop control message doExit will be set to true.

func (*SimpleProducer) GetID added in v0.5.0

func (prod *SimpleProducer) GetID() string

GetID returns the ID of this producer

func (*SimpleProducer) GetLogger added in v0.5.0

func (prod *SimpleProducer) GetLogger() logrus.FieldLogger

GetLogger returns the logging scope of this plugin

func (*SimpleProducer) GetShutdownTimeout added in v0.5.0

func (prod *SimpleProducer) GetShutdownTimeout() time.Duration

GetShutdownTimeout returns the duration gollum will wait for this producer before canceling the shutdown process.

func (*SimpleProducer) GetState added in v0.5.0

func (prod *SimpleProducer) GetState() PluginState

GetState returns the state this plugin is currently in

func (*SimpleProducer) HasContinueAfterModulate added in v0.5.0

func (prod *SimpleProducer) HasContinueAfterModulate(msg *Message) bool

HasContinueAfterModulate applies all modulators by Modulate, handle the ModulateResult and return if you have to continue the message process. This method is a default producer modulate handling.

func (*SimpleProducer) IsActive added in v0.5.0

func (prod *SimpleProducer) IsActive() bool

IsActive returns true if GetState() returns initializing, active, waiting, or prepareStop

func (*SimpleProducer) IsActiveOrStopping added in v0.5.0

func (prod *SimpleProducer) IsActiveOrStopping() bool

IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()

func (*SimpleProducer) IsBlocked added in v0.5.0

func (prod *SimpleProducer) IsBlocked() bool

IsBlocked returns true if GetState() returns waiting

func (*SimpleProducer) IsStopping added in v0.5.0

func (prod *SimpleProducer) IsStopping() bool

IsStopping returns true if GetState() returns prepareStop, stopping or dead

func (*SimpleProducer) Modulate added in v0.5.0

func (prod *SimpleProducer) Modulate(msg *Message) ModulateResult

Modulate applies all modulators from this producer to a given message. This implementation handles routing and discarding of messages.

func (*SimpleProducer) SetPrepareStopCallback added in v0.5.0

func (prod *SimpleProducer) SetPrepareStopCallback(onPrepareStop func())

SetPrepareStopCallback sets the function to be called upon PluginControlPrepareStop

func (*SimpleProducer) SetRollCallback added in v0.5.0

func (prod *SimpleProducer) SetRollCallback(onRoll func())

SetRollCallback sets the function to be called upon PluginControlRoll

func (*SimpleProducer) SetStopCallback added in v0.5.0

func (prod *SimpleProducer) SetStopCallback(onStop func())

SetStopCallback sets the function to be called upon PluginControlStop

func (*SimpleProducer) SetWorkerWaitGroup added in v0.5.0

func (prod *SimpleProducer) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.

func (*SimpleProducer) Streams added in v0.5.0

func (prod *SimpleProducer) Streams() []MessageStreamID

Streams returns the routers this producer is listening to.

func (*SimpleProducer) TickerControlLoop added in v0.5.0

func (prod *SimpleProducer) TickerControlLoop(interval time.Duration, onTimeOut func())

TickerControlLoop is like ControlLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.

func (*SimpleProducer) TryFallback added in v0.5.0

func (prod *SimpleProducer) TryFallback(msg *Message)

TryFallback routes the message to the configured fallback stream.

func (*SimpleProducer) WorkerDone added in v0.5.0

func (prod *SimpleProducer) WorkerDone()

WorkerDone removes an additional worker to the waitgroup.

type SimpleRouter added in v0.5.0

type SimpleRouter struct {
	Producers []Producer

	Logger logrus.FieldLogger
	// contains filtered or unexported fields
}

SimpleRouter plugin base type

This type defines a common baseclass for routers. All routers should derive from this class, but not necessarily need to.

Parameters

- Stream: This value specifies the name of the stream this plugin is supposed to read messages from.

- Filters: This value defines an optional list of Filter plugins to connect to this router.

- TimeoutMs: This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to "0". By default this parameter is set to "0".

func (*SimpleRouter) AddHealthCheck added in v0.5.0

func (router *SimpleRouter) AddHealthCheck(callback thealthcheck.CallbackFunc)

AddHealthCheck adds a health check at the default URL (http://<addr>:<port>/<plugin_id>)

func (*SimpleRouter) AddHealthCheckAt added in v0.5.0

func (router *SimpleRouter) AddHealthCheckAt(path string, callback thealthcheck.CallbackFunc)

AddHealthCheckAt adds a health check at a subpath (http://<addr>:<port>/<plugin_id><path>)

func (*SimpleRouter) AddProducer added in v0.5.0

func (router *SimpleRouter) AddProducer(producers ...Producer)

AddProducer adds all producers to the list of known producers. Duplicates will be filtered.

func (*SimpleRouter) Configure added in v0.5.0

func (router *SimpleRouter) Configure(conf PluginConfigReader)

Configure sets up all values required by SimpleRouter.

func (*SimpleRouter) GetID added in v0.5.0

func (router *SimpleRouter) GetID() string

GetID returns the ID of this router

func (*SimpleRouter) GetLogger added in v0.5.0

func (router *SimpleRouter) GetLogger() logrus.FieldLogger

GetLogger returns the logging scope of this plugin

func (*SimpleRouter) GetProducers added in v0.5.0

func (router *SimpleRouter) GetProducers() []Producer

GetProducers returns the producers bound to this stream

func (*SimpleRouter) GetStreamID added in v0.5.0

func (router *SimpleRouter) GetStreamID() MessageStreamID

GetStreamID returns the id of the stream this plugin is bound to.

func (*SimpleRouter) GetTimeout added in v0.5.0

func (router *SimpleRouter) GetTimeout() time.Duration

GetTimeout returns the timeout set for this router

func (*SimpleRouter) Modulate added in v0.5.0

func (router *SimpleRouter) Modulate(msg *Message) ModulateResult

Modulate calls all modulators in their order of definition

type StreamMetric added in v0.5.0

type StreamMetric struct {
	// contains filtered or unexported fields
}

StreamMetric class for stream based metrics

func GetStreamMetric added in v0.5.0

func GetStreamMetric(streamID MessageStreamID) StreamMetric

GetStreamMetric returns a StreamMetric instance for the given MessageStreamID

func (*StreamMetric) CountMessageDiscarded added in v0.5.0

func (metric *StreamMetric) CountMessageDiscarded()

CountMessageDiscarded increases the discarded messages counter by 1

func (*StreamMetric) CountMessageRouted added in v0.5.0

func (metric *StreamMetric) CountMessageRouted()

CountMessageRouted increases the messages counter by 1

type WriterAssembly added in v0.4.0

type WriterAssembly struct {
	// contains filtered or unexported fields
}

WriterAssembly is a helper struct for io.Writer compatible classes that use message batch.

func NewWriterAssembly added in v0.4.0

func NewWriterAssembly(writer io.Writer, flush func(*Message), modulator Modulator) WriterAssembly

NewWriterAssembly creates a new adapter between io.Writer and the MessageBatch AssemblyFunc function signature

func (*WriterAssembly) Flush added in v0.4.0

func (asm *WriterAssembly) Flush(messages []*Message)

Flush is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to e.g. the Drop function of a producer. Flush will also be called by Write if the io.Writer reported an error.

func (*WriterAssembly) SetErrorHandler added in v0.4.0

func (asm *WriterAssembly) SetErrorHandler(handleError func(error) bool)

SetErrorHandler sets a callback that is called if an error occurred. HandleError needs to return true to prevent messages to be flushed.

func (*WriterAssembly) SetFlush added in v0.4.0

func (asm *WriterAssembly) SetFlush(flush func(*Message))

SetFlush changes the bound flush function

func (*WriterAssembly) SetValidator added in v0.4.0

func (asm *WriterAssembly) SetValidator(validate func() bool)

SetValidator sets a callback that is called if a write was successful. Validate needs to return true to prevent messages to be flushed.

func (*WriterAssembly) SetWriter added in v0.4.0

func (asm *WriterAssembly) SetWriter(writer io.Writer)

SetWriter changes the writer interface used during Assemble

func (*WriterAssembly) Write added in v0.4.0

func (asm *WriterAssembly) Write(messages []*Message)

Write is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to an io.Writer. Messages are formatted using a given formatter. If the io.Writer fails to write the assembled buffer all messages are passed to the FLush() method.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL