Documentation ¶
Index ¶
- Constants
- Variables
- func IsNormalInformantError(err error) bool
- type Config
- type Dispatcher
- type DumpStateConfig
- type EnvArgs
- type HttpMuxServer
- type InformantConfig
- type InformantServer
- func (s *InformantServer) Downscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error)
- func (s *InformantServer) ExitStatus() *InformantServerExitStatus
- func (s *InformantServer) HealthCheck(ctx context.Context, logger *zap.Logger) (*api.InformantHealthCheckResp, error)
- func (s *InformantServer) MonitorDownscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error)
- func (s *InformantServer) MonitorHealthCheck(ctx context.Context, logger *zap.Logger) error
- func (s *InformantServer) MonitorUpscale(ctx context.Context, logger *zap.Logger, to api.Resources) error
- func (s *InformantServer) RegisterWithInformant(ctx context.Context, logger *zap.Logger) error
- func (s *InformantServer) Upscale(ctx context.Context, logger *zap.Logger, to api.Resources) error
- type InformantServerExitStatus
- type InformantServerMode
- type InformantServerState
- type MainRunner
- type MetricsConfig
- type MonitorConfig
- type MonitorResult
- type PromMetrics
- type Runner
- type RunnerState
- type ScalingConfig
- type Scheduler
- type SchedulerConfig
- type SchedulerState
- type StateDump
- type VMUpdateReason
Constants ¶
const ( MinMonitorProtocolVersion api.MonitorProtoVersion = api.MonitorProtoV1_0 MaxMonitorProtocolVersion api.MonitorProtoVersion = api.MonitorProtoV1_0 )
const ( RunnerRestartMinWaitSeconds = 5 RunnerRestartMaxWaitSeconds = 10 )
FIXME: make these timings configurable.
const ( MinInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV1_0 MaxInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV2_0 )
The autoscaler-agent currently supports v1.0 to v2.0 of the agent<->informant protocol.
If you update either of these values, make sure to also update VERSIONING.md.
const PluginProtocolVersion api.PluginProtoVersion = api.PluginProtoV2_0
PluginProtocolVersion is the current version of the agent<->scheduler plugin in use by this autoscaler-agent.
Currently, each autoscaler-agent supports only one version at a time. In the future, this may change.
Variables ¶
var ( InformantServerAlreadyExitedError error = errors.New("Informant server has already exited") InformantServerSuspendedError error = errors.New("Informant server is currently suspended") InformantServerUnconfirmedError error = errors.New("Informant server has not yet been confirmed") InformantServerNotCurrentError error = errors.New("Informant server has been replaced") )
Functions ¶
func IsNormalInformantError ¶
IsNormalInformantError returns true if the error is one of the "expected" errors that can occur in valid exchanges - due to unavoidable raciness or otherwise.
Types ¶
type Config ¶
type Config struct { Scaling ScalingConfig `json:"scaling"` Informant InformantConfig `json:"informant"` Metrics MetricsConfig `json:"metrics"` Scheduler SchedulerConfig `json:"scheduler"` Monitor MonitorConfig `json:"monitor"` Billing *billing.Config `json:"billing,omitempty"` DumpState *DumpStateConfig `json:"dumpState"` }
func ReadConfig ¶
type Dispatcher ¶ added in v0.16.3
type Dispatcher struct {
// contains filtered or unexported fields
}
The Dispatcher is the main object managing the websocket connection to the monitor. For more information on the protocol, see pkg/api/types.go
func NewDispatcher ¶ added in v0.16.3
func NewDispatcher(logger *zap.Logger, addr string, parent *InformantServer) (disp *Dispatcher, _ error)
Create a new Dispatcher, establishing a connection with the informant. Note that this does not immediately start the Dispatcher. Call Run() to start it.
func (*Dispatcher) Call ¶ added in v0.16.3
func (disp *Dispatcher) Call(ctx context.Context, sender util.SignalSender[*MonitorResult], message any) error
Make a request to the monitor. The dispatcher will handle returning a response on the provided SignalSender. The value passed into message must be a valid value to send to the monitor. See the docs for SerializeInformantMessage.
func (*Dispatcher) HandleMessage ¶ added in v0.16.3
func (disp *Dispatcher) HandleMessage( ctx context.Context, logger *zap.Logger, handlers messageHandlerFuncs, ) error
Handle messages from the monitor. Make sure that all message types the monitor can send are included in the inner switch statement.
type DumpStateConfig ¶ added in v0.5.0
type DumpStateConfig struct { // Port is the port to serve on Port uint16 `json:"port"` // TimeoutSeconds gives the maximum duration, in seconds, that we allow for a request to dump // internal state. TimeoutSeconds uint `json:"timeoutSeconds"` }
DumpStateConfig configures the endpoint to dump all internal state
type EnvArgs ¶
type EnvArgs struct { // ConfigPath gives the path to read static configuration from. It is taken from the CONFIG_PATH // environment variable. ConfigPath string // K8sNodeName is the Kubernetes node the autoscaler agent is running on. It is taken from the // K8S_NODE_NAME environment variable, which is set equal to the pod's Spec.NodeName. // // The Kubernetes documentation doesn't say this, but the NodeName is always populated with the // final node the pod was placed on by the time the environment variables are set. K8sNodeName string // K8sPodIP is the IP address of the Kubernetes pod that this autoscaler-agent is running in K8sPodIP string }
EnvArgs stores the static configuration data assigned to the autoscaler agent by its environment
func ArgsFromEnv ¶
type HttpMuxServer ¶ added in v0.15.0
type HttpMuxServer struct {
// contains filtered or unexported fields
}
HttpMuxServer provides a way to multiplex multiple http.ServeMux instances on a single HTTP server, so that requests are routed to the correct ServeMux based on a prefix at the beginning of the URL path.
For example, if you call RegisterMux("foo", myMux), then all requests to "/foo/*" are routed to 'myMux' instance.
(Why is this needed? You could register all the routes directly in one giant http.ServeMux, but http.ServeMux doesn't provide any way to unregister patterns.)
func StartHttpMuxServer ¶ added in v0.15.0
func StartHttpMuxServer( logger *zap.Logger, port int, ) (*HttpMuxServer, error)
Create a new HttpMuxServer, listening on the given port
func (*HttpMuxServer) RegisterMux ¶ added in v0.15.0
func (s *HttpMuxServer) RegisterMux(muxID string, subServer *http.ServeMux) error
func (*HttpMuxServer) ServeHTTP ¶ added in v0.15.0
func (s *HttpMuxServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
Implements http.Handler
func (*HttpMuxServer) UnregisterMux ¶ added in v0.15.0
func (s *HttpMuxServer) UnregisterMux(muxID string)
type InformantConfig ¶
type InformantConfig struct { // ServerPort is the port that the VM informant serves from ServerPort uint16 `json:"serverPort"` // CallbackPort is the port that the agent listens on for informant -> agent requests CallbackPort int `json:"callbackPort"` // RetryServerMinWaitSeconds gives the minimum duration, in seconds, that we must wait between the // start of one InformantServer and the next // // This "minimum wait" is only used when thethe RetryServerMinWaitSeconds uint `json:"retryServerMinWaitSeconds"` // RetryServerNormalWaitSeconds gives the typical duration, in seconds, that we wait between an // InformantServer failing and our retry. RetryServerNormalWaitSeconds uint `json:"retryServerNormalWaitSeconds"` // RegisterRetrySeconds gives the duration, in seconds, to wait between retrying a failed // register request. RegisterRetrySeconds uint `json:"registerRetrySeconds"` // RequestTimeoutSeconds gives the timeout for any individual request to the informant, except // for those with separately-defined values below. RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // RegisterTimeoutSeconds gives the timeout duration, in seconds, for a register request. // // This is a separate field from RequestTimeoutSeconds because registering may require that the // informant suspend a previous agent, which could take longer. RegisterTimeoutSeconds uint `json:"registerTimeoutSeconds"` // DownscaleTimeoutSeconds gives the timeout duration, in seconds, for a downscale request. // // This is a separate field from RequestTimeoutSeconds it's possible that downscaling may // require some non-trivial work that we want to allow to complete. DownscaleTimeoutSeconds uint `json:"downscaleTimeoutSeconds"` // UnhealthyAfterSilenceDurationSeconds gives the duration, in seconds, after which failing to // receive a successful request from the informant indicates that it is probably unhealthy. UnhealthyAfterSilenceDurationSeconds uint `json:"unhealthyAfterSilenceDurationSeconds"` // UnhealthyStartupGracePeriodSeconds gives the duration, in seconds, after which we will no // longer excuse total VM informant failures - i.e. when unhealthyAfterSilenceDurationSeconds // kicks in. UnhealthyStartupGracePeriodSeconds uint `json:"unhealthyStartupGracePeriodSeconds"` }
type InformantServer ¶
type InformantServer struct {
// contains filtered or unexported fields
}
func NewInformantServer ¶
func NewInformantServer( ctx context.Context, logger *zap.Logger, runner *Runner, updatedInformant util.CondChannelSender, upscaleRequested util.CondChannelSender, ) (*InformantServer, util.SignalReceiver[struct{}], error)
NewInformantServer starts an InformantServer, returning it and a signal receiver that will be signalled when it exits.
func (*InformantServer) Downscale ¶
func (s *InformantServer) Downscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error)
Downscale makes a request to the informant's /downscale endpoint with the api.Resources
This method MUST be called while holding s.requestLock AND NOT s.runner.lock
func (*InformantServer) ExitStatus ¶
func (s *InformantServer) ExitStatus() *InformantServerExitStatus
ExitStatus returns the InformantServerExitStatus associated with the server, if it has been instructed to exit
This method MUST NOT be called while holding s.runner.lock.
func (*InformantServer) HealthCheck ¶ added in v0.7.0
func (s *InformantServer) HealthCheck(ctx context.Context, logger *zap.Logger) (*api.InformantHealthCheckResp, error)
HealthCheck makes a request to the informant's /health-check endpoint, using the server's ID.
This method MUST be called while holding i.server.requestLock AND NOT i.server.runner.lock.
func (*InformantServer) MonitorDownscale ¶ added in v0.16.3
func (s *InformantServer) MonitorDownscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error)
MonitorDownscale is the equivalent of (*InformantServer).Downscale for when we're connected to a monitor.
This method MUST be called while holding s.requestLock AND NOT s.runner.lock ¶
*Note*: Locking requestLock is not technically necessary, but it allows for better serialization. For example, if this function exits, we know that no other dispatcher calls will be made.
func (*InformantServer) MonitorHealthCheck ¶ added in v0.16.3
MonitorHealthCheck is the equivalent of (*InformantServer).HealthCheck for when we're connected to a monitor.
This method MUST be called while holding s.requestLock AND NOT s.runner.lock ¶
*Note*: Locking requestLock is not technically necessary, but it allows for better serialization. For example, if this function exits, we know that no other dispatcher calls will be made.
func (*InformantServer) MonitorUpscale ¶ added in v0.16.3
func (s *InformantServer) MonitorUpscale(ctx context.Context, logger *zap.Logger, to api.Resources) error
MonitorUpscale is the equivalent of (*InformantServer).Upscale for when we're connected to a monitor.
This method MUST be called while holding s.requestLock AND NOT s.runner.lock ¶
*Note*: Locking requestLock is not technically necessary, but it allows for better serialization. For example, if this function exits, we know that no other dispatcher calls will be made.
func (*InformantServer) RegisterWithInformant ¶
RegisterWithInformant sends a /register request to the VM Informant
If called after a prior success, this method will panic. If the server has already exited, this method will return InformantServerAlreadyExitedError.
On certain errors, this method will force the server to exit. This can be checked by calling s.ExitStatus() and checking for a non-nil result.
This method MUST NOT be called while holding s.requestLock OR s.runner.lock.
type InformantServerExitStatus ¶
type InformantServerExitStatus struct { // Err is the error, if any, that caused the server to exit. This is only non-nil when context // used to start the server becomes canceled (i.e. the Runner is exiting). Err error // RetryShouldFix is true if simply retrying should resolve err. This is true when e.g. the // informant responds with a 404 to a downscale or upscale request - it might've restarted, so // we just need to re-register. RetryShouldFix bool }
type InformantServerMode ¶
type InformantServerMode string
const ( InformantServerUnconfirmed InformantServerMode = "unconfirmed" InformantServerSuspended InformantServerMode = "suspended" InformantServerRunning InformantServerMode = "running" )
type InformantServerState ¶
type InformantServerState struct { Desc api.AgentDesc `json:"desc"` SeqNum uint64 `json:"seqNum"` ReceivedIDCheck bool `json:"receivedIDCheck"` MadeContact bool `json:"madeContact"` ProtoVersion *api.InformantProtoVersion `json:"protoVersion"` Mode InformantServerMode `json:"mode"` ExitStatus *InformantServerExitStatus `json:"exitStatus"` }
InformantServerState is the serializable state of the InformantServer, produced by calls to the Runner's State() method.
type MainRunner ¶
type MetricsConfig ¶
type MetricsConfig struct { // LoadMetricPrefix is the prefix at the beginning of the load metrics that we use. For // node_exporter, this is "node_", and for vector it's "host_" LoadMetricPrefix string `json:"loadMetricPrefix"` // RequestTimeoutSeconds gives the timeout duration, in seconds, for metrics requests RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // SecondsBetweenRequests sets the number of seconds to wait between metrics requests SecondsBetweenRequests uint `json:"secondsBetweenRequests"` }
MetricsConfig defines a few parameters for metrics requests to the VM
type MonitorConfig ¶ added in v0.16.3
type MonitorConfig struct {
ResponseTimeoutSeconds uint `json:"responseTimeoutSeconds"`
}
type MonitorResult ¶ added in v0.16.3
type MonitorResult struct { Result *api.DownscaleResult Confirmation *api.UpscaleConfirmation HealthCheck *api.HealthCheck }
This struct represents the result of a dispatcher.Call. Because the SignalSender passed in can only be generic over one type - we have this mock enum. Only one field should ever be non-nil, and it should always be clear which field is readable. For example, the caller of dispatcher.call(HealthCheck { .. }) should only read the healthcheck field.
type PromMetrics ¶ added in v0.6.0
type PromMetrics struct {
// contains filtered or unexported fields
}
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner is per-VM Pod god object responsible for handling everything
It primarily operates as a source of shared data for a number of long-running tasks. For additional general information, refer to the comment at the top of this file.
func (*Runner) Run ¶
func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util.CondChannelReceiver) error
Run is the main entrypoint to the long-running per-VM pod tasks
type RunnerState ¶
type RunnerState struct { PodIP string `json:"podIP"` VM api.VmInfo `json:"vm"` LastMetrics *api.Metrics `json:"lastMetrics"` Scheduler *SchedulerState `json:"scheduler"` Server *InformantServerState `json:"server"` Informant *api.InformantDesc `json:"informant"` ComputeUnit *api.Resources `json:"computeUnit"` LastApproved *api.Resources `json:"lastApproved"` LastSchedulerError error `json:"lastSchedulerError"` LastInformantError error `json:"lastInformantError"` BackgroundWorkerCount int64 `json:"backgroundWorkerCount"` SchedulerRespondedWithMigration bool `json:"migrationStarted"` }
RunnerState is the serializable state of the Runner, extracted by its State method
type ScalingConfig ¶
type ScalingConfig struct { // RequestTimeoutSeconds gives the timeout duration, in seconds, for VM patch requests RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // DefaultConfig gives the default scaling config, to be used if there is no configuration // supplied with the "autoscaling.neon.tech/config" annotation. DefaultConfig api.ScalingConfig `json:"defaultConfig"` }
ScalingConfig defines the scheduling we use for scaling up and down
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler stores relevant state for a particular scheduler that a Runner is (or has) connected to
func (*Scheduler) DoRequest ¶
func (s *Scheduler) DoRequest(ctx context.Context, logger *zap.Logger, reqData *api.AgentRequest) (*api.PluginResponse, error)
SendRequest implements all of the tricky logic for requests sent to the scheduler plugin
This method checks: * That the response is semantically valid * That the response matches with the state of s.runner.vm, if s.runner.scheduler == s
This method may set:
- s.fatalError
- s.runner.{computeUnit,lastApproved,lastSchedulerError,schedulerRespondedWithMigration}, if s.runner.scheduler == s.
This method MAY ALSO call s.runner.shutdown(), if s.runner.scheduler == s.
This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock.
func (*Scheduler) Register ¶
Register performs the initial request required to register with a scheduler
This method is called immediately after the Scheduler is created, and may be called subsequent times if the initial request fails.
signalOk will be called if the request succeeds, with s.runner.lock held - but only if s.runner.scheduler == s.
This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock
type SchedulerConfig ¶
type SchedulerConfig struct { // SchedulerName is the name of the scheduler we're expecting to communicate with. // // Any VMs that don't have a matching Spec.SchedulerName will not be autoscaled. SchedulerName string `json:"schedulerName"` // RequestTimeoutSeconds gives the timeout duration, in seconds, for requests to the scheduler // // If zero, requests will have no timeout. RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // RequestPort defines the port to access the scheduler's ✨special✨ API with RequestPort uint16 `json:"requestPort"` }
SchedulerConfig defines a few parameters for scheduler requests
type SchedulerState ¶
type SchedulerState struct { Info schedwatch.SchedulerInfo `json:"info"` Registered bool `json:"registered"` FatalError error `json:"fatalError"` }
SchedulerState is the state of a Scheduler, constructed as part of a Runner's State Method
type VMUpdateReason ¶
type VMUpdateReason string
VMUpdateReason provides context to (*Runner).updateVMResources about why an update to the VM's resources has been requested
const ( UpdatedMetrics VMUpdateReason = "metrics" UpscaleRequested VMUpdateReason = "upscale requested" RegisteredScheduler VMUpdateReason = "scheduler" UpdatedVMInfo VMUpdateReason = "updated VM info" )