Documentation ¶
Index ¶
- Constants
- Variables
- func NewAgentDebug(logger log.FieldLogger, port uint) *agentDebug
- func NewReverseGRPCProxy(metricsHandler metrics.AgentMetricsHandler, logger log.FieldLogger, ...) *reverseGRPCProxy
- func NewReverseHTTPProxy(logger log.FieldLogger, backendHTTPServerHost string, ...) *reverseHTTPProxy
- func ParseReplicaConfig(json string) (*agent.ReplicaConfig, error)
- type AgentSubscriber
- type Client
- func (c *Client) LoadModel(request *agent.ModelOperationMessage) error
- func (c *Client) Start() error
- func (c *Client) StartService() error
- func (c *Client) Stop()
- func (c *Client) UnloadAllModels() error
- func (c *Client) UnloadModel(request *agent.ModelOperationMessage) error
- func (c *Client) WaitReady() error
- type ClientServices
- type KubernetesOptions
- type LocalStateManager
- func (manager *LocalStateManager) EnsureLoadModel(modelId string) error
- func (manager *LocalStateManager) GetAvailableMemoryBytes() uint64
- func (manager *LocalStateManager) GetAvailableMemoryBytesWithOverCommit() uint64
- func (manager *LocalStateManager) GetOverCommitMemoryBytes() float32
- func (manager *LocalStateManager) LoadModelVersion(modelVersionDetails *agent.ModelVersion) error
- func (manager *LocalStateManager) UnloadModelVersion(modelVersionDetails *agent.ModelVersion) error
- type MLServerModelInfo
- type MLServerModelState
- type ModelState
- type SchedulerAgent
- type SchedulerGrpcClientOptions
- type Server
- func (s *Server) AgentDrain(ctx context.Context, message *pb.AgentDrainRequest) (*pb.AgentDrainResponse, error)
- func (s *Server) AgentEvent(ctx context.Context, message *pb.ModelEventMessage) (*pb.ModelEventResponse, error)
- func (s *Server) ModelScalingTrigger(stream pb.AgentService_ModelScalingTriggerServer) error
- func (s *Server) StartGrpcServer(allowPlainTxt bool, agentPort uint, agentTlsPort uint) error
- func (s *Server) StopAgentStreams()
- func (s *Server) Subscribe(request *pb.AgentSubscribeRequest, stream pb.AgentService_SubscribeServer) error
- func (s *Server) Sync(modelName string)
- type ServerKey
- type V2Client
- type V2Err
- type V2ServerError
Constants ¶
View Source
const ( // we define all communication error into one bucket // TODO: separate out the different comm issues (e.g. DNS vs Connection refused etc.) V2CommunicationErrCode = -100 // i.e invalid method etc. V2RequestErrCode = -200 )
View Source
const (
DefaultModelMemoryBytes = uint64(1 * 1024 * 1024) // use 1MB default
)
View Source
const (
DefaultReverseProxyHTTPPort = 9999
)
View Source
const (
GRPCDebugServicePort = 7777
)
View Source
const (
ReverseGRPCProxyPort = 9998
)
Variables ¶
View Source
var ErrServerNotReady = errors.New("Server not ready")
View Source
var ErrV2BadRequest = errors.New("V2 Bad Request")
Functions ¶
func NewAgentDebug ¶
func NewAgentDebug(logger log.FieldLogger, port uint) *agentDebug
func NewReverseGRPCProxy ¶
func NewReverseGRPCProxy( metricsHandler metrics.AgentMetricsHandler, logger log.FieldLogger, backendGRPCServerHost string, backendGRPCServerPort uint, servicePort uint, modelScalingStatsCollector *modelscaling.DataPlaneStatsCollector, ) *reverseGRPCProxy
func NewReverseHTTPProxy ¶
func NewReverseHTTPProxy( logger log.FieldLogger, backendHTTPServerHost string, backendHTTPServerPort uint, servicePort uint, metrics metrics.AgentMetricsHandler, modelScalingStatsCollector *modelscaling.DataPlaneStatsCollector, ) *reverseHTTPProxy
func ParseReplicaConfig ¶
func ParseReplicaConfig(json string) (*agent.ReplicaConfig, error)
Types ¶
type AgentSubscriber ¶
type AgentSubscriber struct {
// contains filtered or unexported fields
}
type Client ¶
type Client struct { ClientServices SchedulerGrpcClientOptions KubernetesOptions // contains filtered or unexported fields }
func NewClient ¶
func NewClient(serverName string, replicaIdx uint32, schedulerHost string, schedulerPlaintxtPort int, schedulerTlsPort int, logger log.FieldLogger, modelRepository repository.ModelRepository, v2Client *V2Client, replicaConfig *agent.ReplicaConfig, namespace string, reverseProxyHTTP interfaces.DependencyServiceInterface, reverseProxyGRPC interfaces.DependencyServiceInterface, agentDebugService interfaces.DependencyServiceInterface, modelScalingService interfaces.DependencyServiceInterface, drainerService interfaces.DependencyServiceInterface, metrics metrics.AgentMetricsHandler, ) *Client
func (*Client) StartService ¶
func (*Client) UnloadAllModels ¶
func (*Client) UnloadModel ¶
func (c *Client) UnloadModel(request *agent.ModelOperationMessage) error
type ClientServices ¶
type ClientServices struct {
ModelRepository repository.ModelRepository
}
type KubernetesOptions ¶
type KubernetesOptions struct {
// contains filtered or unexported fields
}
type LocalStateManager ¶
type LocalStateManager struct {
// contains filtered or unexported fields
}
manages the state associated with models on local agent
func NewLocalStateManager ¶
func NewLocalStateManager( modelVersions *ModelState, logger log.FieldLogger, v2Client *V2Client, totalMainMemoryBytes uint64, overCommitPercentage uint32, metrics metrics.AgentMetricsHandler, ) *LocalStateManager
func (*LocalStateManager) EnsureLoadModel ¶
func (manager *LocalStateManager) EnsureLoadModel(modelId string) error
this should be called from data plane (on incoming inference)
func (*LocalStateManager) GetAvailableMemoryBytes ¶
func (manager *LocalStateManager) GetAvailableMemoryBytes() uint64
func (*LocalStateManager) GetAvailableMemoryBytesWithOverCommit ¶
func (manager *LocalStateManager) GetAvailableMemoryBytesWithOverCommit() uint64
func (*LocalStateManager) GetOverCommitMemoryBytes ¶
func (manager *LocalStateManager) GetOverCommitMemoryBytes() float32
func (*LocalStateManager) LoadModelVersion ¶
func (manager *LocalStateManager) LoadModelVersion(modelVersionDetails *agent.ModelVersion) error
this should be called from control plane (if directly) the load request will always come with versioned model name (only one version)
func (*LocalStateManager) UnloadModelVersion ¶
func (manager *LocalStateManager) UnloadModelVersion(modelVersionDetails *agent.ModelVersion) error
this should be called from control plane (if directly)
type MLServerModelInfo ¶
type MLServerModelInfo struct { Name string State MLServerModelState }
type MLServerModelState ¶
type MLServerModelState string
const ( MLServerModelState_UNKNOWN MLServerModelState = "UNKNOWN" MLServerModelState_READY MLServerModelState = "READY" MLServerModelState_UNAVAILABLE MLServerModelState = "UNAVAILABLE" MLServerModelState_LOADING MLServerModelState = "LOADING" MLServerModelState_UNLOADING MLServerModelState = "UNLOADING" )
type ModelState ¶
type ModelState struct {
// contains filtered or unexported fields
}
func NewModelState ¶
func NewModelState() *ModelState
type SchedulerAgent ¶
type SchedulerAgent interface {
// contains filtered or unexported methods
}
type SchedulerGrpcClientOptions ¶
type SchedulerGrpcClientOptions struct {
// contains filtered or unexported fields
}
type Server ¶
type Server struct { pb.UnimplementedAgentServiceServer // contains filtered or unexported fields }
func NewAgentServer ¶
func NewAgentServer( logger log.FieldLogger, store store.ModelStore, scheduler scheduler.Scheduler, hub *coordinator.EventHub, ) *Server
func (*Server) AgentDrain ¶
func (s *Server) AgentDrain(ctx context.Context, message *pb.AgentDrainRequest) (*pb.AgentDrainResponse, error)
func (*Server) AgentEvent ¶
func (s *Server) AgentEvent(ctx context.Context, message *pb.ModelEventMessage) (*pb.ModelEventResponse, error)
func (*Server) ModelScalingTrigger ¶
func (s *Server) ModelScalingTrigger(stream pb.AgentService_ModelScalingTriggerServer) error
func (*Server) StartGrpcServer ¶
func (*Server) StopAgentStreams ¶
func (s *Server) StopAgentStreams()
func (*Server) Subscribe ¶
func (s *Server) Subscribe(request *pb.AgentSubscribeRequest, stream pb.AgentService_SubscribeServer) error
type V2Client ¶
type V2Client struct {
// contains filtered or unexported fields
}
func NewV2Client ¶
func (*V2Client) GetModels ¶
func (v *V2Client) GetModels() ([]MLServerModelInfo, error)
func (*V2Client) UnloadModel ¶
type V2Err ¶
type V2Err struct {
// contains filtered or unexported fields
}
Error wrapper with client and server errors + error code errCode should have the standard http error codes (for server) and client communication error codes (defined above)
func (*V2Err) IsNotFound ¶
type V2ServerError ¶
type V2ServerError struct {
Error string `json:"error"`
}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.