stream

package
v1.16.105 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: MPL-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package stream provides a publish/subscribe system for events produced by changes to the state store.

Index

Constants

This section is empty.

Variables

View Source
var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down")

ErrShuttingDown is an error to signal that the subscription has been closed because the server is shutting down. The client should subscribe to a different server to get streaming event updates.

View Source
var ErrSubForceClosed = errors.New("subscription closed by server, client must reset state and resubscribe")

ErrSubForceClosed is a error signalling the subscription has been closed. The client should Unsubscribe, then re-Subscribe.

Functions

This section is empty.

Types

type Event

type Event struct {
	Topic   Topic
	Index   uint64
	Payload Payload
}

Event is a structure with identifiers and a payload. Events are Published to EventPublisher and returned to Subscribers.

func NewCloseSubscriptionEvent

func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event

NewCloseSubscriptionEvent returns a special Event that is handled by the stream package, and is never sent to subscribers. EventProcessor handles these events, and closes any subscriptions which were created using a token which matches any of the tokenSecretIDs.

tokenSecretIDs may contain duplicate IDs.

func (Event) IsEndOfSnapshot

func (e Event) IsEndOfSnapshot() bool

IsEndOfSnapshot returns true if this is a framing event that indicates the snapshot has completed. Subsequent events from Subscription.Next will be streamed as they occur.

func (Event) IsFramingEvent

func (e Event) IsFramingEvent() bool

IsFramingEvent returns true if this is a framing event (e.g. EndOfSnapshot or NewSnapshotToFollow).

func (Event) IsNewSnapshotToFollow

func (e Event) IsNewSnapshotToFollow() bool

IsNewSnapshotToFollow returns true if this is a framing event that indicates that the clients view is stale, and must be reset. Subsequent events from Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.

type EventPublisher

type EventPublisher struct {
	// contains filtered or unexported fields
}

EventPublisher receives change events from Publish, and sends the events to all subscribers of the event Topic.

func NewEventPublisher

func NewEventPublisher(snapCacheTTL time.Duration) *EventPublisher

NewEventPublisher returns an EventPublisher for publishing change events. Handlers are used to convert the memDB changes into events. A goroutine is run in the background to publish events to all subscribes. Cancelling the context will shutdown the goroutine, to free resources, and stop all publishing.

func (*EventPublisher) Publish

func (e *EventPublisher) Publish(events []Event)

Publish events to all subscribers of the event Topic. The events will be shared with all subscriptions, so the Payload used in Event.Payload must be immutable.

func (*EventPublisher) RefreshTopic

func (e *EventPublisher) RefreshTopic(topic Topic) error

func (*EventPublisher) RegisterHandler

func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc, supportsWildcard bool) error

RegisterHandler will register a new snapshot handler function. The expectation is that all handlers get registered prior to the event publisher being Run. Handler registration is therefore not concurrency safe and access to handlers is internally not synchronized. Passing supportsWildcard allows consumers to subscribe to events on this topic with *any* subject (by requesting SubjectWildcard) but this must be supported by the handler function.

func (*EventPublisher) Run

func (e *EventPublisher) Run(ctx context.Context)

Run the event publisher until ctx is cancelled. Run should be called from a goroutine to forward events from Publish to all the appropriate subscribers.

func (*EventPublisher) Subscribe

func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)

Subscribe returns a new Subscription for the given request. A subscription will receive an initial snapshot of events matching the request if req.Index > 0. After the snapshot, events will be streamed as they are created. Subscriptions may be closed, forcing the client to resubscribe (for example if ACL policies changed or the state store is abandoned).

When the caller is finished with the subscription for any reason, it must call Subscription.Unsubscribe to free ACL tracking resources.

type NoOpEventPublisher

type NoOpEventPublisher struct{}

func (NoOpEventPublisher) Publish

func (NoOpEventPublisher) Publish([]Event)

func (NoOpEventPublisher) RegisterHandler

func (NoOpEventPublisher) RegisterHandler(Topic, SnapshotFunc, bool) error

func (NoOpEventPublisher) Run

func (NoOpEventPublisher) Subscribe

type Payload

type Payload interface {
	// HasReadPermission uses the acl.Authorizer to determine if the items in the
	// Payload are visible to the request. It returns true if the payload is
	// authorized for Read, otherwise returns false.
	HasReadPermission(authz acl.Authorizer) bool

	// Subject is used to identify which subscribers should be notified of this
	// event - e.g. those subscribing to health events for a particular service.
	// it is usually the normalized resource name (including the partition and
	// namespace if applicable).
	Subject() Subject

	// ToSubscriptionEvent is used to convert streaming events to their
	// serializable equivalent.
	ToSubscriptionEvent(idx uint64) *pbsubscribe.Event
}

A Payload contains the topic-specific data in an event. The payload methods should not modify the state of the payload if the Event is being submitted to EventPublisher.Publish.

type PayloadEvents

type PayloadEvents struct {
	Items []Event
}

PayloadEvents is a Payload that may be returned by Subscription.Next when there are multiple events at an index.

Note that unlike most other Payload, PayloadEvents is mutable and it is NOT safe to send to EventPublisher.Publish.

func (*PayloadEvents) HasReadPermission

func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool

HasReadPermission filters the PayloadEvents to those which are authorized for reading by authz.

func (*PayloadEvents) Len

func (p *PayloadEvents) Len() int

func (PayloadEvents) Subject

func (PayloadEvents) Subject() Subject

Subject is required to satisfy the Payload interface but is not implemented by PayloadEvents. PayloadEvents structs are constructed by Subscription.Next *after* Subject has been used to dispatch the enclosed events to the correct buffer.

func (PayloadEvents) ToSubscriptionEvent

func (p PayloadEvents) ToSubscriptionEvent(idx uint64) *pbsubscribe.Event

type SnapshotAppender

type SnapshotAppender interface {
	// Append events to the snapshot. Every event in the slice must have the same
	// Index, indicating that it is part of the same raft transaction.
	Append(events []Event)
}

SnapshotAppender appends groups of events to create a Snapshot of state.

type SnapshotFunc

type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)

SnapshotFunc builds a snapshot for the subscription request, and appends the events to the Snapshot using SnapshotAppender.

Note: index MUST NOT be zero if any events were appended.

type SnapshotHandlers

type SnapshotHandlers map[Topic]SnapshotFunc

SnapshotHandlers is a mapping of Topic to a function which produces a snapshot of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. The nil Topic is reserved and should not be used.

type StringSubject

type StringSubject string

StringSubject can be used as a Subject for Events sent to the EventPublisher

const (
	// SubjectNone is used when all events on a given topic are "global" and not
	// further partitioned by subject. For example: the "CA Roots" topic which is
	// used to notify subscribers when the global set CA root certificates changes.
	SubjectNone StringSubject = "none"

	// SubjectWildcard is used to subscribe to all events on a topic, regardless
	// of their subject. For example: mesh gateways need to consume *all* service
	// resolver config entries.
	//
	// Note: not all topics support wildcard subscriptions.
	SubjectWildcard StringSubject = "♣"
)

func (StringSubject) String

func (s StringSubject) String() string

type StringTopic

type StringTopic string

StringTopic can be used as a Topic for Events sent to the EventPublisher

func (StringTopic) String

func (s StringTopic) String() string

type Subject

type Subject fmt.Stringer

Subject identifies a portion of a topic for which a subscriber wishes to receive events (e.g. health events for a particular service) usually the normalized resource name (including partition and namespace if applicable).

type SubscribeRequest

type SubscribeRequest struct {
	// Topic to subscribe to (e.g. service health).
	Topic Topic

	// Subject identifies the subset of Topic events the subscriber wishes to
	// receive (e.g. events for a specific service). SubjectNone may be provided
	// if all events on the given topic are "global" and not further partitioned
	// by subject.
	Subject Subject

	// Token that was used to authenticate the request. If any ACL policy
	// changes impact the token the subscription will be forcefully closed.
	Token string

	// Index is the last index the client received. If non-zero the
	// subscription will be resumed from this index. If the index is out-of-date
	// a NewSnapshotToFollow event will be sent.
	Index uint64
}

SubscribeRequest identifies the types of events the subscriber would like to receive. Topic, Subject, and Token are required.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription provides events on a Topic. Events may be filtered by Key. Events are returned by Next(), and may start with a Snapshot of events.

func (*Subscription) Next

func (s *Subscription) Next(ctx context.Context) (Event, error)

Next returns the next Event to deliver. It must only be called from a single goroutine concurrently as it mutates the Subscription.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

Unsubscribe the subscription, freeing resources.

type Topic

type Topic fmt.Stringer

Topic is an identifier that partitions events. A subscription will only receive events which match the Topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL