agent

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2022 License: Apache-2.0 Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// we define all communication error into one bucket
	// TODO: separate out the different comm issues (e.g. DNS vs Connection refused etc.)
	V2CommunicationErrCode = -100
	// i.e invalid method etc.
	V2RequestErrCode = -200
)
View Source
const (
	DefaultModelMemoryBytes = uint64(1 * 1024 * 1024) // use 1MB default
)
View Source
const (
	DefaultReverseProxyHTTPPort = 9999
)
View Source
const (
	GRPCDebugServicePort = 7777
)
View Source
const (
	ReverseGRPCProxyPort = 9998
)

Variables

View Source
var ErrServerNotReady = errors.New("Server not ready")
View Source
var ErrV2BadRequest = errors.New("V2 Bad Request")

Functions

func NewAgentDebug

func NewAgentDebug(logger log.FieldLogger, port uint) *agentDebug

func NewReverseGRPCProxy

func NewReverseGRPCProxy(
	metricsHandler metrics.AgentMetricsHandler,
	logger log.FieldLogger, backendGRPCServerHost string,
	backendGRPCServerPort uint,
	servicePort uint,
	modelScalingStatsCollector *modelscaling.DataPlaneStatsCollector,
) *reverseGRPCProxy

func NewReverseHTTPProxy

func NewReverseHTTPProxy(
	logger log.FieldLogger,
	backendHTTPServerHost string,
	backendHTTPServerPort uint,
	servicePort uint,
	metrics metrics.AgentMetricsHandler,
	modelScalingStatsCollector *modelscaling.DataPlaneStatsCollector,
) *reverseHTTPProxy

func ParseReplicaConfig

func ParseReplicaConfig(json string) (*agent.ReplicaConfig, error)

Types

type AgentSubscriber

type AgentSubscriber struct {
	// contains filtered or unexported fields
}

type Client

type Client struct {
	ClientServices
	SchedulerGrpcClientOptions
	KubernetesOptions
	// contains filtered or unexported fields
}

func NewClient

func NewClient(serverName string,
	replicaIdx uint32,
	schedulerHost string,
	schedulerPlaintxtPort int,
	schedulerTlsPort int,
	logger log.FieldLogger,
	modelRepository repository.ModelRepository,
	v2Client *V2Client,
	replicaConfig *agent.ReplicaConfig,
	namespace string,
	reverseProxyHTTP interfaces.DependencyServiceInterface,
	reverseProxyGRPC interfaces.DependencyServiceInterface,
	agentDebugService interfaces.DependencyServiceInterface,
	modelScalingService interfaces.DependencyServiceInterface,
	drainerService interfaces.DependencyServiceInterface,
	metrics metrics.AgentMetricsHandler,
) *Client

func (*Client) LoadModel

func (c *Client) LoadModel(request *agent.ModelOperationMessage) error

func (*Client) Start

func (c *Client) Start() error

func (*Client) StartService

func (c *Client) StartService() error

func (*Client) Stop

func (c *Client) Stop()

func (*Client) UnloadAllModels

func (c *Client) UnloadAllModels() error

func (*Client) UnloadModel

func (c *Client) UnloadModel(request *agent.ModelOperationMessage) error

func (*Client) WaitReady

func (c *Client) WaitReady() error

type ClientServices

type ClientServices struct {
	ModelRepository repository.ModelRepository
}

type KubernetesOptions

type KubernetesOptions struct {
	// contains filtered or unexported fields
}

type LocalStateManager

type LocalStateManager struct {
	// contains filtered or unexported fields
}

manages the state associated with models on local agent

func NewLocalStateManager

func NewLocalStateManager(
	modelVersions *ModelState,
	logger log.FieldLogger,
	v2Client *V2Client,
	totalMainMemoryBytes uint64,
	overCommitPercentage uint32,
	metrics metrics.AgentMetricsHandler,
) *LocalStateManager

func (*LocalStateManager) EnsureLoadModel

func (manager *LocalStateManager) EnsureLoadModel(modelId string) error

this should be called from data plane (on incoming inference)

func (*LocalStateManager) GetAvailableMemoryBytes

func (manager *LocalStateManager) GetAvailableMemoryBytes() uint64

func (*LocalStateManager) GetAvailableMemoryBytesWithOverCommit

func (manager *LocalStateManager) GetAvailableMemoryBytesWithOverCommit() uint64

func (*LocalStateManager) GetOverCommitMemoryBytes

func (manager *LocalStateManager) GetOverCommitMemoryBytes() float32

func (*LocalStateManager) LoadModelVersion

func (manager *LocalStateManager) LoadModelVersion(modelVersionDetails *agent.ModelVersion) error

this should be called from control plane (if directly) the load request will always come with versioned model name (only one version)

func (*LocalStateManager) UnloadModelVersion

func (manager *LocalStateManager) UnloadModelVersion(modelVersionDetails *agent.ModelVersion) error

this should be called from control plane (if directly)

type MLServerModelInfo

type MLServerModelInfo struct {
	Name  string
	State MLServerModelState
}

type MLServerModelState

type MLServerModelState string
const (
	MLServerModelState_UNKNOWN     MLServerModelState = "UNKNOWN"
	MLServerModelState_READY       MLServerModelState = "READY"
	MLServerModelState_UNAVAILABLE MLServerModelState = "UNAVAILABLE"
	MLServerModelState_LOADING     MLServerModelState = "LOADING"
	MLServerModelState_UNLOADING   MLServerModelState = "UNLOADING"
)

type ModelState

type ModelState struct {
	// contains filtered or unexported fields
}

func NewModelState

func NewModelState() *ModelState

type SchedulerAgent

type SchedulerAgent interface {
	// contains filtered or unexported methods
}

type SchedulerGrpcClientOptions

type SchedulerGrpcClientOptions struct {
	// contains filtered or unexported fields
}

type Server

type Server struct {
	pb.UnimplementedAgentServiceServer
	// contains filtered or unexported fields
}

func NewAgentServer

func NewAgentServer(
	logger log.FieldLogger,
	store store.ModelStore,
	scheduler scheduler.Scheduler,
	hub *coordinator.EventHub,
) *Server

func (*Server) AgentDrain

func (s *Server) AgentDrain(ctx context.Context, message *pb.AgentDrainRequest) (*pb.AgentDrainResponse, error)

func (*Server) AgentEvent

func (s *Server) AgentEvent(ctx context.Context, message *pb.ModelEventMessage) (*pb.ModelEventResponse, error)

func (*Server) ModelScalingTrigger

func (s *Server) ModelScalingTrigger(stream pb.AgentService_ModelScalingTriggerServer) error

func (*Server) StartGrpcServer

func (s *Server) StartGrpcServer(allowPlainTxt bool, agentPort uint, agentTlsPort uint) error

func (*Server) StopAgentStreams

func (s *Server) StopAgentStreams()

func (*Server) Subscribe

func (s *Server) Subscribe(request *pb.AgentSubscribeRequest, stream pb.AgentService_SubscribeServer) error

func (*Server) Sync

func (s *Server) Sync(modelName string)

type ServerKey

type ServerKey struct {
	// contains filtered or unexported fields
}

type V2Client

type V2Client struct {
	// contains filtered or unexported fields
}

func NewV2Client

func NewV2Client(host string, port int, logger log.FieldLogger, isGrpc bool) *V2Client

func (*V2Client) GetModels

func (v *V2Client) GetModels() ([]MLServerModelInfo, error)

func (*V2Client) LoadModel

func (v *V2Client) LoadModel(name string) *V2Err

func (*V2Client) Ready

func (v *V2Client) Ready() error

func (*V2Client) UnloadModel

func (v *V2Client) UnloadModel(name string) *V2Err

type V2Err

type V2Err struct {
	// contains filtered or unexported fields
}

Error wrapper with client and server errors + error code errCode should have the standard http error codes (for server) and client communication error codes (defined above)

func (*V2Err) IsNotFound

func (v *V2Err) IsNotFound() bool

type V2ServerError

type V2ServerError struct {
	Error string `json:"error"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL