core

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2015 License: Apache-2.0 Imports: 12 Imported by: 65

Documentation

Overview

Package core is a generated protocol buffer package.

It is generated from these files:

message.proto

It has these top-level messages:

SerializedMessage

Index

Constants

View Source
const (
	// LogInternalStream is the name of the internal message channel (logs)
	LogInternalStream = "_GOLLUM_"
	// WildcardStream is the name of the "all streams" channel
	WildcardStream = "*"
	// DroppedStream is the name of the stream used to store dropped messages
	DroppedStream = "_DROPPED_"
	// MessageStateOk is returned if the message could be delivered
	MessageStateOk = MessageState(iota)
	// MessageStateTimeout is returned if a message timed out
	MessageStateTimeout = MessageState(iota)
	// MessageStateDiscard is returned if a message should be discarded
	MessageStateDiscard = MessageState(iota)
)
View Source
const (
	// PluginControlStopProducer will cause any producer to halt and shutdown.
	PluginControlStopProducer = PluginControl(iota)
	// PluginControlStopConsumer will cause any consumer to halt and shutdown.
	PluginControlStopConsumer = PluginControl(iota)
	// PluginControlRoll notifies the consumer/producer about a reconnect or reopen request
	PluginControlRoll = PluginControl(iota)
	// PluginControlFuseBurn notfies a producer to burn a fuse or a consumer that a fuse has been burned
	PluginControlFuseBurn = PluginControl(iota)
	// PluginControlFuseActive notfies a producer to activate a fuse or a consumer that a fuse has been activated
	PluginControlFuseActive = PluginControl(iota)
)
View Source
const (

	// PluginStateWaiting is set when a plugin is active but currently unable to process data
	PluginStateWaiting = PluginState(iota)
	// PluginStateActive is set when a plugin is ready to process data
	PluginStateActive = PluginState(iota)
	// PluginStateStopping is set when a plugin is about to stop
	PluginStateStopping = PluginState(iota)
	// PluginStateDead is set when a plugin is unable to process any data
	PluginStateDead = PluginState(iota)
)

Variables

View Source
var (
	// LogInternalStreamID is the ID of the "_GOLLUM_" stream
	LogInternalStreamID = GetStreamID(LogInternalStream)
	// WildcardStreamID is the ID of the "*" stream
	WildcardStreamID = GetStreamID(WildcardStream)
	// DroppedStreamID is the ID of the "_DROPPED_" stream
	DroppedStreamID = GetStreamID(DroppedStream)
)
View Source
var PluginRegistry = pluginRegistry{
	// contains filtered or unexported fields
}

PluginRegistry holds all plugins by their name

View Source
var StreamRegistry = streamRegistry{
	// contains filtered or unexported fields
}

StreamRegistry is the global instance of streamRegistry used to store the all registered streams.

Functions

func CountDiscardedMessage added in v0.4.0

func CountDiscardedMessage()

CountDiscardedMessage increases the discarded messages counter by 1

func CountDroppedMessage added in v0.4.0

func CountDroppedMessage()

CountDroppedMessage increases the dropped messages counter by 1

func CountFilteredMessage added in v0.4.0

func CountFilteredMessage()

CountFilteredMessage increases the filtered messages counter by 1

func CountNoRouteForMessage added in v0.4.0

func CountNoRouteForMessage()

CountNoRouteForMessage increases the "no route" counter by 1

func CountProcessedMessage added in v0.4.0

func CountProcessedMessage()

CountProcessedMessage increases the messages counter by 1

func GetAndResetMessageCount

func GetAndResetMessageCount() (messages, dropped, discarded, filtered, noroute uint32)

GetAndResetMessageCount returns the current message counters and resets them to 0. This function is threadsafe.

Types

type AssemblyFunc added in v0.4.0

type AssemblyFunc func([]Message)

AssemblyFunc is the function signature for callbacks passed to the Flush method.

type AsyncMessageSource

type AsyncMessageSource interface {
	MessageSource

	// EnqueueResponse sends a message to the source of another message.
	EnqueueResponse(msg Message)
}

AsyncMessageSource extends the MessageSource interface to allow a backchannel that simply forwards any message coming from the producer.

type Config

type Config struct {
	Values  []map[string]shared.MarshalMap
	Plugins []PluginConfig
}

Config represents the top level config containing all plugin clonfigs

func ReadConfig

func ReadConfig(path string) (*Config, error)

ReadConfig parses a YAML config file into a new Config struct.

type Consumer

type Consumer interface {
	PluginWithState
	MessageSource

	// Consume should implement to main loop that fetches messages from a given
	// source and pushes it to the Message channel.
	Consume(*sync.WaitGroup)

	// Streams returns the streams this consumer is writing to.
	Streams() []MessageStreamID

	// Control returns write access to this consumer's control channel.
	// See PluginControl* constants.
	Control() chan<- PluginControl
}

Consumer is an interface for plugins that receive data from outside sources and generate Message objects from this data.

type ConsumerBase

type ConsumerBase struct {
	// contains filtered or unexported fields
}

ConsumerBase base class All consumers support a common subset of configuration options:

  • "consumer.Something": Enable: true ID: "" Fuse: "" Stream:
  • "error"
  • "default"

Enable switches the consumer on or off. By default this value is set to true.

ID allows this consumer to be found by other plugins by name. By default this is set to "" which does not register this consumer.

Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages.

Fuse defines the name of a fuse to observe for this consumer. Producer may "burn" the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to "" by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.

func (*ConsumerBase) AddMainWorker

func (cons *ConsumerBase) AddMainWorker(workers *sync.WaitGroup)

AddMainWorker adds the first worker to the waitgroup

func (*ConsumerBase) AddWorker

func (cons *ConsumerBase) AddWorker()

AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.

func (*ConsumerBase) Configure

func (cons *ConsumerBase) Configure(conf PluginConfig) error

Configure initializes standard consumer values from a plugin config.

func (*ConsumerBase) Control

func (cons *ConsumerBase) Control() chan<- PluginControl

Control returns write access to this consumer's control channel. See ConsumerControl* constants.

func (*ConsumerBase) ControlLoop added in v0.4.0

func (cons *ConsumerBase) ControlLoop()

ControlLoop listens to the control channel and triggers callbacks for these messags. Upon stop control message doExit will be set to true.

func (*ConsumerBase) Enqueue

func (cons *ConsumerBase) Enqueue(data []byte, sequence uint64)

Enqueue creates a new message from a given byte slice and passes it to EnqueueMessage. Note that data is not copied, just referenced by the message.

func (*ConsumerBase) EnqueueCopy

func (cons *ConsumerBase) EnqueueCopy(data []byte, sequence uint64)

EnqueueCopy behaves like Enqueue but creates a copy of data that is attached to the message.

func (*ConsumerBase) EnqueueMessage

func (cons *ConsumerBase) EnqueueMessage(msg Message)

EnqueueMessage passes a given message to all streams. Only the StreamID of the message is modified, everything else is passed as-is.

func (*ConsumerBase) GetState added in v0.4.0

func (cons *ConsumerBase) GetState() PluginState

GetState returns the state this plugin is currently in

func (*ConsumerBase) IsActive added in v0.4.0

func (cons *ConsumerBase) IsActive() bool

IsActive returns true if GetState() returns active

func (*ConsumerBase) IsActiveOrStopping added in v0.4.0

func (cons *ConsumerBase) IsActiveOrStopping() bool

IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()

func (*ConsumerBase) IsBlocked added in v0.4.0

func (cons *ConsumerBase) IsBlocked() bool

IsBlocked returns true if GetState() returns waiting

func (*ConsumerBase) IsFuseBurned added in v0.4.1

func (cons *ConsumerBase) IsFuseBurned() bool

IsFuseBurned returns true if the fuse linked to this consumer has been burned. If no fuse is attached, false is returned.

func (*ConsumerBase) IsStopping added in v0.4.0

func (cons *ConsumerBase) IsStopping() bool

IsStopping returns true if GetState() returns stopping

func (*ConsumerBase) SetFuseActiveCallback added in v0.4.1

func (cons *ConsumerBase) SetFuseActiveCallback(onFuseActive func())

SetFuseActiveCallback sets the function to be called upon PluginControlFuseActive

func (*ConsumerBase) SetFuseBurnedCallback added in v0.4.1

func (cons *ConsumerBase) SetFuseBurnedCallback(onFuseBurned func())

SetFuseBurnedCallback sets the function to be called upon PluginControlFuseBurned

func (*ConsumerBase) SetRollCallback added in v0.4.0

func (cons *ConsumerBase) SetRollCallback(onRoll func())

SetRollCallback sets the function to be called upon PluginControlRoll

func (*ConsumerBase) SetStopCallback added in v0.4.0

func (cons *ConsumerBase) SetStopCallback(onStop func())

SetStopCallback sets the function to be called upon PluginControlStop

func (*ConsumerBase) SetWorkerWaitGroup

func (cons *ConsumerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.

func (*ConsumerBase) Streams

func (cons *ConsumerBase) Streams() []MessageStreamID

Streams returns an array with all stream ids this consumer is writing to.

func (*ConsumerBase) TickerControlLoop

func (cons *ConsumerBase) TickerControlLoop(interval time.Duration, onTick func())

TickerControlLoop is like MessageLoop but executes a given function at every given interval tick, too. Note that the interval is not exact. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.

func (*ConsumerBase) WaitOnFuse added in v0.4.1

func (cons *ConsumerBase) WaitOnFuse()

WaitOnFuse blocks if the fuse linked to this consumer has been burned. If no fuse is bound this function does nothing.

func (*ConsumerBase) WorkerDone

func (cons *ConsumerBase) WorkerDone()

WorkerDone removes an additional worker to the waitgroup.

type Distributor added in v0.4.0

type Distributor func(msg Message)

Distributor is a callback typedef for methods processing messages

type Filter

type Filter interface {
	Accepts(msg Message) bool
}

Filter allows custom message filtering for ProducerBase derived plugins. Producers not deriving from ProducerBase might utilize this one, too.

type Formatter

type Formatter interface {
	// Format transfers the message payload into a new format. The payload may
	// then be reassigned to the original or a new message.
	// In addition to that the formatter may change the stream of the message.
	Format(msg Message) ([]byte, MessageStreamID)
}

Formatter is the interface definition for message formatters

type LinkableMessageSource

type LinkableMessageSource interface {
	MessageSource
	// Link the message source to the message receiver. This makes it possible
	// to create stable "pipes" between e.g. a consumer and producer.
	Link(pipe interface{})

	// IsLinked has to return true if Link executed successfull and does not
	// need to be called again.
	IsLinked() bool
}

LinkableMessageSource extends the MessageSource interface to allow a pipe like behaviour between two components that communicate messages.

type LogConsumer

type LogConsumer struct {
	Consumer
	// contains filtered or unexported fields
}

LogConsumer is an internal consumer plugin used indirectly by the gollum log package.

func (*LogConsumer) Configure

func (cons *LogConsumer) Configure(conf PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*LogConsumer) Consume

func (cons *LogConsumer) Consume(threads *sync.WaitGroup)

Consume starts listening for control statements

func (*LogConsumer) Control

func (cons *LogConsumer) Control() chan<- PluginControl

Control returns a handle to the control channel

func (*LogConsumer) GetState added in v0.4.0

func (cons *LogConsumer) GetState() PluginState

GetState always returns PluginStateActive

func (*LogConsumer) Streams

func (cons *LogConsumer) Streams() []MessageStreamID

Streams always returns an array with one member - the internal log stream

func (LogConsumer) Write

func (cons LogConsumer) Write(data []byte) (int, error)

Write fullfills the io.Writer interface

type MappedStream

type MappedStream struct {
	StreamID MessageStreamID
	Stream   Stream
}

MappedStream holds a stream and the id the stream is assgined to

type Message

type Message struct {
	Data         []byte
	StreamID     MessageStreamID
	PrevStreamID MessageStreamID
	Source       MessageSource
	Timestamp    time.Time
	Sequence     uint64
}

Message is a container used for storing the internal state of messages. This struct is passed between consumers and producers.

func DeserializeMessage added in v0.4.0

func DeserializeMessage(data []byte) (Message, error)

DeserializeMessage generates a message from a string produced by Message.Serialize.

func NewMessage

func NewMessage(source MessageSource, data []byte, sequence uint64) Message

NewMessage creates a new message from a given data stream

func (Message) Enqueue

func (msg Message) Enqueue(channel chan<- Message, timeout time.Duration) MessageState

Enqueue is a convenience function to push a message to a channel while waiting for a timeout instead of just blocking. Passing a timeout of -1 will discard the message. Passing a timout of 0 will always block. Messages that time out will be passed to the dropped queue if a Dropped consumer exists. The source parameter is used when a message is dropped, i.e. it is passed to the Drop function.

func (Message) Route added in v0.4.0

func (msg Message) Route(targetID MessageStreamID)

Route enqueues this message to the given stream. If the stream does not exist, a default stream (broadcast) is created.

func (Message) Serialize added in v0.4.0

func (msg Message) Serialize() ([]byte, error)

Serialize generates a string containing all data that can be preserved over shutdown (i.e. no data directly referencing runtime components).

func (Message) String

func (msg Message) String() string

String implements the stringer interface

type MessageBatch

type MessageBatch struct {
	// contains filtered or unexported fields
}

MessageBatch is a helper class for producers to format and store messages into a single buffer that is flushed to an io.Writer. You can use the Reached* functions to determine whether a flush should be called, i.e. if a timeout or size threshold has been reached.

func NewMessageBatch

func NewMessageBatch(maxMessageCount int) MessageBatch

NewMessageBatch creates a new MessageBatch with a given size (in bytes) and a given formatter.

func (*MessageBatch) Append

func (batch *MessageBatch) Append(msg Message) bool

Append formats a message and appends it to the internal buffer. If the message does not fit into the buffer this function returns false. If the message can never fit into the buffer (too large), true is returned and an error is logged.

func (*MessageBatch) AppendOrBlock added in v0.4.0

func (batch *MessageBatch) AppendOrBlock(msg Message) bool

AppendOrBlock works like Append but will block until Append returns true. If the batch was closed during this call, false is returned.

func (*MessageBatch) AppendOrFlush added in v0.4.0

func (batch *MessageBatch) AppendOrFlush(msg Message, flushBuffer func(), canBlock func() bool, drop func(Message))

AppendOrFlush is a common combinatorial pattern of Append and AppendOrBlock. If append fails, flush is called, followed by AppendOrBlock if blocking is allowed. If AppendOrBlock fails (or blocking is not allowed) the message is dropped.

func (*MessageBatch) Close added in v0.4.0

func (batch *MessageBatch) Close(assemble AssemblyFunc, timeout time.Duration)

Close disables Append, calls flush and waits for this call to finish. Timeout is passed to WaitForFlush.

func (*MessageBatch) Flush

func (batch *MessageBatch) Flush(assemble AssemblyFunc)

Flush writes the content of the buffer to a given resource and resets the internal state, i.e. the buffer is empty after a call to Flush. Writing will be done in a separate go routine to be non-blocking.

The validate callback will be called after messages have been successfully written to the io.Writer. If validate returns false the buffer will not be resetted (automatic retry). If validate is nil a return value of true is assumed (buffer reset).

The onError callback will be called if the io.Writer returned an error. If onError returns false the buffer will not be resetted (automatic retry). If onError is nil a return value of true is assumed (buffer reset).

func (MessageBatch) IsClosed added in v0.4.0

func (batch MessageBatch) IsClosed() bool

IsClosed returns true of Close has been called at least once.

func (MessageBatch) IsEmpty

func (batch MessageBatch) IsEmpty() bool

IsEmpty returns true if no data is stored in the front buffer, i.e. if no data is scheduled for flushing.

func (*MessageBatch) Len added in v0.4.0

func (batch *MessageBatch) Len() int

Len returns the length of one buffer

func (MessageBatch) ReachedSizeThreshold

func (batch MessageBatch) ReachedSizeThreshold(size int) bool

ReachedSizeThreshold returns true if the bytes stored in the buffer are above or equal to the size given. If there is no data this function returns false.

func (MessageBatch) ReachedTimeThreshold

func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool

ReachedTimeThreshold returns true if the last flush was more than timeout ago. If there is no data this function returns false.

func (*MessageBatch) Touch

func (batch *MessageBatch) Touch()

Touch resets the timer queried by ReachedTimeThreshold, i.e. this resets the automatic flush timeout

func (*MessageBatch) WaitForFlush

func (batch *MessageBatch) WaitForFlush(timeout time.Duration)

WaitForFlush blocks until the current flush command returns. Passing a timeout > 0 will unblock this function after the given duration at the latest.

type MessageSource

type MessageSource interface {
	// IsActive returns true if the source can produce messages
	IsActive() bool

	// IsBlocked returns true if the source cannot produce messages
	IsBlocked() bool
}

MessageSource defines methods that are common to all message sources. Currently this is only a placeholder.

type MessageState added in v0.4.0

type MessageState int

MessageState is used as a return value for the Enqueu method

type MessageStreamID

type MessageStreamID uint64

MessageStreamID is the "compiled name" of a stream

func GetStreamID

func GetStreamID(stream string) MessageStreamID

GetStreamID returns the integer representation of a given stream name.

type Plugin

type Plugin interface {
	// Configure is called during NewPluginWithType
	Configure(conf PluginConfig) error
}

Plugin is the base class for any runtime class that can be configured and instantiated during runtim.

func NewPlugin

func NewPlugin(config PluginConfig) (Plugin, error)

NewPlugin creates a new plugin from the type information stored in its config. This function internally calls NewPluginWithType.

func NewPluginWithType

func NewPluginWithType(typename string, config PluginConfig) (Plugin, error)

NewPluginWithType creates a new plugin of a given type and initializes it using the given config (i.e. passes that config to Configure). The type passed to this function may differ from the type stored in the config. If the type is meant to match use NewPlugin instead of NewPluginWithType. This function returns nil, error if the plugin could not be instantiated or plugin, error if Configure failed.

type PluginConfig

type PluginConfig struct {
	ID        string
	Typename  string
	Enable    bool
	Instances int
	Stream    []string
	Settings  shared.MarshalMap
	// contains filtered or unexported fields
}

PluginConfig is a configuration for a specific plugin

func NewPluginConfig

func NewPluginConfig(typename string) PluginConfig

NewPluginConfig creates a new plugin config with default values. By default the plugin is enabled, has one instance, is bound to no streams and has no additional settings.

func (PluginConfig) GetBool

func (conf PluginConfig) GetBool(key string, defaultValue bool) bool

GetBool tries to read a non-predefined, boolean value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetInt

func (conf PluginConfig) GetInt(key string, defaultValue int) int

GetInt tries to read a non-predefined, integer value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetStreamArray added in v0.4.0

func (conf PluginConfig) GetStreamArray(key string, defaultValue []MessageStreamID) []MessageStreamID

GetStreamArray tries to read a non-predefined string array from a pluginconfig and translates all values to streamIds. If the key is not found defaultValue is returned.

func (PluginConfig) GetStreamMap

func (conf PluginConfig) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string

GetStreamMap tries to read a non-predefined, stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.

func (PluginConfig) GetStreamRoutes

func (conf PluginConfig) GetStreamRoutes(key string) map[MessageStreamID][]MessageStreamID

GetStreamRoutes tries to read a non-predefined, stream to stream map from a plugin config. If no routes are defined an empty map is returned

func (PluginConfig) GetString

func (conf PluginConfig) GetString(key string, defaultValue string) string

GetString tries to read a non-predefined, string value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetStringArray

func (conf PluginConfig) GetStringArray(key string, defaultValue []string) []string

GetStringArray tries to read a non-predefined, string array from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetStringMap

func (conf PluginConfig) GetStringMap(key string, defaultValue map[string]string) map[string]string

GetStringMap tries to read a non-predefined, string to string map from a PluginConfig. If the key is not found defaultValue is returned.

func (PluginConfig) GetValue

func (conf PluginConfig) GetValue(key string, defaultValue interface{}) interface{}

GetValue tries to read a non-predefined, untyped value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) HasValue

func (conf PluginConfig) HasValue(key string) bool

HasValue returns true if the given key has been set as a config option. This function only takes non-predefined settings into account.

func (PluginConfig) Override

func (conf PluginConfig) Override(key string, value interface{})

Override sets or override a configuration value for non-predefined options.

func (*PluginConfig) Read

func (conf *PluginConfig) Read(values shared.MarshalMap)

Read analyzes a given key/value map to extract the configuration values valid for each plugin. All non-default values are written to the Settings member.

func (PluginConfig) Validate

func (conf PluginConfig) Validate() bool

Validate should be called after a configuration has been processed. It will check the keys read from the config files against the keys requested up to this point. Unknown keys will be written to the error log.

type PluginControl

type PluginControl int

PluginControl is an enumeration used to pass signals to plugins

type PluginRunState

type PluginRunState struct {
	// contains filtered or unexported fields
}

PluginRunState is used in some plugins to store information about the execution state of the plugin (i.e. if it is running or not) as well as threading primitives that enable gollum to wait for a plugin top properly shut down.

func NewPluginRunState added in v0.4.0

func NewPluginRunState() *PluginRunState

NewPluginRunState creates a new plugin state helper

func (*PluginRunState) AddWorker

func (state *PluginRunState) AddWorker()

AddWorker adds a worker to the waitgroup configured by SetWorkerWaitGroup.

func (*PluginRunState) GetState added in v0.4.0

func (state *PluginRunState) GetState() PluginState

GetState returns the current plugin state casted to the correct type

func (*PluginRunState) SetState added in v0.4.0

func (state *PluginRunState) SetState(nextState PluginState)

SetState sets a new plugin state casted to the correct type

func (*PluginRunState) SetWorkerWaitGroup

func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup sets the WaitGroup used to manage workers

func (*PluginRunState) WorkerDone

func (state *PluginRunState) WorkerDone()

WorkerDone removes a worker from the waitgroup configured by SetWorkerWaitGroup.

type PluginState added in v0.4.0

type PluginState int32

PluginState is an enumeration used to describe the current working state of a plugin

type PluginWithState added in v0.4.0

type PluginWithState interface {
	Plugin
	GetState() PluginState
}

PluginWithState allows certain plugins to give information about their runstate

type Producer

type Producer interface {
	PluginWithState
	MessageSource

	// Enqueue sends a message to the producer. The producer may reject
	// the message or drop it after a given timeout. Enqueue can block.
	Enqueue(msg Message, timeout *time.Duration)

	// Produce should implement a main loop that passes messages from the
	// message channel to some other service like the console.
	// This can be part of this function or a separate go routine.
	// Produce is always called as a go routine.
	Produce(workers *sync.WaitGroup)

	// Streams returns the streams this producer is listening to.
	Streams() []MessageStreamID

	// Control returns write access to this producer's control channel.
	// See ProducerControl* constants.
	Control() chan<- PluginControl

	// DropStream returns the id of the stream to drop messages to.
	GetDropStreamID() MessageStreamID

	// AddDependency is called whenever a producer is registered that sends
	// messages to this producer. Dependencies are used to resolve shutdown
	// conflicts.
	AddDependency(Producer)

	// DependsOn returns true if this plugin has a direct or indirect dependency
	// on the given producer
	DependsOn(Producer) bool
}

Producer is an interface for plugins that pass messages to other services, files or storages.

type ProducerBase

type ProducerBase struct {
	// contains filtered or unexported fields
}

ProducerBase base class All producers support a common subset of configuration options:

  • "producer.Something": Enable: true ID: "" Channel: 8192 ChannelTimeoutMs: 0 ShutdownTimeoutMs: 3000 Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" Fuse: "" FuseTimeoutSec: 5 Stream:
  • "console"

Enable switches the consumer on or off. By default this value is set to true.

ID allows this producer to be found by other plugins by name. By default this is set to "" which does not register this producer.

Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.

ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer's queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.

ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.

Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to "*" which means "listen to all streams but the internal".

DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.

Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.

Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.

Fuse defines the name of a fuse to burn if e.g. the producer encounteres a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to "".

FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer's implementation. By default this setting is set to 10.

func (*ProducerBase) Accepts added in v0.4.0

func (prod *ProducerBase) Accepts(msg Message) bool

Accepts calls the filters Accepts function

func (*ProducerBase) AddDependency added in v0.4.0

func (prod *ProducerBase) AddDependency(dep Producer)

AddDependency is called whenever a producer is registered that sends messages to this producer. Dependencies are used to resolve shutdown conflicts.

func (*ProducerBase) AddMainWorker

func (prod *ProducerBase) AddMainWorker(workers *sync.WaitGroup)

AddMainWorker adds the first worker to the waitgroup

func (*ProducerBase) AddWorker

func (prod *ProducerBase) AddWorker()

AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.

func (*ProducerBase) CloseMessageChannel added in v0.4.0

func (prod *ProducerBase) CloseMessageChannel(handleMessage func(msg Message)) bool

CloseMessageChannel closes and empties the internal channel with a given timeout per message. If flushing messages runs into this timeout all remaining messages will be dropped by using the producer.Drop function. If a timout has been detected, false is returned.

func (*ProducerBase) Configure

func (prod *ProducerBase) Configure(conf PluginConfig) error

Configure initializes the standard producer config values.

func (*ProducerBase) Control

func (prod *ProducerBase) Control() chan<- PluginControl

Control returns write access to this producer's control channel. See PluginControl* constants.

func (*ProducerBase) ControlLoop added in v0.4.0

func (prod *ProducerBase) ControlLoop()

ControlLoop listens to the control channel and triggers callbacks for these messags. Upon stop control message doExit will be set to true.

func (*ProducerBase) DependsOn added in v0.4.0

func (prod *ProducerBase) DependsOn(dep Producer) bool

DependsOn returns true if this plugin has a direct or indirect dependency on the given producer

func (*ProducerBase) Drop added in v0.4.0

func (prod *ProducerBase) Drop(msg Message)

Drop routes the message to the configured drop stream.

func (*ProducerBase) Enqueue

func (prod *ProducerBase) Enqueue(msg Message, timeout *time.Duration)

Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.

func (*ProducerBase) Format

func (prod *ProducerBase) Format(msg Message) ([]byte, MessageStreamID)

Format calls the formatters Format function

func (*ProducerBase) GetDropStreamID added in v0.4.0

func (prod *ProducerBase) GetDropStreamID() MessageStreamID

GetDropStreamID returns the id of the stream to drop messages to.

func (*ProducerBase) GetFilter added in v0.4.0

func (prod *ProducerBase) GetFilter() Filter

GetFilter returns the filter of this producer

func (*ProducerBase) GetFormatter

func (prod *ProducerBase) GetFormatter() Formatter

GetFormatter returns the formatter of this producer

func (*ProducerBase) GetFuse added in v0.4.1

func (prod *ProducerBase) GetFuse() *shared.Fuse

GetFuse returns the fuse bound to this producer or nil if no fuse name has been set.

func (ProducerBase) GetShutdownTimeout added in v0.4.0

func (prod ProducerBase) GetShutdownTimeout() time.Duration

GetShutdownTimeout returns the duration this producer will wait during close before messages get dropped.

func (*ProducerBase) GetState added in v0.4.0

func (prod *ProducerBase) GetState() PluginState

GetState returns the state this plugin is currently in

func (ProducerBase) GetTimeout

func (prod ProducerBase) GetTimeout() time.Duration

GetTimeout returns the duration this producer will block before a message is dropped. A value of -1 will cause the message to drop. A value of 0 will cause the producer to always block.

func (*ProducerBase) IsActive added in v0.4.0

func (prod *ProducerBase) IsActive() bool

IsActive returns true if GetState() returns active or waiting

func (*ProducerBase) IsActiveOrStopping added in v0.4.0

func (prod *ProducerBase) IsActiveOrStopping() bool

IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()

func (*ProducerBase) IsBlocked added in v0.4.0

func (prod *ProducerBase) IsBlocked() bool

IsBlocked returns true if GetState() returns waiting

func (*ProducerBase) IsStopping added in v0.4.0

func (prod *ProducerBase) IsStopping() bool

IsStopping returns true if GetState() returns stopping

func (*ProducerBase) MessageControlLoop added in v0.4.0

func (prod *ProducerBase) MessageControlLoop(onMessage func(Message))

MessageControlLoop provides a producer mainloop that is sufficient for most usecases. ControlLoop will be called in a separate go routine. This function will block until a stop signal is received.

func (*ProducerBase) Messages

func (prod *ProducerBase) Messages() chan<- Message

Messages returns write access to the message channel this producer reads from.

func (*ProducerBase) Next

func (prod *ProducerBase) Next() (Message, bool)

Next returns the latest message from the channel as well as the open state of the channel. This function blocks if the channel is empty.

func (*ProducerBase) NextNonBlocking

func (prod *ProducerBase) NextNonBlocking(onMessage func(msg Message)) bool

NextNonBlocking calls a given callback if a message is queued or returns. Returns false if no message was received.

func (*ProducerBase) PauseAllStreams

func (prod *ProducerBase) PauseAllStreams(capacity int)

PauseAllStreams sends the Pause() command to all streams this producer is listening to.

func (*ProducerBase) ResumeAllStreams

func (prod *ProducerBase) ResumeAllStreams()

ResumeAllStreams sends the Resume() command to all streams this producer is listening to.

func (*ProducerBase) SetCheckFuseCallback added in v0.4.1

func (prod *ProducerBase) SetCheckFuseCallback(onCheckFuse func() bool)

SetCheckFuseCallback sets the function to be called upon PluginControlCheckFuse. The callback has to return true to trigger a fuse reactivation. If nil is passed as a callback PluginControlCheckFuse will reactivate the fuse immediately.

func (*ProducerBase) SetRollCallback added in v0.4.0

func (prod *ProducerBase) SetRollCallback(onRoll func())

SetRollCallback sets the function to be called upon PluginControlRoll

func (*ProducerBase) SetStopCallback added in v0.4.0

func (prod *ProducerBase) SetStopCallback(onStop func())

SetStopCallback sets the function to be called upon PluginControlStop

func (*ProducerBase) SetWorkerWaitGroup

func (prod *ProducerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.

func (*ProducerBase) Streams

func (prod *ProducerBase) Streams() []MessageStreamID

Streams returns the streams this producer is listening to.

func (*ProducerBase) TickerMessageControlLoop added in v0.4.0

func (prod *ProducerBase) TickerMessageControlLoop(onMessage func(Message), interval time.Duration, onTimeOut func())

TickerMessageControlLoop is like MessageLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.

func (*ProducerBase) WaitForDependencies added in v0.4.0

func (prod *ProducerBase) WaitForDependencies(waitForState PluginState, timeout time.Duration)

WaitForDependencies waits until all dependencies reach the given runstate. A timeout > 0 can be given to work around possible blocking situations.

func (*ProducerBase) WorkerDone

func (prod *ProducerBase) WorkerDone()

WorkerDone removes an additional worker to the waitgroup.

type SerialMessageSource

type SerialMessageSource interface {
	AsyncMessageSource

	// Notify the end of the response stream
	ResponseDone()
}

SerialMessageSource extends the AsyncMessageSource interface to allow waiting for all parts of the response to be submitted.

type SerializedMessage added in v0.4.0

type SerializedMessage struct {
	StreamID         *uint64 `protobuf:"varint,1,req" json:"StreamID,omitempty"`
	PrevStreamID     *uint64 `protobuf:"varint,2,req" json:"PrevStreamID,omitempty"`
	Timestamp        *int64  `protobuf:"varint,3,req" json:"Timestamp,omitempty"`
	Sequence         *uint64 `protobuf:"varint,4,req" json:"Sequence,omitempty"`
	Data             []byte  `protobuf:"bytes,5,req" json:"Data,omitempty"`
	XXX_unrecognized []byte  `json:"-"`
}

func (*SerializedMessage) GetData added in v0.4.0

func (m *SerializedMessage) GetData() []byte

func (*SerializedMessage) GetPrevStreamID added in v0.4.0

func (m *SerializedMessage) GetPrevStreamID() uint64

func (*SerializedMessage) GetSequence added in v0.4.0

func (m *SerializedMessage) GetSequence() uint64

func (*SerializedMessage) GetStreamID added in v0.4.0

func (m *SerializedMessage) GetStreamID() uint64

func (*SerializedMessage) GetTimestamp added in v0.4.0

func (m *SerializedMessage) GetTimestamp() int64

func (*SerializedMessage) ProtoMessage added in v0.4.0

func (*SerializedMessage) ProtoMessage()

func (*SerializedMessage) Reset added in v0.4.0

func (m *SerializedMessage) Reset()

func (*SerializedMessage) String added in v0.4.0

func (m *SerializedMessage) String() string

type Stream

type Stream interface {
	// GetBoundStreamID returns the stream id this plugin is bound to.
	GetBoundStreamID() MessageStreamID

	// Pause causes this stream to go silent. Messages should be queued or cause
	// a blocking call. The passed capacity can be used to configure internal
	// channel for buffering incoming messages while this stream is paused.
	Pause(capacity int)

	// Resume causes this stream to send messages again after Pause() had been
	// called. Any buffered messages need to be sent by this method or by a
	// separate go routine.
	Resume()

	// Flush calls Resume and blocks until resume finishes
	Flush()

	// AddProducer adds one or more producers to this stream, i.e. the producers
	// listening to messages on this stream.
	AddProducer(producers ...Producer)

	// Enqueue sends a given message to all registered producers
	Enqueue(msg Message)

	// GetProducers returns the producers bound to this stream
	GetProducers() []Producer
}

Stream defines the interface for all stream plugins

type StreamBase

type StreamBase struct {
	Filter    Filter
	Format    Formatter
	Producers []Producer
	Timeout   *time.Duration
	// contains filtered or unexported fields
}

StreamBase defines the standard stream implementation. New stream types should derive from this class. StreamBase allows streams to set and execute filters as well as format a message. Types derived from StreamBase should set the Distribute member instead of overloading the Enqueue method. See stream.Broadcast for default configuration values and examples.

func (*StreamBase) AddProducer

func (stream *StreamBase) AddProducer(producers ...Producer)

AddProducer adds all producers to the list of known producers. Duplicates will be filtered.

func (*StreamBase) Broadcast added in v0.4.0

func (stream *StreamBase) Broadcast(msg Message)

Broadcast enqueues the given message to all producers attached to this stream.

func (*StreamBase) ConfigureStream added in v0.4.0

func (stream *StreamBase) ConfigureStream(conf PluginConfig, distribute Distributor) error

ConfigureStream sets up all values requred by StreamBase.

func (*StreamBase) Enqueue

func (stream *StreamBase) Enqueue(msg Message)

Enqueue checks the filter, formats the message and sends it to all producers registered. Functions deriving from StreamBase can set the Distribute member to hook into this function.

func (*StreamBase) Flush added in v0.4.0

func (stream *StreamBase) Flush()

Flush calls Resume and blocks until resume finishes

func (*StreamBase) GetBoundStreamID added in v0.4.0

func (stream *StreamBase) GetBoundStreamID() MessageStreamID

GetBoundStreamID returns the id of the stream this plugin is bound to.

func (*StreamBase) GetProducers added in v0.4.0

func (stream *StreamBase) GetProducers() []Producer

GetProducers returns the producers bound to this stream

func (*StreamBase) Pause

func (stream *StreamBase) Pause(capacity int)

Pause will cause this stream to go silent. Messages will be queued to an internal channel that can be configured in size by setting the capacity parameter. Pass a capacity of 0 to disable buffering. Calling Pause on an already paused stream is ignored.

func (*StreamBase) Resume

func (stream *StreamBase) Resume()

Resume causes this stream to send messages again after Pause() had been called. Any buffered messages will be sent by a separate go routine. Calling Resume on a stream that is not paused is ignored.

func (*StreamBase) Route added in v0.4.0

func (stream *StreamBase) Route(msg Message, targetID MessageStreamID)

Route is called by Enqueue after a message has been accepted and formatted. This encapsulates the main logic of sending messages to producers or to another stream if necessary.

type WriterAssembly added in v0.4.0

type WriterAssembly struct {
	// contains filtered or unexported fields
}

WriterAssembly is a helper struct for io.Writer compatible classes that use messagebatch.

func NewWriterAssembly added in v0.4.0

func NewWriterAssembly(writer io.Writer, flush func(Message), formatter Formatter) WriterAssembly

NewWriterAssembly creates a new adapter between io.Writer and the MessageBatch AssemblyFunc function signature

func (*WriterAssembly) Flush added in v0.4.0

func (asm *WriterAssembly) Flush(messages []Message)

Flush is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to e.g. the Drop function of a producer. Flush will also be called by Write if the io.Writer reported an error.

func (*WriterAssembly) SetErrorHandler added in v0.4.0

func (asm *WriterAssembly) SetErrorHandler(handleError func(error) bool)

SetErrorHandler sets a callback that is called if an error occured. HandleError needs to return true to prevent messages to be flushed.

func (*WriterAssembly) SetFlush added in v0.4.0

func (asm *WriterAssembly) SetFlush(flush func(Message))

SetFlush changes the bound flush function

func (*WriterAssembly) SetValidator added in v0.4.0

func (asm *WriterAssembly) SetValidator(validate func() bool)

SetValidator sets a callback that is called if a write was successfull. Validate needs to return true to prevent messages to be flushed.

func (*WriterAssembly) SetWriter added in v0.4.0

func (asm *WriterAssembly) SetWriter(writer io.Writer)

SetWriter changes the writer interface used during Assemble

func (*WriterAssembly) Write added in v0.4.0

func (asm *WriterAssembly) Write(messages []Message)

Write is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to an io.Writer. Messages are formatted using a given formatter. If the io.Writer fails to write the assembled buffer all messages are passed to the FLush() method.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL