agent

package
v0.17.11-pre1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2023 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MinMonitorProtocolVersion api.MonitorProtoVersion = api.MonitorProtoV1_0
	MaxMonitorProtocolVersion api.MonitorProtoVersion = api.MonitorProtoV1_0
)
View Source
const (
	RunnerRestartMinWaitSeconds = 5
	RunnerRestartMaxWaitSeconds = 10
)

FIXME: make these timings configurable.

View Source
const (
	MinInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV1_0
	MaxInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV2_0
)

The autoscaler-agent currently supports v1.0 to v2.0 of the agent<->informant protocol.

If you update either of these values, make sure to also update VERSIONING.md.

View Source
const PluginProtocolVersion api.PluginProtoVersion = api.PluginProtoV2_0

PluginProtocolVersion is the current version of the agent<->scheduler plugin in use by this autoscaler-agent.

Currently, each autoscaler-agent supports only one version at a time. In the future, this may change.

Variables

View Source
var (
	InformantServerAlreadyExitedError error = errors.New("Informant server has already exited")
	InformantServerSuspendedError     error = errors.New("Informant server is currently suspended")
	InformantServerUnconfirmedError   error = errors.New("Informant server has not yet been confirmed")
	InformantServerNotCurrentError    error = errors.New("Informant server has been replaced")
)

Functions

func IsNormalInformantError

func IsNormalInformantError(err error) bool

IsNormalInformantError returns true if the error is one of the "expected" errors that can occur in valid exchanges - due to unavoidable raciness or otherwise.

Types

type AtomicUpdateState added in v0.17.7

type AtomicUpdateState struct {
	ComputeUnit      api.Resources
	Metrics          api.Metrics
	VM               api.VmInfo
	LastApproved     api.Resources
	RequestedUpscale api.MoreResources
	Config           api.ScalingConfig
}

AtomicUpdateState holds some pre-validated data for (*Runner).updateVMResources, fetched atomically (i.e. all at once, while holding r.lock) with the (*Runner).atomicState method

Because atomicState is able to return nil when there isn't yet enough information to update the VM's resources, some validation is already guaranteed by representing the data without pointers.

func (*AtomicUpdateState) DesiredVMState added in v0.17.7

func (s *AtomicUpdateState) DesiredVMState(allowDecrease bool) api.Resources

DesiredVMState calculates what the resource allocation to the VM should be, given the metrics and current state.

type Config

type Config struct {
	Scaling   ScalingConfig    `json:"scaling"`
	Informant InformantConfig  `json:"informant"`
	Metrics   MetricsConfig    `json:"metrics"`
	Scheduler SchedulerConfig  `json:"scheduler"`
	Monitor   MonitorConfig    `json:"monitor"`
	Billing   *billing.Config  `json:"billing,omitempty"`
	DumpState *DumpStateConfig `json:"dumpState"`
}

func ReadConfig

func ReadConfig(path string) (*Config, error)

type Dispatcher added in v0.16.3

type Dispatcher struct {
	// contains filtered or unexported fields
}

The Dispatcher is the main object managing the websocket connection to the monitor. For more information on the protocol, see pkg/api/types.go

func NewDispatcher added in v0.16.3

func NewDispatcher(
	ctx context.Context,
	logger *zap.Logger,
	addr string,
	parent *InformantServer,
) (*Dispatcher, error)

Create a new Dispatcher, establishing a connection with the informant. Note that this does not immediately start the Dispatcher. Call Run() to start it.

func (*Dispatcher) Call added in v0.16.3

func (disp *Dispatcher) Call(
	ctx context.Context,
	timeout time.Duration,
	messageType string,
	message any,
) (*MonitorResult, error)

Make a request to the monitor and wait for a response. The value passed as message must be a valid value to send to the monitor. See the docs for SerializeInformantMessage for more.

func (*Dispatcher) HandleMessage added in v0.16.3

func (disp *Dispatcher) HandleMessage(
	ctx context.Context,
	logger *zap.Logger,
	handlers messageHandlerFuncs,
) error

Handle messages from the monitor. Make sure that all message types the monitor can send are included in the inner switch statement.

type DumpStateConfig added in v0.5.0

type DumpStateConfig struct {
	// Port is the port to serve on
	Port uint16 `json:"port"`
	// TimeoutSeconds gives the maximum duration, in seconds, that we allow for a request to dump
	// internal state.
	TimeoutSeconds uint `json:"timeoutSeconds"`
}

DumpStateConfig configures the endpoint to dump all internal state

type EnvArgs

type EnvArgs struct {
	// ConfigPath gives the path to read static configuration from. It is taken from the CONFIG_PATH
	// environment variable.
	ConfigPath string

	// K8sNodeName is the Kubernetes node the autoscaler agent is running on. It is taken from the
	// K8S_NODE_NAME environment variable, which is set equal to the pod's Spec.NodeName.
	//
	// The Kubernetes documentation doesn't say this, but the NodeName is always populated with the
	// final node the pod was placed on by the time the environment variables are set.
	K8sNodeName string

	// K8sPodIP is the IP address of the Kubernetes pod that this autoscaler-agent is running in
	K8sPodIP string
}

EnvArgs stores the static configuration data assigned to the autoscaler agent by its environment

func ArgsFromEnv

func ArgsFromEnv() (EnvArgs, error)

type HttpMuxServer added in v0.15.0

type HttpMuxServer struct {
	// contains filtered or unexported fields
}

HttpMuxServer provides a way to multiplex multiple http.ServeMux instances on a single HTTP server, so that requests are routed to the correct ServeMux based on a prefix at the beginning of the URL path.

For example, if you call RegisterMux("foo", myMux), then all requests to "/foo/*" are routed to 'myMux' instance.

(Why is this needed? You could register all the routes directly in one giant http.ServeMux, but http.ServeMux doesn't provide any way to unregister patterns.)

func StartHttpMuxServer added in v0.15.0

func StartHttpMuxServer(
	logger *zap.Logger,
	port int,
) (*HttpMuxServer, error)

Create a new HttpMuxServer, listening on the given port

func (*HttpMuxServer) RegisterMux added in v0.15.0

func (s *HttpMuxServer) RegisterMux(muxID string, subServer *http.ServeMux) error

func (*HttpMuxServer) ServeHTTP added in v0.15.0

func (s *HttpMuxServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

Implements http.Handler

func (*HttpMuxServer) UnregisterMux added in v0.15.0

func (s *HttpMuxServer) UnregisterMux(muxID string)

type InformantConfig

type InformantConfig struct {
	// ServerPort is the port that the VM informant serves from
	ServerPort uint16 `json:"serverPort"`

	// CallbackPort is the port that the agent listens on for informant -> agent requests
	CallbackPort int `json:"callbackPort"`

	// RetryServerMinWaitSeconds gives the minimum duration, in seconds, that we must wait between the
	// start of one InformantServer and the next
	//
	// This "minimum wait" is only used when thethe
	RetryServerMinWaitSeconds uint `json:"retryServerMinWaitSeconds"`
	// RetryServerNormalWaitSeconds gives the typical duration, in seconds, that we wait between an
	// InformantServer failing and our retry.
	RetryServerNormalWaitSeconds uint `json:"retryServerNormalWaitSeconds"`
	// RegisterRetrySeconds gives the duration, in seconds, to wait between retrying a failed
	// register request.
	RegisterRetrySeconds uint `json:"registerRetrySeconds"`

	// RequestTimeoutSeconds gives the timeout for any individual request to the informant, except
	// for those with separately-defined values below.
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
	// RegisterTimeoutSeconds gives the timeout duration, in seconds, for a register request.
	//
	// This is a separate field from RequestTimeoutSeconds because registering may require that the
	// informant suspend a previous agent, which could take longer.
	RegisterTimeoutSeconds uint `json:"registerTimeoutSeconds"`
	// DownscaleTimeoutSeconds gives the timeout duration, in seconds, for a downscale request.
	//
	// This is a separate field from RequestTimeoutSeconds it's possible that downscaling may
	// require some non-trivial work that we want to allow to complete.
	DownscaleTimeoutSeconds uint `json:"downscaleTimeoutSeconds"`

	// UnhealthyAfterSilenceDurationSeconds gives the duration, in seconds, after which failing to
	// receive a successful request from the informant indicates that it is probably unhealthy.
	UnhealthyAfterSilenceDurationSeconds uint `json:"unhealthyAfterSilenceDurationSeconds"`
	// UnhealthyStartupGracePeriodSeconds gives the duration, in seconds, after which we will no
	// longer excuse total VM informant failures - i.e. when unhealthyAfterSilenceDurationSeconds
	// kicks in.
	UnhealthyStartupGracePeriodSeconds uint `json:"unhealthyStartupGracePeriodSeconds"`
}

type InformantServer

type InformantServer struct {
	// contains filtered or unexported fields
}

func NewInformantServer

func NewInformantServer(
	ctx context.Context,
	logger *zap.Logger,
	runner *Runner,
	updatedInformant util.CondChannelSender,
	upscaleRequested util.CondChannelSender,
) (*InformantServer, util.SignalReceiver[struct{}], error)

NewInformantServer starts an InformantServer, returning it and a signal receiver that will be signalled when it exits.

func (*InformantServer) Downscale

func (s *InformantServer) Downscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error)

Downscale makes a request to the informant's /downscale endpoint with the api.Resources

This method MUST be called while holding s.requestLock AND NOT s.runner.lock

func (*InformantServer) ExitStatus

func (s *InformantServer) ExitStatus() *InformantServerExitStatus

ExitStatus returns the InformantServerExitStatus associated with the server, if it has been instructed to exit

This method MUST NOT be called while holding s.runner.lock.

func (*InformantServer) HealthCheck added in v0.7.0

func (s *InformantServer) HealthCheck(ctx context.Context, logger *zap.Logger) (*api.InformantHealthCheckResp, error)

HealthCheck makes a request to the informant's /health-check endpoint, using the server's ID.

This method MUST be called while holding i.server.requestLock AND NOT i.server.runner.lock.

func (*InformantServer) MonitorDownscale added in v0.16.3

func (s *InformantServer) MonitorDownscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error)

MonitorDownscale is the equivalent of (*InformantServer).Downscale for when we're connected to a monitor.

This method MUST be called while holding s.requestLock AND NOT s.runner.lock

*Note*: Locking requestLock is not technically necessary, but it allows for better serialization. For example, if this function exits, we know that no other dispatcher calls will be made.

func (*InformantServer) MonitorHealthCheck added in v0.16.3

func (s *InformantServer) MonitorHealthCheck(ctx context.Context, logger *zap.Logger) error

MonitorHealthCheck is the equivalent of (*InformantServer).HealthCheck for when we're connected to a monitor.

This method MUST be called while holding s.requestLock AND NOT s.runner.lock

*Note*: Locking requestLock is not technically necessary, but it allows for better serialization. For example, if this function exits, we know that no other dispatcher calls will be made.

func (*InformantServer) MonitorUpscale added in v0.16.3

func (s *InformantServer) MonitorUpscale(ctx context.Context, logger *zap.Logger, to api.Resources) error

MonitorUpscale is the equivalent of (*InformantServer).Upscale for when we're connected to a monitor.

This method MUST be called while holding s.requestLock AND NOT s.runner.lock

*Note*: Locking requestLock is not technically necessary, but it allows for better serialization. For example, if this function exits, we know that no other dispatcher calls will be made.

func (*InformantServer) RegisterWithInformant

func (s *InformantServer) RegisterWithInformant(ctx context.Context, logger *zap.Logger) error

RegisterWithInformant sends a /register request to the VM Informant

If called after a prior success, this method will panic. If the server has already exited, this method will return InformantServerAlreadyExitedError.

On certain errors, this method will force the server to exit. This can be checked by calling s.ExitStatus() and checking for a non-nil result.

This method MUST NOT be called while holding s.requestLock OR s.runner.lock.

func (*InformantServer) Upscale

func (s *InformantServer) Upscale(ctx context.Context, logger *zap.Logger, to api.Resources) error

This method MUST be called while holding s.requestLock AND NOT s.runner.lock

type InformantServerExitStatus

type InformantServerExitStatus struct {
	// Err is the error, if any, that caused the server to exit. This is only non-nil when context
	// used to start the server becomes canceled (i.e. the Runner is exiting).
	Err error
	// RetryShouldFix is true if simply retrying should resolve err. This is true when e.g. the
	// informant responds with a 404 to a downscale or upscale request - it might've restarted, so
	// we just need to re-register.
	RetryShouldFix bool
}

type InformantServerMode

type InformantServerMode string
const (
	InformantServerUnconfirmed InformantServerMode = "unconfirmed"
	InformantServerSuspended   InformantServerMode = "suspended"
	InformantServerRunning     InformantServerMode = "running"
)

type InformantServerState

type InformantServerState struct {
	Desc            api.AgentDesc              `json:"desc"`
	SeqNum          uint64                     `json:"seqNum"`
	ReceivedIDCheck bool                       `json:"receivedIDCheck"`
	MadeContact     bool                       `json:"madeContact"`
	ProtoVersion    *api.InformantProtoVersion `json:"protoVersion"`
	Mode            InformantServerMode        `json:"mode"`
	ExitStatus      *InformantServerExitStatus `json:"exitStatus"`
}

InformantServerState is the serializable state of the InformantServer, produced by calls to the Runner's State() method.

type MainRunner

type MainRunner struct {
	EnvArgs    EnvArgs
	Config     *Config
	KubeClient *kubernetes.Clientset
	VMClient   *vmclient.Clientset
}

func (MainRunner) Run

func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error

type MetricsConfig

type MetricsConfig struct {
	// LoadMetricPrefix is the prefix at the beginning of the load metrics that we use. For
	// node_exporter, this is "node_", and for vector it's "host_"
	LoadMetricPrefix string `json:"loadMetricPrefix"`
	// RequestTimeoutSeconds gives the timeout duration, in seconds, for metrics requests
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
	// SecondsBetweenRequests sets the number of seconds to wait between metrics requests
	SecondsBetweenRequests uint `json:"secondsBetweenRequests"`
}

MetricsConfig defines a few parameters for metrics requests to the VM

type MonitorConfig added in v0.16.3

type MonitorConfig struct {
	ResponseTimeoutSeconds uint `json:"responseTimeoutSeconds"`
	// ConnectionTimeoutSeconds gives how long we may take to connect to the
	// monitor before cancelling.
	ConnectionTimeoutSeconds uint `json:"connectionTimeoutSeconds"`
}

type MonitorResult added in v0.16.3

type MonitorResult struct {
	Result       *api.DownscaleResult
	Confirmation *api.UpscaleConfirmation
	HealthCheck  *api.HealthCheck
}

This struct represents the result of a dispatcher.Call. Because the SignalSender passed in can only be generic over one type - we have this mock enum. Only one field should ever be non-nil, and it should always be clear which field is readable. For example, the caller of dispatcher.call(HealthCheck { .. }) should only read the healthcheck field.

type PromMetrics added in v0.6.0

type PromMetrics struct {
	// contains filtered or unexported fields
}

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner is per-VM Pod god object responsible for handling everything

It primarily operates as a source of shared data for a number of long-running tasks. For additional general information, refer to the comment at the top of this file.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util.CondChannelReceiver) error

Run is the main entrypoint to the long-running per-VM pod tasks

func (*Runner) Spawn

func (r *Runner) Spawn(ctx context.Context, logger *zap.Logger, vmInfoUpdated util.CondChannelReceiver)

func (*Runner) State

func (r *Runner) State(ctx context.Context) (*RunnerState, error)

type RunnerState

type RunnerState struct {
	PodIP                 string                `json:"podIP"`
	VM                    api.VmInfo            `json:"vm"`
	LastMetrics           *api.Metrics          `json:"lastMetrics"`
	RequestedUpscale      api.MoreResources     `json:"requestedUpscale"`
	Scheduler             *SchedulerState       `json:"scheduler"`
	Server                *InformantServerState `json:"server"`
	Informant             *api.InformantDesc    `json:"informant"`
	ComputeUnit           *api.Resources        `json:"computeUnit"`
	LastApproved          *api.Resources        `json:"lastApproved"`
	LastSchedulerError    error                 `json:"lastSchedulerError"`
	LastInformantError    error                 `json:"lastInformantError"`
	BackgroundWorkerCount int64                 `json:"backgroundWorkerCount"`

	SchedulerRespondedWithMigration bool `json:"migrationStarted"`
}

RunnerState is the serializable state of the Runner, extracted by its State method

type ScalingConfig

type ScalingConfig struct {
	// RequestTimeoutSeconds gives the timeout duration, in seconds, for VM patch requests
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
	// DefaultConfig gives the default scaling config, to be used if there is no configuration
	// supplied with the "autoscaling.neon.tech/config" annotation.
	DefaultConfig api.ScalingConfig `json:"defaultConfig"`
}

ScalingConfig defines the scheduling we use for scaling up and down

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler stores relevant state for a particular scheduler that a Runner is (or has) connected to

func (*Scheduler) DoRequest

func (s *Scheduler) DoRequest(ctx context.Context, logger *zap.Logger, reqData *api.AgentRequest) (*api.PluginResponse, error)

SendRequest implements all of the tricky logic for requests sent to the scheduler plugin

This method checks: * That the response is semantically valid * That the response matches with the state of s.runner.vm, if s.runner.scheduler == s

This method may set:

  • s.fatalError
  • s.runner.{computeUnit,lastApproved,lastSchedulerError,schedulerRespondedWithMigration}, if s.runner.scheduler == s.

This method MAY ALSO call s.runner.shutdown(), if s.runner.scheduler == s.

This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock.

func (*Scheduler) Register

func (s *Scheduler) Register(ctx context.Context, logger *zap.Logger, signalOk func()) error

Register performs the initial request required to register with a scheduler

This method is called immediately after the Scheduler is created, and may be called subsequent times if the initial request fails.

signalOk will be called if the request succeeds, with s.runner.lock held - but only if s.runner.scheduler == s.

This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock

type SchedulerConfig

type SchedulerConfig struct {
	// SchedulerName is the name of the scheduler we're expecting to communicate with.
	//
	// Any VMs that don't have a matching Spec.SchedulerName will not be autoscaled.
	SchedulerName string `json:"schedulerName"`
	// RequestTimeoutSeconds gives the timeout duration, in seconds, for requests to the scheduler
	//
	// If zero, requests will have no timeout.
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
	// RequestPort defines the port to access the scheduler's ✨special✨ API with
	RequestPort uint16 `json:"requestPort"`
}

SchedulerConfig defines a few parameters for scheduler requests

type SchedulerState

type SchedulerState struct {
	Info       schedwatch.SchedulerInfo `json:"info"`
	Registered bool                     `json:"registered"`
	FatalError error                    `json:"fatalError"`
}

SchedulerState is the state of a Scheduler, constructed as part of a Runner's State Method

type StateDump added in v0.5.0

type StateDump struct {
	Stopped   bool           `json:"stopped"`
	BuildInfo util.BuildInfo `json:"buildInfo"`
	Pods      []podStateDump `json:"pods"`
}

type VMUpdateReason

type VMUpdateReason string

VMUpdateReason provides context to (*Runner).updateVMResources about why an update to the VM's resources has been requested

const (
	UpdatedMetrics      VMUpdateReason = "metrics"
	UpscaleRequested    VMUpdateReason = "upscale requested"
	RegisteredScheduler VMUpdateReason = "scheduler"
	UpdatedVMInfo       VMUpdateReason = "updated VM info"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL