events

package
v0.0.0-...-efe2023 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	TypeURL string
	ID      xid.ID
	Payload proto.Message
}

Event is what is sent on the wire.

func (*Event) ToAPIEvent

func (event *Event) ToAPIEvent() (*types.Event, error)

ToAPIEvent serializes Event as proto message machine.Event.

type Publisher

type Publisher interface {
	Publish(proto.Message)
}

Publisher defines a runtime event publisher.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents the runtime event stream.

Stream internally is implemented as circular buffer of `Event`. `e.stream` slice is allocated to the initial capacity and slice size doesn't change throughout the lifetime of Stream.

To explain the internals, let's call `Publish()` method 'Publisher' (there might be multiple callers for it), and each `Watch()` handler as 'Consumer'.

For Publisher, `Stream` keeps `e.writePos`, `e.writePos` is write offset into `e.stream`. Offset `e.writePos` is always incremeneted, real write index is `e.writePos % e.cap`

Each Consumer captures initial position it starts consumption from as `pos` which is local to each Consumer, as Consumers are free to work on their own pace. Following diagram shows Publisher and three Consumers:

                                               Consumer 3                         Consumer 2
                                               pos = 27                           pos = 34
e.stream []Event                               |                                  |
                                               |                                  |
+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| 0  | 1  | 2  | 3  | 4  | 5  | 6  | 7  | 8  | 9  | 10 | 11 | 12 | 13 | 14 | 15 | 16 |17  |
+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
                                     |                                  |
                                     |                                  |
                                     Consumer 1                         Publisher
                                     pos = 43                           e.writePos = 50

Capacity of Stream in this diagram is 18, Publisher published already 50 events, so it already overwrote `e.stream` twice fully.

Consumer1 is trying to keep up with the publisher, it has 14-7 = 7 events to catch up.

Consumer2 is reading events published by Publisher before last wraparound, it has 50-34 = 16 events to catch up. Consumer 2 has a lot of events to catch up, but as it stays on track, it can still do that.

Consumer3 is doing bad: 50-27 = 23 > 18 (capacity), so its read position has already been overwritten, it can't read consistent data, soit should error out.

Synchronization: at the moment single mutex protects `e.stream` and `e.writePos`, consumers keep their position as local variable, so it doesn't require synchronization. If Consumer catches up with Publisher, it sleeps on condition variable to be woken up by Publisher on next publish.

func NewStream

func NewStream(cap, gap int) *Stream

NewStream initializes and returns the v1alpha1 runtime event stream.

Argument cap is a maximum event stream capacity (available event history). Argument gap is a safety gap to separate consumer from the publisher. Maximum available event history is (cap-gap).

func (*Stream) Publish

func (e *Stream) Publish(msg proto.Message)

Publish implements the Events interface.

func (*Stream) Watch

func (e *Stream) Watch(f WatchFunc, opt ...WatchOptionFunc) error

Watch implements the Events interface.

nolint: gocyclo

type Streamer

type Streamer interface {
	Watcher
	Publisher
}

Streamer defines the runtime event stream.

type WatchFunc

type WatchFunc func(<-chan Event)

WatchFunc defines the watcher callback function.

type WatchOptionFunc

type WatchOptionFunc func(opts *WatchOptions) error

WatchOptionFunc defines the options for the watcher.

func WithTailDuration

func WithTailDuration(dur time.Duration) WatchOptionFunc

WithTailDuration sets up Watcher to return events with timestamp >= (now - tailDuration).

func WithTailEvents

func WithTailEvents(number int) WatchOptionFunc

WithTailEvents sets up Watcher to return specified number of past events.

If number is negative, all the available past events are returned.

func WithTailID

func WithTailID(id xid.ID) WatchOptionFunc

WithTailID sets up Watcher to return events with ID > TailID.

type WatchOptions

type WatchOptions struct {
	// Return that many past events.
	//
	// If TailEvents is negative, return all the events available.
	TailEvents int
	// Start at ID > specified.
	TailID xid.ID
	// Start at timestamp Now() - TailDuration.
	TailDuration time.Duration
}

WatchOptions defines options for the watch call.

Only one of TailEvents, TailID or TailDuration should be non-zero.

type Watcher

type Watcher interface {
	Watch(WatchFunc, ...WatchOptionFunc) error
}

Watcher defines a runtime event watcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL