Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultExponentialBackoffConfig provides a default configuration for // exponential backoff. DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ Base: time.Second, Factor: time.Second, Max: 20 * time.Second, } )
var ( // ErrSinkClosed is returned if a write is issued to a sink that has been // closed. If encountered, the error should be considered terminal and // retries will not be successful. ErrSinkClosed = fmt.Errorf("events: sink closed") )
Functions ¶
This section is empty.
Types ¶
type Breaker ¶
type Breaker struct {
// contains filtered or unexported fields
}
Breaker implements a circuit breaker retry strategy.
The current implementation never drops events.
func NewBreaker ¶
NewBreaker returns a breaker that will backoff after the threshold has been tripped. A Breaker is thread safe and may be shared by many goroutines.
type Broadcaster ¶
type Broadcaster struct {
// contains filtered or unexported fields
}
Broadcaster sends events to multiple, reliable Sinks. The goal of this component is to dispatch events to configured endpoints. Reliability can be provided by wrapping incoming sinks.
func NewBroadcaster ¶
func NewBroadcaster(sinks ...Sink) *Broadcaster
NewBroadcaster appends one or more sinks to the list of sinks. The broadcaster behavior will be affected by the properties of the sink. Generally, the sink should accept all messages and deal with reliability on its own. Use of EventQueue and RetryingSink should be used here.
func (*Broadcaster) Add ¶
func (b *Broadcaster) Add(sink Sink) error
Add the sink to the broadcaster.
The provided sink must be comparable with equality. Typically, this just works with a regular pointer type.
func (*Broadcaster) Close ¶
func (b *Broadcaster) Close() error
Close the broadcaster, ensuring that all messages are flushed to the underlying sink before returning.
func (*Broadcaster) Remove ¶
func (b *Broadcaster) Remove(sink Sink) error
Remove the provided sink.
func (*Broadcaster) String ¶
func (b *Broadcaster) String() string
func (*Broadcaster) Write ¶
func (b *Broadcaster) Write(event Event) error
Write accepts an event to be dispatched to all sinks. This method will never fail and should never block (hopefully!). The caller cedes the memory to the broadcaster and should not modify it after calling write.
type Channel ¶
type Channel struct { C chan Event // contains filtered or unexported fields }
Channel provides a sink that can be listened on. The writer and channel listener must operate in separate goroutines.
Consumers should listen on Channel.C until Closed is closed.
func NewChannel ¶
NewChannel returns a channel. If buffer is zero, the channel is unbuffered.
type ExponentialBackoff ¶
type ExponentialBackoff struct {
// contains filtered or unexported fields
}
ExponentialBackoff implements random backoff with exponentially increasing bounds as the number consecutive failures increase.
func NewExponentialBackoff ¶
func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff
NewExponentialBackoff returns an exponential backoff strategy with the desired config. If config is nil, the default is returned.
func (*ExponentialBackoff) Failure ¶
func (b *ExponentialBackoff) Failure(event Event, err error) bool
Failure increments the failure counter.
func (*ExponentialBackoff) Proceed ¶
func (b *ExponentialBackoff) Proceed(event Event) time.Duration
Proceed returns the next randomly bound exponential backoff time.
func (*ExponentialBackoff) Success ¶
func (b *ExponentialBackoff) Success(event Event)
Success resets the failures counter.
type ExponentialBackoffConfig ¶
type ExponentialBackoffConfig struct { // Base is the minimum bound for backing off after failure. Base time.Duration // Factor sets the amount of time by which the backoff grows with each // failure. Factor time.Duration // Max is the absolute maxiumum bound for a single backoff. Max time.Duration }
ExponentialBackoffConfig configures backoff parameters.
Note that these parameters operate on the upper bound for choosing a random value. For example, at Base=1s, a random value in [0,1s) will be chosen for the backoff value.
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter provides an event sink that sends only events that are accepted by a Matcher. No methods on filter are goroutine safe.
type MatcherFunc ¶
MatcherFunc implements matcher with just a function.
func (MatcherFunc) Match ¶
func (fn MatcherFunc) Match(event Event) bool
Match calls the wrapped function.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue accepts all messages into a queue for asynchronous consumption by a sink. It is unbounded and thread safe but the sink must be reliable or events will be dropped.
type RetryStrategy ¶
type RetryStrategy interface { // Proceed is called before every event send. If proceed returns a // positive, non-zero integer, the retryer will back off by the provided // duration. // // An event is provided, by may be ignored. Proceed(event Event) time.Duration // Failure reports a failure to the strategy. If this method returns true, // the event should be dropped. Failure(event Event, err error) bool // Success should be called when an event is sent successfully. Success(event Event) }
RetryStrategy defines a strategy for retrying event sink writes.
All methods should be goroutine safe.
type RetryingSink ¶
type RetryingSink struct {
// contains filtered or unexported fields
}
RetryingSink retries the write until success or an ErrSinkClosed is returned. Underlying sink must have p > 0 of succeeding or the sink will block. Retry is configured with a RetryStrategy. Concurrent calls to a retrying sink are serialized through the sink, meaning that if one is in-flight, another will not proceed.
func NewRetryingSink ¶
func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink
NewRetryingSink returns a sink that will retry writes to a sink, backing off on failure. Parameters threshold and backoff adjust the behavior of the circuit breaker.
func (*RetryingSink) Close ¶
func (rs *RetryingSink) Close() error
Close closes the sink and the underlying sink.
func (*RetryingSink) String ¶
func (rs *RetryingSink) String() string
func (*RetryingSink) Write ¶
func (rs *RetryingSink) Write(event Event) error
Write attempts to flush the events to the downstream sink until it succeeds or the sink is closed.
type Sink ¶
type Sink interface { // Write an event to the Sink. If no error is returned, the caller will // assume that all events have been committed to the sink. If an error is // received, the caller may retry sending the event. Write(event Event) error // Close the sink, possibly waiting for pending events to flush. Close() error }
Sink accepts and sends events.