Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ProcessKnown ¶
ProcessKnown takes what the client reports as known keys, combined with which keys the server knows exist, and returns what the client should be told is deleted and which keys the client is not yet aware of, which should be hydrated by querying the database.
Parameters: - `known` is a sorted range-encoded string, directly from the REST API. - `exist` is a sorted list of ints, likely from the database or some in-memory cache.
Return Values: - `gone` is a range-encoded string, suitable for returning over the REST API. - `new` is a list of ints, which are PKs that will need hydrating from the database.
Types ¶
type DeleteMsg ¶
DeleteMsg represents a deletion in the stream.
func (DeleteMsg) MarshalJSON ¶
MarshalJSON returns a json marshaled DeleteMsg.
type Event ¶
type Event[T Msg] struct { Before T `json:"before"` After T `json:"after"` }
Event contains the old and new version a Msg. Inserts will have Before==nil, deletions will have After==nil.
type MarshallableMsg ¶
MarshallableMsg is an intermediary message that is ready to be marshaled and broadcast.
type Publisher ¶
type Publisher[T Msg] struct { Lock sync.Mutex Subscriptions set.Set[*Subscription[T]] WakeupID int64 // Hydrate an UpsertMsg. Hydrator func(T) (T, error) }
Publisher is responsible for publishing messages of type T to streamers associate with active subscriptions.
func NewPublisher ¶
NewPublisher creates a new Publisher for message type T.
func (*Publisher[T]) Broadcast ¶
Broadcast receives a list of events, determines if they are applicable to the publisher's subscriptions, and sends appropriate messages to corresponding streamers.
func (*Publisher[T]) CloseAllStreamers ¶
func (p *Publisher[T]) CloseAllStreamers()
CloseAllStreamers closes all streamers associated with this Publisher.
func (*Publisher[T]) HydrateMsg ¶
HydrateMsg queries the DB by the ID from rawMsg of a upsert or fallin event and get the full record.
type Streamer ¶
type Streamer struct { Cond *sync.Cond // Msgs are pre-marshaled messages to send to the streaming client. Msgs []interface{} // Closed is set externally, and noticed eventually. Closed bool // PrepareFn is a user defined function that prepares Msgs for broadcast PrepareFn func(message MarshallableMsg) interface{} }
Streamer aggregates many events and wakeups into a single slice of pre-marshaled messages. One streamer may be associated with many Subscription[T]'s, but it should only have at most one Subscription per type T. One Streamer is intended to belong to one websocket connection.
func NewStreamer ¶
func NewStreamer(prepareFn func(message MarshallableMsg) interface{}) *Streamer
NewStreamer creates a new Steamer.
type Subscription ¶
type Subscription[T Msg] struct { // Which streamer is collecting messages from this Subscription? Streamer *Streamer // Which publisher should we connect to when active? Publisher *Publisher[T] // contains filtered or unexported fields }
Subscription manages a streamer's subscription to messages of type T.
func NewSubscription ¶
func NewSubscription[T Msg]( streamer *Streamer, publisher *Publisher[T], permFilter func(T) bool, filterFn func(T) bool, ) Subscription[T]
NewSubscription creates a new Subscription to messages of type T.
func (*Subscription[T]) Register ¶
func (s *Subscription[T]) Register()
Register a Subscription with its Publisher.
func (*Subscription[T]) Unregister ¶
func (s *Subscription[T]) Unregister()
Unregister removes a Subscription from its Publisher.
type SyncMsg ¶
SyncMsg is the server response to a StartupMsg once it's been handled.
func (SyncMsg) MarshalJSON ¶
MarshalJSON returns a json marshaled SyncMsg.