Documentation ¶
Index ¶
- Constants
- Variables
- func BootemLoop(ctx context.Context, ps *PublisherSet) error
- func ModelCollectStartupMsgs(ctx context.Context, user model.User, known string, spec ModelSubscriptionSpec) ([]stream.MarshallableMsg, error)
- func ModelMakeFilter(spec *ModelSubscriptionSpec) (func(*ModelMsg) bool, error)
- func ModelMakeHydrator() func(*ModelMsg) (*ModelMsg, error)
- func ModelMakePermissionFilter(ctx context.Context, user model.User) (func(*ModelMsg) bool, error)
- func ModelVersionCollectStartupMsgs(ctx context.Context, user model.User, known string, ...) ([]stream.MarshallableMsg, error)
- func ModelVersionMakeFilter(spec *ModelVersionSubscriptionSpec) (func(*ModelVersionMsg) bool, error)
- func ModelVersionMakeHydrator() func(*ModelVersionMsg) (*ModelVersionMsg, error)
- func ModelVersionMakePermissionFilter(ctx context.Context, user model.User) (func(*ModelVersionMsg) bool, error)
- func ProjectCollectStartupMsgs(ctx context.Context, user model.User, known string, ...) ([]stream.MarshallableMsg, error)
- func ProjectMakeFilter(spec *ProjectSubscriptionSpec) (func(*ProjectMsg) bool, error)
- func ProjectMakeHydrator() func(*ProjectMsg) (*ProjectMsg, error)
- func ProjectMakePermissionFilter(ctx context.Context, user model.User) (func(*ProjectMsg) bool, error)
- func WriteAll(socketLike WebsocketLike, msgs []interface{}) error
- type CollectStartupMsgsFunc
- type JSONB
- type KnownKeySet
- type ModelMsg
- type ModelSubscriptionSpec
- type ModelVersionMsg
- type ModelVersionSubscriptionSpec
- type ProjectMsg
- type ProjectSubscriptionSpec
- type PublisherSet
- type StartupMsg
- type StreamAuthZ
- type StreamAuthZBasic
- func (a *StreamAuthZBasic) GetModelStreamableScopes(_ context.Context, _ model.User) (model.AccessScopeSet, error)
- func (a *StreamAuthZBasic) GetModelVersionStreamableScopes(_ context.Context, _ model.User) (model.AccessScopeSet, error)
- func (a *StreamAuthZBasic) GetPermissionChangeListener() (*pq.Listener, error)
- func (a *StreamAuthZBasic) GetProjectStreamableScopes(_ context.Context, _ model.User) (model.AccessScopeSet, error)
- type SubscriptionSet
- type SubscriptionSpecSet
- type Supervisor
- type WebsocketLike
- type WrappedWebsocket
Constants ¶
const ( // ModelVersionsDeleteKey specifies the key for delete model_versions. ModelVersionsDeleteKey = "modelversions_deleted" // ModelVersionsUpsertKey specifies the key for upsert model_versions. ModelVersionsUpsertKey = "modelversion" )
const ( // ModelsDeleteKey specifies the key for delete models. ModelsDeleteKey = "models_deleted" // ModelsUpsertKey specifies the key for upsert models. ModelsUpsertKey = "model" )
const ( // ProjectsDeleteKey specifies the key for delete projects. ProjectsDeleteKey = "projects_deleted" // ProjectsUpsertKey specifies the key for upsert projects. ProjectsUpsertKey = "project" )
Variables ¶
var AuthZProvider authz.AuthZProviderType[StreamAuthZ]
AuthZProvider provides StreamAuthZ implementations.
Functions ¶
func BootemLoop ¶
func BootemLoop(ctx context.Context, ps *PublisherSet) error
BootemLoop listens for permission changes, updates the PublisherSet to signal to boot streamers, returns an error in the event of a failure to listen.
func ModelCollectStartupMsgs ¶
func ModelCollectStartupMsgs( ctx context.Context, user model.User, known string, spec ModelSubscriptionSpec, ) ( []stream.MarshallableMsg, error, )
ModelCollectStartupMsgs collects ModelMsg's that were missed prior to startup. nolint: dupl
func ModelMakeFilter ¶
func ModelMakeFilter(spec *ModelSubscriptionSpec) (func(*ModelMsg) bool, error)
ModelMakeFilter creates a ModelMsg filter based on the given ModelSubscriptionSpec.
func ModelMakeHydrator ¶
ModelMakeHydrator returns a function that gets properties of a model by its id.
func ModelMakePermissionFilter ¶
ModelMakePermissionFilter returns a function that checks if a ModelMsg is in scope of the user permissions.
func ModelVersionCollectStartupMsgs ¶
func ModelVersionCollectStartupMsgs( ctx context.Context, user model.User, known string, spec ModelVersionSubscriptionSpec, ) ( []stream.MarshallableMsg, error, )
ModelVersionCollectStartupMsgs collects ModelVersionMsg's that were missed prior to startup. nolint: dupl
func ModelVersionMakeFilter ¶
func ModelVersionMakeFilter(spec *ModelVersionSubscriptionSpec) (func(*ModelVersionMsg) bool, error)
ModelVersionMakeFilter creates a ModelVersionMsg filter based on the given ModelVersionSubscriptionSpec.
func ModelVersionMakeHydrator ¶
func ModelVersionMakeHydrator() func(*ModelVersionMsg) (*ModelVersionMsg, error)
ModelVersionMakeHydrator returns a function that gets properties of a model version by its id.
func ModelVersionMakePermissionFilter ¶
func ModelVersionMakePermissionFilter(ctx context.Context, user model.User) (func(*ModelVersionMsg) bool, error)
ModelVersionMakePermissionFilter returns a function that checks if a ModelVersionMsg is in scope of the user permissions.
func ProjectCollectStartupMsgs ¶
func ProjectCollectStartupMsgs( ctx context.Context, user model.User, known string, spec ProjectSubscriptionSpec, ) ( []stream.MarshallableMsg, error, )
ProjectCollectStartupMsgs collects ProjectMsg's that were missed prior to startup. nolint: dupl
func ProjectMakeFilter ¶
func ProjectMakeFilter(spec *ProjectSubscriptionSpec) (func(*ProjectMsg) bool, error)
ProjectMakeFilter creates a ProjectMsg filter based on the given ProjectSubscriptionSpec.
func ProjectMakeHydrator ¶
func ProjectMakeHydrator() func(*ProjectMsg) (*ProjectMsg, error)
ProjectMakeHydrator returns a function that gets properties of a project by its id.
func ProjectMakePermissionFilter ¶
func ProjectMakePermissionFilter(ctx context.Context, user model.User) (func(*ProjectMsg) bool, error)
ProjectMakePermissionFilter returns a function that checks if a ProjectMsg is in scope of the user permissions.
func WriteAll ¶
func WriteAll(socketLike WebsocketLike, msgs []interface{}) error
WriteAll attempts to write all provided messages.
Types ¶
type CollectStartupMsgsFunc ¶
type CollectStartupMsgsFunc[S any] func( ctx context.Context, user model.User, known string, spec S, ) ( []stream.MarshallableMsg, error, )
CollectStartupMsgsFunc collects messages that were missed prior to startup.
type JSONB ¶
type JSONB interface{}
JSONB is the golang equivalent of the postgres jsonb column type.
type KnownKeySet ¶
type KnownKeySet struct { Projects string `json:"projects"` Models string `json:"models"` ModelVersions string `json:"modelversions"` }
KnownKeySet allows a client to describe which primary keys it knows of as existing, so the server can respond with which client-known keys have been deleted or disappeared, and also which server-known keys are not yet known to the client (appearances).
Each field of a KnownKeySet is a comma-separated list of int64s and ranges like "a,b-c,d".
type ModelMsg ¶
type ModelMsg struct { bun.BaseModel `bun:"table:models"` // immutable attributes ID int `bun:"id,pk" json:"id"` // mutable attributes Name string `bun:"name" json:"name"` Description string `bun:"description" json:"description"` Archived bool `bun:"archived" json:"archived"` CreationTime time.Time `bun:"creation_time" json:"creation_time"` Notes string `bun:"notes" json:"notes"` WorkspaceID int `bun:"workspace_id" json:"workspace_id"` UserID int `bun:"user_id" json:"user_id"` LastUpdatedTime time.Time `bun:"last_updated_time" json:"last_updated_time"` Metadata JSONB `bun:"metadata,type:jsonb" json:"metadata"` Labels []string `bun:"labels,array" json:"labels"` // metadata Seq int64 `bun:"seq" json:"seq"` }
ModelMsg is a stream.Msg.
determined:stream-gen source=server delete_msg=ModelsDeleted
type ModelSubscriptionSpec ¶
type ModelSubscriptionSpec struct { WorkspaceIDs []int `json:"workspace_ids"` ModelIDs []int `json:"model_ids"` UserIDs []int `json:"user_ids"` Since int64 `json:"since"` }
ModelSubscriptionSpec is what a user submits to define a Model subscription.
determined:stream-gen source=client
type ModelVersionMsg ¶
type ModelVersionMsg struct { bun.BaseModel `bun:"table:model_versions"` // immutable attributes ID int `bun:"id,pk" json:"id"` // mutable attributes Name string `bun:"name" json:"name"` Version int `bun:"version" json:"version"` CheckpointUUID string `bun:"checkpoint_uuid" json:"checkpoint_uuid"` CreationTime time.Time `bun:"creation_time" json:"creation_time"` LastUpdatedTime time.Time `bun:"last_updated_time" json:"last_updated_time"` Metadata JSONB `bun:"metadata,type:jsonb" json:"metadata"` ModelID int `bun:"model_id" json:"model_id"` UserID int `bun:"user_id" json:"user_id"` Comment string `bun:"comment" json:"comment"` Labels []string `bun:"labels,array" json:"labels"` Notes string `bun:"notes" json:"notes"` WorkspaceID string `json:"workspace_id"` // metadata Seq int64 `bun:"seq" json:"seq"` }
ModelVersionMsg is a stream.Msg.
determined:stream-gen source=server delete_msg=ModelVersionsDeleted
func (*ModelVersionMsg) DeleteMsg ¶
func (mm *ModelVersionMsg) DeleteMsg() *stream.DeleteMsg
DeleteMsg creates a ModelVersion stream delete message.
func (*ModelVersionMsg) GetID ¶
func (mm *ModelVersionMsg) GetID() int
GetID gets the ID from a ModelVersionMsg.
func (*ModelVersionMsg) SeqNum ¶
func (mm *ModelVersionMsg) SeqNum() int64
SeqNum gets the SeqNum from a ModelVersionMsg.
func (*ModelVersionMsg) UpsertMsg ¶
func (mm *ModelVersionMsg) UpsertMsg() *stream.UpsertMsg
UpsertMsg creates a ModelVersion stream upsert message.
type ModelVersionSubscriptionSpec ¶
type ModelVersionSubscriptionSpec struct { ModelIDs []int `json:"model_ids"` ModelVersionIDs []int `json:"model_version_ids"` UserIDs []int `json:"user_ids"` Since int64 `json:"since"` }
ModelVersionSubscriptionSpec is what a user submits to define a model_version subscription.
determined:stream-gen source=client
type ProjectMsg ¶
type ProjectMsg struct { bun.BaseModel `bun:"table:projects"` // immutable attributes ID int `bun:"id,pk" json:"id"` // mutable attributes Name string `bun:"name" json:"name"` Description string `bun:"description" json:"description"` Archived bool `bun:"archived" json:"archived"` CreatedAt time.Time `bun:"created_at" json:"created_at"` Notes JSONB `bun:"notes,type:jsonb" json:"notes"` WorkspaceID int `bun:"workspace_id" json:"workspace_id"` UserID int `bun:"user_id" json:"user_id"` Immutable bool `bun:"immutable" json:"immutable"` State model.WorkspaceState `bun:"state" json:"state"` Key string `bun:"key" json:"key"` // metadata Seq int64 `bun:"seq" json:"seq"` }
ProjectMsg is a stream.Msg.
determined:stream-gen source=server delete_msg=ProjectsDeleted
func (*ProjectMsg) DeleteMsg ¶
func (pm *ProjectMsg) DeleteMsg() *stream.DeleteMsg
DeleteMsg creates a Project stream delete message.
func (*ProjectMsg) SeqNum ¶
func (pm *ProjectMsg) SeqNum() int64
SeqNum gets the SeqNum from a ProjectMsg.
func (*ProjectMsg) UpsertMsg ¶
func (pm *ProjectMsg) UpsertMsg() *stream.UpsertMsg
UpsertMsg creates a Project stream upsert message.
type ProjectSubscriptionSpec ¶
type ProjectSubscriptionSpec struct { WorkspaceIDs []int `json:"workspace_ids"` ProjectIDs []int `json:"project_ids"` Since int64 `json:"since"` }
ProjectSubscriptionSpec is what a user submits to define a project subscription.
determined:stream-gen source=client
type PublisherSet ¶
type PublisherSet struct { DBAddress string Projects *stream.Publisher[*ProjectMsg] Models *stream.Publisher[*ModelMsg] ModelVersions *stream.Publisher[*ModelVersionMsg] // contains filtered or unexported fields }
PublisherSet contains all publishers, and handles all websockets. It will connect each websocket with the appropriate set of publishers, based on that websocket's subscriptions.
There is one PublisherSet for the whole process. It has one Publisher per streamable type.
func NewPublisherSet ¶
func NewPublisherSet(dbAddress string) *PublisherSet
NewPublisherSet constructor for PublisherSet.
func (*PublisherSet) Start ¶
func (ps *PublisherSet) Start(ctx context.Context) error
Start starts each Publisher in the PublisherSet.
func (*PublisherSet) StreamHandler ¶
func (ps *PublisherSet) StreamHandler( publisherSetCtx context.Context, socket *websocket.Conn, c echo.Context, ) error
StreamHandler is the Echo websocket endpoint handler for streaming updates, defaulting the prepare function to prepareWebsocketMessage().
type StartupMsg ¶
type StartupMsg struct { SyncID string `json:"sync_id"` Known KnownKeySet `json:"known"` Subscribe SubscriptionSpecSet `json:"subscribe"` }
StartupMsg is the first message a streaming client sends.
It declares initially known keys and also configures the initial subscriptions for the stream.
type StreamAuthZ ¶
type StreamAuthZ interface { // GetProjectStreamableScopes returns an AccessScopeSet where the user has permission to view projects. GetProjectStreamableScopes(ctx context.Context, curUser model.User) (model.AccessScopeSet, error) // GetModelStreamableScopes returns an AccessScopeSet where the user has permission to view models. GetModelStreamableScopes(ctx context.Context, curUser model.User) (model.AccessScopeSet, error) // GetModelVersionStreamableScopes returns an AccessScopeSet where the user has permission to view models. GetModelVersionStreamableScopes(ctx context.Context, curUser model.User) (model.AccessScopeSet, error) // GetPermissionChangeListener returns a pointer listener // listening for permission change notifications if applicable. GetPermissionChangeListener() (*pq.Listener, error) }
StreamAuthZ is the interface for streaming authorization.
type StreamAuthZBasic ¶
type StreamAuthZBasic struct{}
StreamAuthZBasic is classic OSS Determined authentication for streaming clients.
func (*StreamAuthZBasic) GetModelStreamableScopes ¶
func (a *StreamAuthZBasic) GetModelStreamableScopes( _ context.Context, _ model.User, ) (model.AccessScopeSet, error)
GetModelStreamableScopes always returns an AccessScopeSet with global permissions and a nil error.
func (*StreamAuthZBasic) GetModelVersionStreamableScopes ¶
func (a *StreamAuthZBasic) GetModelVersionStreamableScopes( _ context.Context, _ model.User, ) (model.AccessScopeSet, error)
GetModelVersionStreamableScopes always returns an AccessScopeSet with global permissions and a nil error.
func (*StreamAuthZBasic) GetPermissionChangeListener ¶
func (a *StreamAuthZBasic) GetPermissionChangeListener() (*pq.Listener, error)
GetPermissionChangeListener always returns a nil pointer and a nil error.
func (*StreamAuthZBasic) GetProjectStreamableScopes ¶
func (a *StreamAuthZBasic) GetProjectStreamableScopes( _ context.Context, _ model.User, ) (model.AccessScopeSet, error)
GetProjectStreamableScopes always returns an AccessScopeSet with global permissions and a nil error.
type SubscriptionSet ¶
type SubscriptionSet struct { Projects *subscriptionState[*ProjectMsg, ProjectSubscriptionSpec] Models *subscriptionState[*ModelMsg, ModelSubscriptionSpec] ModelVersions *subscriptionState[*ModelVersionMsg, ModelVersionSubscriptionSpec] }
SubscriptionSet is a set of all subscribers for this PublisherSet.
There is one SubscriptionSet for each websocket connection. It has one SubscriptionManager per streamable type.
func NewSubscriptionSet ¶
func NewSubscriptionSet( ctx context.Context, streamer *stream.Streamer, ps *PublisherSet, user model.User, spec SubscriptionSpecSet, ) (SubscriptionSet, error)
NewSubscriptionSet constructor for SubscriptionSet.
func (*SubscriptionSet) Startup ¶
func (ss *SubscriptionSet) Startup(ctx context.Context, user model.User, startupMsg StartupMsg) ( []interface{}, error, )
Startup handles starting up the Subscription objects in the SubscriptionSet.
func (*SubscriptionSet) UnregisterAll ¶
func (ss *SubscriptionSet) UnregisterAll()
UnregisterAll unregisters all Subscription's in the SubscriptionSet.
type SubscriptionSpecSet ¶
type SubscriptionSpecSet struct { Projects *ProjectSubscriptionSpec `json:"projects"` Models *ModelSubscriptionSpec `json:"models"` ModelVersion *ModelVersionSubscriptionSpec `json:"modelversions"` }
SubscriptionSpecSet is the set of subscription specs that can be sent in startup message.
type Supervisor ¶
type Supervisor struct {
// contains filtered or unexported fields
}
Supervisor manages the context for underlying PublisherSet.
func NewSupervisor ¶
func NewSupervisor(dbAddress string) *Supervisor
NewSupervisor creates a new Supervisor.
func (*Supervisor) Run ¶
func (ssup *Supervisor) Run(ctx context.Context) error
Run attempts to start up the publisher system and recovers in the event of a failure.
func (*Supervisor) Websocket ¶
func (ssup *Supervisor) Websocket(socket *websocket.Conn, c echo.Context) error
Websocket passes incoming stream request to the active PublisherSet's websocket handler, ensuring that in the event of a PublisherSet failure, stream requests can still routed during recovery.
type WebsocketLike ¶
WebsocketLike is an interface to describe a websocket and its dummy implementations.
type WrappedWebsocket ¶
WrappedWebsocket is a simple wrapper on a websocket connection.
func (*WrappedWebsocket) Write ¶
func (w *WrappedWebsocket) Write(msg interface{}) error
Write ensures the underlying msg is a websocket.PreparedMessage before writing it to websocket.