agent

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MinInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV1_0
	MaxInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV1_1
)

The autoscaler-agent currently supports v1.0 to v1.1 of the agent<->informant protocol.

If you update either of these values, make sure to also update VERSIONING.md.

View Source
const PluginProtocolVersion api.PluginProtoVersion = api.PluginProtoV1_0

PluginProtocolVersion is the current version of the agent<->scheduler plugin in use by this autoscaler-agent.

Currently, each autoscaler-agent supports only one version at a time. In the future, this may change.

Variables

View Source
var (
	InformantServerAlreadyExitedError error = errors.New("Informant server has already exited")
	InformantServerSuspendedError     error = errors.New("Informant server is currently suspended")
	InformantServerUnconfirmedError   error = errors.New("Informant server has not yet been confirmed")
	InformantServerNotCurrentError    error = errors.New("Informant server has been replaced")
)

Functions

func IsNormalInformantError

func IsNormalInformantError(err error) bool

IsNormalInformantError returns true if the error is one of the "expected" errors that can occur in valid exchanges - due to unavoidable raciness or otherwise.

Types

type Config

type Config struct {
	Scaling   ScalingConfig   `json:"scaling"`
	Informant InformantConfig `json:"informant"`
	Metrics   MetricsConfig   `json:"metrics"`
	Scheduler SchedulerConfig `json:"scheduler"`
}

func ReadConfig

func ReadConfig(path string) (*Config, error)

type EnvArgs

type EnvArgs struct {
	// ConfigPath gives the path to read static configuration from. It is taken from the CONFIG_PATH
	// environment variable.
	ConfigPath string

	// K8sNodeName is the Kubernetes node the autoscaler agent is running on. It is taken from the
	// K8S_NODE_NAME environment variable, which is set equal to the pod's Spec.NodeName.
	//
	// The Kubernetes documentation doesn't say this, but the NodeName is always populated with the
	// final node the pod was placed on by the time the environment variables are set.
	K8sNodeName string

	// K8sPodIP is the IP address of the Kubernetes pod that this autoscaler-agent is running in
	K8sPodIP string
}

EnvArgs stores the static configuration data assigned to the autoscaler agent by its environment

func ArgsFromEnv

func ArgsFromEnv() (EnvArgs, error)

type InformantConfig

type InformantConfig struct {
	// ServerPort is the port that the VM informant serves from
	ServerPort uint16 `json:"serverPort"`

	// RetryServerMinWaitSeconds gives the minimum duration, in seconds, that we must wait between the
	// start of one InformantServer and the next
	//
	// This "minimum wait" is only used when thethe
	RetryServerMinWaitSeconds uint `json:"retryServerMinWaitSeconds"`
	// RetryServerNormalWaitSeconds gives the typical duration, in seconds, that we wait between an
	// InformantServer failing and our retry.
	RetryServerNormalWaitSeconds uint `json:"retryServerNormalWaitSeconds"`
	// RegisterRetrySeconds gives the duration, in seconds, to wait between retrying a failed
	// register request.
	RegisterRetrySeconds uint `json:"registerRetrySeconds"`

	// RequestTimeoutSeconds gives the timeout for any individual request to the informant, except
	// for those with separately-defined values below.
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
	// RegisterTimeoutSeconds gives the timeout duration, in seconds, for a register request.
	//
	// This is a separate field from RequestTimeoutSeconds because registering may require that the
	// informant suspend a previous agent, which could take longer.
	RegisterTimeoutSeconds uint `json:"registerTimeoutSeconds"`
	// DownscaleTimeoutSeconds gives the timeout duration, in seconds, for a downscale request.
	//
	// This is a separate field from RequestTimeoutSeconds it's possible that downscaling may
	// require some non-trivial work that we want to allow to complete.
	DownscaleTimeoutSeconds uint `json:"downscaleTimeoutSeconds"`
}

type InformantServer

type InformantServer struct {
	// contains filtered or unexported fields
}

func NewInformantServer

func NewInformantServer(
	ctx context.Context,
	runner *Runner,
	updatedInformant util.CondChannelSender,
	upscaleRequested util.CondChannelSender,
) (*InformantServer, util.SignalReceiver, error)

NewInformantServer starts an InformantServer, returning it and a signal receiver that will be signalled when it exits.

func (*InformantServer) Downscale

Downscale makes a request to the informant's /downscale endpoit with the api.Resources

This method MUST NOT be called while holding i.server.runner.lock OR i.server.requestLock.

func (*InformantServer) ExitStatus

func (s *InformantServer) ExitStatus() *InformantServerExitStatus

ExitStatus returns the InformantServerExitStatus associated with the server, if it has been instructed to exit

This method MUST NOT be called while holding s.runner.lock.

func (*InformantServer) RegisterWithInformant

func (s *InformantServer) RegisterWithInformant(ctx context.Context) error

RegisterWithInformant sends a /register request to the VM Informant

If called after a prior success, this method will panic. If the server has already exited, this method will return InformantServerAlreadyExitedError.

On certain errors, this method will force the server to exit. This can be checked by calling s.ExitStatus() and checking for a non-nil result.

This method MUST NOT be called while holding s.requestLock OR s.runner.lock.

func (*InformantServer) Upscale

func (s *InformantServer) Upscale(ctx context.Context, to api.Resources) error

func (*InformantServer) Valid

func (s *InformantServer) Valid() error

Valid checks if the InformantServer is good to use for communication, returning an error if not

This method can return errors for a number of unavoidably-racy protocol states - errors from this method should be handled as unusual, but not unexpected. Any error returned will be one of InformantServer{AlreadyExited,Suspended,Confirmed}Error.

This method MUST be called while holding s.runner.lock.

type InformantServerExitStatus

type InformantServerExitStatus struct {
	// Err is the non-nil error that caused the server to exit
	Err error
	// RetryShouldFix is true if simply retrying should resolve err. This is true when e.g. the
	// informant responds with a 404 to a downscale or upscale request - it might've restarted, so
	// we just need to re-register.
	RetryShouldFix bool
}

type InformantServerMode

type InformantServerMode string
const (
	InformantServerUnconfirmed InformantServerMode = "unconfirmed"
	InformantServerSuspended   InformantServerMode = "suspended"
	InformantServerRunning     InformantServerMode = "running"
)

type InformantServerState

type InformantServerState struct {
	Desc            api.AgentDesc              `json:"desc"`
	SeqNum          uint64                     `json:"seqNum"`
	ReceivedIDCheck bool                       `json:"receivedIDCheck"`
	MadeContact     bool                       `json:"madeContact"`
	ProtoVersion    *api.InformantProtoVersion `json:"protoVersion"`
	Mode            InformantServerMode        `json:"mode"`
	ExitStatus      *InformantServerExitStatus `json:"exitStatus"`
}

InformantServerState is the serializable state of the InformantServer, produced by calls to the Runner's State() method.

type MainRunner

type MainRunner struct {
	EnvArgs    EnvArgs
	Config     *Config
	KubeClient *kubernetes.Clientset
	VMClient   *vmclient.Clientset
}

func (MainRunner) Run

func (r MainRunner) Run() error

type MetricsConfig

type MetricsConfig struct {
	// LoadMetricPrefix is the prefix at the beginning of the load metrics that we use. For
	// node_exporter, this is "node_", and for vector it's "host_"
	LoadMetricPrefix string `json:"loadMetricPrefix"`
	// RequestTimeoutSeconds gives the timeout duration, in seconds, for metrics requests
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
	// SecondsBetweenRequests sets the number of seconds to wait between metrics requests
	SecondsBetweenRequests uint `json:"secondsBetweenRequests"`
}

MetricsConfig defines a few parameters for metrics requests to the VM

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner is per-VM Pod god object responsible for handling everything

It primarily operates as a source of shared data for a number of long-running tasks. For additional general information, refer to the comment at the top of this file.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context, vmName string) error

Run is the main entrypoint to the long-running per-VM pod tasks

func (*Runner) Spawn

func (r *Runner) Spawn(ctx context.Context, vmName string)

func (*Runner) State

func (r *Runner) State() RunnerState

type RunnerLogger

type RunnerLogger struct {
	// contains filtered or unexported fields
}

func (RunnerLogger) Errorf

func (l RunnerLogger) Errorf(format string, args ...interface{})

func (RunnerLogger) Fatalf

func (l RunnerLogger) Fatalf(format string, args ...interface{})

func (RunnerLogger) Infof

func (l RunnerLogger) Infof(format string, args ...interface{})

func (RunnerLogger) Warningf

func (l RunnerLogger) Warningf(format string, args ...interface{})

type RunnerState

type RunnerState struct {
	LogPrefix             string                `json:"logPrefix"`
	PodIP                 string                `json:"podIP"`
	VM                    api.VmInfo            `json:"vm"`
	LastMetrics           *api.Metrics          `json:"lastMetrics"`
	Scheduler             *SchedulerState       `json:"scheduler"`
	Server                *InformantServerState `json:"server"`
	Informant             *api.InformantDesc    `json:"informant"`
	ComputeUnit           *api.Resources        `json:"computeUnit"`
	LastApproved          *api.Resources        `json:"lastApproved"`
	LastSchedulerError    error                 `json:"lastSchedulerError"`
	LastInformantError    error                 `json:"lastInformantError"`
	BackgroundWorkerCount int64                 `json:"backgroundWorkerCount"`
}

RunnerState is the serializable state of the Runner, extracted by its State method

type ScalingConfig

type ScalingConfig struct {
	// RequestTimeoutSeconds gives the timeout duration, in seconds, for VM patch requests
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
}

ScalingConfig defines the scheduling we use for scaling up and down

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler stores relevant state for a particular scheduler that a Runner is (or has) connected to

Each Scheduler has an associated background worker that checks for deadlocks and panics if it cannot acquire requestLock within a reasonable delay.

Scheduler has some magic to clean up its deadlock checker. Here's what's going on: Basically, a pointer to schedulerInternal is expected to function /kind of/ like a weak pointer to Scheduler. All non-background usage is through Scheduler, and all background usage is through schedulerInternal - so we attach a finalizer to Scheduler that cancels the context of the background workers once the Scheduler gets GC'd.

Any direct usage of schedulerInternal outside of the construction of a Scheduler MUST be considered invalid, and rejected as such.

func (*Scheduler) DoRequest

func (s *Scheduler) DoRequest(ctx context.Context, reqData *api.AgentRequest) (*api.PluginResponse, error)

SendRequest implements all of the tricky logic for requests sent to the scheduler plugin

This method checks: * That the response is semantically valid * That the response matches with the state of s.runner.vm, if s.runner.scheduler == s

This method may set: * s.fatalError * s.runner.{computeUnit,lastApproved,lastSchedulerError}, if s.runner.scheduler == s

FIXME: when support for migration is added back, this method needs to similarly handle that.

This method MUST be called while holding s.requestLock AND NOT s.runner.lock. s.requestLock will not be released during a call to this method.

func (*Scheduler) Register

func (s *Scheduler) Register(ctx context.Context, signalOk func()) error

Register performs the initial request required to register with a scheduler

This method is called immediately after the Scheduler is created, and may be called subsequent times if the initial request fails.

signalOk will be called if the request succeeds, with s.runner.lock held - but only if s.runner.scheduler == s.

This method MUST be called while holding s.requestLock AND NOT s.runner.lock

type SchedulerConfig

type SchedulerConfig struct {
	// SchedulerName is the name of the scheduler we're expecting to communicate with.
	//
	// Any VMs that don't have a matching Spec.SchedulerName will not be autoscaled.
	SchedulerName string `json:"schedulerName"`
	// RequestTimeoutSeconds gives the timeout duration, in seconds, for requests to the scheduler
	//
	// If zero, requests will have no timeout.
	RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
	// RequestPort defines the port to access the scheduler's ✨special✨ API with
	RequestPort uint16 `json:"requestPort"`
}

SchedulerConfig defines a few parameters for scheduler requests

type SchedulerState

type SchedulerState struct {
	LogPrefix  string        `json:"logPrefix"`
	Info       schedulerInfo `json:"info"`
	Registered bool          `json:"registered"`
	FatalError error         `json:"fatalError"`
}

SchedulerState is the state of a Scheduler, constructed as part of a Runner's State Method

type VMUpdateReason

type VMUpdateReason string

VMUpdateReason provides context to (*Runner).updateVMResources about why an update to the VM's resources has been requested

const (
	UpdatedMetrics      VMUpdateReason = "metrics"
	UpscaleRequested    VMUpdateReason = "upscale requested"
	RegisteredScheduler VMUpdateReason = "scheduler"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL