Documentation ¶
Overview ¶
Package core is a generated protocol buffer package.
It is generated from these files:
message.proto
It has these top-level messages:
SerializedMessage
Index ¶
- Constants
- Variables
- func CountDiscardedMessage()
- func CountDroppedMessage()
- func CountFilteredMessage()
- func CountNoRouteForMessage()
- func CountProcessedMessage()
- func GetAndResetMessageCount() (messages, dropped, discarded, filtered, noroute uint32)
- type AssemblyFunc
- type AsyncMessageSource
- type Config
- type Consumer
- type ConsumerBase
- func (cons *ConsumerBase) AddMainWorker(workers *sync.WaitGroup)
- func (cons *ConsumerBase) AddWorker()
- func (cons *ConsumerBase) Configure(conf PluginConfig) error
- func (cons *ConsumerBase) Control() chan<- PluginControl
- func (cons *ConsumerBase) ControlLoop()
- func (cons *ConsumerBase) Enqueue(data []byte, sequence uint64)
- func (cons *ConsumerBase) EnqueueCopy(data []byte, sequence uint64)
- func (cons *ConsumerBase) EnqueueMessage(msg Message)
- func (cons *ConsumerBase) GetState() PluginState
- func (cons *ConsumerBase) IsActive() bool
- func (cons *ConsumerBase) IsActiveOrStopping() bool
- func (cons *ConsumerBase) IsBlocked() bool
- func (cons *ConsumerBase) IsFuseBurned() bool
- func (cons *ConsumerBase) IsStopping() bool
- func (cons *ConsumerBase) SetFuseActiveCallback(onFuseActive func())
- func (cons *ConsumerBase) SetFuseBurnedCallback(onFuseBurned func())
- func (cons *ConsumerBase) SetRollCallback(onRoll func())
- func (cons *ConsumerBase) SetStopCallback(onStop func())
- func (cons *ConsumerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
- func (cons *ConsumerBase) Streams() []MessageStreamID
- func (cons *ConsumerBase) TickerControlLoop(interval time.Duration, onTick func())
- func (cons *ConsumerBase) WaitOnFuse()
- func (cons *ConsumerBase) WorkerDone()
- type Distributor
- type Filter
- type Formatter
- type LinkableMessageSource
- type LogConsumer
- func (cons *LogConsumer) Configure(conf PluginConfig) error
- func (cons *LogConsumer) Consume(threads *sync.WaitGroup)
- func (cons *LogConsumer) Control() chan<- PluginControl
- func (cons *LogConsumer) GetState() PluginState
- func (cons *LogConsumer) Streams() []MessageStreamID
- func (cons LogConsumer) Write(data []byte) (int, error)
- type MappedStream
- type Message
- type MessageBatch
- func (batch *MessageBatch) AfterFlushDo(callback func() error) error
- func (batch *MessageBatch) Append(msg Message) bool
- func (batch *MessageBatch) AppendOrBlock(msg Message) bool
- func (batch *MessageBatch) AppendOrFlush(msg Message, flushBuffer func(), canBlock func() bool, drop func(Message))
- func (batch *MessageBatch) Close(assemble AssemblyFunc, timeout time.Duration)
- func (batch *MessageBatch) Flush(assemble AssemblyFunc)
- func (batch MessageBatch) IsClosed() bool
- func (batch MessageBatch) IsEmpty() bool
- func (batch *MessageBatch) Len() int
- func (batch MessageBatch) ReachedSizeThreshold(size int) bool
- func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool
- func (batch *MessageBatch) Touch()
- func (batch *MessageBatch) WaitForFlush(timeout time.Duration)
- type MessageSource
- type MessageState
- type MessageStreamID
- type Plugin
- type PluginConfig
- func (conf PluginConfig) GetBool(key string, defaultValue bool) bool
- func (conf PluginConfig) GetInt(key string, defaultValue int) int
- func (conf PluginConfig) GetStreamArray(key string, defaultValue []MessageStreamID) []MessageStreamID
- func (conf PluginConfig) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string
- func (conf PluginConfig) GetStreamRoutes(key string) map[MessageStreamID][]MessageStreamID
- func (conf PluginConfig) GetString(key string, defaultValue string) string
- func (conf PluginConfig) GetStringArray(key string, defaultValue []string) []string
- func (conf PluginConfig) GetStringMap(key string, defaultValue map[string]string) map[string]string
- func (conf PluginConfig) GetValue(key string, defaultValue interface{}) interface{}
- func (conf PluginConfig) HasValue(key string) bool
- func (conf PluginConfig) Override(key string, value interface{})
- func (conf *PluginConfig) Read(values shared.MarshalMap)
- func (conf PluginConfig) Validate() bool
- type PluginControl
- type PluginRunState
- type PluginState
- type PluginWithState
- type Producer
- type ProducerBase
- func (prod *ProducerBase) Accepts(msg Message) bool
- func (prod *ProducerBase) AddDependency(dep Producer)
- func (prod *ProducerBase) AddMainWorker(workers *sync.WaitGroup)
- func (prod *ProducerBase) AddWorker()
- func (prod *ProducerBase) CloseMessageChannel(handleMessage func(msg Message)) bool
- func (prod *ProducerBase) Configure(conf PluginConfig) error
- func (prod *ProducerBase) Control() chan<- PluginControl
- func (prod *ProducerBase) ControlLoop()
- func (prod *ProducerBase) DependsOn(dep Producer) bool
- func (prod *ProducerBase) Drop(msg Message)
- func (prod *ProducerBase) Enqueue(msg Message, timeout *time.Duration)
- func (prod *ProducerBase) Format(msg Message) ([]byte, MessageStreamID)
- func (prod *ProducerBase) GetDropStreamID() MessageStreamID
- func (prod *ProducerBase) GetFilter() Filter
- func (prod *ProducerBase) GetFormatter() Formatter
- func (prod *ProducerBase) GetFuse() *shared.Fuse
- func (prod *ProducerBase) GetShutdownTimeout() time.Duration
- func (prod *ProducerBase) GetState() PluginState
- func (prod *ProducerBase) GetTimeout() time.Duration
- func (prod *ProducerBase) IsActive() bool
- func (prod *ProducerBase) IsActiveOrStopping() bool
- func (prod *ProducerBase) IsBlocked() bool
- func (prod *ProducerBase) IsStopping() bool
- func (prod *ProducerBase) MessageControlLoop(onMessage func(Message))
- func (prod *ProducerBase) Messages() chan<- Message
- func (prod *ProducerBase) Next() (Message, bool)
- func (prod *ProducerBase) NextNonBlocking(onMessage func(msg Message)) bool
- func (prod *ProducerBase) PauseAllStreams(capacity int)
- func (prod *ProducerBase) ResumeAllStreams()
- func (prod *ProducerBase) SetCheckFuseCallback(onCheckFuse func() bool)
- func (prod *ProducerBase) SetRollCallback(onRoll func())
- func (prod *ProducerBase) SetStopCallback(onStop func())
- func (prod *ProducerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
- func (prod *ProducerBase) Streams() []MessageStreamID
- func (prod *ProducerBase) TickerMessageControlLoop(onMessage func(Message), interval time.Duration, onTimeOut func())
- func (prod *ProducerBase) WaitForDependencies(waitForState PluginState, timeout time.Duration)
- func (prod *ProducerBase) WorkerDone()
- type SerialMessageSource
- type SerializedMessage
- func (m *SerializedMessage) GetData() []byte
- func (m *SerializedMessage) GetPrevStreamID() uint64
- func (m *SerializedMessage) GetSequence() uint64
- func (m *SerializedMessage) GetStreamID() uint64
- func (m *SerializedMessage) GetTimestamp() int64
- func (*SerializedMessage) ProtoMessage()
- func (m *SerializedMessage) Reset()
- func (m *SerializedMessage) String() string
- type Stream
- type StreamBase
- func (stream *StreamBase) AddProducer(producers ...Producer)
- func (stream *StreamBase) Broadcast(msg Message)
- func (stream *StreamBase) ConfigureStream(conf PluginConfig, distribute Distributor) error
- func (stream *StreamBase) Enqueue(msg Message)
- func (stream *StreamBase) Flush()
- func (stream *StreamBase) GetBoundStreamID() MessageStreamID
- func (stream *StreamBase) GetProducers() []Producer
- func (stream *StreamBase) Pause(capacity int)
- func (stream *StreamBase) Resume()
- func (stream *StreamBase) Route(msg Message, targetID MessageStreamID)
- type WriterAssembly
- func (asm *WriterAssembly) Flush(messages []Message)
- func (asm *WriterAssembly) SetErrorHandler(handleError func(error) bool)
- func (asm *WriterAssembly) SetFlush(flush func(Message))
- func (asm *WriterAssembly) SetValidator(validate func() bool)
- func (asm *WriterAssembly) SetWriter(writer io.Writer)
- func (asm *WriterAssembly) Write(messages []Message)
Constants ¶
const ( // LogInternalStream is the name of the internal message channel (logs) LogInternalStream = "_GOLLUM_" // WildcardStream is the name of the "all streams" channel WildcardStream = "*" // DroppedStream is the name of the stream used to store dropped messages DroppedStream = "_DROPPED_" // MessageStateOk is returned if the message could be delivered MessageStateOk = MessageState(iota) // MessageStateTimeout is returned if a message timed out MessageStateTimeout = MessageState(iota) // MessageStateDiscard is returned if a message should be discarded MessageStateDiscard = MessageState(iota) )
const ( // PluginControlStopProducer will cause any producer to halt and shutdown. PluginControlStopProducer = PluginControl(iota) // PluginControlStopConsumer will cause any consumer to halt and shutdown. PluginControlStopConsumer = PluginControl(iota) // PluginControlRoll notifies the consumer/producer about a reconnect or reopen request PluginControlRoll = PluginControl(iota) // PluginControlFuseBurn notfies a producer to burn a fuse or a consumer that a fuse has been burned PluginControlFuseBurn = PluginControl(iota) // PluginControlFuseActive notfies a producer to activate a fuse or a consumer that a fuse has been activated PluginControlFuseActive = PluginControl(iota) )
const ( // PluginStateInitializing is set when a plugin is not yet configured PluginStateInitializing = PluginState(iota) // PluginStateWaiting is set when a plugin is active but currently unable to process data PluginStateWaiting = PluginState(iota) // PluginStateActive is set when a plugin is ready to process data PluginStateActive = PluginState(iota) // PluginStateStopping is set when a plugin is about to stop PluginStateStopping = PluginState(iota) // PluginStateDead is set when a plugin is unable to process any data PluginStateDead = PluginState(iota) )
Variables ¶
var ( // LogInternalStreamID is the ID of the "_GOLLUM_" stream LogInternalStreamID = StreamRegistry.GetStreamID(LogInternalStream) // WildcardStreamID is the ID of the "*" stream WildcardStreamID = StreamRegistry.GetStreamID(WildcardStream) // DroppedStreamID is the ID of the "_DROPPED_" stream DroppedStreamID = StreamRegistry.GetStreamID(DroppedStream) // InvalidStreamID is a placeholder and must never be used for any stream InvalidStreamID = MessageStreamID(0) )
var PluginRegistry = pluginRegistry{ // contains filtered or unexported fields }
PluginRegistry holds all plugins by their name
var StreamRegistry = streamRegistry{ // contains filtered or unexported fields }
StreamRegistry is the global instance of streamRegistry used to store the all registered streams.
Functions ¶
func CountDiscardedMessage ¶ added in v0.4.0
func CountDiscardedMessage()
CountDiscardedMessage increases the discarded messages counter by 1
func CountDroppedMessage ¶ added in v0.4.0
func CountDroppedMessage()
CountDroppedMessage increases the dropped messages counter by 1
func CountFilteredMessage ¶ added in v0.4.0
func CountFilteredMessage()
CountFilteredMessage increases the filtered messages counter by 1
func CountNoRouteForMessage ¶ added in v0.4.0
func CountNoRouteForMessage()
CountNoRouteForMessage increases the "no route" counter by 1
func CountProcessedMessage ¶ added in v0.4.0
func CountProcessedMessage()
CountProcessedMessage increases the messages counter by 1
func GetAndResetMessageCount ¶
func GetAndResetMessageCount() (messages, dropped, discarded, filtered, noroute uint32)
GetAndResetMessageCount returns the current message counters and resets them to 0. This function is threadsafe.
Types ¶
type AssemblyFunc ¶ added in v0.4.0
type AssemblyFunc func([]Message)
AssemblyFunc is the function signature for callbacks passed to the Flush method.
type AsyncMessageSource ¶
type AsyncMessageSource interface { MessageSource // EnqueueResponse sends a message to the source of another message. EnqueueResponse(msg Message) }
AsyncMessageSource extends the MessageSource interface to allow a backchannel that simply forwards any message coming from the producer.
type Config ¶
type Config struct { Values []map[string]shared.MarshalMap Plugins []PluginConfig }
Config represents the top level config containing all plugin clonfigs
func ReadConfig ¶
ReadConfig parses a YAML config file into a new Config struct.
type Consumer ¶
type Consumer interface { PluginWithState MessageSource // Consume should implement to main loop that fetches messages from a given // source and pushes it to the Message channel. Consume(*sync.WaitGroup) // Streams returns the streams this consumer is writing to. Streams() []MessageStreamID // Control returns write access to this consumer's control channel. // See PluginControl* constants. Control() chan<- PluginControl }
Consumer is an interface for plugins that receive data from outside sources and generate Message objects from this data.
type ConsumerBase ¶
type ConsumerBase struct {
// contains filtered or unexported fields
}
ConsumerBase plugin base type This type defines a common baseclass for all consumers. All consumer plugins should derive from this class but don't necessarily need to. Configuration example:
- "consumer.Foobar": Enable: true ID: "" Fuse: "" Stream:
- "foo"
- "bar"
Enable switches the consumer on or off. By default this value is set to true.
ID allows this consumer to be found by other plugins by name. By default this is set to "" which does not register this consumer.
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages.
Fuse defines the name of a fuse to observe for this consumer. Producer may "burn" the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to "" by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
func (*ConsumerBase) AddMainWorker ¶
func (cons *ConsumerBase) AddMainWorker(workers *sync.WaitGroup)
AddMainWorker adds the first worker to the waitgroup
func (*ConsumerBase) AddWorker ¶
func (cons *ConsumerBase) AddWorker()
AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.
func (*ConsumerBase) Configure ¶
func (cons *ConsumerBase) Configure(conf PluginConfig) error
Configure initializes standard consumer values from a plugin config.
func (*ConsumerBase) Control ¶
func (cons *ConsumerBase) Control() chan<- PluginControl
Control returns write access to this consumer's control channel. See ConsumerControl* constants.
func (*ConsumerBase) ControlLoop ¶ added in v0.4.0
func (cons *ConsumerBase) ControlLoop()
ControlLoop listens to the control channel and triggers callbacks for these messags. Upon stop control message doExit will be set to true.
func (*ConsumerBase) Enqueue ¶
func (cons *ConsumerBase) Enqueue(data []byte, sequence uint64)
Enqueue creates a new message from a given byte slice and passes it to EnqueueMessage. Note that data is not copied, just referenced by the message.
func (*ConsumerBase) EnqueueCopy ¶
func (cons *ConsumerBase) EnqueueCopy(data []byte, sequence uint64)
EnqueueCopy behaves like Enqueue but creates a copy of data that is attached to the message.
func (*ConsumerBase) EnqueueMessage ¶
func (cons *ConsumerBase) EnqueueMessage(msg Message)
EnqueueMessage passes a given message to all streams. Only the StreamID of the message is modified, everything else is passed as-is.
func (*ConsumerBase) GetState ¶ added in v0.4.0
func (cons *ConsumerBase) GetState() PluginState
GetState returns the state this plugin is currently in
func (*ConsumerBase) IsActive ¶ added in v0.4.0
func (cons *ConsumerBase) IsActive() bool
IsActive returns true if GetState() returns active
func (*ConsumerBase) IsActiveOrStopping ¶ added in v0.4.0
func (cons *ConsumerBase) IsActiveOrStopping() bool
IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()
func (*ConsumerBase) IsBlocked ¶ added in v0.4.0
func (cons *ConsumerBase) IsBlocked() bool
IsBlocked returns true if GetState() returns waiting
func (*ConsumerBase) IsFuseBurned ¶ added in v0.4.1
func (cons *ConsumerBase) IsFuseBurned() bool
IsFuseBurned returns true if the fuse linked to this consumer has been burned. If no fuse is attached, false is returned.
func (*ConsumerBase) IsStopping ¶ added in v0.4.0
func (cons *ConsumerBase) IsStopping() bool
IsStopping returns true if GetState() returns stopping
func (*ConsumerBase) SetFuseActiveCallback ¶ added in v0.4.1
func (cons *ConsumerBase) SetFuseActiveCallback(onFuseActive func())
SetFuseActiveCallback sets the function to be called upon PluginControlFuseActive
func (*ConsumerBase) SetFuseBurnedCallback ¶ added in v0.4.1
func (cons *ConsumerBase) SetFuseBurnedCallback(onFuseBurned func())
SetFuseBurnedCallback sets the function to be called upon PluginControlFuseBurned
func (*ConsumerBase) SetRollCallback ¶ added in v0.4.0
func (cons *ConsumerBase) SetRollCallback(onRoll func())
SetRollCallback sets the function to be called upon PluginControlRoll
func (*ConsumerBase) SetStopCallback ¶ added in v0.4.0
func (cons *ConsumerBase) SetStopCallback(onStop func())
SetStopCallback sets the function to be called upon PluginControlStop
func (*ConsumerBase) SetWorkerWaitGroup ¶
func (cons *ConsumerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.
func (*ConsumerBase) Streams ¶
func (cons *ConsumerBase) Streams() []MessageStreamID
Streams returns an array with all stream ids this consumer is writing to.
func (*ConsumerBase) TickerControlLoop ¶
func (cons *ConsumerBase) TickerControlLoop(interval time.Duration, onTick func())
TickerControlLoop is like MessageLoop but executes a given function at every given interval tick, too. Note that the interval is not exact. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.
func (*ConsumerBase) WaitOnFuse ¶ added in v0.4.1
func (cons *ConsumerBase) WaitOnFuse()
WaitOnFuse blocks if the fuse linked to this consumer has been burned. If no fuse is bound this function does nothing.
func (*ConsumerBase) WorkerDone ¶
func (cons *ConsumerBase) WorkerDone()
WorkerDone removes an additional worker to the waitgroup.
type Distributor ¶ added in v0.4.0
type Distributor func(msg Message)
Distributor is a callback typedef for methods processing messages
type Filter ¶
Filter allows custom message filtering for ProducerBase derived plugins. Producers not deriving from ProducerBase might utilize this one, too.
type Formatter ¶
type Formatter interface { // Format transfers the message payload into a new format. The payload may // then be reassigned to the original or a new message. // In addition to that the formatter may change the stream of the message. Format(msg Message) ([]byte, MessageStreamID) }
Formatter is the interface definition for message formatters
type LinkableMessageSource ¶
type LinkableMessageSource interface { MessageSource // Link the message source to the message receiver. This makes it possible // to create stable "pipes" between e.g. a consumer and producer. Link(pipe interface{}) // IsLinked has to return true if Link executed successful and does not // need to be called again. IsLinked() bool }
LinkableMessageSource extends the MessageSource interface to allow a pipe like behaviour between two components that communicate messages.
type LogConsumer ¶
type LogConsumer struct { Consumer // contains filtered or unexported fields }
LogConsumer is an internal consumer plugin used indirectly by the gollum log package.
func (*LogConsumer) Configure ¶
func (cons *LogConsumer) Configure(conf PluginConfig) error
Configure initializes this consumer with values from a plugin config.
func (*LogConsumer) Consume ¶
func (cons *LogConsumer) Consume(threads *sync.WaitGroup)
Consume starts listening for control statements
func (*LogConsumer) Control ¶
func (cons *LogConsumer) Control() chan<- PluginControl
Control returns a handle to the control channel
func (*LogConsumer) GetState ¶ added in v0.4.0
func (cons *LogConsumer) GetState() PluginState
GetState always returns PluginStateActive
func (*LogConsumer) Streams ¶
func (cons *LogConsumer) Streams() []MessageStreamID
Streams always returns an array with one member - the internal log stream
type MappedStream ¶
type MappedStream struct { StreamID MessageStreamID Stream Stream }
MappedStream holds a stream and the id the stream is assgined to
type Message ¶
type Message struct { Data []byte StreamID MessageStreamID PrevStreamID MessageStreamID Source MessageSource Timestamp time.Time Sequence uint64 }
Message is a container used for storing the internal state of messages. This struct is passed between consumers and producers.
func DeserializeMessage ¶ added in v0.4.0
DeserializeMessage generates a message from a string produced by Message.Serialize.
func NewMessage ¶
func NewMessage(source MessageSource, data []byte, sequence uint64) Message
NewMessage creates a new message from a given data stream
func (Message) Enqueue ¶
func (msg Message) Enqueue(channel chan<- Message, timeout time.Duration) MessageState
Enqueue is a convenience function to push a message to a channel while waiting for a timeout instead of just blocking. Passing a timeout of -1 will discard the message. Passing a timout of 0 will always block. Messages that time out will be passed to the dropped queue if a Dropped consumer exists. The source parameter is used when a message is dropped, i.e. it is passed to the Drop function.
func (Message) Route ¶ added in v0.4.0
func (msg Message) Route(targetID MessageStreamID)
Route enqueues this message to the given stream. If the stream does not exist, a default stream (broadcast) is created.
type MessageBatch ¶
type MessageBatch struct {
// contains filtered or unexported fields
}
MessageBatch is a helper class for producers to format and store messages into a single buffer that is flushed to an io.Writer. You can use the Reached* functions to determine whether a flush should be called, i.e. if a timeout or size threshold has been reached.
func NewMessageBatch ¶
func NewMessageBatch(maxMessageCount int) MessageBatch
NewMessageBatch creates a new MessageBatch with a given size (in bytes) and a given formatter.
func (*MessageBatch) AfterFlushDo ¶ added in v0.4.2
func (batch *MessageBatch) AfterFlushDo(callback func() error) error
AfterFlushDo calls a function after a currently running flush is done. It also blocks any flush during the execution of callback. Returns the error returned by callback
func (*MessageBatch) Append ¶
func (batch *MessageBatch) Append(msg Message) bool
Append formats a message and appends it to the internal buffer. If the message does not fit into the buffer this function returns false. If the message can never fit into the buffer (too large), true is returned and an error is logged.
func (*MessageBatch) AppendOrBlock ¶ added in v0.4.0
func (batch *MessageBatch) AppendOrBlock(msg Message) bool
AppendOrBlock works like Append but will block until Append returns true. If the batch was closed during this call, false is returned.
func (*MessageBatch) AppendOrFlush ¶ added in v0.4.0
func (batch *MessageBatch) AppendOrFlush(msg Message, flushBuffer func(), canBlock func() bool, drop func(Message))
AppendOrFlush is a common combinatorial pattern of Append and AppendOrBlock. If append fails, flush is called, followed by AppendOrBlock if blocking is allowed. If AppendOrBlock fails (or blocking is not allowed) the message is dropped.
func (*MessageBatch) Close ¶ added in v0.4.0
func (batch *MessageBatch) Close(assemble AssemblyFunc, timeout time.Duration)
Close disables Append, calls flush and waits for this call to finish. Timeout is passed to WaitForFlush.
func (*MessageBatch) Flush ¶
func (batch *MessageBatch) Flush(assemble AssemblyFunc)
Flush writes the content of the buffer to a given resource and resets the internal state, i.e. the buffer is empty after a call to Flush. Writing will be done in a separate go routine to be non-blocking.
The validate callback will be called after messages have been successfully written to the io.Writer. If validate returns false the buffer will not be resetted (automatic retry). If validate is nil a return value of true is assumed (buffer reset).
The onError callback will be called if the io.Writer returned an error. If onError returns false the buffer will not be resetted (automatic retry). If onError is nil a return value of true is assumed (buffer reset).
func (MessageBatch) IsClosed ¶ added in v0.4.0
func (batch MessageBatch) IsClosed() bool
IsClosed returns true of Close has been called at least once.
func (MessageBatch) IsEmpty ¶
func (batch MessageBatch) IsEmpty() bool
IsEmpty returns true if no data is stored in the front buffer, i.e. if no data is scheduled for flushing.
func (*MessageBatch) Len ¶ added in v0.4.0
func (batch *MessageBatch) Len() int
Len returns the length of one buffer
func (MessageBatch) ReachedSizeThreshold ¶
func (batch MessageBatch) ReachedSizeThreshold(size int) bool
ReachedSizeThreshold returns true if the bytes stored in the buffer are above or equal to the size given. If there is no data this function returns false.
func (MessageBatch) ReachedTimeThreshold ¶
func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool
ReachedTimeThreshold returns true if the last flush was more than timeout ago. If there is no data this function returns false.
func (*MessageBatch) Touch ¶
func (batch *MessageBatch) Touch()
Touch resets the timer queried by ReachedTimeThreshold, i.e. this resets the automatic flush timeout
func (*MessageBatch) WaitForFlush ¶
func (batch *MessageBatch) WaitForFlush(timeout time.Duration)
WaitForFlush blocks until the current flush command returns. Passing a timeout > 0 will unblock this function after the given duration at the latest.
type MessageSource ¶
type MessageSource interface { // IsActive returns true if the source can produce messages IsActive() bool // IsBlocked returns true if the source cannot produce messages IsBlocked() bool }
MessageSource defines methods that are common to all message sources. Currently this is only a placeholder.
type MessageState ¶ added in v0.4.0
type MessageState int
MessageState is used as a return value for the Enqueu method
type MessageStreamID ¶
type MessageStreamID uint64
MessageStreamID is the "compiled name" of a stream
type Plugin ¶
type Plugin interface { // Configure is called during NewPluginWithType Configure(conf PluginConfig) error }
Plugin is the base class for any runtime class that can be configured and instantiated during runtim.
func NewPlugin ¶
func NewPlugin(config PluginConfig) (Plugin, error)
NewPlugin creates a new plugin from the type information stored in its config. This function internally calls NewPluginWithType.
func NewPluginWithType ¶
func NewPluginWithType(typename string, config PluginConfig) (Plugin, error)
NewPluginWithType creates a new plugin of a given type and initializes it using the given config (i.e. passes that config to Configure). The type passed to this function may differ from the type stored in the config. If the type is meant to match use NewPlugin instead of NewPluginWithType. This function returns nil, error if the plugin could not be instantiated or plugin, error if Configure failed.
type PluginConfig ¶
type PluginConfig struct { ID string Typename string Enable bool Instances int Stream []string Settings shared.MarshalMap // contains filtered or unexported fields }
PluginConfig is a configuration for a specific plugin
func NewPluginConfig ¶
func NewPluginConfig(typename string) PluginConfig
NewPluginConfig creates a new plugin config with default values. By default the plugin is enabled, has one instance, is bound to no streams and has no additional settings.
func (PluginConfig) GetBool ¶
func (conf PluginConfig) GetBool(key string, defaultValue bool) bool
GetBool tries to read a non-predefined, boolean value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetInt ¶
func (conf PluginConfig) GetInt(key string, defaultValue int) int
GetInt tries to read a non-predefined, integer value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetStreamArray ¶ added in v0.4.0
func (conf PluginConfig) GetStreamArray(key string, defaultValue []MessageStreamID) []MessageStreamID
GetStreamArray tries to read a non-predefined string array from a pluginconfig and translates all values to streamIds. If the key is not found defaultValue is returned.
func (PluginConfig) GetStreamMap ¶
func (conf PluginConfig) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string
GetStreamMap tries to read a non-predefined, stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.
func (PluginConfig) GetStreamRoutes ¶
func (conf PluginConfig) GetStreamRoutes(key string) map[MessageStreamID][]MessageStreamID
GetStreamRoutes tries to read a non-predefined, stream to stream map from a plugin config. If no routes are defined an empty map is returned
func (PluginConfig) GetString ¶
func (conf PluginConfig) GetString(key string, defaultValue string) string
GetString tries to read a non-predefined, string value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetStringArray ¶
func (conf PluginConfig) GetStringArray(key string, defaultValue []string) []string
GetStringArray tries to read a non-predefined, string array from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) GetStringMap ¶
GetStringMap tries to read a non-predefined, string to string map from a PluginConfig. If the key is not found defaultValue is returned.
func (PluginConfig) GetValue ¶
func (conf PluginConfig) GetValue(key string, defaultValue interface{}) interface{}
GetValue tries to read a non-predefined, untyped value from a PluginConfig. If that value is not found defaultValue is returned.
func (PluginConfig) HasValue ¶
func (conf PluginConfig) HasValue(key string) bool
HasValue returns true if the given key has been set as a config option. This function only takes non-predefined settings into account.
func (PluginConfig) Override ¶
func (conf PluginConfig) Override(key string, value interface{})
Override sets or override a configuration value for non-predefined options.
func (*PluginConfig) Read ¶
func (conf *PluginConfig) Read(values shared.MarshalMap)
Read analyzes a given key/value map to extract the configuration values valid for each plugin. All non-default values are written to the Settings member.
func (PluginConfig) Validate ¶
func (conf PluginConfig) Validate() bool
Validate should be called after a configuration has been processed. It will check the keys read from the config files against the keys requested up to this point. Unknown keys will be written to the error log.
type PluginControl ¶
type PluginControl int
PluginControl is an enumeration used to pass signals to plugins
type PluginRunState ¶
type PluginRunState struct {
// contains filtered or unexported fields
}
PluginRunState is used in some plugins to store information about the execution state of the plugin (i.e. if it is running or not) as well as threading primitives that enable gollum to wait for a plugin top properly shut down.
func NewPluginRunState ¶ added in v0.4.0
func NewPluginRunState() *PluginRunState
NewPluginRunState creates a new plugin state helper
func (*PluginRunState) AddWorker ¶
func (state *PluginRunState) AddWorker()
AddWorker adds a worker to the waitgroup configured by SetWorkerWaitGroup.
func (*PluginRunState) GetState ¶ added in v0.4.0
func (state *PluginRunState) GetState() PluginState
GetState returns the current plugin state casted to the correct type
func (*PluginRunState) SetState ¶ added in v0.4.0
func (state *PluginRunState) SetState(nextState PluginState)
SetState sets a new plugin state casted to the correct type
func (*PluginRunState) SetWorkerWaitGroup ¶
func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup sets the WaitGroup used to manage workers
func (*PluginRunState) WorkerDone ¶
func (state *PluginRunState) WorkerDone()
WorkerDone removes a worker from the waitgroup configured by SetWorkerWaitGroup.
type PluginState ¶ added in v0.4.0
type PluginState int32
PluginState is an enumeration used to describe the current working state of a plugin
type PluginWithState ¶ added in v0.4.0
type PluginWithState interface { Plugin GetState() PluginState }
PluginWithState allows certain plugins to give information about their runstate
type Producer ¶
type Producer interface { PluginWithState MessageSource // Enqueue sends a message to the producer. The producer may reject // the message or drop it after a given timeout. Enqueue can block. Enqueue(msg Message, timeout *time.Duration) // Produce should implement a main loop that passes messages from the // message channel to some other service like the console. // This can be part of this function or a separate go routine. // Produce is always called as a go routine. Produce(workers *sync.WaitGroup) // Streams returns the streams this producer is listening to. Streams() []MessageStreamID // Control returns write access to this producer's control channel. // See ProducerControl* constants. Control() chan<- PluginControl // DropStream returns the id of the stream to drop messages to. GetDropStreamID() MessageStreamID // AddDependency is called whenever a producer is registered that sends // messages to this producer. Dependencies are used to resolve shutdown // conflicts. AddDependency(Producer) // DependsOn returns true if this plugin has a direct or indirect dependency // on the given producer DependsOn(Producer) bool }
Producer is an interface for plugins that pass messages to other services, files or storages.
type ProducerBase ¶
type ProducerBase struct {
// contains filtered or unexported fields
}
ProducerBase plugin base type This type defines a common baseclass for producers. All producers should derive from this class, but not necessarily need to. Configuration example:
- "producer.Foobar": Enable: true ID: "" Channel: 8192 ChannelTimeoutMs: 0 ShutdownTimeoutMs: 3000 Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" Fuse: "" FuseTimeoutSec: 5 Stream:
- "foo"
- "bar"
Enable switches the consumer on or off. By default this value is set to true.
ID allows this producer to be found by other plugins by name. By default this is set to "" which does not register this producer.
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer's queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to "*" which means "listen to all streams but the internal".
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to "".
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer's implementation. By default this setting is set to 10.
func (*ProducerBase) Accepts ¶ added in v0.4.0
func (prod *ProducerBase) Accepts(msg Message) bool
Accepts returns false if one filter in the list returns false
func (*ProducerBase) AddDependency ¶ added in v0.4.0
func (prod *ProducerBase) AddDependency(dep Producer)
AddDependency is called whenever a producer is registered that sends messages to this producer. Dependencies are used to resolve shutdown conflicts.
func (*ProducerBase) AddMainWorker ¶
func (prod *ProducerBase) AddMainWorker(workers *sync.WaitGroup)
AddMainWorker adds the first worker to the waitgroup
func (*ProducerBase) AddWorker ¶
func (prod *ProducerBase) AddWorker()
AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.
func (*ProducerBase) CloseMessageChannel ¶ added in v0.4.0
func (prod *ProducerBase) CloseMessageChannel(handleMessage func(msg Message)) bool
CloseMessageChannel closes and empties the internal channel with a given timeout per message. If flushing messages runs into this timeout all remaining messages will be dropped by using the producer.Drop function. If a timout has been detected, false is returned.
func (*ProducerBase) Configure ¶
func (prod *ProducerBase) Configure(conf PluginConfig) error
Configure initializes the standard producer config values.
func (*ProducerBase) Control ¶
func (prod *ProducerBase) Control() chan<- PluginControl
Control returns write access to this producer's control channel. See PluginControl* constants.
func (*ProducerBase) ControlLoop ¶ added in v0.4.0
func (prod *ProducerBase) ControlLoop()
ControlLoop listens to the control channel and triggers callbacks for these messags. Upon stop control message doExit will be set to true.
func (*ProducerBase) DependsOn ¶ added in v0.4.0
func (prod *ProducerBase) DependsOn(dep Producer) bool
DependsOn returns true if this plugin has a direct or indirect dependency on the given producer
func (*ProducerBase) Drop ¶ added in v0.4.0
func (prod *ProducerBase) Drop(msg Message)
Drop routes the message to the configured drop stream.
func (*ProducerBase) Enqueue ¶
func (prod *ProducerBase) Enqueue(msg Message, timeout *time.Duration)
Enqueue will add the message to the internal channel so it can be processed by the producer main loop. A timeout value != nil will overwrite the channel timeout value for this call.
func (*ProducerBase) Format ¶
func (prod *ProducerBase) Format(msg Message) ([]byte, MessageStreamID)
Format calls the formatters Format function
func (*ProducerBase) GetDropStreamID ¶ added in v0.4.0
func (prod *ProducerBase) GetDropStreamID() MessageStreamID
GetDropStreamID returns the id of the stream to drop messages to.
func (*ProducerBase) GetFilter ¶ added in v0.4.0
func (prod *ProducerBase) GetFilter() Filter
GetFilter returns the first filter of this producer
func (*ProducerBase) GetFormatter ¶
func (prod *ProducerBase) GetFormatter() Formatter
GetFormatter returns the formatter of this producer
func (*ProducerBase) GetFuse ¶ added in v0.4.1
func (prod *ProducerBase) GetFuse() *shared.Fuse
GetFuse returns the fuse bound to this producer or nil if no fuse name has been set.
func (*ProducerBase) GetShutdownTimeout ¶ added in v0.4.0
func (prod *ProducerBase) GetShutdownTimeout() time.Duration
GetShutdownTimeout returns the duration this producer will wait during close before messages get dropped.
func (*ProducerBase) GetState ¶ added in v0.4.0
func (prod *ProducerBase) GetState() PluginState
GetState returns the state this plugin is currently in
func (*ProducerBase) GetTimeout ¶
func (prod *ProducerBase) GetTimeout() time.Duration
GetTimeout returns the duration this producer will block before a message is dropped. A value of -1 will cause the message to drop. A value of 0 will cause the producer to always block.
func (*ProducerBase) IsActive ¶ added in v0.4.0
func (prod *ProducerBase) IsActive() bool
IsActive returns true if GetState() returns active or waiting
func (*ProducerBase) IsActiveOrStopping ¶ added in v0.4.0
func (prod *ProducerBase) IsActiveOrStopping() bool
IsActiveOrStopping is a shortcut for prod.IsActive() || prod.IsStopping()
func (*ProducerBase) IsBlocked ¶ added in v0.4.0
func (prod *ProducerBase) IsBlocked() bool
IsBlocked returns true if GetState() returns waiting
func (*ProducerBase) IsStopping ¶ added in v0.4.0
func (prod *ProducerBase) IsStopping() bool
IsStopping returns true if GetState() returns stopping
func (*ProducerBase) MessageControlLoop ¶ added in v0.4.0
func (prod *ProducerBase) MessageControlLoop(onMessage func(Message))
MessageControlLoop provides a producer mainloop that is sufficient for most usecases. ControlLoop will be called in a separate go routine. This function will block until a stop signal is received.
func (*ProducerBase) Messages ¶
func (prod *ProducerBase) Messages() chan<- Message
Messages returns write access to the message channel this producer reads from.
func (*ProducerBase) Next ¶
func (prod *ProducerBase) Next() (Message, bool)
Next returns the latest message from the channel as well as the open state of the channel. This function blocks if the channel is empty.
func (*ProducerBase) NextNonBlocking ¶
func (prod *ProducerBase) NextNonBlocking(onMessage func(msg Message)) bool
NextNonBlocking calls a given callback if a message is queued or returns. Returns false if no message was received.
func (*ProducerBase) PauseAllStreams ¶
func (prod *ProducerBase) PauseAllStreams(capacity int)
PauseAllStreams sends the Pause() command to all streams this producer is listening to.
func (*ProducerBase) ResumeAllStreams ¶
func (prod *ProducerBase) ResumeAllStreams()
ResumeAllStreams sends the Resume() command to all streams this producer is listening to.
func (*ProducerBase) SetCheckFuseCallback ¶ added in v0.4.1
func (prod *ProducerBase) SetCheckFuseCallback(onCheckFuse func() bool)
SetCheckFuseCallback sets the function to be called upon PluginControlCheckFuse. The callback has to return true to trigger a fuse reactivation. If nil is passed as a callback PluginControlCheckFuse will reactivate the fuse immediately.
func (*ProducerBase) SetRollCallback ¶ added in v0.4.0
func (prod *ProducerBase) SetRollCallback(onRoll func())
SetRollCallback sets the function to be called upon PluginControlRoll
func (*ProducerBase) SetStopCallback ¶ added in v0.4.0
func (prod *ProducerBase) SetStopCallback(onStop func())
SetStopCallback sets the function to be called upon PluginControlStop
func (*ProducerBase) SetWorkerWaitGroup ¶
func (prod *ProducerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)
SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.
func (*ProducerBase) Streams ¶
func (prod *ProducerBase) Streams() []MessageStreamID
Streams returns the streams this producer is listening to.
func (*ProducerBase) TickerMessageControlLoop ¶ added in v0.4.0
func (prod *ProducerBase) TickerMessageControlLoop(onMessage func(Message), interval time.Duration, onTimeOut func())
TickerMessageControlLoop is like MessageLoop but executes a given function at every given interval tick, too. If the onTick function takes longer than interval, the next tick will be delayed until onTick finishes.
func (*ProducerBase) WaitForDependencies ¶ added in v0.4.0
func (prod *ProducerBase) WaitForDependencies(waitForState PluginState, timeout time.Duration)
WaitForDependencies waits until all dependencies reach the given runstate. A timeout > 0 can be given to work around possible blocking situations.
func (*ProducerBase) WorkerDone ¶
func (prod *ProducerBase) WorkerDone()
WorkerDone removes an additional worker to the waitgroup.
type SerialMessageSource ¶
type SerialMessageSource interface { AsyncMessageSource // Notify the end of the response stream ResponseDone() }
SerialMessageSource extends the AsyncMessageSource interface to allow waiting for all parts of the response to be submitted.
type SerializedMessage ¶ added in v0.4.0
type SerializedMessage struct { StreamID *uint64 `protobuf:"varint,1,req" json:"StreamID,omitempty"` PrevStreamID *uint64 `protobuf:"varint,2,req" json:"PrevStreamID,omitempty"` Timestamp *int64 `protobuf:"varint,3,req" json:"Timestamp,omitempty"` Sequence *uint64 `protobuf:"varint,4,req" json:"Sequence,omitempty"` Data []byte `protobuf:"bytes,5,req" json:"Data,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*SerializedMessage) GetData ¶ added in v0.4.0
func (m *SerializedMessage) GetData() []byte
func (*SerializedMessage) GetPrevStreamID ¶ added in v0.4.0
func (m *SerializedMessage) GetPrevStreamID() uint64
func (*SerializedMessage) GetSequence ¶ added in v0.4.0
func (m *SerializedMessage) GetSequence() uint64
func (*SerializedMessage) GetStreamID ¶ added in v0.4.0
func (m *SerializedMessage) GetStreamID() uint64
func (*SerializedMessage) GetTimestamp ¶ added in v0.4.0
func (m *SerializedMessage) GetTimestamp() int64
func (*SerializedMessage) ProtoMessage ¶ added in v0.4.0
func (*SerializedMessage) ProtoMessage()
func (*SerializedMessage) Reset ¶ added in v0.4.0
func (m *SerializedMessage) Reset()
func (*SerializedMessage) String ¶ added in v0.4.0
func (m *SerializedMessage) String() string
type Stream ¶
type Stream interface { // GetBoundStreamID returns the stream id this plugin is bound to. GetBoundStreamID() MessageStreamID // Pause causes this stream to go silent. Messages should be queued or cause // a blocking call. The passed capacity can be used to configure internal // channel for buffering incoming messages while this stream is paused. Pause(capacity int) // Resume causes this stream to send messages again after Pause() had been // called. Any buffered messages need to be sent by this method or by a // separate go routine. Resume() // Flush calls Resume and blocks until resume finishes Flush() // AddProducer adds one or more producers to this stream, i.e. the producers // listening to messages on this stream. AddProducer(producers ...Producer) // Enqueue sends a given message to all registered producers Enqueue(msg Message) // GetProducers returns the producers bound to this stream GetProducers() []Producer }
Stream defines the interface for all stream plugins
type StreamBase ¶
type StreamBase struct { Filter Filter Format Formatter Producers []Producer Timeout *time.Duration // contains filtered or unexported fields }
StreamBase plugin base type This type defines the standard stream implementation. New stream types should derive from this class. StreamBase allows streams to set and execute filters as well as format a message. Types derived from StreamBase should set the Distribute member instead of overloading the Enqueue method. Configuration Example
- "stream.Foobar" Enable: true Stream: "streamToConfigure" Formatter: "format.Forward" Filter: "filter.All" TimeoutMs: 0
Enable can be set to false to disable this stream configuration but leave it in the config for future use. Set to true by default.
Stream defines the stream to configure. This is a mandatory setting and has no default value.
Formatter defines the first formatter to apply to the messages passing through this stream. By default this is set to "format.Forward".
Filter defines the filter to apply to the messages passing through this stream. By default this is et to "filter.All".
TimeoutMs defines an optional timeout that can be used to wait for producers attached to this stream to unblock. This setting overwrites the corresponding producer setting for this (and only this) stream.
func (*StreamBase) AddProducer ¶
func (stream *StreamBase) AddProducer(producers ...Producer)
AddProducer adds all producers to the list of known producers. Duplicates will be filtered.
func (*StreamBase) Broadcast ¶ added in v0.4.0
func (stream *StreamBase) Broadcast(msg Message)
Broadcast enqueues the given message to all producers attached to this stream.
func (*StreamBase) ConfigureStream ¶ added in v0.4.0
func (stream *StreamBase) ConfigureStream(conf PluginConfig, distribute Distributor) error
ConfigureStream sets up all values required by StreamBase.
func (*StreamBase) Enqueue ¶
func (stream *StreamBase) Enqueue(msg Message)
Enqueue checks the filter, formats the message and sends it to all producers registered. Functions deriving from StreamBase can set the Distribute member to hook into this function.
func (*StreamBase) Flush ¶ added in v0.4.0
func (stream *StreamBase) Flush()
Flush calls Resume and blocks until resume finishes
func (*StreamBase) GetBoundStreamID ¶ added in v0.4.0
func (stream *StreamBase) GetBoundStreamID() MessageStreamID
GetBoundStreamID returns the id of the stream this plugin is bound to.
func (*StreamBase) GetProducers ¶ added in v0.4.0
func (stream *StreamBase) GetProducers() []Producer
GetProducers returns the producers bound to this stream
func (*StreamBase) Pause ¶
func (stream *StreamBase) Pause(capacity int)
Pause will cause this stream to go silent. Messages will be queued to an internal channel that can be configured in size by setting the capacity parameter. Pass a capacity of 0 to disable buffering. Calling Pause on an already paused stream is ignored.
func (*StreamBase) Resume ¶
func (stream *StreamBase) Resume()
Resume causes this stream to send messages again after Pause() had been called. Any buffered messages will be sent by a separate go routine. Calling Resume on a stream that is not paused is ignored.
func (*StreamBase) Route ¶ added in v0.4.0
func (stream *StreamBase) Route(msg Message, targetID MessageStreamID)
Route is called by Enqueue after a message has been accepted and formatted. This encapsulates the main logic of sending messages to producers or to another stream if necessary.
type WriterAssembly ¶ added in v0.4.0
type WriterAssembly struct {
// contains filtered or unexported fields
}
WriterAssembly is a helper struct for io.Writer compatible classes that use messagebatch.
func NewWriterAssembly ¶ added in v0.4.0
func NewWriterAssembly(writer io.Writer, flush func(Message), formatter Formatter) WriterAssembly
NewWriterAssembly creates a new adapter between io.Writer and the MessageBatch AssemblyFunc function signature
func (*WriterAssembly) Flush ¶ added in v0.4.0
func (asm *WriterAssembly) Flush(messages []Message)
Flush is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to e.g. the Drop function of a producer. Flush will also be called by Write if the io.Writer reported an error.
func (*WriterAssembly) SetErrorHandler ¶ added in v0.4.0
func (asm *WriterAssembly) SetErrorHandler(handleError func(error) bool)
SetErrorHandler sets a callback that is called if an error occurred. HandleError needs to return true to prevent messages to be flushed.
func (*WriterAssembly) SetFlush ¶ added in v0.4.0
func (asm *WriterAssembly) SetFlush(flush func(Message))
SetFlush changes the bound flush function
func (*WriterAssembly) SetValidator ¶ added in v0.4.0
func (asm *WriterAssembly) SetValidator(validate func() bool)
SetValidator sets a callback that is called if a write was successful. Validate needs to return true to prevent messages to be flushed.
func (*WriterAssembly) SetWriter ¶ added in v0.4.0
func (asm *WriterAssembly) SetWriter(writer io.Writer)
SetWriter changes the writer interface used during Assemble
func (*WriterAssembly) Write ¶ added in v0.4.0
func (asm *WriterAssembly) Write(messages []Message)
Write is an AssemblyFunc compatible implementation to pass all messages from a MessageBatch to an io.Writer. Messages are formatted using a given formatter. If the io.Writer fails to write the assembled buffer all messages are passed to the FLush() method.