stream

package
v1.11.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MPL-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package stream provides a publish/subscribe system for events produced by changes to the state store.

Index

Constants

This section is empty.

Variables

View Source
var ErrSubForceClosed = errors.New("subscription closed by server, client must reset state and resubscribe")

ErrSubForceClosed is a error signalling the subscription has been closed. The client should Unsubscribe, then re-Subscribe.

Functions

This section is empty.

Types

type Event

type Event struct {
	Topic   Topic
	Index   uint64
	Payload Payload
}

Event is a structure with identifiers and a payload. Events are Published to EventPublisher and returned to Subscribers.

func NewCloseSubscriptionEvent

func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event

NewCloseSubscriptionEvent returns a special Event that is handled by the stream package, and is never sent to subscribers. EventProcessor handles these events, and closes any subscriptions which were created using a token which matches any of the tokenSecretIDs.

tokenSecretIDs may contain duplicate IDs.

func (Event) IsEndOfSnapshot

func (e Event) IsEndOfSnapshot() bool

IsEndOfSnapshot returns true if this is a framing event that indicates the snapshot has completed. Subsequent events from Subscription.Next will be streamed as they occur.

func (Event) IsNewSnapshotToFollow

func (e Event) IsNewSnapshotToFollow() bool

IsNewSnapshotToFollow returns true if this is a framing event that indicates that the clients view is stale, and must be reset. Subsequent events from Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.

type EventPublisher

type EventPublisher struct {
	// contains filtered or unexported fields
}

EventPublisher receives change events from Publish, and sends the events to all subscribers of the event Topic.

func NewEventPublisher

func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher

NewEventPublisher returns an EventPublisher for publishing change events. Handlers are used to convert the memDB changes into events. A goroutine is run in the background to publish events to all subscribes. Cancelling the context will shutdown the goroutine, to free resources, and stop all publishing.

func (*EventPublisher) Publish

func (e *EventPublisher) Publish(events []Event)

Publish events to all subscribers of the event Topic. The events will be shared with all subscriptions, so the Payload used in Event.Payload must be immutable.

func (*EventPublisher) Run

func (e *EventPublisher) Run(ctx context.Context)

Run the event publisher until ctx is cancelled. Run should be called from a goroutine to forward events from Publish to all the appropriate subscribers.

func (*EventPublisher) Subscribe

func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)

Subscribe returns a new Subscription for the given request. A subscription will receive an initial snapshot of events matching the request if req.Index > 0. After the snapshot, events will be streamed as they are created. Subscriptions may be closed, forcing the client to resubscribe (for example if ACL policies changed or the state store is abandoned).

When the caller is finished with the subscription for any reason, it must call Subscription.Unsubscribe to free ACL tracking resources.

type NoOpEventPublisher

type NoOpEventPublisher struct{}

func (NoOpEventPublisher) Publish

func (NoOpEventPublisher) Publish([]Event)

func (NoOpEventPublisher) Run

func (NoOpEventPublisher) Subscribe

type Payload

type Payload interface {
	// HasReadPermission uses the acl.Authorizer to determine if the items in the
	// Payload are visible to the request. It returns true if the payload is
	// authorized for Read, otherwise returns false.
	HasReadPermission(authz acl.Authorizer) bool

	// Subject is used to identify which subscribers should be notified of this
	// event - e.g. those subscribing to health events for a particular service.
	// it is usually the normalized resource name (including the partition and
	// namespace if applicable).
	Subject() Subject
}

A Payload contains the topic-specific data in an event. The payload methods should not modify the state of the payload if the Event is being submitted to EventPublisher.Publish.

type PayloadEvents

type PayloadEvents struct {
	Items []Event
}

PayloadEvents is a Payload that may be returned by Subscription.Next when there are multiple events at an index.

Note that unlike most other Payload, PayloadEvents is mutable and it is NOT safe to send to EventPublisher.Publish.

func (*PayloadEvents) HasReadPermission added in v1.9.1

func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool

HasReadPermission filters the PayloadEvents to those which are authorized for reading by authz.

func (*PayloadEvents) Len added in v1.9.1

func (p *PayloadEvents) Len() int

func (PayloadEvents) Subject added in v1.10.8

func (PayloadEvents) Subject() Subject

Subject is required to satisfy the Payload interface but is not implemented by PayloadEvents. PayloadEvents structs are constructed by Subscription.Next *after* Subject has been used to dispatch the enclosed events to the correct buffer.

type SnapshotAppender

type SnapshotAppender interface {
	// Append events to the snapshot. Every event in the slice must have the same
	// Index, indicating that it is part of the same raft transaction.
	Append(events []Event)
}

SnapshotAppender appends groups of events to create a Snapshot of state.

type SnapshotFunc

type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)

SnapshotFunc builds a snapshot for the subscription request, and appends the events to the Snapshot using SnapshotAppender. If err is not nil the SnapshotFunc must return a non-zero index.

type SnapshotHandlers

type SnapshotHandlers map[Topic]SnapshotFunc

SnapshotHandlers is a mapping of Topic to a function which produces a snapshot of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. The nil Topic is reserved and should not be used.

type Subject added in v1.10.8

type Subject string

Subject identifies a portion of a topic for which a subscriber wishes to receive events (e.g. health events for a particular service) usually the normalized resource name (including partition and namespace if applicable).

type SubscribeRequest

type SubscribeRequest struct {
	// Topic to subscribe to
	Topic Topic
	// Key used to filter events in the topic. Only events matching the key will
	// be returned by the subscription. A blank key will return all events. Key
	// is generally the name of the resource.
	Key string
	// EnterpriseMeta is used to filter events in the topic. Only events matching
	// the partition and namespace will be returned by the subscription.
	EnterpriseMeta structs.EnterpriseMeta
	// Token that was used to authenticate the request. If any ACL policy
	// changes impact the token the subscription will be forcefully closed.
	Token string
	// Index is the last index the client received. If non-zero the
	// subscription will be resumed from this index. If the index is out-of-date
	// a NewSnapshotToFollow event will be sent.
	Index uint64
}

SubscribeRequest identifies the types of events the subscriber would like to receiver. Topic and Token are required.

func (SubscribeRequest) Subject added in v1.10.8

func (req SubscribeRequest) Subject() Subject

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription provides events on a Topic. Events may be filtered by Key. Events are returned by Next(), and may start with a Snapshot of events.

func (*Subscription) Next

func (s *Subscription) Next(ctx context.Context) (Event, error)

Next returns the next Event to deliver. It must only be called from a single goroutine concurrently as it mutates the Subscription.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

Unsubscribe the subscription, freeing resources.

type Topic

type Topic fmt.Stringer

Topic is an identifier that partitions events. A subscription will only receive events which match the Topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL