scheduler

package
v0.0.0-...-211e212 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2021 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const MaxTokens = 1000

MaxTokens is an arbitrary upper limit on the number of concurrent tasks an agent can run.

Variables

View Source
var (
	ErrNoAgents         = errors.New("No available agents can run this task")
	ErrStreamClosed     = errors.New("Task stream closed")
	ErrRequestRejected  = errors.New("The task has been rejected by the server")
	ErrInvalidToolchain = errors.New("Invalid or nil toolchain")
)
View Source
var SchedulerServerContext context.Context

Functions

func NewSchedulerServer

func NewSchedulerServer(
	ctx context.Context,
	opts ...SchedulerServerOption,
) *schedulerServer

Types

type Agent

type Agent struct {
	*sync.RWMutex

	Toolchains *metrics.Toolchains
	Stream     types.Scheduler_StreamIncomingTasksServer

	AvailableTokens chan struct{}
	LockedTokens    chan struct{}
	// contains filtered or unexported fields
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(
	ctx context.Context,
	tcw ToolchainWatcher,
	opts ...BrokerOption,
) *Broker

func (*Broker) CalcAgentStats

func (b *Broker) CalcAgentStats() <-chan []agentStats

func (*Broker) CalcConsumerdStats

func (b *Broker) CalcConsumerdStats() <-chan []consumerdStats

func (*Broker) GetAgent

func (b *Broker) GetAgent(uuid string) (agent *Agent, ok bool)

func (*Broker) NewAgentTaskStream

func (b *Broker) NewAgentTaskStream(
	stream types.Scheduler_StreamIncomingTasksServer,
) error

func (*Broker) NewConsumerdTaskStream

func (b *Broker) NewConsumerdTaskStream(
	stream types.Scheduler_StreamOutgoingTasksServer,
) error

func (*Broker) PreReceive

func (b *Broker) PreReceive(
	rt *route,
	req *types.CompileRequest,
) (action HookAction)

func (*Broker) TaskStats

func (b *Broker) TaskStats() taskStats

type BrokerOption

type BrokerOption func(*BrokerOptions)

func CacheClient

func CacheClient(client types.CacheClient) BrokerOption

func MonitorClient

func MonitorClient(client types.MonitorClient) BrokerOption

type BrokerOptions

type BrokerOptions struct {
	// contains filtered or unexported fields
}

func (*BrokerOptions) Apply

func (o *BrokerOptions) Apply(opts ...BrokerOption)

type Consumerd

type Consumerd struct {
	*sync.RWMutex

	Toolchains *metrics.Toolchains
	Stream     types.Scheduler_StreamOutgoingTasksServer
	// contains filtered or unexported fields
}

type HookAction

type HookAction int
const (
	ProcessRequestNormally HookAction = iota
	RejectRequest
	RequestIntercepted
)

type Optimizer

type Optimizer struct {
	// contains filtered or unexported fields
}

Optimizer is responsible for adjusting the preferred remote usage limits in real-time based on the state of the cluster.

By default, the Optimizer will set the preferred remote usage limit to be the sum of all agents' usage limits.

If a cache server is running and posts a cache usage percent, the limit controller will increase the remote usage limit by that percentage, to account for the fact that on average that percent of tasks will never make it to an agent and will therefore not affect actual remote usage.

Each agent posts their cpu stats every second. These stats contain total cpu usage in nanoseconds and cgroup throttling info. Since we know how many tasks are running on any agent at any given time, we can use the average amount of tasks running between every interval and the cpu usage for that interval to solve an optimization problem of simulateneously maximizing cpu usage (or rather, minimizing unused cpu) and minimizing throttling. Too much throttling means the cgroup is overloaded but too little cpu usage means the agent is not being used to its full capacity.

func NewOptimizer

func NewOptimizer(
	ctx context.Context,
	client types.MonitorClient,
	broker *Broker,
) *Optimizer

func (*Optimizer) UsageLimitMultiplierChanged

func (o *Optimizer) UsageLimitMultiplierChanged() <-chan float64

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(ctx context.Context, opts ...RouterOption) *Router

func (*Router) AddReceiver

func (r *Router) AddReceiver(agent *Agent) <-chan request

func (*Router) AddSender

func (r *Router) AddSender(cd *Consumerd)

func (*Router) GetRoutes

func (r *Router) GetRoutes() *types.RouteList

func (*Router) Route

func (r *Router) Route(ctx context.Context, req request) error

func (*Router) UpdateSenderToolchains

func (r *Router) UpdateSenderToolchains(
	uuid string,
	tcs *metrics.Toolchains,
)

type RouterHook

type RouterHook interface {
	PreReceive(*route, request) HookAction
}

type RouterOption

type RouterOption func(*RouterOptions)

func WithHooks

func WithHooks(hooks ...RouterHook) RouterOption

type RouterOptions

type RouterOptions struct {
	// contains filtered or unexported fields
}

func (*RouterOptions) Apply

func (o *RouterOptions) Apply(opts ...RouterOption)

type SchedulerServerOption

type SchedulerServerOption func(*SchedulerServerOptions)

func WithCacheClient

func WithCacheClient(cacheClient types.CacheClient) SchedulerServerOption

func WithMonitorClient

func WithMonitorClient(monClient types.MonitorClient) SchedulerServerOption

type SchedulerServerOptions

type SchedulerServerOptions struct {
	// contains filtered or unexported fields
}

func (*SchedulerServerOptions) Apply

type ToolchainWatcher

type ToolchainWatcher interface {
	WatchToolchains(uuid string) chan *metrics.Toolchains
}

func NewDefaultToolchainWatcher

func NewDefaultToolchainWatcher(
	ctx context.Context,
	client types.MonitorClient,
) ToolchainWatcher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL