Documentation ¶
Overview ¶
Package stream provides a publish/subscribe system for events produced by changes to the state store.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrSubscriptionClosed = errors.New("subscription closed by server, client must reset state and resubscribe")
ErrSubscriptionClosed is a error signalling the subscription has been closed. The client should Unsubscribe, then re-Subscribe.
Functions ¶
This section is empty.
Types ¶
type Event ¶
Event is a structure with identifiers and a payload. Events are Published to EventPublisher and returned to Subscribers.
func NewCloseSubscriptionEvent ¶
NewCloseSubscriptionEvent returns a special Event that is handled by the stream package, and is never sent to subscribers. EventProcessor handles these events, and closes any subscriptions which were created using a token which matches any of the tokenSecretIDs.
tokenSecretIDs may contain duplicate IDs.
func (Event) Filter ¶
Filter returns an Event filtered to only those Events where f returns true. If the second return value is false, every Event was removed by the filter.
func (Event) IsEndOfSnapshot ¶
IsEndOfSnapshot returns true if this is a framing event that indicates the snapshot has completed. Subsequent events from Subscription.Next will be streamed as they occur.
func (Event) IsNewSnapshotToFollow ¶
IsNewSnapshotToFollow returns true if this is a framing event that indicates that the clients view is stale, and must be reset. Subsequent events from Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.
type EventPublisher ¶
type EventPublisher struct {
// contains filtered or unexported fields
}
EventPublisher receives change events from Publish, and sends the events to all subscribers of the event Topic.
func NewEventPublisher ¶
func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher
NewEventPublisher returns an EventPublisher for publishing change events. Handlers are used to convert the memDB changes into events. A goroutine is run in the background to publish events to all subscribes. Cancelling the context will shutdown the goroutine, to free resources, and stop all publishing.
func (*EventPublisher) Publish ¶
func (e *EventPublisher) Publish(events []Event)
Publish events to all subscribers of the event Topic.
func (*EventPublisher) Run ¶
func (e *EventPublisher) Run(ctx context.Context)
Run the event publisher until ctx is cancelled. Run should be called from a goroutine to forward events from Publish to all the appropriate subscribers.
func (*EventPublisher) Subscribe ¶
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
Subscribe returns a new Subscription for the given request. A subscription will receive an initial snapshot of events matching the request if req.Index > 0. After the snapshot, events will be streamed as they are created. Subscriptions may be closed, forcing the client to resubscribe (for example if ACL policies changed or the state store is abandoned).
When the caller is finished with the subscription for any reason, it must call Subscription.Unsubscribe to free ACL tracking resources.
type SnapshotAppender ¶
type SnapshotAppender interface { // Append events to the snapshot. Every event in the slice must have the same // Index, indicating that it is part of the same raft transaction. Append(events []Event) }
SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotFunc ¶
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
SnapshotFunc builds a snapshot for the subscription request, and appends the events to the Snapshot using SnapshotAppender. If err is not nil the SnapshotFunc must return a non-zero index.
type SnapshotHandlers ¶
type SnapshotHandlers map[Topic]SnapshotFunc
SnapshotHandlers is a mapping of Topic to a function which produces a snapshot of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. The nil Topic is reserved and should not be used.
type SubscribeRequest ¶
SubscribeRequest identifies the types of events the subscriber would like to receiver. Topic and Token are required.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription provides events on a Topic. Events may be filtered by Key. Events are returned by Next(), and may start with a Snapshot of events.
func (*Subscription) Next ¶
func (s *Subscription) Next(ctx context.Context) (Event, error)
Next returns the next Event to deliver. It must only be called from a single goroutine concurrently as it mutates the Subscription.
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe()
Unsubscribe the subscription, freeing resources.