stream

package
v0.750.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ModelVersionsDeleteKey specifies the key for delete model_versions.
	ModelVersionsDeleteKey = "modelversions_deleted"
	// ModelVersionsUpsertKey specifies the key for upsert model_versions.
	ModelVersionsUpsertKey = "modelversion"
)
View Source
const (
	// ModelsDeleteKey specifies the key for delete models.
	ModelsDeleteKey = "models_deleted"
	// ModelsUpsertKey specifies the key for upsert models.
	ModelsUpsertKey = "model"
)
View Source
const (
	// ProjectsDeleteKey specifies the key for delete projects.
	ProjectsDeleteKey = "projects_deleted"
	// ProjectsUpsertKey specifies the key for upsert projects.
	ProjectsUpsertKey = "project"
)

Variables

AuthZProvider provides StreamAuthZ implementations.

Functions

func BootemLoop

func BootemLoop(ctx context.Context, ps *PublisherSet) error

BootemLoop listens for permission changes, updates the PublisherSet to signal to boot streamers, returns an error in the event of a failure to listen.

func ModelCollectStartupMsgs

func ModelCollectStartupMsgs(
	ctx context.Context,
	user model.User,
	known string,
	spec ModelSubscriptionSpec,
) (
	[]stream.MarshallableMsg, error,
)

ModelCollectStartupMsgs collects ModelMsg's that were missed prior to startup. nolint: dupl

func ModelMakeFilter

func ModelMakeFilter(spec *ModelSubscriptionSpec) (func(*ModelMsg) bool, error)

ModelMakeFilter creates a ModelMsg filter based on the given ModelSubscriptionSpec.

func ModelMakeHydrator

func ModelMakeHydrator() func(*ModelMsg) (*ModelMsg, error)

ModelMakeHydrator returns a function that gets properties of a model by its id.

func ModelMakePermissionFilter

func ModelMakePermissionFilter(ctx context.Context, user model.User) (func(*ModelMsg) bool, error)

ModelMakePermissionFilter returns a function that checks if a ModelMsg is in scope of the user permissions.

func ModelVersionCollectStartupMsgs

func ModelVersionCollectStartupMsgs(
	ctx context.Context,
	user model.User,
	known string,
	spec ModelVersionSubscriptionSpec,
) (
	[]stream.MarshallableMsg, error,
)

ModelVersionCollectStartupMsgs collects ModelVersionMsg's that were missed prior to startup. nolint: dupl

func ModelVersionMakeFilter

func ModelVersionMakeFilter(spec *ModelVersionSubscriptionSpec) (func(*ModelVersionMsg) bool, error)

ModelVersionMakeFilter creates a ModelVersionMsg filter based on the given ModelVersionSubscriptionSpec.

func ModelVersionMakeHydrator

func ModelVersionMakeHydrator() func(*ModelVersionMsg) (*ModelVersionMsg, error)

ModelVersionMakeHydrator returns a function that gets properties of a model version by its id.

func ModelVersionMakePermissionFilter

func ModelVersionMakePermissionFilter(ctx context.Context, user model.User) (func(*ModelVersionMsg) bool, error)

ModelVersionMakePermissionFilter returns a function that checks if a ModelVersionMsg is in scope of the user permissions.

func ProjectCollectStartupMsgs

func ProjectCollectStartupMsgs(
	ctx context.Context,
	user model.User,
	known string,
	spec ProjectSubscriptionSpec,
) (
	[]stream.MarshallableMsg, error,
)

ProjectCollectStartupMsgs collects ProjectMsg's that were missed prior to startup. nolint: dupl

func ProjectMakeFilter

func ProjectMakeFilter(spec *ProjectSubscriptionSpec) (func(*ProjectMsg) bool, error)

ProjectMakeFilter creates a ProjectMsg filter based on the given ProjectSubscriptionSpec.

func ProjectMakeHydrator

func ProjectMakeHydrator() func(*ProjectMsg) (*ProjectMsg, error)

ProjectMakeHydrator returns a function that gets properties of a project by its id.

func ProjectMakePermissionFilter

func ProjectMakePermissionFilter(ctx context.Context, user model.User) (func(*ProjectMsg) bool, error)

ProjectMakePermissionFilter returns a function that checks if a ProjectMsg is in scope of the user permissions.

func WriteAll

func WriteAll(socketLike WebsocketLike, msgs []interface{}) error

WriteAll attempts to write all provided messages.

Types

type CollectStartupMsgsFunc

type CollectStartupMsgsFunc[S any] func(
	ctx context.Context,
	user model.User,
	known string,
	spec S,
) (
	[]stream.MarshallableMsg, error,
)

CollectStartupMsgsFunc collects messages that were missed prior to startup.

type JSONB

type JSONB interface{}

JSONB is the golang equivalent of the postgres jsonb column type.

type KnownKeySet

type KnownKeySet struct {
	Projects      string `json:"projects"`
	Models        string `json:"models"`
	ModelVersions string `json:"modelversions"`
}

KnownKeySet allows a client to describe which primary keys it knows of as existing, so the server can respond with which client-known keys have been deleted or disappeared, and also which server-known keys are not yet known to the client (appearances).

Each field of a KnownKeySet is a comma-separated list of int64s and ranges like "a,b-c,d".

type ModelMsg

type ModelMsg struct {
	bun.BaseModel `bun:"table:models"`

	// immutable attributes
	ID int `bun:"id,pk" json:"id"`

	// mutable attributes
	Name            string    `bun:"name" json:"name"`
	Description     string    `bun:"description" json:"description"`
	Archived        bool      `bun:"archived" json:"archived"`
	CreationTime    time.Time `bun:"creation_time" json:"creation_time"`
	Notes           string    `bun:"notes" json:"notes"`
	WorkspaceID     int       `bun:"workspace_id" json:"workspace_id"`
	UserID          int       `bun:"user_id" json:"user_id"`
	LastUpdatedTime time.Time `bun:"last_updated_time" json:"last_updated_time"`
	Metadata        JSONB     `bun:"metadata,type:jsonb" json:"metadata"`
	Labels          []string  `bun:"labels,array" json:"labels"`

	// metadata
	Seq int64 `bun:"seq" json:"seq"`
}

ModelMsg is a stream.Msg.

determined:stream-gen source=server delete_msg=ModelsDeleted

func (*ModelMsg) DeleteMsg

func (mm *ModelMsg) DeleteMsg() *stream.DeleteMsg

DeleteMsg creates a model stream delete message.

func (*ModelMsg) GetID

func (mm *ModelMsg) GetID() int

GetID gets the ID from a ModelMsg.

func (*ModelMsg) SeqNum

func (mm *ModelMsg) SeqNum() int64

SeqNum gets the SeqNum from a ModelMsg.

func (*ModelMsg) UpsertMsg

func (mm *ModelMsg) UpsertMsg() *stream.UpsertMsg

UpsertMsg creates a model stream upsert message.

type ModelSubscriptionSpec

type ModelSubscriptionSpec struct {
	WorkspaceIDs []int `json:"workspace_ids"`
	ModelIDs     []int `json:"model_ids"`
	UserIDs      []int `json:"user_ids"`
	Since        int64 `json:"since"`
}

ModelSubscriptionSpec is what a user submits to define a Model subscription.

determined:stream-gen source=client

type ModelVersionMsg

type ModelVersionMsg struct {
	bun.BaseModel `bun:"table:model_versions"`

	// immutable attributes
	ID int `bun:"id,pk" json:"id"`

	// mutable attributes
	Name            string    `bun:"name" json:"name"`
	Version         int       `bun:"version" json:"version"`
	CheckpointUUID  string    `bun:"checkpoint_uuid" json:"checkpoint_uuid"`
	CreationTime    time.Time `bun:"creation_time" json:"creation_time"`
	LastUpdatedTime time.Time `bun:"last_updated_time" json:"last_updated_time"`
	Metadata        JSONB     `bun:"metadata,type:jsonb" json:"metadata"`
	ModelID         int       `bun:"model_id" json:"model_id"`
	UserID          int       `bun:"user_id" json:"user_id"`
	Comment         string    `bun:"comment" json:"comment"`
	Labels          []string  `bun:"labels,array" json:"labels"`
	Notes           string    `bun:"notes" json:"notes"`
	WorkspaceID     string    `json:"workspace_id"`
	// metadata
	Seq int64 `bun:"seq" json:"seq"`
}

ModelVersionMsg is a stream.Msg.

determined:stream-gen source=server delete_msg=ModelVersionsDeleted

func (*ModelVersionMsg) DeleteMsg

func (mm *ModelVersionMsg) DeleteMsg() *stream.DeleteMsg

DeleteMsg creates a ModelVersion stream delete message.

func (*ModelVersionMsg) GetID

func (mm *ModelVersionMsg) GetID() int

GetID gets the ID from a ModelVersionMsg.

func (*ModelVersionMsg) SeqNum

func (mm *ModelVersionMsg) SeqNum() int64

SeqNum gets the SeqNum from a ModelVersionMsg.

func (*ModelVersionMsg) UpsertMsg

func (mm *ModelVersionMsg) UpsertMsg() *stream.UpsertMsg

UpsertMsg creates a ModelVersion stream upsert message.

type ModelVersionSubscriptionSpec

type ModelVersionSubscriptionSpec struct {
	ModelIDs        []int `json:"model_ids"`
	ModelVersionIDs []int `json:"model_version_ids"`
	UserIDs         []int `json:"user_ids"`
	Since           int64 `json:"since"`
}

ModelVersionSubscriptionSpec is what a user submits to define a model_version subscription.

determined:stream-gen source=client

type ProjectMsg

type ProjectMsg struct {
	bun.BaseModel `bun:"table:projects"`

	// immutable attributes
	ID int `bun:"id,pk" json:"id"`

	// mutable attributes
	Name        string               `bun:"name" json:"name"`
	Description string               `bun:"description" json:"description"`
	Archived    bool                 `bun:"archived" json:"archived"`
	CreatedAt   time.Time            `bun:"created_at" json:"created_at"`
	Notes       JSONB                `bun:"notes,type:jsonb" json:"notes"`
	WorkspaceID int                  `bun:"workspace_id" json:"workspace_id"`
	UserID      int                  `bun:"user_id" json:"user_id"`
	Immutable   bool                 `bun:"immutable" json:"immutable"`
	State       model.WorkspaceState `bun:"state" json:"state"`
	Key         string               `bun:"key" json:"key"`

	// metadata
	Seq int64 `bun:"seq" json:"seq"`
}

ProjectMsg is a stream.Msg.

determined:stream-gen source=server delete_msg=ProjectsDeleted

func (*ProjectMsg) DeleteMsg

func (pm *ProjectMsg) DeleteMsg() *stream.DeleteMsg

DeleteMsg creates a Project stream delete message.

func (*ProjectMsg) GetID

func (pm *ProjectMsg) GetID() int

GetID gets the ID from a ProjectMsg.

func (*ProjectMsg) SeqNum

func (pm *ProjectMsg) SeqNum() int64

SeqNum gets the SeqNum from a ProjectMsg.

func (*ProjectMsg) UpsertMsg

func (pm *ProjectMsg) UpsertMsg() *stream.UpsertMsg

UpsertMsg creates a Project stream upsert message.

type ProjectSubscriptionSpec

type ProjectSubscriptionSpec struct {
	WorkspaceIDs []int `json:"workspace_ids"`
	ProjectIDs   []int `json:"project_ids"`
	Since        int64 `json:"since"`
}

ProjectSubscriptionSpec is what a user submits to define a project subscription.

determined:stream-gen source=client

type PublisherSet

type PublisherSet struct {
	DBAddress     string
	Projects      *stream.Publisher[*ProjectMsg]
	Models        *stream.Publisher[*ModelMsg]
	ModelVersions *stream.Publisher[*ModelVersionMsg]
	// contains filtered or unexported fields
}

PublisherSet contains all publishers, and handles all websockets. It will connect each websocket with the appropriate set of publishers, based on that websocket's subscriptions.

There is one PublisherSet for the whole process. It has one Publisher per streamable type.

func NewPublisherSet

func NewPublisherSet(dbAddress string) *PublisherSet

NewPublisherSet constructor for PublisherSet.

func (*PublisherSet) Start

func (ps *PublisherSet) Start(ctx context.Context) error

Start starts each Publisher in the PublisherSet.

func (*PublisherSet) StreamHandler

func (ps *PublisherSet) StreamHandler(
	publisherSetCtx context.Context,
	socket *websocket.Conn,
	c echo.Context,
) error

StreamHandler is the Echo websocket endpoint handler for streaming updates, defaulting the prepare function to prepareWebsocketMessage().

type StartupMsg

type StartupMsg struct {
	SyncID    string              `json:"sync_id"`
	Known     KnownKeySet         `json:"known"`
	Subscribe SubscriptionSpecSet `json:"subscribe"`
}

StartupMsg is the first message a streaming client sends.

It declares initially known keys and also configures the initial subscriptions for the stream.

type StreamAuthZ

type StreamAuthZ interface {
	// GetProjectStreamableScopes returns an AccessScopeSet where the user has permission to view projects.
	GetProjectStreamableScopes(ctx context.Context, curUser model.User) (model.AccessScopeSet, error)

	// GetModelStreamableScopes returns an AccessScopeSet where the user has permission to view models.
	GetModelStreamableScopes(ctx context.Context, curUser model.User) (model.AccessScopeSet, error)

	// GetModelVersionStreamableScopes returns an AccessScopeSet where the user has permission to view models.
	GetModelVersionStreamableScopes(ctx context.Context, curUser model.User) (model.AccessScopeSet, error)

	// GetPermissionChangeListener returns a pointer listener
	// listening for permission change notifications if applicable.
	GetPermissionChangeListener() (*pq.Listener, error)
}

StreamAuthZ is the interface for streaming authorization.

type StreamAuthZBasic

type StreamAuthZBasic struct{}

StreamAuthZBasic is classic OSS Determined authentication for streaming clients.

func (*StreamAuthZBasic) GetModelStreamableScopes

func (a *StreamAuthZBasic) GetModelStreamableScopes(
	_ context.Context,
	_ model.User,
) (model.AccessScopeSet, error)

GetModelStreamableScopes always returns an AccessScopeSet with global permissions and a nil error.

func (*StreamAuthZBasic) GetModelVersionStreamableScopes

func (a *StreamAuthZBasic) GetModelVersionStreamableScopes(
	_ context.Context,
	_ model.User,
) (model.AccessScopeSet, error)

GetModelVersionStreamableScopes always returns an AccessScopeSet with global permissions and a nil error.

func (*StreamAuthZBasic) GetPermissionChangeListener

func (a *StreamAuthZBasic) GetPermissionChangeListener() (*pq.Listener, error)

GetPermissionChangeListener always returns a nil pointer and a nil error.

func (*StreamAuthZBasic) GetProjectStreamableScopes

func (a *StreamAuthZBasic) GetProjectStreamableScopes(
	_ context.Context,
	_ model.User,
) (model.AccessScopeSet, error)

GetProjectStreamableScopes always returns an AccessScopeSet with global permissions and a nil error.

type SubscriptionSet

type SubscriptionSet struct {
	Projects      *subscriptionState[*ProjectMsg, ProjectSubscriptionSpec]
	Models        *subscriptionState[*ModelMsg, ModelSubscriptionSpec]
	ModelVersions *subscriptionState[*ModelVersionMsg, ModelVersionSubscriptionSpec]
}

SubscriptionSet is a set of all subscribers for this PublisherSet.

There is one SubscriptionSet for each websocket connection. It has one SubscriptionManager per streamable type.

func NewSubscriptionSet

func NewSubscriptionSet(
	ctx context.Context,
	streamer *stream.Streamer,
	ps *PublisherSet,
	user model.User,
	spec SubscriptionSpecSet,
) (SubscriptionSet, error)

NewSubscriptionSet constructor for SubscriptionSet.

func (*SubscriptionSet) Startup

func (ss *SubscriptionSet) Startup(ctx context.Context, user model.User, startupMsg StartupMsg) (
	[]interface{}, error,
)

Startup handles starting up the Subscription objects in the SubscriptionSet.

func (*SubscriptionSet) UnregisterAll

func (ss *SubscriptionSet) UnregisterAll()

UnregisterAll unregisters all Subscription's in the SubscriptionSet.

type SubscriptionSpecSet

type SubscriptionSpecSet struct {
	Projects     *ProjectSubscriptionSpec      `json:"projects"`
	Models       *ModelSubscriptionSpec        `json:"models"`
	ModelVersion *ModelVersionSubscriptionSpec `json:"modelversions"`
}

SubscriptionSpecSet is the set of subscription specs that can be sent in startup message.

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor manages the context for underlying PublisherSet.

func NewSupervisor

func NewSupervisor(dbAddress string) *Supervisor

NewSupervisor creates a new Supervisor.

func (*Supervisor) Run

func (ssup *Supervisor) Run(ctx context.Context) error

Run attempts to start up the publisher system and recovers in the event of a failure.

func (*Supervisor) Websocket

func (ssup *Supervisor) Websocket(socket *websocket.Conn, c echo.Context) error

Websocket passes incoming stream request to the active PublisherSet's websocket handler, ensuring that in the event of a PublisherSet failure, stream requests can still routed during recovery.

type WebsocketLike

type WebsocketLike interface {
	ReadJSON(interface{}) error
	Write(interface{}) error
}

WebsocketLike is an interface to describe a websocket and its dummy implementations.

type WrappedWebsocket

type WrappedWebsocket struct {
	*websocket.Conn
}

WrappedWebsocket is a simple wrapper on a websocket connection.

func (*WrappedWebsocket) Write

func (w *WrappedWebsocket) Write(msg interface{}) error

Write ensures the underlying msg is a websocket.PreparedMessage before writing it to websocket.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL