Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents the runtime event stream.
Stream internally is implemented as circular buffer of `Event`. `e.stream` slice is allocated to the initial capacity and slice size doesn't change throughout the lifetime of Stream.
To explain the internals, let's call `Publish()` method 'Publisher' (there might be multiple callers for it), and each `Watch()` handler as 'Consumer'.
For Publisher, `Stream` keeps `e.writePos`, `e.writePos` is write offset into `e.stream`. Offset `e.writePos` is always incremeneted, real write index is `e.writePos % e.cap`
Each Consumer captures initial position it starts consumption from as `pos` which is local to each Consumer, as Consumers are free to work on their own pace. Following diagram shows Publisher and three Consumers:
Consumer 3 Consumer 2 pos = 27 pos = 34 e.stream []Event | | | | +----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 |17 | +----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ | | | | Consumer 1 Publisher pos = 43 e.writePos = 50
Capacity of Stream in this diagram is 18, Publisher published already 50 events, so it already overwrote `e.stream` twice fully.
Consumer1 is trying to keep up with the publisher, it has 14-7 = 7 events to catch up.
Consumer2 is reading events published by Publisher before last wraparound, it has 50-34 = 16 events to catch up. Consumer 2 has a lot of events to catch up, but as it stays on track, it can still do that.
Consumer3 is doing bad: 50-27 = 23 > 18 (capacity), so its read position has already been overwritten, it can't read consistent data, soit should error out.
Synchronization: at the moment single mutex protects `e.stream` and `e.writePos`, consumers keep their position as local variable, so it doesn't require synchronization. If Consumer catches up with Publisher, it sleeps on condition variable to be woken up by Publisher on next publish.
func NewStream ¶
NewStream initializes and returns the v1alpha1 runtime event stream.
Argument cap is a maximum event stream capacity (available event history). Argument gap is a safety gap to separate consumer from the publisher. Maximum available event history is (cap-gap).
type WatchOptionFunc ¶
type WatchOptionFunc func(opts *WatchOptions) error
WatchOptionFunc defines the options for the watcher.
func WithTailDuration ¶
func WithTailDuration(dur time.Duration) WatchOptionFunc
WithTailDuration sets up Watcher to return events with timestamp >= (now - tailDuration).
func WithTailEvents ¶
func WithTailEvents(number int) WatchOptionFunc
WithTailEvents sets up Watcher to return specified number of past events.
If number is negative, all the available past events are returned.
func WithTailID ¶
func WithTailID(id xid.ID) WatchOptionFunc
WithTailID sets up Watcher to return events with ID > TailID.
type WatchOptions ¶
type WatchOptions struct { // Return that many past events. // // If TailEvents is negative, return all the events available. TailEvents int // Start at ID > specified. TailID xid.ID // Start at timestamp Now() - TailDuration. TailDuration time.Duration }
WatchOptions defines options for the watch call.
Only one of TailEvents, TailID or TailDuration should be non-zero.
type Watcher ¶
type Watcher interface {
Watch(WatchFunc, ...WatchOptionFunc) error
}
Watcher defines a runtime event watcher.