orchestrator

package
v1.3.2-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2024 License: Apache-2.0 Imports: 34 Imported by: 0

README

Package orchestrator

Package orchestrator is responsible for scheduling jobs on a network of compute nodes. It consists of the main components that enable job evaluation, scheduling, and planning. The primary components of the orchestrator package are as follows:

EvaluationBroker

EvaluationBroker is a queue of jobs that require reevaluation and comparison of the job's desired state and actual state in the network. Evaluations can be triggered by different sources, including job submission, updates, node failure, and more. Jobs are added to the EvaluationBroker when they need to be processed and their state compared to the desired state.

Worker

Worker represents a long-running goroutine that polls evaluations from the EvaluationBroker, passes the evaluation to the right scheduler based on the job type, and then acknowledges back the evaluation to the EvaluationBroker on successful processing. Multiple workers can exist within a single orchestrator, each responsible for processing evaluations concurrently.

Scheduler

Scheduler holds all the business logic required to compare the job's desired state and the observed state. Based on its observations, a scheduler can propose a plan that may include new executions, termination of existing ones, or approvals for pending executions. The scheduler is also responsible for finding placement for executions and ranking of nodes.

Planner

Planner executes the plan suggested by the scheduler. Existing planners include:

  • StateUpdater: This planner updates the job store with the desired changes resulting from the evaluation.
  • ComputeForwarder: This planner notifies compute nodes with the proposed changes and updates the job store with the final observed state if the notification is successful. It handles communication with the compute nodes.

Execution

Execution represents a mapping of a job to a compute node, where a job can have multiple active executions based on the desired executions count. Each execution is represented by two components:

The following sequence diagram shows the flow of an execution from the time it is created until it is completed, along with the different components that can mutate its state: img.png PlantUML Diagram

At a high level, the scheduler is responsible for updating the execution's desired state, while other components are responsible for updating the observed state as follows:

  • The endpoint receives requests from end users and callbacks from the compute nodes. Its primary responsibility is to update the execution's observed state and notify the scheduler of the new state by enqueuing an evaluation.

  • The scheduler polls an evaluation and create new executions or update the desired state of existing ones. It does not modify the observed state. It does that through the help of the state updater.

  • The compute proxy receives the plan and updates the execution's observed state after notifying the compute node of the change.

Documentation

Overview

Package orchestrator is a generated GoMock package.

Index

Constants

View Source
const (
	EventTopicJobSubmission    models.EventTopic = "Submission"
	EventTopicJobScheduling    models.EventTopic = "Scheduling"
	EventTopicExecutionTimeout models.EventTopic = "Exec Timeout"
)
View Source
const (
	// The node is known to be not suitable to execute the job.
	RankUnsuitable int = -1
	// The node's suitability to execute the job is not known, so we could ask
	// it to bid and hope that it is able to accept.
	RankPossible int = 0
	// The node is known to be suitable to execute the job, so we should prefer
	// using it if we can.
	RankPreferred int = 10
)
View Source
const (
	WorkerStatusInit     = "Initialized"
	WorkerStatusStarting = "Starting"
	WorkerStatusRunning  = "Running"
	WorkerStatusStopping = "Stopping"
	WorkerStatusStopped  = "Stopped"
)
View Source
const (
	// DefaultHousekeepingWorkers is the default number of parallel workers for housekeeping tasks
	DefaultHousekeepingWorkers = 3
)

Variables

View Source
var (
	WorkerDequeueFaults = telemetry.Must(Meter.Int64Counter(
		"worker_dequeue_faults",
		metric.WithDescription("Number of times a worker failed to dequeue an evaluation"),
	))

	WorkerProcessFaults = telemetry.Must(Meter.Int64Counter(
		"worker_process_faults",
		metric.WithDescription("Number of times a worker failed to process an evaluation"),
	))

	WorkerAckFaults = telemetry.Must(Meter.Int64Counter(
		"worker_ack_faults",
		metric.WithDescription("Number of times a worker failed to ack an evaluation back to the broker"),
	))

	WorkerNackFaults = telemetry.Must(Meter.Int64Counter(
		"worker_nack_faults",
		metric.WithDescription("Number of times a worker failed to nack an evaluation back to the broker"),
	))
)

Metrics for monitoring worker

View Source
var (
	EvalBrokerReady = telemetry.Must(Meter.Int64ObservableUpDownCounter(
		"eval_broker_ready",
		metric.WithDescription("Evaluations ready to be processed"),
	))

	EvalBrokerInflight = telemetry.Must(Meter.Int64ObservableUpDownCounter(
		"eval_broker_inflight",
		metric.WithDescription("Evaluations currently being processed"),
	))

	EvalBrokerPending = telemetry.Must(Meter.Int64ObservableUpDownCounter(
		"eval_broker_pending",
		metric.WithDescription("Duplicate evaluations for the same jobID pending for an active evaluation to finish"),
	))

	EvalBrokerWaiting = telemetry.Must(Meter.Int64ObservableUpDownCounter(
		"eval_broker_waiting",
		metric.WithDescription("Evaluations delayed and waiting to be processed"),
	))

	EvalBrokerCancelable = telemetry.Must(Meter.Int64ObservableUpDownCounter(
		"eval_broker_cancelable",
		metric.WithDescription("Duplicate evaluations for the same jobID that can be canceled"),
	))
)

Metrics for monitoring evaluation broker

View Source
var (
	Meter = otel.GetMeterProvider().Meter("orchestrator")
)

Functions

func EvalTypeAttribute

func EvalTypeAttribute(evaluationType string) metric.MeasurementOption

func ExecStoppedByExecutionTimeoutEvent added in v1.3.1

func ExecStoppedByExecutionTimeoutEvent(timeout time.Duration) models.Event

func ExecStoppedByJobStopEvent added in v1.3.1

func ExecStoppedByJobStopEvent() models.Event

func ExecStoppedByNodeRejectedEvent added in v1.3.1

func ExecStoppedByNodeRejectedEvent() models.Event

func ExecStoppedByNodeUnhealthyEvent added in v1.3.1

func ExecStoppedByNodeUnhealthyEvent() models.Event

func ExecStoppedByOversubscriptionEvent added in v1.3.1

func ExecStoppedByOversubscriptionEvent() models.Event

func JobExhaustedRetriesEvent added in v1.3.1

func JobExhaustedRetriesEvent() models.Event

func JobStoppedEvent added in v1.3.1

func JobStoppedEvent(reason string) models.Event

func JobSubmittedEvent added in v1.3.1

func JobSubmittedEvent() models.Event

func JobTranslatedEvent added in v1.3.1

func JobTranslatedEvent(old, new *models.Job) models.Event

Types

type BaseEndpoint

type BaseEndpoint struct {
	// contains filtered or unexported fields
}

func NewBaseEndpoint

func NewBaseEndpoint(params *BaseEndpointParams) *BaseEndpoint

func (*BaseEndpoint) GetResults added in v1.1.4

func (e *BaseEndpoint) GetResults(ctx context.Context, request *GetResultsRequest) (GetResultsResponse, error)

GetResults returns the results of a job

func (*BaseEndpoint) ReadLogs

func (*BaseEndpoint) StopJob

func (e *BaseEndpoint) StopJob(ctx context.Context, request *StopJobRequest) (StopJobResponse, error)

func (*BaseEndpoint) SubmitJob

func (e *BaseEndpoint) SubmitJob(ctx context.Context, request *SubmitJobRequest) (*SubmitJobResponse, error)

SubmitJob submits a job to the evaluation broker.

type BaseEndpointParams

type BaseEndpointParams struct {
	ID                string
	Store             jobstore.Store
	EventEmitter      EventEmitter
	ComputeProxy      compute.Endpoint
	JobTransformer    transformer.JobTransformer
	TaskTranslator    translation.TranslatorProvider
	ResultTransformer transformer.ResultTransformer
}

type ErrNoMatchingNodes

type ErrNoMatchingNodes struct {
}

ErrNoMatchingNodes is returned when no matching nodes in the network to run a job

func NewErrNoMatchingNodes

func NewErrNoMatchingNodes() ErrNoMatchingNodes

func (ErrNoMatchingNodes) Error

func (e ErrNoMatchingNodes) Error() string

type ErrNotEnoughNodes

type ErrNotEnoughNodes struct {
	RequestedNodes int
	AvailableNodes []NodeRank
}

ErrNotEnoughNodes is returned when not enough nodes in the network to run a job

func NewErrNotEnoughNodes

func NewErrNotEnoughNodes(requestedNodes int, availableNodes []NodeRank) ErrNotEnoughNodes

func (ErrNotEnoughNodes) Details added in v1.3.1

func (e ErrNotEnoughNodes) Details() map[string]string

func (ErrNotEnoughNodes) Error

func (e ErrNotEnoughNodes) Error() string

func (ErrNotEnoughNodes) Retryable added in v1.3.1

func (e ErrNotEnoughNodes) Retryable() bool

func (ErrNotEnoughNodes) SuitableNodes added in v1.3.1

func (e ErrNotEnoughNodes) SuitableNodes() int

type ErrSchedulerNotFound

type ErrSchedulerNotFound struct {
	EvaluationType string
}

ErrSchedulerNotFound is returned when the scheduler is not found for a given evaluation type

func NewErrSchedulerNotFound

func NewErrSchedulerNotFound(evaluationType string) ErrSchedulerNotFound

func (ErrSchedulerNotFound) Error

func (e ErrSchedulerNotFound) Error() string

type EvaluationBroker

type EvaluationBroker interface {
	// Enqueue adds an evaluation to the broker
	// - If the evaluation is already in the broker, it will do nothing
	// - If another evaluation with the same job ID is in the broker, it will not make the new eval
	//   visible until the active eval is Ack'd.
	// - If the evaluation has a WaitUntil time, it will not be visible until that time has passed.
	// - Otherwise the evaluation will be visible to dequeue immediately
	Enqueue(evaluation *models.Evaluation) error

	// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
	// that are being re-enqueued to include their receipt handle.
	// If the evaluation is already in the broker, in flight, and with matching receipt handle, it will
	// re-enqueue the evaluation to be processed again after the previous one is Ack'd.
	EnqueueAll(evaluation map[*models.Evaluation]string) error

	// Dequeue is used to perform a blocking dequeue. The next available evaluation
	// is returned as well as a unique receiptHandle identifier for this dequeue.
	// The receipt handle changes every time the same evaluation is dequeued, such as
	// after a Nack, timeout, state restore or possibly broker lease change.
	// This ensures that previous inflight Dequeue cannot conflict with a
	// Dequeue of the same evaluation after the state change..
	Dequeue(types []string, timeout time.Duration) (*models.Evaluation, string, error)

	// Inflight checks if an EvalID has been delivered but not acknowledged
	// and returns the associated receipt handle for the evaluation.
	Inflight(evaluationID string) (string, bool)

	// InflightExtend resets the Nack timer for the evaluationID if the
	// receipt handle matches and the eval is inflight
	InflightExtend(evaluationID, receiptHandle string) error

	// Ack is used to acknowledge a successful evaluation.
	// The evaluation will be removed from the broker.
	Ack(evalID string, receiptHandle string) error

	// Nack is used to negatively acknowledge an evaluation.
	// The evaluation can be re-enqueued to be processed again
	// without having to wait for the dequeue visibility timeout.
	Nack(evalID string, receiptHandle string) error
}

EvaluationBroker is used to manage brokering of evaluations. When an evaluation is created, due to a change in a job specification or a node, we put it into the broker. The broker sorts by evaluations by priority and job type. This allows us to dequeue the highest priority work first, while also allowing sub-schedulers to only dequeue work they know how to handle.

The broker must provide at-least-once delivery semantics. It relies on explicit Ack/Nack messages to handle this. If a delivery is not Ack'd in a sufficient time span, it will be assumed Nack'd.

The broker must also make sure there is a single inflight evaluation per job, and that multiple enqueued evaluations for the same job can be represented as a single most recent evaluation,

type EventEmitter

type EventEmitter struct {
	// contains filtered or unexported fields
}

func NewEventEmitter

func NewEventEmitter(params EventEmitterParams) EventEmitter

func (EventEmitter) EmitBidAccepted

func (e EventEmitter) EmitBidAccepted(
	ctx context.Context, request compute.BidAcceptedRequest, response compute.BidAcceptedResponse)

func (EventEmitter) EmitBidReceived

func (e EventEmitter) EmitBidReceived(
	ctx context.Context, result compute.BidResult)

func (EventEmitter) EmitBidRejected

func (e EventEmitter) EmitBidRejected(
	ctx context.Context, request compute.BidRejectedRequest, response compute.BidRejectedResponse)

func (EventEmitter) EmitComputeFailure

func (e EventEmitter) EmitComputeFailure(ctx context.Context, executionID string, err error)

func (EventEmitter) EmitEvent

func (e EventEmitter) EmitEvent(ctx context.Context, event model.JobEvent) error

func (EventEmitter) EmitEventSilently

func (e EventEmitter) EmitEventSilently(ctx context.Context, event model.JobEvent)

func (EventEmitter) EmitJobCreated

func (e EventEmitter) EmitJobCreated(
	ctx context.Context, job models.Job)

func (EventEmitter) EmitRunComplete

func (e EventEmitter) EmitRunComplete(ctx context.Context, response compute.RunResult)

type EventEmitterParams

type EventEmitterParams struct {
	EventConsumer eventhandler.JobEventHandler
}

A quick workaround to publish job events locally as we still have some types that rely on job events to update their states (e.g. localdb) and to take actions (e.g. websockets and logging) TODO: create a strongly typed local event emitter similar to libp2p event bus, and update localdb directly from

requester instead of consuming events.

type GetResultsRequest added in v1.1.4

type GetResultsRequest struct {
	JobID string
}

type GetResultsResponse added in v1.1.4

type GetResultsResponse struct {
	Results []*models.SpecConfig
}

type Housekeeping added in v1.3.1

type Housekeeping struct {
	// contains filtered or unexported fields
}

func NewHousekeeping added in v1.3.1

func NewHousekeeping(params HousekeepingParams) (*Housekeeping, error)

func (*Housekeeping) IsRunning added in v1.3.1

func (h *Housekeeping) IsRunning() bool

IsRunning returns true if the housekeeping task is running

func (*Housekeeping) ShouldRun added in v1.3.1

func (h *Housekeeping) ShouldRun() bool

ShouldRun returns true if the housekeeping task should run. This is just a placeholder for now until we introduce leader election or lease management for housekeeping when we introduce more than one orchestrator.

func (*Housekeeping) Start added in v1.3.1

func (h *Housekeeping) Start(ctx context.Context)

Start starts the housekeeping task

func (*Housekeeping) Stop added in v1.3.1

func (h *Housekeeping) Stop(ctx context.Context)

type HousekeepingParams added in v1.3.1

type HousekeepingParams struct {
	JobStore jobstore.Store
	// Interval is the interval at which housekeeping tasks are run
	Interval time.Duration
	// Workers is the maximum number of parallel workers for housekeeping tasks
	Workers int
	// TimeoutBuffer is the buffer time to add to the execution timeout
	// It is better that compute nodes timeout and report the failure before the orchestrator does.
	// This buffer is added to the execution timeout to allow for this.
	TimeoutBuffer time.Duration
	// Clock is the clock used for time-based operations.
	// If not provided, the system clock is used.
	Clock clock.Clock
}

type MappedSchedulerProvider

type MappedSchedulerProvider struct {
	// contains filtered or unexported fields
}

func NewMappedSchedulerProvider

func NewMappedSchedulerProvider(schedulers map[string]Scheduler) *MappedSchedulerProvider

func (*MappedSchedulerProvider) EnabledSchedulers

func (p *MappedSchedulerProvider) EnabledSchedulers() []string

func (*MappedSchedulerProvider) Scheduler

func (p *MappedSchedulerProvider) Scheduler(jobType string) (Scheduler, error)

type MockEvaluationBroker

type MockEvaluationBroker struct {
	// contains filtered or unexported fields
}

MockEvaluationBroker is a mock of EvaluationBroker interface.

func NewMockEvaluationBroker

func NewMockEvaluationBroker(ctrl *gomock.Controller) *MockEvaluationBroker

NewMockEvaluationBroker creates a new mock instance.

func (*MockEvaluationBroker) Ack

func (m *MockEvaluationBroker) Ack(evalID, receiptHandle string) error

Ack mocks base method.

func (*MockEvaluationBroker) Dequeue

func (m *MockEvaluationBroker) Dequeue(types []string, timeout time.Duration) (*models.Evaluation, string, error)

Dequeue mocks base method.

func (*MockEvaluationBroker) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockEvaluationBroker) Enqueue

func (m *MockEvaluationBroker) Enqueue(evaluation *models.Evaluation) error

Enqueue mocks base method.

func (*MockEvaluationBroker) EnqueueAll

func (m *MockEvaluationBroker) EnqueueAll(evaluation map[*models.Evaluation]string) error

EnqueueAll mocks base method.

func (*MockEvaluationBroker) Inflight

func (m *MockEvaluationBroker) Inflight(evaluationID string) (string, bool)

Inflight mocks base method.

func (*MockEvaluationBroker) InflightExtend

func (m *MockEvaluationBroker) InflightExtend(evaluationID, receiptHandle string) error

InflightExtend mocks base method.

func (*MockEvaluationBroker) Nack

func (m *MockEvaluationBroker) Nack(evalID, receiptHandle string) error

Nack mocks base method.

type MockEvaluationBrokerMockRecorder

type MockEvaluationBrokerMockRecorder struct {
	// contains filtered or unexported fields
}

MockEvaluationBrokerMockRecorder is the mock recorder for MockEvaluationBroker.

func (*MockEvaluationBrokerMockRecorder) Ack

func (mr *MockEvaluationBrokerMockRecorder) Ack(evalID, receiptHandle interface{}) *gomock.Call

Ack indicates an expected call of Ack.

func (*MockEvaluationBrokerMockRecorder) Dequeue

func (mr *MockEvaluationBrokerMockRecorder) Dequeue(types, timeout interface{}) *gomock.Call

Dequeue indicates an expected call of Dequeue.

func (*MockEvaluationBrokerMockRecorder) Enqueue

func (mr *MockEvaluationBrokerMockRecorder) Enqueue(evaluation interface{}) *gomock.Call

Enqueue indicates an expected call of Enqueue.

func (*MockEvaluationBrokerMockRecorder) EnqueueAll

func (mr *MockEvaluationBrokerMockRecorder) EnqueueAll(evaluation interface{}) *gomock.Call

EnqueueAll indicates an expected call of EnqueueAll.

func (*MockEvaluationBrokerMockRecorder) Inflight

func (mr *MockEvaluationBrokerMockRecorder) Inflight(evaluationID interface{}) *gomock.Call

Inflight indicates an expected call of Inflight.

func (*MockEvaluationBrokerMockRecorder) InflightExtend

func (mr *MockEvaluationBrokerMockRecorder) InflightExtend(evaluationID, receiptHandle interface{}) *gomock.Call

InflightExtend indicates an expected call of InflightExtend.

func (*MockEvaluationBrokerMockRecorder) Nack

func (mr *MockEvaluationBrokerMockRecorder) Nack(evalID, receiptHandle interface{}) *gomock.Call

Nack indicates an expected call of Nack.

type MockNodeDiscoverer

type MockNodeDiscoverer struct {
	// contains filtered or unexported fields
}

MockNodeDiscoverer is a mock of NodeDiscoverer interface.

func NewMockNodeDiscoverer

func NewMockNodeDiscoverer(ctrl *gomock.Controller) *MockNodeDiscoverer

NewMockNodeDiscoverer creates a new mock instance.

func (*MockNodeDiscoverer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockNodeDiscoverer) List added in v1.3.1

List mocks base method.

type MockNodeDiscovererMockRecorder

type MockNodeDiscovererMockRecorder struct {
	// contains filtered or unexported fields
}

MockNodeDiscovererMockRecorder is the mock recorder for MockNodeDiscoverer.

func (*MockNodeDiscovererMockRecorder) List added in v1.3.1

func (mr *MockNodeDiscovererMockRecorder) List(ctx interface{}, filter ...interface{}) *gomock.Call

List indicates an expected call of List.

type MockNodeRanker

type MockNodeRanker struct {
	// contains filtered or unexported fields
}

MockNodeRanker is a mock of NodeRanker interface.

func NewMockNodeRanker

func NewMockNodeRanker(ctrl *gomock.Controller) *MockNodeRanker

NewMockNodeRanker creates a new mock instance.

func (*MockNodeRanker) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockNodeRanker) RankNodes

func (m *MockNodeRanker) RankNodes(ctx context.Context, job models.Job, nodes []models.NodeInfo) ([]NodeRank, error)

RankNodes mocks base method.

type MockNodeRankerMockRecorder

type MockNodeRankerMockRecorder struct {
	// contains filtered or unexported fields
}

MockNodeRankerMockRecorder is the mock recorder for MockNodeRanker.

func (*MockNodeRankerMockRecorder) RankNodes

func (mr *MockNodeRankerMockRecorder) RankNodes(ctx, job, nodes interface{}) *gomock.Call

RankNodes indicates an expected call of RankNodes.

type MockNodeSelector added in v1.1.0

type MockNodeSelector struct {
	// contains filtered or unexported fields
}

MockNodeSelector is a mock of NodeSelector interface.

func NewMockNodeSelector added in v1.1.0

func NewMockNodeSelector(ctrl *gomock.Controller) *MockNodeSelector

NewMockNodeSelector creates a new mock instance.

func (*MockNodeSelector) AllNodes added in v1.1.0

func (m *MockNodeSelector) AllNodes(ctx context.Context) ([]models.NodeInfo, error)

AllNodes mocks base method.

func (*MockNodeSelector) EXPECT added in v1.1.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockNodeSelector) MatchingNodes added in v1.3.2

func (m *MockNodeSelector) MatchingNodes(ctx context.Context, job *models.Job) ([]NodeRank, []NodeRank, error)

MatchingNodes mocks base method.

type MockNodeSelectorMockRecorder added in v1.1.0

type MockNodeSelectorMockRecorder struct {
	// contains filtered or unexported fields
}

MockNodeSelectorMockRecorder is the mock recorder for MockNodeSelector.

func (*MockNodeSelectorMockRecorder) AllNodes added in v1.1.0

func (mr *MockNodeSelectorMockRecorder) AllNodes(ctx interface{}) *gomock.Call

AllNodes indicates an expected call of AllNodes.

func (*MockNodeSelectorMockRecorder) MatchingNodes added in v1.3.2

func (mr *MockNodeSelectorMockRecorder) MatchingNodes(ctx, job interface{}) *gomock.Call

MatchingNodes indicates an expected call of MatchingNodes.

type MockPlanner

type MockPlanner struct {
	// contains filtered or unexported fields
}

MockPlanner is a mock of Planner interface.

func NewMockPlanner

func NewMockPlanner(ctrl *gomock.Controller) *MockPlanner

NewMockPlanner creates a new mock instance.

func (*MockPlanner) EXPECT

func (m *MockPlanner) EXPECT() *MockPlannerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockPlanner) Process

func (m *MockPlanner) Process(ctx context.Context, plan *models.Plan) error

Process mocks base method.

type MockPlannerMockRecorder

type MockPlannerMockRecorder struct {
	// contains filtered or unexported fields
}

MockPlannerMockRecorder is the mock recorder for MockPlanner.

func (*MockPlannerMockRecorder) Process

func (mr *MockPlannerMockRecorder) Process(ctx, plan interface{}) *gomock.Call

Process indicates an expected call of Process.

type MockRetryStrategy

type MockRetryStrategy struct {
	// contains filtered or unexported fields
}

MockRetryStrategy is a mock of RetryStrategy interface.

func NewMockRetryStrategy

func NewMockRetryStrategy(ctrl *gomock.Controller) *MockRetryStrategy

NewMockRetryStrategy creates a new mock instance.

func (*MockRetryStrategy) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRetryStrategy) ShouldRetry

func (m *MockRetryStrategy) ShouldRetry(ctx context.Context, request RetryRequest) bool

ShouldRetry mocks base method.

type MockRetryStrategyMockRecorder

type MockRetryStrategyMockRecorder struct {
	// contains filtered or unexported fields
}

MockRetryStrategyMockRecorder is the mock recorder for MockRetryStrategy.

func (*MockRetryStrategyMockRecorder) ShouldRetry

func (mr *MockRetryStrategyMockRecorder) ShouldRetry(ctx, request interface{}) *gomock.Call

ShouldRetry indicates an expected call of ShouldRetry.

type MockScheduler

type MockScheduler struct {
	// contains filtered or unexported fields
}

MockScheduler is a mock of Scheduler interface.

func NewMockScheduler

func NewMockScheduler(ctrl *gomock.Controller) *MockScheduler

NewMockScheduler creates a new mock instance.

func (*MockScheduler) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockScheduler) Process

func (m *MockScheduler) Process(ctx context.Context, eval *models.Evaluation) error

Process mocks base method.

type MockSchedulerMockRecorder

type MockSchedulerMockRecorder struct {
	// contains filtered or unexported fields
}

MockSchedulerMockRecorder is the mock recorder for MockScheduler.

func (*MockSchedulerMockRecorder) Process

func (mr *MockSchedulerMockRecorder) Process(ctx, eval interface{}) *gomock.Call

Process indicates an expected call of Process.

type MockSchedulerProvider

type MockSchedulerProvider struct {
	// contains filtered or unexported fields
}

MockSchedulerProvider is a mock of SchedulerProvider interface.

func NewMockSchedulerProvider

func NewMockSchedulerProvider(ctrl *gomock.Controller) *MockSchedulerProvider

NewMockSchedulerProvider creates a new mock instance.

func (*MockSchedulerProvider) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSchedulerProvider) EnabledSchedulers

func (m *MockSchedulerProvider) EnabledSchedulers() []string

EnabledSchedulers mocks base method.

func (*MockSchedulerProvider) Scheduler

func (m *MockSchedulerProvider) Scheduler(jobType string) (Scheduler, error)

Scheduler mocks base method.

type MockSchedulerProviderMockRecorder

type MockSchedulerProviderMockRecorder struct {
	// contains filtered or unexported fields
}

MockSchedulerProviderMockRecorder is the mock recorder for MockSchedulerProvider.

func (*MockSchedulerProviderMockRecorder) EnabledSchedulers

func (mr *MockSchedulerProviderMockRecorder) EnabledSchedulers() *gomock.Call

EnabledSchedulers indicates an expected call of EnabledSchedulers.

func (*MockSchedulerProviderMockRecorder) Scheduler

func (mr *MockSchedulerProviderMockRecorder) Scheduler(jobType interface{}) *gomock.Call

Scheduler indicates an expected call of Scheduler.

type NodeDiscoverer

type NodeDiscoverer interface {
	List(ctx context.Context, filter ...routing.NodeStateFilter) ([]models.NodeState, error)
}

NodeDiscoverer discovers nodes in the network that are suitable to execute a job. NodeDiscoverer is a subset of the routing.NodeInfoStore interface.

type NodeRank

type NodeRank struct {
	NodeInfo models.NodeInfo
	Rank     int
	Reason   string

	// Retryable should be true only if the system could defer this job until
	// later and the rank could change without any human intervention on the
	// assessed node. I.e. it should only reflect transient things like node
	// usage, capacity or approval status.
	//
	// E.g. if this node is excluded because it does not support a required
	// feature, this could be fixed if the feature was configured at the other
	// node, but Retryable should be false because this is unlikely to happen
	// over the lifetime of the job.
	Retryable bool
}

NodeRank represents a node and its rank. The higher the rank, the more preferable a node is to execute the job. A negative rank means the node is not suitable to execute the job.

func (NodeRank) MarshalZerologObject

func (r NodeRank) MarshalZerologObject(e *zerolog.Event)

func (NodeRank) MeetsRequirement

func (r NodeRank) MeetsRequirement() bool

Returns whether the node meets the requirements to run the job.

type NodeRanker

type NodeRanker interface {
	RankNodes(ctx context.Context, job models.Job, nodes []models.NodeInfo) ([]NodeRank, error)
}

NodeRanker ranks nodes based on their suitability to execute a job.

type NodeSelectionConstraints added in v1.3.1

type NodeSelectionConstraints struct {
	RequireConnected bool
	RequireApproval  bool
}

type NodeSelector added in v1.1.0

type NodeSelector interface {
	// AllNodes returns all nodes in the network.
	AllNodes(ctx context.Context) ([]models.NodeInfo, error)

	// MatchingNodes return the nodes that match job constraints order by rank in descending order.
	// Also return the nodes that were filtered out and an error if any.
	MatchingNodes(ctx context.Context, job *models.Job) ([]NodeRank, []NodeRank, error)
}

NodeSelector selects nodes based on their suitability to execute a job.

type Planner

type Planner interface {
	Process(ctx context.Context, plan *models.Plan) error
}

Planner executes the plan generated by the scheduler. It is responsible for the actual update of the job state in the database, as well as the placement of tasks on compute nodes.

type ReadLogsRequest

type ReadLogsRequest struct {
	JobID       string
	ExecutionID string
	Tail        bool
	Follow      bool
}

type ReadLogsResponse

type ReadLogsResponse struct {
	Address           string
	ExecutionComplete bool
}

type RetryRequest

type RetryRequest struct {
	JobID string
}

type RetryStrategy

type RetryStrategy interface {
	// ShouldRetry returns true if the job can be retried.
	ShouldRetry(ctx context.Context, request RetryRequest) bool
}

type Scheduler

type Scheduler interface {
	// Process handles a new evaluation. It applies the necessary logic to determine
	// task placements based on the provided evaluation.
	Process(ctx context.Context, eval *models.Evaluation) error
}

Scheduler encapsulates the business logic of a scheduler. It processes evaluations one at a time, generating task placements based on the provided evaluation. The scheduler focuses on business logic, while other components handles the underlying infrastructure and coordination between the orchestrator and compute nodes.

type SchedulerProvider

type SchedulerProvider interface {
	// Scheduler returns a scheduler for the given job type
	Scheduler(jobType string) (Scheduler, error)

	// EnabledSchedulers returns a list of enabled schedulers (job types)
	EnabledSchedulers() []string
}

SchedulerProvider returns a scheduler instance that is capable of handling jobs requiring scheduling.

type StopJobRequest

type StopJobRequest struct {
	JobID         string
	Reason        string
	UserTriggered bool
}

type StopJobResponse

type StopJobResponse struct {
	EvaluationID string
}

type SubmitJobRequest

type SubmitJobRequest struct {
	Job *models.Job
}

type SubmitJobResponse

type SubmitJobResponse struct {
	JobID        string
	EvaluationID string
	Warnings     []string
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is a long-running process that dequeues evaluations, invokes the scheduler to process the evaluation, and acknowledges or rejects the evaluation back to the broker. The worker is single-threaded and processes one evaluation at a time. An orchestrator can have multiple workers working in parallel.

func NewWorker

func NewWorker(params WorkerParams) *Worker

NewWorker returns a new Worker instance.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context)

Start triggers the worker to start processing evaluations. The worker can only start once, and subsequent calls to Start will be ignored.

func (*Worker) Status

func (w *Worker) Status() string

Status returns the current status of the worker.

func (*Worker) Stop

func (w *Worker) Stop()

Stop triggers the worker to stop processing evaluations. The worker will stop after the in-flight evaluation is processed.

type WorkerParams

type WorkerParams struct {
	// SchedulerProvider is responsible for providing the scheduler instance
	// based on the evaluation type.
	SchedulerProvider SchedulerProvider
	// EvaluationBroker is the broker used for handling evaluations.
	EvaluationBroker EvaluationBroker
	// DequeueTimeout is the maximum duration for dequeueing an evaluation.
	DequeueTimeout time.Duration
	// DequeueFailureBackoff defines the backoff strategy when dequeueing an evaluation fails.
	DequeueFailureBackoff backoff.Backoff
}

Directories

Path Synopsis
selection

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL