Documentation ¶
Index ¶
- Constants
- Variables
- func IsNormalInformantError(err error) bool
- func RunBillingMetricsCollector(backgroundCtx context.Context, conf *BillingConfig, store VMStoreForNode)
- type BillingConfig
- type Config
- type DumpStateConfig
- type EnvArgs
- type InformantConfig
- type InformantServer
- func (s *InformantServer) Downscale(ctx context.Context, to api.Resources) (*api.DownscaleResult, error)
- func (s *InformantServer) ExitStatus() *InformantServerExitStatus
- func (s *InformantServer) HealthCheck(ctx context.Context) (*api.InformantHealthCheckResp, error)
- func (s *InformantServer) RegisterWithInformant(ctx context.Context) error
- func (s *InformantServer) Upscale(ctx context.Context, to api.Resources) error
- type InformantServerExitStatus
- type InformantServerMode
- type InformantServerState
- type MainRunner
- type MetricsConfig
- type PromMetrics
- type Runner
- type RunnerLogger
- type RunnerState
- type ScalingConfig
- type Scheduler
- type SchedulerConfig
- type SchedulerState
- type StateDump
- type VMNodeIndex
- type VMStoreForNode
- type VMUpdateReason
Constants ¶
const ( RunnerRestartMinWaitSeconds = 5 RunnerRestartMaxWaitSeconds = 10 )
FIXME: make these timings configurable.
const ( MinInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV1_0 MaxInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV1_2 )
The autoscaler-agent currently supports v1.0 to v1.1 of the agent<->informant protocol.
If you update either of these values, make sure to also update VERSIONING.md.
const (
EndpointLabel string = "neon/endpoint-id"
)
const PluginProtocolVersion api.PluginProtoVersion = api.PluginProtoV2_0
PluginProtocolVersion is the current version of the agent<->scheduler plugin in use by this autoscaler-agent.
Currently, each autoscaler-agent supports only one version at a time. In the future, this may change.
Variables ¶
var ( InformantServerAlreadyExitedError error = errors.New("Informant server has already exited") InformantServerSuspendedError error = errors.New("Informant server is currently suspended") InformantServerUnconfirmedError error = errors.New("Informant server has not yet been confirmed") InformantServerNotCurrentError error = errors.New("Informant server has been replaced") )
Functions ¶
func IsNormalInformantError ¶
IsNormalInformantError returns true if the error is one of the "expected" errors that can occur in valid exchanges - due to unavoidable raciness or otherwise.
func RunBillingMetricsCollector ¶ added in v0.1.9
func RunBillingMetricsCollector( backgroundCtx context.Context, conf *BillingConfig, store VMStoreForNode, )
Types ¶
type BillingConfig ¶ added in v0.1.9
type BillingConfig struct { URL string `json:"url"` CPUMetricName string `json:"cpuMetricName"` ActiveTimeMetricName string `json:"activeTimeMetricName"` CollectEverySeconds uint `json:"collectEverySeconds"` PushEverySeconds uint `json:"pushEverySeconds"` PushTimeoutSeconds uint `json:"pushTimeoutSeconds"` }
type Config ¶
type Config struct { Scaling ScalingConfig `json:"scaling"` Informant InformantConfig `json:"informant"` Metrics MetricsConfig `json:"metrics"` Scheduler SchedulerConfig `json:"scheduler"` Billing *BillingConfig `json:"billing,omitempty"` DumpState *DumpStateConfig `json:"dumpState"` }
func ReadConfig ¶
type DumpStateConfig ¶ added in v0.5.0
type DumpStateConfig struct { // Port is the port to serve on Port uint16 `json:"port"` // TimeoutSeconds gives the maximum duration, in seconds, that we allow for a request to dump // internal state. TimeoutSeconds uint `json:"timeoutSeconds"` }
DumpStateConfig configures the endpoint to dump all internal state
type EnvArgs ¶
type EnvArgs struct { // ConfigPath gives the path to read static configuration from. It is taken from the CONFIG_PATH // environment variable. ConfigPath string // K8sNodeName is the Kubernetes node the autoscaler agent is running on. It is taken from the // K8S_NODE_NAME environment variable, which is set equal to the pod's Spec.NodeName. // // The Kubernetes documentation doesn't say this, but the NodeName is always populated with the // final node the pod was placed on by the time the environment variables are set. K8sNodeName string // K8sPodIP is the IP address of the Kubernetes pod that this autoscaler-agent is running in K8sPodIP string }
EnvArgs stores the static configuration data assigned to the autoscaler agent by its environment
func ArgsFromEnv ¶
type InformantConfig ¶
type InformantConfig struct { // ServerPort is the port that the VM informant serves from ServerPort uint16 `json:"serverPort"` // RetryServerMinWaitSeconds gives the minimum duration, in seconds, that we must wait between the // start of one InformantServer and the next // // This "minimum wait" is only used when thethe RetryServerMinWaitSeconds uint `json:"retryServerMinWaitSeconds"` // RetryServerNormalWaitSeconds gives the typical duration, in seconds, that we wait between an // InformantServer failing and our retry. RetryServerNormalWaitSeconds uint `json:"retryServerNormalWaitSeconds"` // RegisterRetrySeconds gives the duration, in seconds, to wait between retrying a failed // register request. RegisterRetrySeconds uint `json:"registerRetrySeconds"` // RequestTimeoutSeconds gives the timeout for any individual request to the informant, except // for those with separately-defined values below. RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // RegisterTimeoutSeconds gives the timeout duration, in seconds, for a register request. // // This is a separate field from RequestTimeoutSeconds because registering may require that the // informant suspend a previous agent, which could take longer. RegisterTimeoutSeconds uint `json:"registerTimeoutSeconds"` // DownscaleTimeoutSeconds gives the timeout duration, in seconds, for a downscale request. // // This is a separate field from RequestTimeoutSeconds it's possible that downscaling may // require some non-trivial work that we want to allow to complete. DownscaleTimeoutSeconds uint `json:"downscaleTimeoutSeconds"` // UnhealthyAfterSilenceDurationSeconds gives the duration, in seconds, after which failing to // receive a successful request from the informant indicates that it is probably unhealthy. UnhealthyAfterSilenceDurationSeconds uint `json:"unhealthyAfterSilenceDurationSeconds"` // UnhealthyStartupGracePeriodSeconds gives the duration, in seconds, after which we will no // longer excuse total VM informant failures - i.e. when unhealthyAfterSilenceDurationSeconds // kicks in. UnhealthyStartupGracePeriodSeconds uint `json:"unhealthyStartupGracePeriodSeconds"` }
type InformantServer ¶
type InformantServer struct {
// contains filtered or unexported fields
}
func NewInformantServer ¶
func NewInformantServer( ctx context.Context, runner *Runner, updatedInformant util.CondChannelSender, upscaleRequested util.CondChannelSender, ) (*InformantServer, util.SignalReceiver, error)
NewInformantServer starts an InformantServer, returning it and a signal receiver that will be signalled when it exits.
func (*InformantServer) Downscale ¶
func (s *InformantServer) Downscale(ctx context.Context, to api.Resources) (*api.DownscaleResult, error)
Downscale makes a request to the informant's /downscale endpoit with the api.Resources
This method MUST NOT be called while holding i.server.runner.lock OR i.server.requestLock.
func (*InformantServer) ExitStatus ¶
func (s *InformantServer) ExitStatus() *InformantServerExitStatus
ExitStatus returns the InformantServerExitStatus associated with the server, if it has been instructed to exit
This method MUST NOT be called while holding s.runner.lock.
func (*InformantServer) HealthCheck ¶ added in v0.7.0
func (s *InformantServer) HealthCheck(ctx context.Context) (*api.InformantHealthCheckResp, error)
HealthCheck makes a request to the informant's /health-check endpoint, using the server's ID.
This method MUST be called while holding i.server.requestLock AND NOT i.server.runner.lock.
func (*InformantServer) RegisterWithInformant ¶
func (s *InformantServer) RegisterWithInformant(ctx context.Context) error
RegisterWithInformant sends a /register request to the VM Informant
If called after a prior success, this method will panic. If the server has already exited, this method will return InformantServerAlreadyExitedError.
On certain errors, this method will force the server to exit. This can be checked by calling s.ExitStatus() and checking for a non-nil result.
This method MUST NOT be called while holding s.requestLock OR s.runner.lock.
type InformantServerExitStatus ¶
type InformantServerExitStatus struct { // Err is the error, if any, that caused the server to exit. This is only non-nil when context // used to start the server becomes canceled (i.e. the Runner is exiting). Err error // RetryShouldFix is true if simply retrying should resolve err. This is true when e.g. the // informant responds with a 404 to a downscale or upscale request - it might've restarted, so // we just need to re-register. RetryShouldFix bool }
type InformantServerMode ¶
type InformantServerMode string
const ( InformantServerUnconfirmed InformantServerMode = "unconfirmed" InformantServerSuspended InformantServerMode = "suspended" InformantServerRunning InformantServerMode = "running" )
type InformantServerState ¶
type InformantServerState struct { Desc api.AgentDesc `json:"desc"` SeqNum uint64 `json:"seqNum"` ReceivedIDCheck bool `json:"receivedIDCheck"` MadeContact bool `json:"madeContact"` ProtoVersion *api.InformantProtoVersion `json:"protoVersion"` Mode InformantServerMode `json:"mode"` ExitStatus *InformantServerExitStatus `json:"exitStatus"` }
InformantServerState is the serializable state of the InformantServer, produced by calls to the Runner's State() method.
type MainRunner ¶
type MetricsConfig ¶
type MetricsConfig struct { // LoadMetricPrefix is the prefix at the beginning of the load metrics that we use. For // node_exporter, this is "node_", and for vector it's "host_" LoadMetricPrefix string `json:"loadMetricPrefix"` // RequestTimeoutSeconds gives the timeout duration, in seconds, for metrics requests RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // SecondsBetweenRequests sets the number of seconds to wait between metrics requests SecondsBetweenRequests uint `json:"secondsBetweenRequests"` }
MetricsConfig defines a few parameters for metrics requests to the VM
type PromMetrics ¶ added in v0.6.0
type PromMetrics struct {
// contains filtered or unexported fields
}
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner is per-VM Pod god object responsible for handling everything
It primarily operates as a source of shared data for a number of long-running tasks. For additional general information, refer to the comment at the top of this file.
type RunnerLogger ¶
type RunnerLogger struct {
// contains filtered or unexported fields
}
func (RunnerLogger) Errorf ¶
func (l RunnerLogger) Errorf(format string, args ...interface{})
func (RunnerLogger) Fatalf ¶
func (l RunnerLogger) Fatalf(format string, args ...interface{})
func (RunnerLogger) Infof ¶
func (l RunnerLogger) Infof(format string, args ...interface{})
func (RunnerLogger) Warningf ¶
func (l RunnerLogger) Warningf(format string, args ...interface{})
type RunnerState ¶
type RunnerState struct { LogPrefix string `json:"logPrefix"` PodIP string `json:"podIP"` VM api.VmInfo `json:"vm"` LastMetrics *api.Metrics `json:"lastMetrics"` Scheduler *SchedulerState `json:"scheduler"` Server *InformantServerState `json:"server"` Informant *api.InformantDesc `json:"informant"` ComputeUnit *api.Resources `json:"computeUnit"` LastApproved *api.Resources `json:"lastApproved"` LastSchedulerError error `json:"lastSchedulerError"` LastInformantError error `json:"lastInformantError"` BackgroundWorkerCount int64 `json:"backgroundWorkerCount"` SchedulerRespondedWithMigration bool `json:"migrationStarted"` }
RunnerState is the serializable state of the Runner, extracted by its State method
type ScalingConfig ¶
type ScalingConfig struct { // RequestTimeoutSeconds gives the timeout duration, in seconds, for VM patch requests RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // DefaultConfig gives the default scaling config, to be used if there is no configuration // supplied with the "autoscaling.neon.tech/config" annotation. DefaultConfig api.ScalingConfig `json:"defaultConfig"` }
ScalingConfig defines the scheduling we use for scaling up and down
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler stores relevant state for a particular scheduler that a Runner is (or has) connected to
func (*Scheduler) DoRequest ¶
func (s *Scheduler) DoRequest(ctx context.Context, reqData *api.AgentRequest) (*api.PluginResponse, error)
SendRequest implements all of the tricky logic for requests sent to the scheduler plugin
This method checks: * That the response is semantically valid * That the response matches with the state of s.runner.vm, if s.runner.scheduler == s
This method may set:
- s.fatalError
- s.runner.{computeUnit,lastApproved,lastSchedulerError,schedulerRespondedWithMigration}, if s.runner.scheduler == s.
This method MAY ALSO call s.runner.shutdown(), if s.runner.scheduler == s.
This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock.
func (*Scheduler) Register ¶
Register performs the initial request required to register with a scheduler
This method is called immediately after the Scheduler is created, and may be called subsequent times if the initial request fails.
signalOk will be called if the request succeeds, with s.runner.lock held - but only if s.runner.scheduler == s.
This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock
type SchedulerConfig ¶
type SchedulerConfig struct { // SchedulerName is the name of the scheduler we're expecting to communicate with. // // Any VMs that don't have a matching Spec.SchedulerName will not be autoscaled. SchedulerName string `json:"schedulerName"` // RequestTimeoutSeconds gives the timeout duration, in seconds, for requests to the scheduler // // If zero, requests will have no timeout. RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` // RequestPort defines the port to access the scheduler's ✨special✨ API with RequestPort uint16 `json:"requestPort"` }
SchedulerConfig defines a few parameters for scheduler requests
type SchedulerState ¶
type SchedulerState struct { LogPrefix string `json:"logPrefix"` Info schedulerInfo `json:"info"` Registered bool `json:"registered"` FatalError error `json:"fatalError"` }
SchedulerState is the state of a Scheduler, constructed as part of a Runner's State Method
type VMNodeIndex ¶ added in v0.6.0
type VMNodeIndex struct {
// contains filtered or unexported fields
}
VMNodeIndex is a util.WatchIndex that stores all of the VMs for a particular node
We have to implement this ourselves because K8s does not (as of 2023-04-04) support field selectors on CRDs, so we can't have the API server filter out VMs for us.
For more info, see: https://github.com/kubernetes/kubernetes/issues/53459 This comment in particular was particularly instructive: https://github.com/kubernetes/kubernetes/issues/53459#issuecomment-1146200268
func NewVMNodeIndex ¶ added in v0.6.0
func NewVMNodeIndex(node string) *VMNodeIndex
func (*VMNodeIndex) Add ¶ added in v0.6.0
func (i *VMNodeIndex) Add(vm *vmapi.VirtualMachine)
func (*VMNodeIndex) Delete ¶ added in v0.6.0
func (i *VMNodeIndex) Delete(vm *vmapi.VirtualMachine)
func (*VMNodeIndex) List ¶ added in v0.6.0
func (i *VMNodeIndex) List() []*vmapi.VirtualMachine
func (*VMNodeIndex) Update ¶ added in v0.6.0
func (i *VMNodeIndex) Update(oldVM, newVM *vmapi.VirtualMachine)
type VMStoreForNode ¶ added in v0.6.0
type VMStoreForNode = util.IndexedWatchStore[vmapi.VirtualMachine, *VMNodeIndex]
type VMUpdateReason ¶
type VMUpdateReason string
VMUpdateReason provides context to (*Runner).updateVMResources about why an update to the VM's resources has been requested
const ( UpdatedMetrics VMUpdateReason = "metrics" UpscaleRequested VMUpdateReason = "upscale requested" RegisteredScheduler VMUpdateReason = "scheduler" UpdatedVMInfo VMUpdateReason = "updated VM info" )