Documentation ¶
Index ¶
- Constants
- Variables
- func CopySlice[T Copyable[T]](slice []T) []T
- func FromLabelSelectorRequirements(requirements ...*LabelSelectorRequirement) ([]labels.Requirement, error)
- func IsDefaultEngineType(kind string) bool
- func NewErrInvalidPagingToken(s string, msg string) error
- func NormalizeSlice[T Normalizable](slice []T)
- func ValidateSlice[T Validatable](slice []T) error
- type AllocatedResources
- type BaseError
- func (e *BaseError) Details() map[string]string
- func (e *BaseError) Error() string
- func (e *BaseError) FailsExecution() bool
- func (e *BaseError) Hint() string
- func (e *BaseError) Retryable() bool
- func (e *BaseError) WithDetails(details map[string]string) *BaseError
- func (e *BaseError) WithFailsExecution() *BaseError
- func (e *BaseError) WithHint(hint string) *BaseError
- func (e *BaseError) WithRetryable() *BaseError
- type BuildVersionInfo
- type ComputeNodeInfo
- type Copyable
- type DebugInfo
- type DebugInfoProvider
- type Evaluation
- func (e *Evaluation) Copy() *Evaluation
- func (e *Evaluation) NewDelayedEvaluation(waitUntil time.Time) *Evaluation
- func (e *Evaluation) Normalize() *Evaluation
- func (e *Evaluation) ShouldEnqueue() bool
- func (e *Evaluation) String() string
- func (e *Evaluation) TerminalStatus() bool
- func (e *Evaluation) UpdateModifyTime()
- func (e *Evaluation) WithComment(comment string) *Evaluation
- func (e *Evaluation) WithJob(job *Job) *Evaluation
- func (e *Evaluation) WithJobID(jobID string) *Evaluation
- func (e *Evaluation) WithNamespace(namespace string) *Evaluation
- func (e *Evaluation) WithPriority(priority int) *Evaluation
- func (e *Evaluation) WithStatus(status string) *Evaluation
- func (e *Evaluation) WithTriggeredBy(triggeredBy string) *Evaluation
- func (e *Evaluation) WithType(jobType string) *Evaluation
- func (e *Evaluation) WithWaitUntil(waitUntil time.Time) *Evaluation
- type EvaluationReceipt
- type Event
- func (e *Event) WithDetail(key, value string) *Event
- func (e *Event) WithDetails(details map[string]string) *Event
- func (e *Event) WithError(err error) *Event
- func (e *Event) WithFailsExecution(failsExecution bool) *Event
- func (e *Event) WithHint(hint string) *Event
- func (e *Event) WithMessage(message string) *Event
- func (e *Event) WithRetryable(retryable bool) *Event
- type EventTopic
- type Execution
- func (e *Execution) AllocateResources(taskID string, resources Resources)
- func (e *Execution) Copy() *Execution
- func (e *Execution) GetCreateTime() time.Time
- func (e *Execution) GetModifyTime() time.Time
- func (e *Execution) IsDiscarded() bool
- func (e *Execution) IsExpired(expirationTime time.Time) bool
- func (e *Execution) IsTerminalComputeState() bool
- func (e *Execution) IsTerminalDesiredState() bool
- func (e *Execution) IsTerminalState() bool
- func (e *Execution) JobNamespacedID() NamespacedID
- func (e *Execution) Normalize()
- func (e *Execution) String() string
- func (e *Execution) TotalAllocatedResources() *Resources
- func (e *Execution) Validate() error
- type ExecutionDesiredStateType
- type ExecutionLog
- type ExecutionLogType
- type ExecutionStateType
- type FailureInjectionComputeConfig
- type FailureInjectionRequesterConfig
- type GPU
- type GPUVendor
- type HasDetails
- type HasFailsExecution
- type HasHint
- type HasRetryable
- type InputSource
- type Job
- func (j *Job) AllStorageTypes() []string
- func (j *Job) Copy() *Job
- func (j *Job) GetCreateTime() time.Time
- func (j *Job) GetModifyTime() time.Time
- func (j *Job) IsExpired(expirationTime time.Time) bool
- func (j *Job) IsLongRunning() bool
- func (j *Job) IsTerminal() bool
- func (j *Job) MetricAttributes() []attribute.KeyValue
- func (j *Job) NamespacedID() NamespacedID
- func (j *Job) Normalize()
- func (j *Job) SanitizeSubmission() (warnings []string)
- func (j *Job) String() string
- func (j *Job) Task() *Task
- func (j *Job) Validate() error
- func (j *Job) ValidateSubmission() error
- type JobEvent
- type JobEventType
- type JobHistory
- type JobHistoryType
- type JobSelectionDataLocality
- type JobSelectionPolicy
- type JobStateType
- type LabelSelectorRequirement
- type LabelsProvider
- type NamespacedID
- type Network
- type NetworkConfig
- type NetworkConfigBuilder
- type NodeConnectionState
- type NodeInfo
- type NodeInfoDecorator
- type NodeMembershipState
- type NodeState
- type NodeStateProvider
- type NodeType
- type NoopNodeInfoDecorator
- type Normalizable
- type PagingToken
- type PagingTokenParams
- type Plan
- func (p *Plan) AppendApprovedExecution(execution *Execution)
- func (p *Plan) AppendEvaluation(eval *Evaluation)
- func (p *Plan) AppendExecution(execution *Execution)
- func (p *Plan) AppendStoppedExecution(execution *Execution, event Event)
- func (p *Plan) IsJobFailed() bool
- func (p *Plan) MarkJobCompleted()
- func (p *Plan) MarkJobFailed(event Event)
- func (p *Plan) MarkJobQueued(event Event)
- func (p *Plan) MarkJobRunningIfEligible()
- type PlanExecutionDesiredUpdate
- type Resources
- func (r *Resources) Add(other Resources) *Resources
- func (r *Resources) Copy() *Resources
- func (r *Resources) IsZero() bool
- func (r *Resources) LessThan(other Resources) bool
- func (r *Resources) LessThanEq(other Resources) bool
- func (r *Resources) Max(other Resources) *Resources
- func (r *Resources) Merge(other Resources) *Resources
- func (r *Resources) Multiply(factor float64) *Resources
- func (r *Resources) String() string
- func (r *Resources) Sub(other Resources) *Resources
- func (r *Resources) Validate() error
- type ResourcesConfig
- type ResourcesConfigBuilder
- func (r *ResourcesConfigBuilder) Build() (*ResourcesConfig, error)
- func (r *ResourcesConfigBuilder) BuildOrDie() *ResourcesConfig
- func (r *ResourcesConfigBuilder) CPU(cpu string) *ResourcesConfigBuilder
- func (r *ResourcesConfigBuilder) Disk(disk string) *ResourcesConfigBuilder
- func (r *ResourcesConfigBuilder) GPU(gpu string) *ResourcesConfigBuilder
- func (r *ResourcesConfigBuilder) Memory(memory string) *ResourcesConfigBuilder
- type ResultPath
- type RunCommandResult
- type SpecConfig
- func (s *SpecConfig) Copy() *SpecConfig
- func (s *SpecConfig) IsEmpty() bool
- func (s *SpecConfig) IsType(t string) bool
- func (s *SpecConfig) MarshalZerologObject(e *zerolog.Event)
- func (s *SpecConfig) MetricAttributes() []attribute.KeyValue
- func (s *SpecConfig) Normalize()
- func (s *SpecConfig) Validate() error
- func (s *SpecConfig) ValidateAllowBlank() error
- func (s *SpecConfig) WithParam(key string, value interface{}) *SpecConfig
- type State
- type StateChange
- type Task
- type TaskBuilder
- func (b *TaskBuilder) Build() (*Task, error)
- func (b *TaskBuilder) BuildOrDie() *Task
- func (b *TaskBuilder) Engine(engine *SpecConfig) *TaskBuilder
- func (b *TaskBuilder) InputSources(inputSources ...*InputSource) *TaskBuilder
- func (b *TaskBuilder) Meta(key string, value string) *TaskBuilder
- func (b *TaskBuilder) Name(name string) *TaskBuilder
- func (b *TaskBuilder) Network(network *NetworkConfig) *TaskBuilder
- func (b *TaskBuilder) Publisher(publisher *SpecConfig) *TaskBuilder
- func (b *TaskBuilder) ResourcesConfig(resourcesConfig *ResourcesConfig) *TaskBuilder
- func (b *TaskBuilder) ResultPaths(resultPaths ...*ResultPath) *TaskBuilder
- func (b *TaskBuilder) Timeouts(timeouts *TimeoutConfig) *TaskBuilder
- type TimeoutConfig
- type Validatable
Constants ¶
const ( // JobTypeService represents a long-running job that runs on a desired number of nodes // matching the specified constraints. JobTypeService = "service" // JobTypeDaemon represents a long-running job that runs on all nodes matching the // specified constraints. JobTypeDaemon = "daemon" // JobTypeBatch represents a batch job that runs to completion on the desired number // of nodes matching the specified constraints. JobTypeBatch = "batch" // JobTypeOps represents a batch job that runs to completion on all nodes matching // the specified constraints. JobTypeOps = "ops" )
const ( EngineNoop = "noop" EngineDocker = "docker" EngineWasm = "wasm" )
const ( StorageSourceNoop = "noop" StorageSourceIPFS = "ipfs" StorageSourceURL = "urlDownload" StorageSourceS3 = "s3" StorageSourceS3PreSigned = "s3PreSigned" StorageSourceInline = "inline" StorageSourceLocalDirectory = "localDirectory" )
const ( PublisherNoop = "noop" PublisherIPFS = "ipfs" PublisherS3 = "s3" PublisherLocal = "local" )
const ( DownloadFilenameStdout = "stdout" DownloadFilenameStderr = "stderr" DownloadFilenameExitCode = "exitCode" DownloadCIDsFolderName = "raw" DownloadFolderPerm = 0755 DownloadFilePerm = 0644 )
const ( MetaReservedPrefix = "bacalhau.org/" MetaRequesterID = "bacalhau.org/requester.id" MetaClientID = "bacalhau.org/client.id" // Job provenance metadata used to track the origin of a job where // it may have been translated from another job. MetaDerivedFrom = "bacalhau.org/derivedFrom" MetaTranslatedBy = "bacalhau.org/translatedBy" )
const ( DetailsKeyIsError = "IsError" DetailsKeyHint = "Hint" DetailsKeyRetryable = "Retryable" DetailsKeyFailsExecution = "FailsExecution" )
const ( EvalStatusBlocked = "blocked" EvalStatusPending = "pending" EvalStatusComplete = "complete" EvalStatusFailed = "failed" EvalStatusCancelled = "canceled" )
const ( EvalTriggerJobRegister = "job-register" EvalTriggerJobCancel = "job-cancel" EvalTriggerJobQueue = "job-queue" EvalTriggerJobTimeout = "job-timeout" EvalTriggerExecFailure = "exec-failure" EvalTriggerExecUpdate = "exec-update" EvalTriggerExecTimeout = "exec-timeout" )
const (
// DefaultNamespace is the default namespace.
DefaultNamespace = "default"
)
Variables ¶
var EngineNames = []string{ EngineDocker, EngineWasm, }
var NodeMembership = membershipContainer{ UNKNOWN: NodeMembershipState{unknown}, PENDING: NodeMembershipState{pending}, APPROVED: NodeMembershipState{approved}, REJECTED: NodeMembershipState{rejected}, }
var NodeStates = livenessContainer{ CONNECTED: NodeConnectionState{connected}, DISCONNECTED: NodeConnectionState{disconnected}, }
var PublisherNames = []string{ PublisherNoop, PublisherIPFS, PublisherS3, PublisherLocal, }
var StoragesNames = []string{ StorageSourceIPFS, StorageSourceInline, StorageSourceLocalDirectory, StorageSourceS3, StorageSourceS3PreSigned, StorageSourceURL, }
Functions ¶
func FromLabelSelectorRequirements ¶
func FromLabelSelectorRequirements(requirements ...*LabelSelectorRequirement) ([]labels.Requirement, error)
func IsDefaultEngineType ¶ added in v1.2.1
func NewErrInvalidPagingToken ¶ added in v1.2.2
func NormalizeSlice ¶
func NormalizeSlice[T Normalizable](slice []T)
func ValidateSlice ¶
func ValidateSlice[T Validatable](slice []T) error
Types ¶
type AllocatedResources ¶
AllocatedResources is the set of resources to be used by an execution, which maybe be resources allocated to a single task or a set of tasks in the future.
func (*AllocatedResources) Copy ¶
func (a *AllocatedResources) Copy() *AllocatedResources
func (*AllocatedResources) Total ¶
func (a *AllocatedResources) Total() *Resources
Total returns the total resources allocated
type BaseError ¶ added in v1.3.1
type BaseError struct {
// contains filtered or unexported fields
}
BaseError is a custom error type in Go that provides additional fields and methods for more detailed error handling. It implements the error interface, as well as additional interfaces for providing a hint, indicating whether the error is retryable, whether it fails execution, and for providing additional details.
func NewBaseError ¶ added in v1.3.1
NewBaseError is a constructor function that creates a new BaseError with only the message field set.
func (*BaseError) Details ¶ added in v1.3.1
Details is a method that returns the details field of BaseError.
func (*BaseError) Error ¶ added in v1.3.1
Error is a method that returns the message field of BaseError. This method makes BaseError satisfy the error interface.
func (*BaseError) FailsExecution ¶ added in v1.3.1
FailsExecution is a method that returns the failsExecution field of BaseError.
func (*BaseError) Retryable ¶ added in v1.3.1
Retryable is a method that returns the retryable field of BaseError.
func (*BaseError) WithDetails ¶ added in v1.3.1
WithDetails is a method that sets the details field of BaseError and returns the BaseError itself for chaining.
func (*BaseError) WithFailsExecution ¶ added in v1.3.1
WithFailsExecution is a method that sets the failsExecution field of BaseError and returns the BaseError itself for chaining.
func (*BaseError) WithHint ¶ added in v1.3.1
WithHint is a method that sets the hint field of BaseError and returns the BaseError itself for chaining.
func (*BaseError) WithRetryable ¶ added in v1.3.1
WithRetryable is a method that sets the retryable field of BaseError and returns the BaseError itself for chaining.
type BuildVersionInfo ¶
type BuildVersionInfo struct { Major string `json:"Major,omitempty" example:"0"` Minor string `json:"Minor,omitempty" example:"3"` GitVersion string `json:"GitVersion" example:"v0.3.12"` GitCommit string `json:"GitCommit" example:"d612b63108f2b5ce1ab2b9e02444eb1dac1d922d"` BuildDate time.Time `json:"BuildDate" example:"2022-11-16T14:03:31Z"` GOOS string `json:"GOOS" example:"linux"` GOARCH string `json:"GOARCH" example:"amd64"` }
BuildVersionInfo is the version of a Bacalhau binary (either client or server)
type ComputeNodeInfo ¶
type ComputeNodeInfo struct { ExecutionEngines []string `json:"ExecutionEngines"` Publishers []string `json:"Publishers"` StorageSources []string `json:"StorageSources"` MaxCapacity Resources `json:"MaxCapacity"` QueueUsedCapacity Resources `json:"QueueCapacity"` AvailableCapacity Resources `json:"AvailableCapacity"` MaxJobRequirements Resources `json:"MaxJobRequirements"` RunningExecutions int `json:"RunningExecutions"` EnqueuedExecutions int `json:"EnqueuedExecutions"` }
ComputeNodeInfo contains metadata about the current state and abilities of a compute node. Compute Nodes share this state with Requester nodes by including it in the NodeInfo they share across the network.
type Copyable ¶
type Copyable[T any] interface { Copy() T }
Copyable is an interface for types that can be copied
type DebugInfo ¶ added in v1.5.0
type DebugInfo struct { Component string `json:"component"` Info interface{} `json:"info"` }
type DebugInfoProvider ¶ added in v1.5.0
type Evaluation ¶
type Evaluation struct { // ID is the unique identifier of the evaluation. ID string `json:"ID"` // Namespace is the namespace the evaluation is created in Namespace string `json:"Namespace"` // JobID is the unique identifier of the job. JobID string `json:"JobID"` // TriggeredBy is the root cause that triggered the evaluation. TriggeredBy string `json:"TriggeredBy"` // Priority is the priority of the evaluation. // e.g. 50 is higher priority than 10, and so will be evaluated first. Priority int `json:"Priority"` // Type is the type of the job that needs to be evaluated. Type string `json:"Type"` // Status is the current status of the evaluation. Status string `json:"Status"` // Comment is to provide additional information about the evaluation. Comment string `json:"Comment"` // WaitUntil is the time until which the evaluation should be ignored, such as to implement backoff when // repeatedly failing to assess a job. WaitUntil time.Time `json:"WaitUntil"` CreateTime int64 `json:"CreateTime"` ModifyTime int64 `json:"ModifyTime"` }
Evaluation is just to ask the scheduler to reassess if additional job instances must be scheduled or if existing ones must be stopped. It is possible that no action is required if the scheduler sees the desired job state matches the observed state. This allows the triggers (e.g. APIs, Node Manager) to be lightweight and submit evaluation on state changes without having to do complex logic to decide what actions to take.
func NewEvaluation ¶ added in v1.3.1
func NewEvaluation() *Evaluation
NewEvaluation creates a new Evaluation.
func (*Evaluation) Copy ¶
func (e *Evaluation) Copy() *Evaluation
func (*Evaluation) NewDelayedEvaluation ¶ added in v1.3.2
func (e *Evaluation) NewDelayedEvaluation(waitUntil time.Time) *Evaluation
NewDelayedEvaluation creates a new Evaluation from current one with a WaitUntil time.
func (*Evaluation) Normalize ¶ added in v1.3.1
func (e *Evaluation) Normalize() *Evaluation
Normalize ensures that the Evaluation is in a valid state.
func (*Evaluation) ShouldEnqueue ¶
func (e *Evaluation) ShouldEnqueue() bool
ShouldEnqueue checks if a given Evaluation should be enqueued into the evaluation_broker
func (*Evaluation) String ¶
func (e *Evaluation) String() string
func (*Evaluation) TerminalStatus ¶
func (e *Evaluation) TerminalStatus() bool
TerminalStatus returns if the current status is terminal and will no longer transition.
func (*Evaluation) UpdateModifyTime ¶
func (e *Evaluation) UpdateModifyTime()
UpdateModifyTime makes sure that time always moves forward, taking into account that different server clocks can drift apart.
func (*Evaluation) WithComment ¶ added in v1.3.1
func (e *Evaluation) WithComment(comment string) *Evaluation
WithComment sets the Comment of the Evaluation.
func (*Evaluation) WithJob ¶ added in v1.3.2
func (e *Evaluation) WithJob(job *Job) *Evaluation
WithJob sets the JobID, Type, Priority nd Namespace of the Evaluation.
func (*Evaluation) WithJobID ¶ added in v1.3.1
func (e *Evaluation) WithJobID(jobID string) *Evaluation
WithJobID sets the JobID of the Evaluation.
func (*Evaluation) WithNamespace ¶ added in v1.3.1
func (e *Evaluation) WithNamespace(namespace string) *Evaluation
WithNamespace sets the Namespace of the Evaluation.
func (*Evaluation) WithPriority ¶ added in v1.3.1
func (e *Evaluation) WithPriority(priority int) *Evaluation
WithPriority sets the Priority of the Evaluation.
func (*Evaluation) WithStatus ¶ added in v1.3.1
func (e *Evaluation) WithStatus(status string) *Evaluation
WithStatus sets the Status of the Evaluation.
func (*Evaluation) WithTriggeredBy ¶ added in v1.3.1
func (e *Evaluation) WithTriggeredBy(triggeredBy string) *Evaluation
WithTriggeredBy sets the TriggeredBy of the Evaluation.
func (*Evaluation) WithType ¶ added in v1.3.1
func (e *Evaluation) WithType(jobType string) *Evaluation
WithType sets the Type of the Evaluation.
func (*Evaluation) WithWaitUntil ¶ added in v1.3.1
func (e *Evaluation) WithWaitUntil(waitUntil time.Time) *Evaluation
WithWaitUntil sets the WaitUntil of the Evaluation.
type EvaluationReceipt ¶
type EvaluationReceipt struct { Evaluation *Evaluation `json:"Evaluation"` // ReceiptHandle is a unique identifier when dequeue an Evaluation from a broker. ReceiptHandle string `json:"ReceiptHandle"` }
EvaluationReceipt is a pair of an Evaluation and its ReceiptHandle.
type Event ¶ added in v1.3.1
type Event struct { // A human-readable string giving the user all the information they need to // understand and respond to an Event, if a response is required. Message string `json:"Message"` // The topic of the event. See the documentation on EventTopic. Topic EventTopic `json:"Topic"` // The moment the event occurred, which may be different to the moment it // was recorded. Timestamp time.Time `json:"Timestamp"` // Any additional metadata that the system or user may need to know about // the event in order to handle it properly. Details map[string]string `json:"Details,omitempty"` }
Event represents a progress report made by the system in its attempt to run a job. Events are generated by the orchestrator and also passed back to the orchestrator from the compute node.
Events may be delivered in an async fashion – i.e, they may arrive much later than the moment they occurred.
func EventFromError ¶ added in v1.3.1
func EventFromError(topic EventTopic, err error) Event
EventFromError converts an error into an Event tagged with the passed event topic.
This method allows errors to implement extra interfaces (above) to do "attribute-based error reporting". The design principle is that errors can report a set of extra flags that have well defined semantics which the system can then respond to with specific behavior. This allows introducing or refactoring error types without higher-level components needing to be modified – they simply continue to respond to the presence of attributes.
This is instead of the system having a centralized set of known error types and programming in specific behavior in response to them, which is brittle and requires updating all of the error responses when the types change.
func NewEvent ¶ added in v1.3.1
func NewEvent(topic EventTopic) *Event
NewEvent returns a new Event with the given topic.
func (*Event) WithDetail ¶ added in v1.3.1
WithDetail returns a new Event with the given detail and topic.
func (*Event) WithDetails ¶ added in v1.3.1
WithDetails returns a new Event with the given details and topic.
func (*Event) WithFailsExecution ¶ added in v1.3.1
WithFailsExecution returns a new Event with the given fails execution flag.
func (*Event) WithMessage ¶ added in v1.3.1
WithMessage returns a new Event with the given message and topic.
func (*Event) WithRetryable ¶ added in v1.3.1
WithRetryable returns a new Event with the given retryable flag.
type EventTopic ¶ added in v1.3.1
type EventTopic string
EventTopic is a high level categorisation that can be applied to an event. It should be a human-readable string with no dynamic content. They are used to disambiguate events from the same component occurring in different contexts. For example, an event emitted by S3 storage used as an input source and the same event emitted by S3 storage used as a publisher would be tagged with different topics.
EventTopics do not need to conform to a centralized list – each module should use event topics that make sense for their own logic. Event topics SHOULD be unique.
type Execution ¶
type Execution struct { // ID of the execution (UUID) ID string `json:"ID"` // Namespace is the namespace the execution is created in Namespace string `json:"Namespace"` // ID of the evaluation that generated this execution EvalID string `json:"EvalID"` // Name is a logical name of the execution. Name string `json:"Name"` // NodeID is the node this is being placed on NodeID string `json:"NodeID"` // Job is the parent job of the task being allocated. // This is copied at execution time to avoid issues if the job // definition is updated. JobID string `json:"JobID"` // TODO: evaluate using a copy of the job instead of a pointer Job *Job `json:"Job,omitempty"` // AllocatedResources is the total resources allocated for the execution tasks. AllocatedResources *AllocatedResources `json:"AllocatedResources"` // DesiredState of the execution on the compute node DesiredState State[ExecutionDesiredStateType] `json:"DesiredState"` // ComputeState observed state of the execution on the compute node ComputeState State[ExecutionStateType] `json:"ComputeState"` // the published results for this execution PublishedResult *SpecConfig `json:"PublishedResult"` // RunOutput is the output of the run command // TODO: evaluate removing this from execution spec in favour of calling `bacalhau logs` RunOutput *RunCommandResult `json:"RunOutput"` // PreviousExecution is the execution that this execution is replacing PreviousExecution string `json:"PreviousExecution"` // NextExecution is the execution that this execution is being replaced by NextExecution string `json:"NextExecution"` // FollowupEvalID captures a follow up evaluation created to handle a failed execution // that can be rescheduled in the future FollowupEvalID string `json:"FollowupEvalID"` // Revision is increment each time the execution is updated. Revision uint64 `json:"Revision"` // CreateTime is the time the execution has finished scheduling and been // verified by the plan applier. CreateTime int64 `json:"CreateTime"` // ModifyTime is the time the execution was last updated. ModifyTime int64 `json:"ModifyTime"` }
Execution is used to allocate the placement of a task group to a node.
func (*Execution) AllocateResources ¶
AllocateResources allocates resources to a task
func (*Execution) GetCreateTime ¶ added in v1.1.0
GetCreateTime returns the creation time
func (*Execution) GetModifyTime ¶ added in v1.1.0
GetModifyTime returns the modify time
func (*Execution) IsDiscarded ¶
IsDiscarded returns true if the execution has failed, been cancelled or rejected.
func (*Execution) IsExpired ¶ added in v1.3.1
IsExpired returns true if the execution is still running beyond the expiration time We return true if the execution is in the bid accepted state (i.e. running) and the modify time is older than the expiration time
func (*Execution) IsTerminalComputeState ¶
IsTerminalComputeState returns true if the execution observed state is terminal
func (*Execution) IsTerminalDesiredState ¶
IsTerminalDesiredState returns true if the execution desired state is terminal
func (*Execution) IsTerminalState ¶
IsTerminalState returns true if the execution desired of observed state is terminal
func (*Execution) JobNamespacedID ¶
func (e *Execution) JobNamespacedID() NamespacedID
func (*Execution) Normalize ¶
func (e *Execution) Normalize()
Normalize Allocation to ensure fields are initialized to the expectations of this version of Bacalhau. Should be called when restoring persisted Executions or receiving Executions from Bacalhau clients potentially on an older version of Bacalhau.
func (*Execution) TotalAllocatedResources ¶
type ExecutionDesiredStateType ¶
type ExecutionDesiredStateType int
const ( ExecutionDesiredStatePending ExecutionDesiredStateType = iota ExecutionDesiredStateRunning ExecutionDesiredStateStopped )
func (ExecutionDesiredStateType) String ¶
func (i ExecutionDesiredStateType) String() string
type ExecutionLog ¶ added in v1.2.2
type ExecutionLog struct { Type ExecutionLogType Line string }
type ExecutionLogType ¶ added in v1.2.2
type ExecutionLogType int
const ( ExecutionLogTypeSTDOUT ExecutionLogType ExecutionLogTypeSTDERR )
type ExecutionStateType ¶
type ExecutionStateType int
ExecutionStateType The state of an execution. An execution represents a single attempt to execute a job on a node. A compute node can have multiple executions for the same job due to retries, but there can only be a single active execution per node at any given time.
const ( ExecutionStateUndefined ExecutionStateType = iota // ExecutionStateNew The execution has been created, but not pushed to a compute node yet. ExecutionStateNew // ExecutionStateAskForBid A node has been selected to execute a job, and is being asked to bid on the job. ExecutionStateAskForBid // ExecutionStateAskForBidAccepted compute node has rejected the ask for bid. ExecutionStateAskForBidAccepted // ExecutionStateAskForBidRejected compute node has rejected the ask for bid. ExecutionStateAskForBidRejected // ExecutionStateBidAccepted requester has accepted the bid, and the execution is expected to be running on the compute node. ExecutionStateBidAccepted // aka running // ExecutionStateBidRejected requester has rejected the bid. ExecutionStateBidRejected // ExecutionStateCompleted The execution has been completed, and the result has been published. ExecutionStateCompleted // ExecutionStateFailed The execution has failed. ExecutionStateFailed // ExecutionStateCancelled The execution has been canceled by the user ExecutionStateCancelled )
TODO: change states to reflect non-bidding scheduling
func (ExecutionStateType) IsTermainl ¶ added in v1.3.1
func (s ExecutionStateType) IsTermainl() bool
func (ExecutionStateType) IsUndefined ¶
func (s ExecutionStateType) IsUndefined() bool
IsUndefined returns true if the execution state is undefined
func (ExecutionStateType) String ¶
func (i ExecutionStateType) String() string
type FailureInjectionComputeConfig ¶
type FailureInjectionComputeConfig struct {
IsBadActor bool
}
type FailureInjectionRequesterConfig ¶
type FailureInjectionRequesterConfig struct {
IsBadActor bool `yaml:"IsBadActor"`
}
type GPU ¶ added in v1.2.0
type GPU struct { // Self-reported index of the device in the system Index uint64 // Model name of the GPU e.g. Tesla T4 Name string // Maker of the GPU, e.g. NVidia, AMD, Intel Vendor GPUVendor // Total GPU memory in mebibytes (MiB) Memory uint64 // PCI address of the device, in the format AAAA:BB:CC.C // Used to discover the correct device rendering cards PCIAddress string }
type HasDetails ¶ added in v1.3.1
type HasFailsExecution ¶ added in v1.3.1
type HasFailsExecution interface { // FailsExecution Whether this error means that the associated execution cannot // continue. // // If a component raises an error with FailsExecution() as true, // the hosting executor should report the execution as failed and stop any // further steps. FailsExecution() bool }
type HasHint ¶ added in v1.3.1
type HasHint interface { // Hint A human-readable string that advises the user on how they might solve the error. Hint() string }
type HasRetryable ¶ added in v1.3.1
type HasRetryable interface { // Retryable Whether the error could be retried, assuming the same input and // node configuration; i.e. the error is transient and due to network // capacity or service outage. // // If a component raises an error with Retryable() as true, the system may // retry the last action after some length of time. If it is false, it // should not try the action again, and may choose an alternative action or // fail the job. Retryable() bool }
type InputSource ¶
type InputSource struct { // Source is the source of the artifact to be downloaded, e.g a URL, S3 bucket, etc. Source *SpecConfig `json:"Source"` // Alias is an optional reference to this input source that can be used for // dynamic linking to this input. (e.g. dynamic import in wasm by alias) Alias string `json:"Alias"` // Target is the path where the artifact should be mounted on Target string `json:"Target"` }
func (*InputSource) Copy ¶
func (a *InputSource) Copy() *InputSource
Copy returns a deep copy of the artifact
func (*InputSource) MarshalZerologObject ¶ added in v1.2.2
func (a *InputSource) MarshalZerologObject(e *zerolog.Event)
func (*InputSource) Normalize ¶
func (a *InputSource) Normalize()
Normalize normalizes the artifact's source and target
func (*InputSource) Validate ¶
func (a *InputSource) Validate() error
Validate validates the artifact's source and target
type Job ¶
type Job struct { // ID is a unique identifier assigned to this job. // It helps to distinguish jobs with the same name after they have been deleted and re-created. // The ID is generated by the server and should not be set directly by the client. ID string `json:"ID"` // Name is the logical name of the job used to refer to it. // Submitting a job with the same name as an existing job will result in an // update to the existing job. Name string `json:"Name"` // Namespace is the namespace this job is running in. Namespace string `json:"Namespace"` // Type is the type of job this is, e.g. "daemon" or "batch". Type string `json:"Type"` // Priority defines the scheduling priority of this job. Priority int `json:"Priority"` // Count is the number of replicas that should be scheduled. Count int `json:"Count"` // Constraints is a selector which must be true for the compute node to run this job. Constraints []*LabelSelectorRequirement `json:"Constraints"` // Meta is used to associate arbitrary metadata with this job. Meta map[string]string `json:"Meta"` // Labels is used to associate arbitrary labels with this job, which can be used // for filtering. // key=value Labels map[string]string `json:"Labels"` Tasks []*Task `json:"Tasks"` // State is the current state of the job. State State[JobStateType] `json:"State"` // Version is a per-job monotonically increasing version number that is incremented // on each job specification update. Version uint64 `json:"Version"` // Revision is a per-job monotonically increasing revision number that is incremented // on each update to the job's state or specification Revision uint64 `json:"Revision"` CreateTime int64 `json:"CreateTime"` ModifyTime int64 `json:"ModifyTime"` }
func (*Job) AllStorageTypes ¶
AllStorageTypes returns keys of all storage types required by the job
func (*Job) Copy ¶
Copy returns a deep copy of the Job. It is expected that callers use recover. This job can panic if the deep copy failed as it uses reflection.
func (*Job) GetCreateTime ¶
GetCreateTime returns the creation time
func (*Job) GetModifyTime ¶
GetModifyTime returns the modify time
func (*Job) IsExpired ¶ added in v1.4.0
IsExpired returns true if the job is still running beyond the expiration time
func (*Job) IsLongRunning ¶ added in v1.1.0
IsLongRunning returns true if the job is long running
func (*Job) IsTerminal ¶
IsTerminal returns true if the job is in a terminal state
func (*Job) MetricAttributes ¶ added in v1.2.1
func (*Job) NamespacedID ¶
func (j *Job) NamespacedID() NamespacedID
NamespacedID returns the namespaced id useful for logging
func (*Job) Normalize ¶
func (j *Job) Normalize()
Normalize is used to canonicalize fields in the Job. This should be called when registering a Job.
func (*Job) SanitizeSubmission ¶ added in v1.1.0
SanitizeSubmission is used to sanitize a job for reasonable configuration when it is submitted.
func (*Job) ValidateSubmission ¶
ValidateSubmission is used to check a job for reasonable configuration when it is submitted. It is a subset of Validate that does not check fields with defaults, such as job ID
type JobEvent ¶ added in v1.5.0
type JobEvent struct { JobID string `json:"JobID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` // compute execution identifier ExecutionID string `json:"ExecutionID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` // the node that emitted this event SourceNodeID string `json:"SourceNodeID,omitempty" example:"QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF"` // the node that this event is for // e.g. "AcceptJobBid" was emitted by Requester but it targeting compute node TargetNodeID string `json:"TargetNodeID,omitempty" example:"QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVL"` EventName JobEventType `json:"EventName,omitempty"` Status string `json:"Status,omitempty" example:"Got results proposal of length: 0"` EventTime time.Time `json:"EventTime,omitempty" example:"2022-11-17T13:32:55.756658941Z"` }
TODO remove this https://github.com/bacalhau-project/bacalhau/issues/4185
type JobEventType ¶ added in v1.5.0
type JobEventType int
const ( // Job has been created on the requester node JobEventCreated JobEventType // a compute node bid on a job JobEventBid // a requester node accepted for rejected a job bid JobEventBidAccepted JobEventBidRejected // a compute node had an error running a job JobEventComputeError // a compute node completed running a job JobEventResultsProposed // a Requester node accepted the results from a node for a job JobEventResultsAccepted // a Requester node rejected the results from a node for a job JobEventResultsRejected // once the results have been accepted or rejected // the compute node will publish them and issue this event JobEventResultsPublished // a requester node declared an error running a job JobEventError // a user canceled a job JobEventCanceled // a job has been completed JobEventCompleted )
func JobEventTypes ¶ added in v1.5.0
func JobEventTypes() []JobEventType
func ParseJobEventType ¶ added in v1.5.0
func ParseJobEventType(str string) (JobEventType, error)
func (JobEventType) IsTerminal ¶ added in v1.5.0
func (je JobEventType) IsTerminal() bool
IsTerminal returns true if the given event type signals the end of the lifecycle of a job. After this, all nodes can safely ignore the job.
func (JobEventType) IsUndefined ¶ added in v1.5.0
func (je JobEventType) IsUndefined() bool
func (JobEventType) MarshalText ¶ added in v1.5.0
func (je JobEventType) MarshalText() ([]byte, error)
func (JobEventType) String ¶ added in v1.5.0
func (i JobEventType) String() string
func (*JobEventType) UnmarshalText ¶ added in v1.5.0
func (je *JobEventType) UnmarshalText(text []byte) (err error)
type JobHistory ¶
type JobHistory struct { Type JobHistoryType `json:"Type"` JobID string `json:"JobID"` NodeID string `json:"NodeID,omitempty"` ExecutionID string `json:"ExecutionID,omitempty"` JobState *StateChange[JobStateType] `json:"JobState,omitempty"` ExecutionState *StateChange[ExecutionStateType] `json:"ExecutionState,omitempty"` NewRevision uint64 `json:"NewRevision"` Comment string `json:"Comment,omitempty"` Event Event `json:"Event,omitempty"` Time time.Time `json:"Time"` }
JobHistory represents a single event in the history of a job. An event can be at the job level, or execution (node) level.
{Job,Event}State fields will only be present if the Type field is of the matching type.
func (JobHistory) Occurred ¶ added in v1.3.1
func (jh JobHistory) Occurred() time.Time
Occurred returns when the action that triggered an update to job history actually occurred.
The Time field represents the moment that the JobHistory item was recorded, i.e. it is almost always set to time.Now() when creating the object. This is different to the Event.Timestamp which represents when the source of the history update actually occurred.
type JobHistoryType ¶
type JobHistoryType int
const ( JobHistoryTypeUndefined JobHistoryType = iota JobHistoryTypeJobLevel JobHistoryTypeExecutionLevel )
func (JobHistoryType) MarshalText ¶
func (s JobHistoryType) MarshalText() ([]byte, error)
func (JobHistoryType) String ¶
func (i JobHistoryType) String() string
func (*JobHistoryType) UnmarshalText ¶
func (s *JobHistoryType) UnmarshalText(text []byte) (err error)
type JobSelectionDataLocality ¶ added in v1.5.0
type JobSelectionDataLocality int64
Job selection policy configuration
const ( Local JobSelectionDataLocality = 0 // local Anywhere JobSelectionDataLocality = 1 // anywhere )
func ParseJobSelectionDataLocality ¶ added in v1.5.0
func ParseJobSelectionDataLocality(s string) (ret JobSelectionDataLocality, err error)
func (JobSelectionDataLocality) MarshalYAML ¶
func (i JobSelectionDataLocality) MarshalYAML() (interface{}, error)
func (JobSelectionDataLocality) String ¶ added in v1.5.0
func (i JobSelectionDataLocality) String() string
func (*JobSelectionDataLocality) UnmarshalText ¶ added in v1.5.0
func (i *JobSelectionDataLocality) UnmarshalText(text []byte) error
func (*JobSelectionDataLocality) UnmarshalYAML ¶
func (i *JobSelectionDataLocality) UnmarshalYAML(value *yaml.Node) error
type JobSelectionPolicy ¶ added in v1.5.0
type JobSelectionPolicy struct { // this describes if we should run a job based on // where the data is located - i.e. if the data is "local" // or if the data is "anywhere" Locality JobSelectionDataLocality `json:"locality" yaml:"Locality"` // should we reject jobs that don't specify any data // the default is "accept" RejectStatelessJobs bool `json:"reject_stateless_jobs" yaml:"RejectStatelessJobs"` // should we accept jobs that specify networking // the default is "reject" AcceptNetworkedJobs bool `json:"accept_networked_jobs" yaml:"AcceptNetworkedJobs"` // external hooks that decide if we should take on the job or not // if either of these are given they will override the data locality settings ProbeHTTP string `json:"probe_http,omitempty" yaml:"ProbeHTTP"` ProbeExec string `json:"probe_exec,omitempty" yaml:"ProbeExec"` }
describe the rules for how a compute node selects an incoming job
type JobStateType ¶
type JobStateType int
const ( JobStateTypeUndefined JobStateType = iota // JobStateTypePending is the state of a job that has been submitted but not // yet scheduled. JobStateTypePending // JobStateTypeQueued is the state of a job that has been evaluated but no // matching nodes are available yet. JobStateTypeQueued // JobStateTypeRunning is the state of a job that has been scheduled, with at // least one active execution. JobStateTypeRunning // JobStateTypeCompleted is the state of a job that has successfully completed. // Only valid for batch jobs. JobStateTypeCompleted // JobStateTypeFailed is the state of a job that has failed. JobStateTypeFailed // JobStateTypeStopped is the state of a job that has been stopped by the user. JobStateTypeStopped )
func JobStateTypes ¶ added in v1.1.0
func JobStateTypes() []JobStateType
func (JobStateType) IsTerminal ¶ added in v1.3.2
func (s JobStateType) IsTerminal() bool
IsTerminal returns true if the job state is terminal
func (JobStateType) IsUndefined ¶
func (s JobStateType) IsUndefined() bool
IsUndefined returns true if the job state is undefined
func (JobStateType) MarshalText ¶ added in v1.1.0
func (s JobStateType) MarshalText() ([]byte, error)
func (JobStateType) String ¶
func (i JobStateType) String() string
func (*JobStateType) UnmarshalText ¶ added in v1.1.0
func (s *JobStateType) UnmarshalText(text []byte) (err error)
type LabelSelectorRequirement ¶
type LabelSelectorRequirement struct { // key is the label key that the selector applies to. Key string `json:"Key"` // operator represents a key's relationship to a set of values. // Valid operators are In, NotIn, Exists and DoesNotExist. Operator selection.Operator `json:"Operator"` // values is an array of string values. If the operator is In or NotIn, // the values array must be non-empty. If the operator is Exists or DoesNotExist, // the values array must be empty. This array is replaced during a strategic Values []string `json:"Values,omitempty"` }
LabelSelectorRequirement A selector that contains values, a key, and an operator that relates the key and values. These are based on labels library from kubernetes package. While we use labels.Requirement to represent the label selector requirements in the command line arguments as the library supports multiple parsing formats, and we also use it when matching selectors to labels as that's what the library expects, labels.Requirements are not serializable, so we need to convert them to LabelSelectorRequirements.
func ToLabelSelectorRequirements ¶
func ToLabelSelectorRequirements(requirements ...labels.Requirement) []LabelSelectorRequirement
func (*LabelSelectorRequirement) Copy ¶
func (r *LabelSelectorRequirement) Copy() *LabelSelectorRequirement
func (*LabelSelectorRequirement) String ¶
func (r *LabelSelectorRequirement) String() string
func (*LabelSelectorRequirement) Validate ¶
func (r *LabelSelectorRequirement) Validate() error
type LabelsProvider ¶ added in v1.2.1
func MergeLabelsInOrder ¶ added in v1.2.1
func MergeLabelsInOrder(providers ...LabelsProvider) LabelsProvider
type NamespacedID ¶
NamespacedID is a tuple of an ID and a namespace
func NewNamespacedID ¶
func NewNamespacedID(id, ns string) NamespacedID
NewNamespacedID returns a new namespaced ID given the ID and namespace
func (NamespacedID) String ¶
func (n NamespacedID) String() string
type Network ¶
type Network int
const ( // NetworkNone specifies that the job does not require networking. NetworkNone Network = iota // NetworkFull specifies that the job requires unfiltered raw IP networking. NetworkFull // NetworkHTTP specifies that the job requires HTTP networking to certain domains. // // The model is: the job specifier submits a job with the domain(s) it will // need to communicate with, the compute provider uses this to make some // decision about the risk of the job and bids accordingly, and then at run // time the traffic is limited to only the domain(s) specified. // // As a command, something like: // // bacalhau docker run —network=http —domain=crates.io —domain=github.com -i ipfs://Qmy1234myd4t4,dst=/code rust/compile // // The “risk” for the compute provider is that the job does something that // violates its terms, the terms of its hosting provider or ISP, or even the // law in its jurisdiction (e.g. accessing and spreading illegal content, // performing cyberattacks). So the same sort of risk as operating a Tor // exit node. // // The risk for the job specifier is that we are operating in an environment // they are paying for, so there is an incentive to hijack that environment // (e.g. via a compromised package download that runs a crypto miner on // install, and uses up all the paid-for job time). Having the traffic // enforced to only domains specified makes those sorts of attacks much // trickier and less valuable. // // The compute provider might well enforce its limits by other means, but // having the domains specified up front allows it to skip bidding on jobs // it knows will fail in its executor. So this is hopefully a better UX for // job specifiers who can have their job picked up only by someone who will // run it successfully. NetworkHTTP )
func ParseNetwork ¶
func (Network) MarshalText ¶
func (*Network) UnmarshalText ¶
type NetworkConfig ¶
type NetworkConfig struct { Type Network `json:"Type"` Domains []string `json:"Domains,omitempty"` }
func (*NetworkConfig) Copy ¶
func (n *NetworkConfig) Copy() *NetworkConfig
func (*NetworkConfig) Disabled ¶
func (n *NetworkConfig) Disabled() bool
Disabled returns whether network connections should be completely disabled according to this config.
func (*NetworkConfig) DomainSet ¶
func (n *NetworkConfig) DomainSet() []string
DomainSet returns the "unique set" of domains from the network config. Domains listed multiple times and any subdomain that is also matched by a wildcard is removed.
This is something of an implementation detail – it matches the behavior expected by our Docker HTTP gateway, which complains and/or fails to start if these requirements are not met.
func (*NetworkConfig) Normalize ¶
func (n *NetworkConfig) Normalize()
Normalize ensures that the network config is in a consistent state.
func (*NetworkConfig) Validate ¶
func (n *NetworkConfig) Validate() (err error)
Validate returns an error if any of the fields do not pass validation, or nil otherwise.
type NetworkConfigBuilder ¶
type NetworkConfigBuilder struct {
// contains filtered or unexported fields
}
func NewNetworkConfigBuilder ¶
func NewNetworkConfigBuilder() *NetworkConfigBuilder
func NewNetworkConfigBuilderFromNetwork ¶
func NewNetworkConfigBuilderFromNetwork(network *NetworkConfig) *NetworkConfigBuilder
func (*NetworkConfigBuilder) Build ¶
func (b *NetworkConfigBuilder) Build() (*NetworkConfig, error)
func (*NetworkConfigBuilder) BuildOrDie ¶
func (b *NetworkConfigBuilder) BuildOrDie() *NetworkConfig
BuildOrDie is a helper that wraps Build and panics on error.
func (*NetworkConfigBuilder) Domains ¶
func (b *NetworkConfigBuilder) Domains(domains ...string) *NetworkConfigBuilder
func (*NetworkConfigBuilder) Type ¶
func (b *NetworkConfigBuilder) Type(typ Network) *NetworkConfigBuilder
type NodeConnectionState ¶ added in v1.3.1
type NodeConnectionState struct {
// contains filtered or unexported fields
}
TODO if we ever pass a pointer to this type and use `==` comparison on it we're gonna have a bad time implement an `Equal()` method for this type and default to it.
func ParseConnection ¶ added in v1.3.1
func ParseConnection(a any) NodeConnectionState
func (NodeConnectionState) MarshalJSON ¶ added in v1.3.1
func (s NodeConnectionState) MarshalJSON() ([]byte, error)
func (*NodeConnectionState) UnmarshalJSON ¶ added in v1.3.1
func (s *NodeConnectionState) UnmarshalJSON(b []byte) error
type NodeInfo ¶
type NodeInfo struct { // TODO replace all access on this field with the `ID()` method NodeID string `json:"NodeID"` NodeType NodeType `json:"NodeType"` Labels map[string]string `json:"Labels"` ComputeNodeInfo *ComputeNodeInfo `json:"ComputeNodeInfo,omitempty" yaml:",omitempty"` BacalhauVersion BuildVersionInfo `json:"BacalhauVersion"` }
NodeInfo contains metadata about a node on the network. Compute nodes share their NodeInfo with Requester nodes to further its view of the networks conditions. ComputeNodeInfo is non-nil iff the NodeType is NodeTypeCompute. TODO(walid): add Validate() method to NodeInfo and make sure it is called in all the places where it is initialized
func (NodeInfo) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
type NodeInfoDecorator ¶ added in v1.2.1
type NodeMembershipState ¶ added in v1.3.1
type NodeMembershipState struct {
// contains filtered or unexported fields
}
TODO if we ever pass a pointer to this type and use `==` comparison on it we're gonna have a bad time implement an `Equal()` method for this type and default to it.
func Parse ¶ added in v1.2.2
func Parse(a any) NodeMembershipState
func (NodeMembershipState) IsUndefined ¶ added in v1.3.2
func (t NodeMembershipState) IsUndefined() bool
func (NodeMembershipState) MarshalJSON ¶ added in v1.3.1
func (t NodeMembershipState) MarshalJSON() ([]byte, error)
func (*NodeMembershipState) UnmarshalJSON ¶ added in v1.3.1
func (t *NodeMembershipState) UnmarshalJSON(b []byte) error
type NodeState ¶ added in v1.3.1
type NodeState struct { Info NodeInfo `json:"Info"` Membership NodeMembershipState `json:"Membership"` Connection NodeConnectionState `json:"Connection"` }
NodeState contains metadata about the state of a node on the network. Requester nodes maintain a NodeState for each node they are aware of. The NodeState represents a Requester nodes view of another node on the network.
type NodeStateProvider ¶ added in v1.3.1
type NodeType ¶
type NodeType int
const ( NodeTypeRequester NodeType NodeTypeCompute )
func ParseNodeType ¶
func (NodeType) MarshalText ¶ added in v1.1.0
func (*NodeType) UnmarshalText ¶ added in v1.1.0
type NoopNodeInfoDecorator ¶ added in v1.2.1
type NoopNodeInfoDecorator struct{}
NoopNodeInfoDecorator is a decorator that does nothing
func (NoopNodeInfoDecorator) DecorateNodeInfo ¶ added in v1.2.1
func (n NoopNodeInfoDecorator) DecorateNodeInfo(ctx context.Context, nodeInfo NodeInfo) NodeInfo
type Normalizable ¶
type Normalizable interface {
Normalize()
}
Normalizable is an interface for types that can be normalized (e.g. empty maps are converted to nil)
type PagingToken ¶ added in v1.2.2
func NewPagingToken ¶ added in v1.2.2
func NewPagingToken(params *PagingTokenParams) *PagingToken
func NewPagingTokenFromString ¶ added in v1.2.2
func NewPagingTokenFromString(s string) (*PagingToken, error)
func (*PagingToken) RawString ¶ added in v1.2.2
func (pagingToken *PagingToken) RawString() string
func (*PagingToken) String ¶ added in v1.2.2
func (pagingToken *PagingToken) String() string
String returns the token as a base 64 encoded string where each field is delimited.
type PagingTokenParams ¶ added in v1.2.2
type Plan ¶
type Plan struct { EvalID string `json:"EvalID"` EvalReceipt string `json:"EvalReceipt"` // TODO: passing the evalID should be enough once we persist evaluations Eval *Evaluation `json:"Eval,omitempty"` Priority int `json:"Priority"` Job *Job `json:"Job,omitempty"` DesiredJobState JobStateType `json:"DesiredJobState,omitempty"` Event Event `json:"Event,omitempty"` // NewExecutions holds the executions to be created. NewExecutions []*Execution `json:"NewExecutions,omitempty"` UpdatedExecutions map[string]*PlanExecutionDesiredUpdate `json:"UpdatedExecutions,omitempty"` // NewEvaluations holds the evaluations to be created, such as delayed evaluations when no nodes are available. NewEvaluations []*Evaluation `json:"NewEvaluations,omitempty"` }
Plan holds actions as a result of processing an evaluation by the scheduler.
func (*Plan) AppendApprovedExecution ¶
AppendApprovedExecution marks an execution as accepted and ready to be started.
func (*Plan) AppendEvaluation ¶ added in v1.3.2
func (p *Plan) AppendEvaluation(eval *Evaluation)
AppendEvaluation appends the evaluation to the plan evaluations.
func (*Plan) AppendExecution ¶
AppendExecution appends the execution to the plan executions.
func (*Plan) AppendStoppedExecution ¶
AppendStoppedExecution marks an execution to be stopped.
func (*Plan) IsJobFailed ¶ added in v1.3.2
IsJobFailed returns true if the plan is marking the job as failed
func (*Plan) MarkJobCompleted ¶
func (p *Plan) MarkJobCompleted()
func (*Plan) MarkJobFailed ¶
func (*Plan) MarkJobQueued ¶ added in v1.4.0
MarkJobQueued marks the job as pending.
func (*Plan) MarkJobRunningIfEligible ¶ added in v1.1.3
func (p *Plan) MarkJobRunningIfEligible()
MarkJobRunningIfEligible updates the job state to "Running" under certain conditions.
type PlanExecutionDesiredUpdate ¶
type PlanExecutionDesiredUpdate struct { Execution *Execution `json:"Execution"` DesiredState ExecutionDesiredStateType `json:"DesiredState"` Event Event `json:"Event"` }
type Resources ¶
type Resources struct { // CPU units CPU float64 `json:"CPU,omitempty"` // Memory in bytes Memory uint64 `json:"Memory,omitempty"` // Disk in bytes Disk uint64 `json:"Disk,omitempty"` // GPU units GPU uint64 `json:"GPU,omitempty"` // GPU details GPUs []GPU `json:"GPUs,omitempty"` }
func (*Resources) LessThanEq ¶
type ResourcesConfig ¶
type ResourcesConfig struct { // CPU https://github.com/BTBurke/k8sresource string CPU string `json:"CPU,omitempty"` // Memory github.com/dustin/go-humanize string Memory string `json:"Memory,omitempty"` // Memory github.com/dustin/go-humanize string Disk string `json:"Disk,omitempty"` GPU string `json:"GPU,omitempty"` }
func (*ResourcesConfig) Copy ¶
func (r *ResourcesConfig) Copy() *ResourcesConfig
Copy returns a deep copy of the resources
func (*ResourcesConfig) Normalize ¶
func (r *ResourcesConfig) Normalize()
Normalize normalizes the resources
func (*ResourcesConfig) ToResources ¶
func (r *ResourcesConfig) ToResources() (*Resources, error)
ToResources converts the resources config to resources
func (*ResourcesConfig) Validate ¶
func (r *ResourcesConfig) Validate() error
Validate returns an error if the resources are invalid
type ResourcesConfigBuilder ¶
type ResourcesConfigBuilder struct {
// contains filtered or unexported fields
}
func NewResourcesConfigBuilder ¶
func NewResourcesConfigBuilder() *ResourcesConfigBuilder
func (*ResourcesConfigBuilder) Build ¶
func (r *ResourcesConfigBuilder) Build() (*ResourcesConfig, error)
func (*ResourcesConfigBuilder) BuildOrDie ¶
func (r *ResourcesConfigBuilder) BuildOrDie() *ResourcesConfig
BuildOrDie is the same as Build, but panics if an error occurs
func (*ResourcesConfigBuilder) CPU ¶
func (r *ResourcesConfigBuilder) CPU(cpu string) *ResourcesConfigBuilder
func (*ResourcesConfigBuilder) Disk ¶
func (r *ResourcesConfigBuilder) Disk(disk string) *ResourcesConfigBuilder
func (*ResourcesConfigBuilder) GPU ¶
func (r *ResourcesConfigBuilder) GPU(gpu string) *ResourcesConfigBuilder
func (*ResourcesConfigBuilder) Memory ¶
func (r *ResourcesConfigBuilder) Memory(memory string) *ResourcesConfigBuilder
type ResultPath ¶
type ResultPath struct { // Name Name string `json:"Name"` // The path to the file/dir Path string `json:"Path"` }
func (*ResultPath) Normalize ¶
func (p *ResultPath) Normalize()
Normalize normalizes the path to a canonical form
type RunCommandResult ¶
type RunCommandResult struct { // stdout of the run. Yaml provided for `describe` output STDOUT string `json:"Stdout"` // bool describing if stdout was truncated StdoutTruncated bool `json:"StdoutTruncated"` // stderr of the run. STDERR string `json:"stderr"` // bool describing if stderr was truncated StderrTruncated bool `json:"StderrTruncated"` // exit code of the run. ExitCode int `json:"ExitCode"` // Runner error ErrorMsg string `json:"ErrorMsg"` }
func NewRunCommandResult ¶
func NewRunCommandResult() *RunCommandResult
type SpecConfig ¶
type SpecConfig struct { // Type of the config Type string `json:"Type"` // Params is a map of the config params Params map[string]interface{} `json:"Params,omitempty"` }
func NewSpecConfig ¶
func NewSpecConfig(t string) *SpecConfig
NewSpecConfig returns a new spec config
func (*SpecConfig) Copy ¶
func (s *SpecConfig) Copy() *SpecConfig
Copy returns a shallow copy of the spec config TODO: implement deep copy if the value is a nested map, slice or Copyable
func (*SpecConfig) IsEmpty ¶ added in v1.1.0
func (s *SpecConfig) IsEmpty() bool
IsEmpty returns true if the spec config is empty
func (*SpecConfig) IsType ¶
func (s *SpecConfig) IsType(t string) bool
IsType returns true if the current SpecConfig
func (*SpecConfig) MarshalZerologObject ¶ added in v1.2.2
func (s *SpecConfig) MarshalZerologObject(e *zerolog.Event)
func (*SpecConfig) MetricAttributes ¶ added in v1.2.2
func (s *SpecConfig) MetricAttributes() []attribute.KeyValue
func (*SpecConfig) Normalize ¶
func (s *SpecConfig) Normalize()
func (*SpecConfig) Validate ¶
func (s *SpecConfig) Validate() error
func (*SpecConfig) ValidateAllowBlank ¶ added in v1.1.0
func (s *SpecConfig) ValidateAllowBlank() error
ValidateAllowBlank is the same as Validate but allows blank types. This is useful for when you want to validate a spec config that is optional.
func (*SpecConfig) WithParam ¶
func (s *SpecConfig) WithParam(key string, value interface{}) *SpecConfig
WithParam adds a param to the spec config
type State ¶
type State[T any] struct { // StateType is the current state of the object. StateType T `json:"StateType"` // Message is a human readable message describing the state. Message string `json:"Message,omitempty"` }
State is a generic struct for representing the state of an object, with an optional human readable message.
func NewExecutionDesiredState ¶
func NewExecutionDesiredState(stateType ExecutionDesiredStateType) State[ExecutionDesiredStateType]
NewExecutionDesiredState returns a new ExecutionDesiredStateType with the specified state type
func NewExecutionState ¶
func NewExecutionState(stateType ExecutionStateType) State[ExecutionStateType]
NewExecutionState returns a new ExecutionState with the specified state type
func NewJobState ¶
func NewJobState(stateType JobStateType) State[JobStateType]
NewJobState returns a new JobState with the specified state type
func (State[T]) WithMessage ¶
WithMessage returns a new State with the specified message.
type StateChange ¶
type StateChange[StateType any] struct { Previous StateType `json:"Previous,omitempty"` New StateType `json:"New,omitempty"` }
StateChange represents a change in state of one of the state types.
type Task ¶
type Task struct { // Name of the task Name string `json:"Name"` Engine *SpecConfig `json:"Engine"` Publisher *SpecConfig `json:"Publisher"` // Map of environment variables to be used by the driver Env map[string]string `json:"Env,omitempty"` // Meta is used to associate arbitrary metadata with this task. Meta map[string]string `json:"Meta,omitempty"` // InputSources is a list of remote artifacts to be downloaded before running the task // and mounted into the task. InputSources []*InputSource `json:"InputSources,omitempty"` // ResultPaths is a list of task volumes to be included in the task's published result ResultPaths []*ResultPath `json:"ResultPaths,omitempty"` // ResourcesConfig is the resources needed by this task ResourcesConfig *ResourcesConfig `json:"Resources,omitempty"` Network *NetworkConfig `json:"Network,omitempty"` Timeouts *TimeoutConfig `json:"Timeouts,omitempty"` }
func (*Task) AllStorageTypes ¶
func (*Task) MetricAttributes ¶ added in v1.2.1
func (*Task) ToBuilder ¶
func (t *Task) ToBuilder() *TaskBuilder
ToBuilder returns a new task builder with the same values as the task
func (*Task) ValidateSubmission ¶ added in v1.1.0
ValidateSubmission is used to check a task for reasonable configuration when it is submitted. It is a subset of Validate that does not check fields with defaults, such as timeouts and resources.
type TaskBuilder ¶
type TaskBuilder struct {
// contains filtered or unexported fields
}
func NewTaskBuilder ¶
func NewTaskBuilder() *TaskBuilder
func NewTaskBuilderFromTask ¶
func NewTaskBuilderFromTask(task *Task) *TaskBuilder
func (*TaskBuilder) Build ¶
func (b *TaskBuilder) Build() (*Task, error)
func (*TaskBuilder) BuildOrDie ¶
func (b *TaskBuilder) BuildOrDie() *Task
BuildOrDie is the same as Build, but panics if an error occurs
func (*TaskBuilder) Engine ¶
func (b *TaskBuilder) Engine(engine *SpecConfig) *TaskBuilder
func (*TaskBuilder) InputSources ¶
func (b *TaskBuilder) InputSources(inputSources ...*InputSource) *TaskBuilder
func (*TaskBuilder) Meta ¶ added in v1.2.1
func (b *TaskBuilder) Meta(key string, value string) *TaskBuilder
func (*TaskBuilder) Name ¶
func (b *TaskBuilder) Name(name string) *TaskBuilder
func (*TaskBuilder) Network ¶
func (b *TaskBuilder) Network(network *NetworkConfig) *TaskBuilder
func (*TaskBuilder) Publisher ¶
func (b *TaskBuilder) Publisher(publisher *SpecConfig) *TaskBuilder
func (*TaskBuilder) ResourcesConfig ¶
func (b *TaskBuilder) ResourcesConfig(resourcesConfig *ResourcesConfig) *TaskBuilder
func (*TaskBuilder) ResultPaths ¶
func (b *TaskBuilder) ResultPaths(resultPaths ...*ResultPath) *TaskBuilder
func (*TaskBuilder) Timeouts ¶
func (b *TaskBuilder) Timeouts(timeouts *TimeoutConfig) *TaskBuilder
type TimeoutConfig ¶
type TimeoutConfig struct { // ExecutionTimeout is the maximum amount of time a task is allowed to run in seconds. // Zero means no timeout, such as for a daemon task. ExecutionTimeout int64 `json:"ExecutionTimeout,omitempty"` // QueueTimeout is the maximum amount of time a task is allowed to wait in the orchestrator // queue in seconds before being scheduled. Zero means no timeout. QueueTimeout int64 `json:"QueueTimeout,omitempty"` // TotalTimeout is the maximum amount of time a task is allowed to complete in seconds. // This includes the time spent in the queue, the time spent executing and the time spent retrying. // Zero means no timeout. TotalTimeout int64 `json:"TotalTimeout,omitempty"` }
TimeoutConfig is the configuration for timeout related settings, such as execution and shutdown timeouts.
func (*TimeoutConfig) Copy ¶
func (t *TimeoutConfig) Copy() *TimeoutConfig
Copy returns a deep copy of the timeout config.
func (*TimeoutConfig) GetExecutionTimeout ¶
func (t *TimeoutConfig) GetExecutionTimeout() time.Duration
GetExecutionTimeout returns the execution timeout duration Returns ExecutionTimeout if configured, otherwise returns TotalTimeout value, otherwise returns 0.
func (*TimeoutConfig) GetQueueTimeout ¶ added in v1.3.2
func (t *TimeoutConfig) GetQueueTimeout() time.Duration
GetQueueTimeout returns the queue timeout duration Returns QueueTimeout if configured, otherwise returns 0. We don't fallback to TotalTimeout to allow users to disable queueing if no nodes were available.
func (*TimeoutConfig) GetTotalTimeout ¶ added in v1.3.2
func (t *TimeoutConfig) GetTotalTimeout() time.Duration
GetTotalTimeout returns the total timeout duration
func (*TimeoutConfig) Validate ¶
func (t *TimeoutConfig) Validate() error
type Validatable ¶
type Validatable interface {
Validate() error
}
Validatable is an interface for types that can be validated
Source Files ¶
- buildversion.go
- constants.go
- debug.go
- engine.go
- error.go
- evaluation.go
- event.go
- execution.go
- execution_desired_state_string.go
- execution_log.go
- execution_state_string.go
- failure_injection.go
- input_source.go
- interfaces.go
- job.go
- job_event_string.go
- job_history.go
- job_history_string.go
- job_selection_pilicy.go
- job_string.go
- jobevent.go
- jobselectiondatalocality_string.go
- namespace.go
- network.go
- network_string.go
- node_approval.go
- node_connection.go
- node_info.go
- node_info_string.go
- node_state.go
- paging_token.go
- path.go
- plan.go
- resource.go
- selector.go
- spec_config.go
- state.go
- task.go
- task_builder.go
- timeout.go
- utils.go