Documentation
¶
Index ¶
- Constants
- Variables
- func NewSchedulerServer(ctx context.Context, opts ...SchedulerServerOption) *schedulerServer
- type Agent
- type Broker
- func (b *Broker) CalcAgentStats() <-chan []agentStats
- func (b *Broker) CalcConsumerdStats() <-chan []consumerdStats
- func (b *Broker) GetAgent(uuid string) (agent *Agent, ok bool)
- func (b *Broker) NewAgentTaskStream(stream types.Scheduler_StreamIncomingTasksServer) error
- func (b *Broker) NewConsumerdTaskStream(stream types.Scheduler_StreamOutgoingTasksServer) error
- func (b *Broker) PreReceive(rt *route, req *types.CompileRequest) (action HookAction)
- func (b *Broker) TaskStats() taskStats
- type BrokerOption
- type BrokerOptions
- type Consumerd
- type HookAction
- type Optimizer
- type Router
- type RouterHook
- type RouterOption
- type RouterOptions
- type SchedulerServerOption
- type SchedulerServerOptions
- type ToolchainWatcher
Constants ¶
const MaxTokens = 1000
MaxTokens is an arbitrary upper limit on the number of concurrent tasks an agent can run.
Variables ¶
var ( ErrNoAgents = errors.New("No available agents can run this task") ErrStreamClosed = errors.New("Task stream closed") ErrRequestRejected = errors.New("The task has been rejected by the server") ErrInvalidToolchain = errors.New("Invalid or nil toolchain") )
var SchedulerServerContext context.Context
Functions ¶
func NewSchedulerServer ¶
func NewSchedulerServer( ctx context.Context, opts ...SchedulerServerOption, ) *schedulerServer
Types ¶
type Agent ¶
type Agent struct { *sync.RWMutex Toolchains *metrics.Toolchains Stream types.Scheduler_StreamIncomingTasksServer AvailableTokens chan struct{} LockedTokens chan struct{} // contains filtered or unexported fields }
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func NewBroker ¶
func NewBroker( ctx context.Context, tcw ToolchainWatcher, opts ...BrokerOption, ) *Broker
func (*Broker) CalcAgentStats ¶
func (b *Broker) CalcAgentStats() <-chan []agentStats
func (*Broker) CalcConsumerdStats ¶
func (b *Broker) CalcConsumerdStats() <-chan []consumerdStats
func (*Broker) NewAgentTaskStream ¶
func (b *Broker) NewAgentTaskStream( stream types.Scheduler_StreamIncomingTasksServer, ) error
func (*Broker) NewConsumerdTaskStream ¶
func (b *Broker) NewConsumerdTaskStream( stream types.Scheduler_StreamOutgoingTasksServer, ) error
func (*Broker) PreReceive ¶
func (b *Broker) PreReceive( rt *route, req *types.CompileRequest, ) (action HookAction)
type BrokerOption ¶
type BrokerOption func(*BrokerOptions)
func CacheClient ¶
func CacheClient(client types.CacheClient) BrokerOption
func MonitorClient ¶
func MonitorClient(client types.MonitorClient) BrokerOption
type BrokerOptions ¶
type BrokerOptions struct {
// contains filtered or unexported fields
}
func (*BrokerOptions) Apply ¶
func (o *BrokerOptions) Apply(opts ...BrokerOption)
type Consumerd ¶
type Consumerd struct { *sync.RWMutex Toolchains *metrics.Toolchains Stream types.Scheduler_StreamOutgoingTasksServer // contains filtered or unexported fields }
type HookAction ¶
type HookAction int
const ( ProcessRequestNormally HookAction = iota RejectRequest RequestIntercepted )
type Optimizer ¶
type Optimizer struct {
// contains filtered or unexported fields
}
Optimizer is responsible for adjusting the preferred remote usage limits in real-time based on the state of the cluster.
By default, the Optimizer will set the preferred remote usage limit to be the sum of all agents' usage limits.
If a cache server is running and posts a cache usage percent, the limit controller will increase the remote usage limit by that percentage, to account for the fact that on average that percent of tasks will never make it to an agent and will therefore not affect actual remote usage.
Each agent posts their cpu stats every second. These stats contain total cpu usage in nanoseconds and cgroup throttling info. Since we know how many tasks are running on any agent at any given time, we can use the average amount of tasks running between every interval and the cpu usage for that interval to solve an optimization problem of simulateneously maximizing cpu usage (or rather, minimizing unused cpu) and minimizing throttling. Too much throttling means the cgroup is overloaded but too little cpu usage means the agent is not being used to its full capacity.
func NewOptimizer ¶
func (*Optimizer) UsageLimitMultiplierChanged ¶
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
func (*Router) AddReceiver ¶
func (*Router) UpdateSenderToolchains ¶
func (r *Router) UpdateSenderToolchains( uuid string, tcs *metrics.Toolchains, )
type RouterHook ¶
type RouterHook interface {
PreReceive(*route, request) HookAction
}
type RouterOption ¶
type RouterOption func(*RouterOptions)
func WithHooks ¶
func WithHooks(hooks ...RouterHook) RouterOption
type RouterOptions ¶
type RouterOptions struct {
// contains filtered or unexported fields
}
func (*RouterOptions) Apply ¶
func (o *RouterOptions) Apply(opts ...RouterOption)
type SchedulerServerOption ¶
type SchedulerServerOption func(*SchedulerServerOptions)
func WithCacheClient ¶
func WithCacheClient(cacheClient types.CacheClient) SchedulerServerOption
func WithMonitorClient ¶
func WithMonitorClient(monClient types.MonitorClient) SchedulerServerOption
type SchedulerServerOptions ¶
type SchedulerServerOptions struct {
// contains filtered or unexported fields
}
func (*SchedulerServerOptions) Apply ¶
func (o *SchedulerServerOptions) Apply(opts ...SchedulerServerOption)
type ToolchainWatcher ¶
type ToolchainWatcher interface {
WatchToolchains(uuid string) chan *metrics.Toolchains
}
func NewDefaultToolchainWatcher ¶
func NewDefaultToolchainWatcher( ctx context.Context, client types.MonitorClient, ) ToolchainWatcher