events

package module
v0.0.0-...-9940093 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2024 License: Apache-2.0 Imports: 7 Imported by: 760

README

Docker Events Package

GoDoc

The Docker events package implements a composable event distribution package for Go.

Originally created to implement the notifications in Docker Registry 2, we've found the pattern to be useful in other applications. This package is most of the same code with slightly updated interfaces. Much of the internals have been made available.

Usage

The events package centers around a Sink type. Events are written with calls to Sink.Write(event Event). Sinks can be wired up in various configurations to achieve interesting behavior.

The canonical example is that employed by the docker/distribution/notifications package. Let's say we have a type httpSink where we'd like to queue notifications. As a rule, it should send a single http request and return an error if it fails:

func (h *httpSink) Write(event Event) error {
	p, err := json.Marshal(event)
	if err != nil {
		return err
	}
	body := bytes.NewReader(p)
	resp, err := h.client.Post(h.url, "application/json", body)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	
	if resp.Status != 200 {
		return errors.New("unexpected status")
	}

	return nil
}

// implement (*httpSink).Close()

With just that, we can start using components from this package. One can call (*httpSink).Write to send events as the body of a post request to a configured URL.

Retries

HTTP can be unreliable. The first feature we'd like is to have some retry:

hs := newHTTPSink(/*...*/)
retry := NewRetryingSink(hs, NewBreaker(5, time.Second))

We now have a sink that will retry events against the httpSink until they succeed. The retry will backoff for one second after 5 consecutive failures using the breaker strategy.

Queues

This isn't quite enough. We we want a sink that doesn't block while we are waiting for events to be sent. Let's add a Queue:

queue := NewQueue(retry)

Now, we have an unbounded queue that will work through all events sent with (*Queue).Write. Events can be added asynchronously to the queue without blocking the current execution path. This is ideal for use in an http request.

Broadcast

It usually turns out that you want to send to more than one listener. We can use Broadcaster to support this:

var broadcast = NewBroadcaster() // make it available somewhere in your application.
broadcast.Add(queue) // add your queue!
broadcast.Add(queue2) // and another!

With the above, we can now call broadcast.Write in our http handlers and have all the events distributed to each queue. Because the events are queued, not listener blocks another.

Extending

For the most part, the above is sufficient for a lot of applications. However, extending the above functionality can be done implementing your own Sink. The behavior and semantics of the sink can be completely dependent on the application requirements. The interface is provided below for reference:

type Sink {
	Write(Event) error
	Close() error
}

Application behavior can be controlled by how Write behaves. The examples above are designed to queue the message and return as quickly as possible. Other implementations may block until the event is committed to durable storage.

Copyright © 2016 Docker, Inc. go-events is licensed under the Apache License, Version 2.0. See LICENSE for the full license text.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultExponentialBackoffConfig provides a default configuration for
	// exponential backoff.
	DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
		Base:   time.Second,
		Factor: time.Second,
		Max:    20 * time.Second,
	}
)
View Source
var (
	// ErrSinkClosed is returned if a write is issued to a sink that has been
	// closed. If encountered, the error should be considered terminal and
	// retries will not be successful.
	ErrSinkClosed = fmt.Errorf("events: sink closed")
)

Functions

This section is empty.

Types

type Breaker

type Breaker struct {
	// contains filtered or unexported fields
}

Breaker implements a circuit breaker retry strategy.

The current implementation never drops events.

func NewBreaker

func NewBreaker(threshold int, backoff time.Duration) *Breaker

NewBreaker returns a breaker that will backoff after the threshold has been tripped. A Breaker is thread safe and may be shared by many goroutines.

func (*Breaker) Failure

func (b *Breaker) Failure(event Event, err error) bool

Failure records the failure and latest failure time.

func (*Breaker) Proceed

func (b *Breaker) Proceed(event Event) time.Duration

Proceed checks the failures against the threshold.

func (*Breaker) Success

func (b *Breaker) Success(event Event)

Success resets the breaker.

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster sends events to multiple, reliable Sinks. The goal of this component is to dispatch events to configured endpoints. Reliability can be provided by wrapping incoming sinks.

func NewBroadcaster

func NewBroadcaster(sinks ...Sink) *Broadcaster

NewBroadcaster appends one or more sinks to the list of sinks. The broadcaster behavior will be affected by the properties of the sink. Generally, the sink should accept all messages and deal with reliability on its own. Use of EventQueue and RetryingSink should be used here.

func (*Broadcaster) Add

func (b *Broadcaster) Add(sink Sink) error

Add the sink to the broadcaster.

The provided sink must be comparable with equality. Typically, this just works with a regular pointer type.

func (*Broadcaster) Close

func (b *Broadcaster) Close() error

Close the broadcaster, ensuring that all messages are flushed to the underlying sink before returning.

func (*Broadcaster) Remove

func (b *Broadcaster) Remove(sink Sink) error

Remove the provided sink.

func (*Broadcaster) String

func (b *Broadcaster) String() string

func (*Broadcaster) Write

func (b *Broadcaster) Write(event Event) error

Write accepts an event to be dispatched to all sinks. This method will never fail and should never block (hopefully!). The caller cedes the memory to the broadcaster and should not modify it after calling write.

type Channel

type Channel struct {
	C chan Event
	// contains filtered or unexported fields
}

Channel provides a sink that can be listened on. The writer and channel listener must operate in separate goroutines.

Consumers should listen on Channel.C until Closed is closed.

func NewChannel

func NewChannel(buffer int) *Channel

NewChannel returns a channel. If buffer is zero, the channel is unbuffered.

func (*Channel) Close

func (ch *Channel) Close() error

Close the channel sink.

func (*Channel) Done

func (ch *Channel) Done() chan struct{}

Done returns a channel that will always proceed once the sink is closed.

func (*Channel) String

func (ch *Channel) String() string

func (*Channel) Write

func (ch *Channel) Write(event Event) error

Write the event to the channel. Must be called in a separate goroutine from the listener.

type Event

type Event interface{}

Event marks items that can be sent as events.

type ExponentialBackoff

type ExponentialBackoff struct {
	// contains filtered or unexported fields
}

ExponentialBackoff implements random backoff with exponentially increasing bounds as the number consecutive failures increase.

func NewExponentialBackoff

func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff

NewExponentialBackoff returns an exponential backoff strategy with the desired config. If config is nil, the default is returned.

func (*ExponentialBackoff) Failure

func (b *ExponentialBackoff) Failure(event Event, err error) bool

Failure increments the failure counter.

func (*ExponentialBackoff) Proceed

func (b *ExponentialBackoff) Proceed(event Event) time.Duration

Proceed returns the next randomly bound exponential backoff time.

func (*ExponentialBackoff) Success

func (b *ExponentialBackoff) Success(event Event)

Success resets the failures counter.

type ExponentialBackoffConfig

type ExponentialBackoffConfig struct {
	// Base is the minimum bound for backing off after failure.
	Base time.Duration

	// Factor sets the amount of time by which the backoff grows with each
	// failure.
	Factor time.Duration

	// Max is the absolute maxiumum bound for a single backoff.
	Max time.Duration
}

ExponentialBackoffConfig configures backoff parameters.

Note that these parameters operate on the upper bound for choosing a random value. For example, at Base=1s, a random value in [0,1s) will be chosen for the backoff value.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter provides an event sink that sends only events that are accepted by a Matcher. No methods on filter are goroutine safe.

func (*Filter) Close

func (f *Filter) Close() error

Close the filter and allow no more events to pass through.

func (*Filter) Write

func (f *Filter) Write(event Event) error

Write an event to the filter.

type Matcher

type Matcher interface {
	Match(event Event) bool
}

Matcher matches events.

type MatcherFunc

type MatcherFunc func(event Event) bool

MatcherFunc implements matcher with just a function.

func (MatcherFunc) Match

func (fn MatcherFunc) Match(event Event) bool

Match calls the wrapped function.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue accepts all messages into a queue for asynchronous consumption by a sink. It is unbounded and thread safe but the sink must be reliable or events will be dropped.

func NewQueue

func NewQueue(dst Sink) *Queue

NewQueue returns a queue to the provided Sink dst.

func (*Queue) Close

func (eq *Queue) Close() error

Close shutsdown the event queue, flushing

func (*Queue) Write

func (eq *Queue) Write(event Event) error

Write accepts the events into the queue, only failing if the queue has been closed.

type RetryStrategy

type RetryStrategy interface {
	// Proceed is called before every event send. If proceed returns a
	// positive, non-zero integer, the retryer will back off by the provided
	// duration.
	//
	// An event is provided, by may be ignored.
	Proceed(event Event) time.Duration

	// Failure reports a failure to the strategy. If this method returns true,
	// the event should be dropped.
	Failure(event Event, err error) bool

	// Success should be called when an event is sent successfully.
	Success(event Event)
}

RetryStrategy defines a strategy for retrying event sink writes.

All methods should be goroutine safe.

type RetryingSink

type RetryingSink struct {
	// contains filtered or unexported fields
}

RetryingSink retries the write until success or an ErrSinkClosed is returned. Underlying sink must have p > 0 of succeeding or the sink will block. Retry is configured with a RetryStrategy. Concurrent calls to a retrying sink are serialized through the sink, meaning that if one is in-flight, another will not proceed.

func NewRetryingSink

func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink

NewRetryingSink returns a sink that will retry writes to a sink, backing off on failure. Parameters threshold and backoff adjust the behavior of the circuit breaker.

func (*RetryingSink) Close

func (rs *RetryingSink) Close() error

Close closes the sink and the underlying sink.

func (*RetryingSink) String

func (rs *RetryingSink) String() string

func (*RetryingSink) Write

func (rs *RetryingSink) Write(event Event) error

Write attempts to flush the events to the downstream sink until it succeeds or the sink is closed.

type Sink

type Sink interface {
	// Write an event to the Sink. If no error is returned, the caller will
	// assume that all events have been committed to the sink. If an error is
	// received, the caller may retry sending the event.
	Write(event Event) error

	// Close the sink, possibly waiting for pending events to flush.
	Close() error
}

Sink accepts and sends events.

func NewFilter

func NewFilter(dst Sink, matcher Matcher) Sink

NewFilter returns a new filter that will send to events to dst that return true for Matcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL