Documentation ¶
Overview ¶
Package mode defines and implents output strategies with failover or load balancing modes for use by output plugins.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoConnectionConfigured indicates no configured connections for publishing. ErrNoConnectionConfigured = errors.New("No connection configured") )
var ErrNoHostsConfigured = errors.New("no host configuration found")
ErrNoHostsConfigured indicates missing host or hosts configuration
var ( // ErrTempBulkFailure indicates PublishEvents fail temporary to retry. ErrTempBulkFailure = errors.New("temporary bulk send failure") )
Functions ¶
Types ¶
type AsyncLoadBalancerMode ¶
type AsyncLoadBalancerMode struct {
// contains filtered or unexported fields
}
AsyncLoadBalancerMode balances the sending of events between multiple connections.
The balancing algorithm is mostly pull-based, with multiple workers trying to pull some amount of work from a shared queue. Workers will try to get a new work item only if they have a working/active connection. Workers without active connection do not participate until a connection has been re-established. Due to the pull based nature the algorithm will load-balance events by random with workers having less latencies/turn-around times potentially getting more work items then other workers with higher latencies. Thusly the algorithm dynamically adapts to resource availability of server events are forwarded to.
Workers not participating in the load-balancing will continuously try to reconnect to their configured endpoints. Once a new connection has been established, these workers will participate in in load-balancing again.
If a connection becomes unavailable, the events are rescheduled for another connection to pick up. Rescheduling events is limited to a maximum number of send attempts. If events have not been send after maximum number of allowed attemps has been passed, they will be dropped.
Like network connections, distributing events to workers is subject to timeout. If no worker is available to pickup a message for sending, the message will be dropped internally after max_retries. If mode or message requires guaranteed send, message is retried infinitely.
func NewAsyncLoadBalancerMode ¶
func NewAsyncLoadBalancerMode( clients []AsyncProtocolClient, maxAttempts int, waitRetry, timeout, maxWaitRetry time.Duration, ) (*AsyncLoadBalancerMode, error)
NewAsyncLoadBalancerMode create a new load balancer connection mode.
func (*AsyncLoadBalancerMode) Close ¶
func (m *AsyncLoadBalancerMode) Close() error
Close stops all workers and closes all open connections. In flight events are signaled as failed.
func (*AsyncLoadBalancerMode) PublishEvent ¶
func (m *AsyncLoadBalancerMode) PublishEvent( signaler outputs.Signaler, opts outputs.Options, event common.MapStr, ) error
PublishEvent forwards the event to some load balancing worker.
func (*AsyncLoadBalancerMode) PublishEvents ¶
func (m *AsyncLoadBalancerMode) PublishEvents( signaler outputs.Signaler, opts outputs.Options, events []common.MapStr, ) error
PublishEvents forwards events to some load balancing worker.
type AsyncProtocolClient ¶
type AsyncProtocolClient interface { Connectable AsyncPublishEvents(cb func([]common.MapStr, error), events []common.MapStr) error AsyncPublishEvent(cb func(error), event common.MapStr) error }
AsyncProtocolClient interface is a output plugin specfic client implementation for asynchronous encoding and publishing events.
func MakeAsyncClients ¶
func MakeAsyncClients( config *common.Config, newClient func(string) (AsyncProtocolClient, error), ) ([]AsyncProtocolClient, error)
func NewAsyncFailoverClient ¶
func NewAsyncFailoverClient(clients []AsyncProtocolClient) []AsyncProtocolClient
type Connectable ¶
type Connectable interface { // Connect establishes a connection to the clients sink. // The connection attempt shall report an error if no connection could been // established within the given time interval. A timeout value of 0 == wait // forever. Connect(timeout time.Duration) error // Close closes the established connection. Close() error // IsConnected indicates the clients connection state. If connection has // been lost while publishing events, IsConnected must return false. As long as // IsConnected returns false, an output plugin might try to re-establish the // connection by calling Connect. IsConnected() bool }
type ConnectionMode ¶
type ConnectionMode interface { // Close will stop the modes it's publisher loop and close all it's // associated clients Close() error // PublishEvents will send all events (potentially asynchronous) to its // clients. PublishEvents(trans outputs.Signaler, opts outputs.Options, events []common.MapStr) error // PublishEvent will send an event to its clients. PublishEvent(trans outputs.Signaler, opts outputs.Options, event common.MapStr) error }
ConnectionMode takes care of connecting to hosts and potentially doing load balancing and/or failover
func NewAsyncConnectionMode ¶
func NewAsyncConnectionMode( clients []AsyncProtocolClient, failover bool, maxAttempts int, waitRetry, timeout, maxWaitRetry time.Duration, ) (ConnectionMode, error)
func NewConnectionMode ¶
func NewConnectionMode( clients []ProtocolClient, failover bool, maxAttempts int, waitRetry, timeout, maxWaitRetry time.Duration, ) (ConnectionMode, error)
type LoadBalancerMode ¶
type LoadBalancerMode struct {
// contains filtered or unexported fields
}
LoadBalancerMode balances the sending of events between multiple connections.
The balancing algorithm is mostly pull-based, with multiple workers trying to pull some amount of work from a shared queue. Workers will try to get a new work item only if they have a working/active connection. Workers without active connection do not participate until a connection has been re-established. Due to the pull based nature the algorithm will load-balance events by random with workers having less latencies/turn-around times potentially getting more work items then other workers with higher latencies. Thusly the algorithm dynamically adapts to resource availability of server events are forwarded to.
Workers not participating in the load-balancing will continuously try to reconnect to their configured endpoints. Once a new connection has been established, these workers will participate in in load-balancing again.
If a connection becomes unavailable, the events are rescheduled for another connection to pick up. Rescheduling events is limited to a maximum number of send attempts. If events have not been send after maximum number of allowed attemps has been passed, they will be dropped.
Distributing events to workers is subject to timeout. If no worker is available to pickup a message for sending, the message will be dropped internally.
func NewLoadBalancerMode ¶
func NewLoadBalancerMode( clients []ProtocolClient, maxAttempts int, waitRetry, timeout, maxWaitRetry time.Duration, ) (*LoadBalancerMode, error)
NewLoadBalancerMode create a new load balancer connection mode.
func (*LoadBalancerMode) Close ¶
func (m *LoadBalancerMode) Close() error
Close waits for the workers to end and connections to close.
func (*LoadBalancerMode) PublishEvent ¶
func (m *LoadBalancerMode) PublishEvent( signaler outputs.Signaler, opts outputs.Options, event common.MapStr, ) error
PublishEvent forwards the event to some load balancing worker.
func (*LoadBalancerMode) PublishEvents ¶
func (m *LoadBalancerMode) PublishEvents( signaler outputs.Signaler, opts outputs.Options, events []common.MapStr, ) error
PublishEvents forwards events to some load balancing worker.
type ProtocolClient ¶
type ProtocolClient interface { Connectable // PublishEvents sends events to the clients sink. On failure or timeout err // must be set. If connection has been lost, IsConnected must return false // in future calls. // PublishEvents is free to publish only a subset of given events, even in // error case. On return nextEvents contains all events not yet published. PublishEvents(events []common.MapStr) (nextEvents []common.MapStr, err error) // PublishEvent sends one event to the clients sink. On failure and error is // returned. PublishEvent(event common.MapStr) error }
ProtocolClient interface is a output plugin specific client implementation for encoding and publishing events. A ProtocolClient must be able to connection to it's sink and indicate connection failures in order to be reconnected byte the output plugin.
func MakeClients ¶
func MakeClients( config *common.Config, newClient func(string) (ProtocolClient, error), ) ([]ProtocolClient, error)
MakeClients will create a list from of ProtocolClient instances from outputer configuration host list and client factory function.
func NewFailoverClient ¶
func NewFailoverClient(clients []ProtocolClient) []ProtocolClient
type SingleConnectionMode ¶
type SingleConnectionMode struct {
// contains filtered or unexported fields
}
SingleConnectionMode sends all Output on one single connection. If connection is not available, the output plugin blocks until the connection is either available again or the connection mode is closed by Close.
func NewSingleConnectionMode ¶
func NewSingleConnectionMode( client ProtocolClient, maxAttempts int, waitRetry, timeout, maxWaitRetry time.Duration, ) (*SingleConnectionMode, error)
NewSingleConnectionMode creates a new single connection mode using exactly one ProtocolClient connection.
func (*SingleConnectionMode) Close ¶
func (s *SingleConnectionMode) Close() error
Close closes the underlying connection.
func (*SingleConnectionMode) PublishEvent ¶
func (s *SingleConnectionMode) PublishEvent( signaler outputs.Signaler, opts outputs.Options, event common.MapStr, ) error
PublishEvent forwards a single event. On failure PublishEvent tries to reconnect.
func (*SingleConnectionMode) PublishEvents ¶
func (s *SingleConnectionMode) PublishEvents( signaler outputs.Signaler, opts outputs.Options, events []common.MapStr, ) error
PublishEvents tries to publish the events with retries if connection becomes unavailable. On failure PublishEvents tries to reconnect.