stream

package
v0.750.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ProcessKnown

func ProcessKnown(known string, exist []int64) (string, []int64, error)

ProcessKnown takes what the client reports as known keys, combined with which keys the server knows exist, and returns what the client should be told is deleted and which keys the client is not yet aware of, which should be hydrated by querying the database.

Parameters: - `known` is a sorted range-encoded string, directly from the REST API. - `exist` is a sorted list of ints, likely from the database or some in-memory cache.

Return Values: - `gone` is a range-encoded string, suitable for returning over the REST API. - `new` is a list of ints, which are PKs that will need hydrating from the database.

Types

type DeleteMsg

type DeleteMsg struct {
	Key     string
	Deleted string
}

DeleteMsg represents a deletion in the stream.

func (DeleteMsg) MarshalJSON

func (d DeleteMsg) MarshalJSON() ([]byte, error)

MarshalJSON returns a json marshaled DeleteMsg.

type Event

type Event[T Msg] struct {
	Before T `json:"before"`
	After  T `json:"after"`
}

Event contains the old and new version a Msg. Inserts will have Before==nil, deletions will have After==nil.

type MarshallableMsg

type MarshallableMsg interface {
	MarshalJSON() ([]byte, error)
}

MarshallableMsg is an intermediary message that is ready to be marshaled and broadcast.

type Msg

type Msg interface {
	GetID() int
	SeqNum() int64
	UpsertMsg() *UpsertMsg
	DeleteMsg() *DeleteMsg
}

Msg is an object with a message and a sequence number and json marshal caching.

type Publisher

type Publisher[T Msg] struct {
	Lock          sync.Mutex
	Subscriptions set.Set[*Subscription[T]]
	WakeupID      int64
	// Hydrate an UpsertMsg.
	Hydrator func(T) (T, error)
}

Publisher is responsible for publishing messages of type T to streamers associate with active subscriptions.

func NewPublisher

func NewPublisher[T Msg](hydrator func(T) (T, error)) *Publisher[T]

NewPublisher creates a new Publisher for message type T.

func (*Publisher[T]) Broadcast

func (p *Publisher[T]) Broadcast(events []Event[T], idToSaturatedMsg map[int]*UpsertMsg)

Broadcast receives a list of events, determines if they are applicable to the publisher's subscriptions, and sends appropriate messages to corresponding streamers.

func (*Publisher[T]) CloseAllStreamers

func (p *Publisher[T]) CloseAllStreamers()

CloseAllStreamers closes all streamers associated with this Publisher.

func (*Publisher[T]) HydrateMsg

func (p *Publisher[T]) HydrateMsg(msg T, idToSaturatedMsg map[int]*UpsertMsg)

HydrateMsg queries the DB by the ID from rawMsg of a upsert or fallin event and get the full record.

type Streamer

type Streamer struct {
	Cond *sync.Cond
	// Msgs are pre-marshaled messages to send to the streaming client.
	Msgs []interface{}
	// Closed is set externally, and noticed eventually.
	Closed bool
	// PrepareFn is a user defined function that prepares Msgs for broadcast
	PrepareFn func(message MarshallableMsg) interface{}
}

Streamer aggregates many events and wakeups into a single slice of pre-marshaled messages. One streamer may be associated with many Subscription[T]'s, but it should only have at most one Subscription per type T. One Streamer is intended to belong to one websocket connection.

func NewStreamer

func NewStreamer(prepareFn func(message MarshallableMsg) interface{}) *Streamer

NewStreamer creates a new Steamer.

func (*Streamer) Close

func (s *Streamer) Close()

Close closes a streamer.

type Subscription

type Subscription[T Msg] struct {
	// Which streamer is collecting messages from this Subscription?
	Streamer *Streamer
	// Which publisher should we connect to when active?
	Publisher *Publisher[T]
	// contains filtered or unexported fields
}

Subscription manages a streamer's subscription to messages of type T.

func NewSubscription

func NewSubscription[T Msg](
	streamer *Streamer,
	publisher *Publisher[T],
	permFilter func(T) bool,
	filterFn func(T) bool,
) Subscription[T]

NewSubscription creates a new Subscription to messages of type T.

func (*Subscription[T]) Register

func (s *Subscription[T]) Register()

Register a Subscription with its Publisher.

func (*Subscription[T]) Unregister

func (s *Subscription[T]) Unregister()

Unregister removes a Subscription from its Publisher.

type SyncMsg

type SyncMsg struct {
	SyncID   string `json:"sync_id"`
	Complete bool   `json:"complete"`
}

SyncMsg is the server response to a StartupMsg once it's been handled.

func (SyncMsg) MarshalJSON

func (sm SyncMsg) MarshalJSON() ([]byte, error)

MarshalJSON returns a json marshaled SyncMsg.

type UpsertMsg

type UpsertMsg struct {
	JSONKey string
	Msg
}

UpsertMsg represents an upsert in the stream.

func (UpsertMsg) MarshalJSON

func (u UpsertMsg) MarshalJSON() ([]byte, error)

MarshalJSON returns a json marshaled UpsertMsg.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL