stream

package
v1.9.0-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2020 License: MPL-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package stream provides a publish/subscribe system for events produced by changes to the state store.

Index

Constants

This section is empty.

Variables

View Source
var ErrSubscriptionClosed = errors.New("subscription closed by server, client must reset state and resubscribe")

ErrSubscriptionClosed is a error signalling the subscription has been closed. The client should Unsubscribe, then re-Subscribe.

Functions

This section is empty.

Types

type Event

type Event struct {
	Topic   Topic
	Key     string
	Index   uint64
	Payload interface{}
}

Event is a structure with identifiers and a payload. Events are Published to EventPublisher and returned to Subscribers.

func NewCloseSubscriptionEvent

func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event

NewCloseSubscriptionEvent returns a special Event that is handled by the stream package, and is never sent to subscribers. EventProcessor handles these events, and closes any subscriptions which were created using a token which matches any of the tokenSecretIDs.

tokenSecretIDs may contain duplicate IDs.

func (Event) Filter

func (e Event) Filter(f func(Event) bool) (Event, bool)

Filter returns an Event filtered to only those Events where f returns true. If the second return value is false, every Event was removed by the filter.

func (Event) IsEndOfSnapshot

func (e Event) IsEndOfSnapshot() bool

IsEndOfSnapshot returns true if this is a framing event that indicates the snapshot has completed. Subsequent events from Subscription.Next will be streamed as they occur.

func (Event) IsNewSnapshotToFollow

func (e Event) IsNewSnapshotToFollow() bool

IsNewSnapshotToFollow returns true if this is a framing event that indicates that the clients view is stale, and must be reset. Subsequent events from Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.

func (Event) Len

func (e Event) Len() int

Len returns the number of events contained within this event. If the Payload is a []Event, the length of that slice is returned. Otherwise 1 is returned.

type EventPublisher

type EventPublisher struct {
	// contains filtered or unexported fields
}

EventPublisher receives change events from Publish, and sends the events to all subscribers of the event Topic.

func NewEventPublisher

func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher

NewEventPublisher returns an EventPublisher for publishing change events. Handlers are used to convert the memDB changes into events. A goroutine is run in the background to publish events to all subscribes. Cancelling the context will shutdown the goroutine, to free resources, and stop all publishing.

func (*EventPublisher) Publish

func (e *EventPublisher) Publish(events []Event)

Publish events to all subscribers of the event Topic.

func (*EventPublisher) Run

func (e *EventPublisher) Run(ctx context.Context)

Run the event publisher until ctx is cancelled. Run should be called from a goroutine to forward events from Publish to all the appropriate subscribers.

func (*EventPublisher) Subscribe

func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)

Subscribe returns a new Subscription for the given request. A subscription will receive an initial snapshot of events matching the request if req.Index > 0. After the snapshot, events will be streamed as they are created. Subscriptions may be closed, forcing the client to resubscribe (for example if ACL policies changed or the state store is abandoned).

When the caller is finished with the subscription for any reason, it must call Subscription.Unsubscribe to free ACL tracking resources.

type SnapshotAppender

type SnapshotAppender interface {
	// Append events to the snapshot. Every event in the slice must have the same
	// Index, indicating that it is part of the same raft transaction.
	Append(events []Event)
}

SnapshotAppender appends groups of events to create a Snapshot of state.

type SnapshotFunc

type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)

SnapshotFunc builds a snapshot for the subscription request, and appends the events to the Snapshot using SnapshotAppender. If err is not nil the SnapshotFunc must return a non-zero index.

type SnapshotHandlers

type SnapshotHandlers map[Topic]SnapshotFunc

SnapshotHandlers is a mapping of Topic to a function which produces a snapshot of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. The nil Topic is reserved and should not be used.

type SubscribeRequest

type SubscribeRequest struct {
	Topic Topic
	Key   string
	Token string
	Index uint64
}

SubscribeRequest identifies the types of events the subscriber would like to receiver. Topic and Token are required.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription provides events on a Topic. Events may be filtered by Key. Events are returned by Next(), and may start with a Snapshot of events.

func (*Subscription) Next

func (s *Subscription) Next(ctx context.Context) (Event, error)

Next returns the next Event to deliver. It must only be called from a single goroutine concurrently as it mutates the Subscription.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

Unsubscribe the subscription, freeing resources.

type Topic

type Topic fmt.Stringer

Topic is an identifier that partitions events. A subscription will only receive events which match the Topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL