Documentation ¶
Overview ¶
Package stream provides a publish/subscribe system for events produced by changes to the state store.
Index ¶
- Variables
- type Event
- type EventPublisher
- func (e *EventPublisher) Publish(events []Event)
- func (e *EventPublisher) RefreshTopic(topic Topic) error
- func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc) error
- func (e *EventPublisher) Run(ctx context.Context)
- func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
- type NoOpEventPublisher
- type Payload
- type PayloadEvents
- type SnapshotAppender
- type SnapshotFunc
- type SnapshotHandlers
- type StringSubject
- type StringTopic
- type Subject
- type SubscribeRequest
- type Subscription
- type Topic
Constants ¶
This section is empty.
Variables ¶
var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down")
ErrShuttingDown is an error to signal that the subscription has been closed because the server is shutting down. The client should subscribe to a different server to get streaming event updates.
var ErrSubForceClosed = errors.New("subscription closed by server, client must reset state and resubscribe")
ErrSubForceClosed is a error signalling the subscription has been closed. The client should Unsubscribe, then re-Subscribe.
Functions ¶
This section is empty.
Types ¶
type Event ¶
Event is a structure with identifiers and a payload. Events are Published to EventPublisher and returned to Subscribers.
func NewCloseSubscriptionEvent ¶
NewCloseSubscriptionEvent returns a special Event that is handled by the stream package, and is never sent to subscribers. EventProcessor handles these events, and closes any subscriptions which were created using a token which matches any of the tokenSecretIDs.
tokenSecretIDs may contain duplicate IDs.
func (Event) IsEndOfSnapshot ¶
IsEndOfSnapshot returns true if this is a framing event that indicates the snapshot has completed. Subsequent events from Subscription.Next will be streamed as they occur.
func (Event) IsFramingEvent ¶ added in v1.12.0
IsFramingEvent returns true if this is a framing event (e.g. EndOfSnapshot or NewSnapshotToFollow).
func (Event) IsNewSnapshotToFollow ¶
IsNewSnapshotToFollow returns true if this is a framing event that indicates that the clients view is stale, and must be reset. Subsequent events from Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.
type EventPublisher ¶
type EventPublisher struct {
// contains filtered or unexported fields
}
EventPublisher receives change events from Publish, and sends the events to all subscribers of the event Topic.
func NewEventPublisher ¶
func NewEventPublisher(snapCacheTTL time.Duration) *EventPublisher
NewEventPublisher returns an EventPublisher for publishing change events. Handlers are used to convert the memDB changes into events. A goroutine is run in the background to publish events to all subscribes. Cancelling the context will shutdown the goroutine, to free resources, and stop all publishing.
func (*EventPublisher) Publish ¶
func (e *EventPublisher) Publish(events []Event)
Publish events to all subscribers of the event Topic. The events will be shared with all subscriptions, so the Payload used in Event.Payload must be immutable.
func (*EventPublisher) RefreshTopic ¶ added in v1.12.0
func (e *EventPublisher) RefreshTopic(topic Topic) error
func (*EventPublisher) RegisterHandler ¶ added in v1.12.0
func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc) error
RegisterHandler will register a new snapshot handler function. The expectation is that all handlers get registered prior to the event publisher being Run. Handler registration is therefore not concurrency safe and access to handlers is internally not synchronized.
func (*EventPublisher) Run ¶
func (e *EventPublisher) Run(ctx context.Context)
Run the event publisher until ctx is cancelled. Run should be called from a goroutine to forward events from Publish to all the appropriate subscribers.
func (*EventPublisher) Subscribe ¶
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
Subscribe returns a new Subscription for the given request. A subscription will receive an initial snapshot of events matching the request if req.Index > 0. After the snapshot, events will be streamed as they are created. Subscriptions may be closed, forcing the client to resubscribe (for example if ACL policies changed or the state store is abandoned).
When the caller is finished with the subscription for any reason, it must call Subscription.Unsubscribe to free ACL tracking resources.
type NoOpEventPublisher ¶
type NoOpEventPublisher struct{}
func (NoOpEventPublisher) Publish ¶
func (NoOpEventPublisher) Publish([]Event)
func (NoOpEventPublisher) RegisterHandler ¶ added in v1.12.0
func (NoOpEventPublisher) RegisterHandler(Topic, SnapshotFunc) error
func (NoOpEventPublisher) Run ¶
func (NoOpEventPublisher) Run(context.Context)
func (NoOpEventPublisher) Subscribe ¶
func (NoOpEventPublisher) Subscribe(*SubscribeRequest) (*Subscription, error)
type Payload ¶
type Payload interface { // HasReadPermission uses the acl.Authorizer to determine if the items in the // Payload are visible to the request. It returns true if the payload is // authorized for Read, otherwise returns false. HasReadPermission(authz acl.Authorizer) bool // Subject is used to identify which subscribers should be notified of this // event - e.g. those subscribing to health events for a particular service. // it is usually the normalized resource name (including the partition and // namespace if applicable). Subject() Subject // ToSubscriptionEvent is used to convert streaming events to their // serializable equivalent. ToSubscriptionEvent(idx uint64) *pbsubscribe.Event }
A Payload contains the topic-specific data in an event. The payload methods should not modify the state of the payload if the Event is being submitted to EventPublisher.Publish.
type PayloadEvents ¶
type PayloadEvents struct {
Items []Event
}
PayloadEvents is a Payload that may be returned by Subscription.Next when there are multiple events at an index.
Note that unlike most other Payload, PayloadEvents is mutable and it is NOT safe to send to EventPublisher.Publish.
func (*PayloadEvents) HasReadPermission ¶ added in v1.9.1
func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool
HasReadPermission filters the PayloadEvents to those which are authorized for reading by authz.
func (*PayloadEvents) Len ¶ added in v1.9.1
func (p *PayloadEvents) Len() int
func (PayloadEvents) Subject ¶ added in v1.10.8
func (PayloadEvents) Subject() Subject
Subject is required to satisfy the Payload interface but is not implemented by PayloadEvents. PayloadEvents structs are constructed by Subscription.Next *after* Subject has been used to dispatch the enclosed events to the correct buffer.
func (PayloadEvents) ToSubscriptionEvent ¶ added in v1.13.0
func (p PayloadEvents) ToSubscriptionEvent(idx uint64) *pbsubscribe.Event
type SnapshotAppender ¶
type SnapshotAppender interface { // Append events to the snapshot. Every event in the slice must have the same // Index, indicating that it is part of the same raft transaction. Append(events []Event) }
SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotFunc ¶
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
SnapshotFunc builds a snapshot for the subscription request, and appends the events to the Snapshot using SnapshotAppender. If err is not nil the SnapshotFunc must return a non-zero index.
type SnapshotHandlers ¶
type SnapshotHandlers map[Topic]SnapshotFunc
SnapshotHandlers is a mapping of Topic to a function which produces a snapshot of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. The nil Topic is reserved and should not be used.
type StringSubject ¶ added in v1.13.0
type StringSubject string
StringSubject can be used as a Subject for Events sent to the EventPublisher
const SubjectNone StringSubject = "none"
SubjectNone is used when all events on a given topic are "global" and not further partitioned by subject. For example: the "CA Roots" topic which is used to notify subscribers when the global set CA root certificates changes.
func (StringSubject) String ¶ added in v1.13.0
func (s StringSubject) String() string
type StringTopic ¶ added in v1.13.0
type StringTopic string
StringTopic can be used as a Topic for Events sent to the EventPublisher
func (StringTopic) String ¶ added in v1.13.0
func (s StringTopic) String() string
type Subject ¶ added in v1.10.8
Subject identifies a portion of a topic for which a subscriber wishes to receive events (e.g. health events for a particular service) usually the normalized resource name (including partition and namespace if applicable).
type SubscribeRequest ¶
type SubscribeRequest struct { // Topic to subscribe to (e.g. service health). Topic Topic // Subject identifies the subset of Topic events the subscriber wishes to // receive (e.g. events for a specific service). SubjectNone may be provided // if all events on the given topic are "global" and not further partitioned // by subject. Subject Subject // Token that was used to authenticate the request. If any ACL policy // changes impact the token the subscription will be forcefully closed. Token string // Index is the last index the client received. If non-zero the // subscription will be resumed from this index. If the index is out-of-date // a NewSnapshotToFollow event will be sent. Index uint64 }
SubscribeRequest identifies the types of events the subscriber would like to receive. Topic, Subject, and Token are required.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription provides events on a Topic. Events may be filtered by Key. Events are returned by Next(), and may start with a Snapshot of events.
func (*Subscription) Next ¶
func (s *Subscription) Next(ctx context.Context) (Event, error)
Next returns the next Event to deliver. It must only be called from a single goroutine concurrently as it mutates the Subscription.
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe()
Unsubscribe the subscription, freeing resources.