models

package
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2025 License: Apache-2.0 Imports: 30 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// DefaultNamespace is the default namespace.
	DefaultNamespace = "default"

	// EnvVarPrefix is the prefix used for all Bacalhau executions environment variables
	EnvVarPrefix = "BACALHAU_"
)
View Source
const (
	// JobTypeService represents a long-running job that runs on a desired number of nodes
	// matching the specified constraints.
	JobTypeService = "service"

	// JobTypeDaemon represents a long-running job that runs on all nodes matching the
	// specified constraints.
	JobTypeDaemon = "daemon"

	// JobTypeBatch represents a batch job that runs to completion on the desired number
	// of nodes matching the specified constraints.
	JobTypeBatch = "batch"

	// JobTypeOps represents a batch job that runs to completion on all nodes matching
	// the specified constraints.
	JobTypeOps = "ops"
)
View Source
const (
	EngineNoop   = "noop"
	EngineDocker = "docker"
	EngineWasm   = "wasm"
)
View Source
const (
	StorageSourceNoop           = "noop"
	StorageSourceIPFS           = "ipfs"
	StorageSourceURL            = "urlDownload"
	StorageSourceS3             = "s3"
	StorageSourceS3PreSigned    = "s3PreSigned"
	StorageSourceInline         = "inline"
	StorageSourceLocalDirectory = "localDirectory"
)
View Source
const (
	PublisherNoop  = "noop"
	PublisherIPFS  = "ipfs"
	PublisherS3    = "s3"
	PublisherLocal = "local"
)
View Source
const (
	DownloadFilenameStdout   = "stdout"
	DownloadFilenameStderr   = "stderr"
	DownloadFilenameExitCode = "exitCode"
	DownloadCIDsFolderName   = "raw"
	DownloadFolderPerm       = 0755
	DownloadFilePerm         = 0644
)
View Source
const (
	MetaReservedPrefix       = "bacalhau.org/"
	MetaOrchestratorIDLegacy = "bacalhau.org/requester.id"
	MetaOrchestratorID       = "bacalhau.org/orchestrator.id"
	// MetaOrchestratorProtocol indicates which orchestrator-compute protocol is used
	MetaOrchestratorProtocol = "bacalhau.org/orchestrator.protocol"

	MetaServerInstallationID = "bacalhau.org/server.installation.id"
	MetaServerInstanceID     = "bacalhau.org/server.instance.id"
	MetaClientInstallationID = "bacalhau.org/client.installation.id"
	MetaClientInstanceID     = "bacalhau.org/client.instance.id"
)
View Source
const (
	EvalStatusBlocked   = "blocked"
	EvalStatusPending   = "pending"
	EvalStatusComplete  = "complete"
	EvalStatusFailed    = "failed"
	EvalStatusCancelled = "canceled"
)
View Source
const (
	EvalTriggerJobRegister = "job-register"
	EvalTriggerJobCancel   = "job-cancel"
	EvalTriggerJobQueue    = "job-queue"
	EvalTriggerJobTimeout  = "job-timeout"

	EvalTriggerExecFailure = "exec-failure"
	EvalTriggerExecUpdate  = "exec-update"
	EvalTriggerExecTimeout = "exec-timeout"
)
View Source
const (
	DetailsKeyIsError        = "IsError"
	DetailsKeyHint           = "Hint"
	DetailsKeyRetryable      = "Retryable"
	DetailsKeyFailsExecution = "FailsExecution"
	DetailsKeyNewState       = "NewState"
	DetailsKeyErrorCode      = "ErrorCode"
)

Variables

View Source
var EngineNames = []string{
	EngineDocker,
	EngineWasm,
}
View Source
var NoTimeout = time.Duration(math.MaxInt64).Truncate(time.Second)
View Source
var NodeMembership = membershipContainer{
	UNKNOWN:  NodeMembershipState{unknown},
	PENDING:  NodeMembershipState{pending},
	APPROVED: NodeMembershipState{approved},
	REJECTED: NodeMembershipState{rejected},
}
View Source
var NodeStates = livenessContainer{
	CONNECTED:    NodeConnectionState{connected},
	DISCONNECTED: NodeConnectionState{disconnected},
}

Functions

func CopySlice

func CopySlice[T Copyable[T]](slice []T) []T

func FromLabelSelectorRequirements

func FromLabelSelectorRequirements(requirements ...*LabelSelectorRequirement) ([]labels.Requirement, error)

func IsDefaultEngineType added in v1.2.1

func IsDefaultEngineType(kind string) bool

func NewErrInvalidPagingToken added in v1.2.2

func NewErrInvalidPagingToken(s string, msg string) error

func NormalizeSlice

func NormalizeSlice[T Normalizable](slice []T)

func ValidateSlice

func ValidateSlice[T Validatable](slice []T) error

Types

type AllocatedResources

type AllocatedResources struct {
	Tasks map[string]*Resources `json:"Tasks"`
}

AllocatedResources is the set of resources to be used by an execution, which maybe be resources allocated to a single task or a set of tasks in the future.

func (*AllocatedResources) Copy

func (*AllocatedResources) Total

func (a *AllocatedResources) Total() *Resources

Total returns the total resources allocated

type BaseNodeInfoProvider added in v1.6.0

type BaseNodeInfoProvider struct {
	// contains filtered or unexported fields
}

func NewBaseNodeInfoProvider added in v1.6.0

func NewBaseNodeInfoProvider(params BaseNodeInfoProviderParams) *BaseNodeInfoProvider

func (*BaseNodeInfoProvider) GetNodeInfo added in v1.6.0

func (n *BaseNodeInfoProvider) GetNodeInfo(ctx context.Context) NodeInfo

GetNodeInfo returns the node info for the node.

func (*BaseNodeInfoProvider) RegisterLabelProvider added in v1.6.0

func (n *BaseNodeInfoProvider) RegisterLabelProvider(provider LabelsProvider)

RegisterLabelProvider registers a label provider with the node info provider.

func (*BaseNodeInfoProvider) RegisterNodeInfoDecorator added in v1.6.0

func (n *BaseNodeInfoProvider) RegisterNodeInfoDecorator(decorator NodeInfoDecorator)

RegisterNodeInfoDecorator registers a node info decorator with the node info provider.

type BaseNodeInfoProviderParams added in v1.6.0

type BaseNodeInfoProviderParams struct {
	NodeID             string
	LabelsProvider     LabelsProvider
	BacalhauVersion    BuildVersionInfo
	SupportedProtocols []Protocol
}

type BuildVersionInfo

type BuildVersionInfo struct {
	Major      string    `json:"Major,omitempty" example:"0"`
	Minor      string    `json:"Minor,omitempty" example:"3"`
	GitVersion string    `json:"GitVersion" example:"v0.3.12"`
	GitCommit  string    `json:"GitCommit" example:"d612b63108f2b5ce1ab2b9e02444eb1dac1d922d"`
	BuildDate  time.Time `json:"BuildDate" example:"2022-11-16T14:03:31Z"`
	GOOS       string    `json:"GOOS" example:"linux"`
	GOARCH     string    `json:"GOARCH" example:"amd64"`
}

BuildVersionInfo is the version of a Bacalhau binary (either client or server)

func (*BuildVersionInfo) Copy added in v1.6.0

type ComputeNodeInfo

type ComputeNodeInfo struct {
	ExecutionEngines   []string  `json:"ExecutionEngines"`
	Publishers         []string  `json:"Publishers"`
	StorageSources     []string  `json:"StorageSources"`
	MaxCapacity        Resources `json:"MaxCapacity"`
	QueueUsedCapacity  Resources `json:"QueueCapacity"`
	AvailableCapacity  Resources `json:"AvailableCapacity"`
	MaxJobRequirements Resources `json:"MaxJobRequirements"`
	RunningExecutions  int       `json:"RunningExecutions"`
	EnqueuedExecutions int       `json:"EnqueuedExecutions"`
}

ComputeNodeInfo contains metadata about the current state and abilities of a compute node. Compute Nodes share this state with Requester nodes by including it in the NodeInfo they share across the network.

func (*ComputeNodeInfo) Copy added in v1.6.0

func (c *ComputeNodeInfo) Copy() *ComputeNodeInfo

Copy provides a copy of the allocation and deep copies the job

type ConnectionState added in v1.6.0

type ConnectionState struct {
	// Connection status
	Status NodeConnectionState `json:"Status"` // Connected, Disconnected, etc.

	// Last successful heartbeat timestamp
	LastHeartbeat time.Time `json:"LastHeartbeat"`

	// Message sequencing for reliable delivery
	LastComputeSeqNum      uint64 `json:"LastComputeSeqNum,omitempty"`      // Last seq received from compute node
	LastOrchestratorSeqNum uint64 `json:"LastOrchestratorSeqNum,omitempty"` // Last seq received from orchestrator

	// Connection tracking
	ConnectedSince    time.Time `json:"ConnectedSince"`
	DisconnectedSince time.Time `json:"DisconnectedSince"`
	LastError         string    `json:"LastError,omitempty"`
}

ConnectionState tracks node's connectivity and messaging state

type Copyable

type Copyable[T any] interface {
	Copy() T
}

Copyable is an interface for types that can be copied

type DebugInfo added in v1.5.0

type DebugInfo struct {
	Component string      `json:"component"`
	Info      interface{} `json:"info"`
}

type DebugInfoProvider added in v1.5.0

type DebugInfoProvider interface {
	GetDebugInfo(ctx context.Context) (DebugInfo, error)
}

type DecoratorNodeInfoProvider added in v1.6.0

type DecoratorNodeInfoProvider interface {
	NodeInfoProvider
	RegisterNodeInfoDecorator(decorator NodeInfoDecorator)
	RegisterLabelProvider(provider LabelsProvider)
}

type Evaluation

type Evaluation struct {
	// ID is the unique identifier of the evaluation.
	ID string `json:"ID"`

	// Namespace is the namespace the evaluation is created in
	Namespace string `json:"Namespace"`

	// JobID is the unique identifier of the job.
	JobID string `json:"JobID"`

	// TriggeredBy is the root cause that triggered the evaluation.
	TriggeredBy string `json:"TriggeredBy"`

	// Priority is the priority of the evaluation.
	// e.g. 50 is higher priority than 10, and so will be evaluated first.
	Priority int `json:"Priority"`

	// Type is the type of the job that needs to be evaluated.
	Type string `json:"Type"`

	// Status is the current status of the evaluation.
	Status string `json:"Status"`

	// Comment is to provide additional information about the evaluation.
	Comment string `json:"Comment"`

	// WaitUntil is the time until which the evaluation should be ignored, such as to implement backoff when
	// repeatedly failing to assess a job.
	WaitUntil time.Time `json:"WaitUntil"`

	CreateTime int64 `json:"CreateTime"`
	ModifyTime int64 `json:"ModifyTime"`
}

Evaluation is just to ask the scheduler to reassess if additional job instances must be scheduled or if existing ones must be stopped. It is possible that no action is required if the scheduler sees the desired job state matches the observed state. This allows the triggers (e.g. APIs, Node Manager) to be lightweight and submit evaluation on state changes without having to do complex logic to decide what actions to take.

func NewEvaluation added in v1.3.1

func NewEvaluation() *Evaluation

NewEvaluation creates a new Evaluation.

func (*Evaluation) Copy

func (e *Evaluation) Copy() *Evaluation

func (*Evaluation) NewDelayedEvaluation added in v1.3.2

func (e *Evaluation) NewDelayedEvaluation(waitUntil time.Time) *Evaluation

NewDelayedEvaluation creates a new Evaluation from current one with a WaitUntil time.

func (*Evaluation) Normalize added in v1.3.1

func (e *Evaluation) Normalize() *Evaluation

Normalize ensures that the Evaluation is in a valid state.

func (*Evaluation) ShouldEnqueue

func (e *Evaluation) ShouldEnqueue() bool

ShouldEnqueue checks if a given Evaluation should be enqueued into the evaluation_broker

func (*Evaluation) String

func (e *Evaluation) String() string

func (*Evaluation) TerminalStatus

func (e *Evaluation) TerminalStatus() bool

TerminalStatus returns if the current status is terminal and will no longer transition.

func (*Evaluation) UpdateModifyTime

func (e *Evaluation) UpdateModifyTime()

UpdateModifyTime makes sure that time always moves forward, taking into account that different server clocks can drift apart.

func (*Evaluation) WithComment added in v1.3.1

func (e *Evaluation) WithComment(comment string) *Evaluation

WithComment sets the Comment of the Evaluation.

func (*Evaluation) WithJob added in v1.3.2

func (e *Evaluation) WithJob(job *Job) *Evaluation

WithJob sets the JobID, Type, Priority nd Namespace of the Evaluation.

func (*Evaluation) WithJobID added in v1.3.1

func (e *Evaluation) WithJobID(jobID string) *Evaluation

WithJobID sets the JobID of the Evaluation.

func (*Evaluation) WithNamespace added in v1.3.1

func (e *Evaluation) WithNamespace(namespace string) *Evaluation

WithNamespace sets the Namespace of the Evaluation.

func (*Evaluation) WithPriority added in v1.3.1

func (e *Evaluation) WithPriority(priority int) *Evaluation

WithPriority sets the Priority of the Evaluation.

func (*Evaluation) WithStatus added in v1.3.1

func (e *Evaluation) WithStatus(status string) *Evaluation

WithStatus sets the Status of the Evaluation.

func (*Evaluation) WithTriggeredBy added in v1.3.1

func (e *Evaluation) WithTriggeredBy(triggeredBy string) *Evaluation

WithTriggeredBy sets the TriggeredBy of the Evaluation.

func (*Evaluation) WithType added in v1.3.1

func (e *Evaluation) WithType(jobType string) *Evaluation

WithType sets the Type of the Evaluation.

func (*Evaluation) WithWaitUntil added in v1.3.1

func (e *Evaluation) WithWaitUntil(waitUntil time.Time) *Evaluation

WithWaitUntil sets the WaitUntil of the Evaluation.

type EvaluationReceipt

type EvaluationReceipt struct {
	Evaluation *Evaluation `json:"Evaluation"`
	// ReceiptHandle is a unique identifier when dequeue an Evaluation from a broker.
	ReceiptHandle string `json:"ReceiptHandle"`
}

EvaluationReceipt is a pair of an Evaluation and its ReceiptHandle.

type Event added in v1.3.1

type Event struct {
	// A human-readable string giving the user all the information they need to
	// understand and respond to an Event, if a response is required.
	Message string `json:"Message"`

	// The topic of the event. See the documentation on EventTopic.
	Topic EventTopic `json:"Topic"`

	// The moment the event occurred, which may be different to the moment it
	// was recorded.
	Timestamp time.Time `json:"Timestamp"`

	// Any additional metadata that the system or user may need to know about
	// the event in order to handle it properly.
	Details map[string]string `json:"Details,omitempty"`
}

Event represents a progress report made by the system in its attempt to run a job. Events are generated by the orchestrator and also passed back to the orchestrator from the compute node.

Events may be delivered in an async fashion – i.e, they may arrive much later than the moment they occurred.

func EventFromError added in v1.3.1

func EventFromError(topic EventTopic, err error) *Event

EventFromError converts an error into an Event tagged with the passed event topic.

This method allows errors to implement extra interfaces (above) to do "attribute-based error reporting". The design principle is that errors can report a set of extra flags that have well defined semantics which the system can then respond to with specific behavior. This allows introducing or refactoring error types without higher-level components needing to be modified – they simply continue to respond to the presence of attributes.

This is instead of the system having a centralized set of known error types and programming in specific behavior in response to them, which is brittle and requires updating all of the error responses when the types change.

func NewEvent added in v1.3.1

func NewEvent(topic EventTopic) *Event

NewEvent returns a new Event with the given topic.

func (*Event) GetJobStateIfPresent added in v1.5.0

func (e *Event) GetJobStateIfPresent() (JobStateType, error)

GetJobStateIfPresent returns the job state from the event, if it is a state update.

func (*Event) HasError added in v1.5.0

func (e *Event) HasError() bool

HasError returns true if the event is an error.

func (*Event) HasStateUpdate added in v1.5.0

func (e *Event) HasStateUpdate() bool

HasStateUpdate returns true if the event is a state update.

func (*Event) WithDetail added in v1.3.1

func (e *Event) WithDetail(key, value string) *Event

WithDetail returns a new Event with the given detail and topic.

func (*Event) WithDetails added in v1.3.1

func (e *Event) WithDetails(details map[string]string) *Event

WithDetails returns a new Event with the given details and topic.

func (*Event) WithError added in v1.3.1

func (e *Event) WithError(err error) *Event

WithError returns a new Event with the given error.

func (*Event) WithErrorCode added in v1.5.0

func (e *Event) WithErrorCode(errorCode string) *Event

WithErrorCode returns a new Event with the given error code.

func (*Event) WithFailsExecution added in v1.3.1

func (e *Event) WithFailsExecution(failsExecution bool) *Event

WithFailsExecution returns a new Event with the given fails execution flag.

func (*Event) WithHint added in v1.3.1

func (e *Event) WithHint(hint string) *Event

WithHint returns a new Event with the given hint.

func (*Event) WithMessage added in v1.3.1

func (e *Event) WithMessage(message string) *Event

WithMessage returns a new Event with the given message and topic.

func (*Event) WithRetryable added in v1.3.1

func (e *Event) WithRetryable(retryable bool) *Event

WithRetryable returns a new Event with the given retryable flag.

type EventTopic added in v1.3.1

type EventTopic string

EventTopic is a high level categorisation that can be applied to an event. It should be a human-readable string with no dynamic content. They are used to disambiguate events from the same component occurring in different contexts. For example, an event emitted by S3 storage used as an input source and the same event emitted by S3 storage used as a publisher would be tagged with different topics.

EventTopics do not need to conform to a centralized list – each module should use event topics that make sense for their own logic. Event topics SHOULD be unique.

type Execution

type Execution struct {
	// ID of the execution (UUID)
	ID string `json:"ID"`

	// Namespace is the namespace the execution is created in
	Namespace string `json:"Namespace"`

	// ID of the evaluation that generated this execution
	EvalID string `json:"EvalID"`

	// Name is a logical name of the execution.
	Name string `json:"Name"`

	// NodeID is the node this is being placed on
	NodeID string `json:"NodeID"`

	// Job is the parent job of the task being allocated.
	// This is copied at execution time to avoid issues if the job
	// definition is updated.
	JobID string `json:"JobID"`
	// TODO: evaluate using a copy of the job instead of a pointer
	Job *Job `json:"Job,omitempty"`

	// AllocatedResources is the total resources allocated for the execution tasks.
	AllocatedResources *AllocatedResources `json:"AllocatedResources"`

	// DesiredState of the execution on the compute node
	DesiredState State[ExecutionDesiredStateType] `json:"DesiredState"`

	// ComputeState observed state of the execution on the compute node
	ComputeState State[ExecutionStateType] `json:"ComputeState"`

	// the published results for this execution
	PublishedResult *SpecConfig `json:"PublishedResult"`

	// RunOutput is the output of the run command
	// TODO: evaluate removing this from execution spec in favour of calling `bacalhau job logs`
	RunOutput *RunCommandResult `json:"RunOutput"`

	// PreviousExecution is the execution that this execution is replacing
	PreviousExecution string `json:"PreviousExecution"`

	// NextExecution is the execution that this execution is being replaced by
	NextExecution string `json:"NextExecution"`

	// FollowupEvalID captures a follow up evaluation created to handle a failed execution
	// that can be rescheduled in the future
	FollowupEvalID string `json:"FollowupEvalID"`

	// PartitionIndex is the index of this execution in the job's total partitions (0-based)
	// Only relevant when Job.Count > 1
	PartitionIndex int `json:"PartitionIndex,omitempty"`

	// Revision is increment each time the execution is updated.
	Revision uint64 `json:"Revision"`

	// CreateTime is the time the execution has finished scheduling and been
	// verified by the plan applier.
	CreateTime int64 `json:"CreateTime"`
	// ModifyTime is the time the execution was last updated.
	ModifyTime int64 `json:"ModifyTime"`
}

Execution is used to allocate the placement of a task group to a node.

func (*Execution) AllocateResources

func (e *Execution) AllocateResources(taskID string, resources Resources)

AllocateResources allocates resources to a task

func (*Execution) Copy

func (e *Execution) Copy() *Execution

Copy provides a copy of the allocation and deep copies the job

func (*Execution) GetCreateTime added in v1.1.0

func (e *Execution) GetCreateTime() time.Time

GetCreateTime returns the creation time

func (*Execution) GetModifyTime added in v1.1.0

func (e *Execution) GetModifyTime() time.Time

GetModifyTime returns the modify time

func (*Execution) IsDiscarded

func (e *Execution) IsDiscarded() bool

IsDiscarded returns true if the execution has failed, been cancelled or rejected.

func (*Execution) IsExpired added in v1.3.1

func (e *Execution) IsExpired(expirationTime time.Time) bool

IsExpired returns true if the execution is still running beyond the expiration time We return true if the execution is in the bid accepted state (i.e. running) and the modify time is older than the expiration time

func (*Execution) IsTerminalComputeState

func (e *Execution) IsTerminalComputeState() bool

IsTerminalComputeState returns true if the execution observed state is terminal

func (*Execution) IsTerminalDesiredState

func (e *Execution) IsTerminalDesiredState() bool

IsTerminalDesiredState returns true if the execution desired state is terminal

func (*Execution) IsTerminalState

func (e *Execution) IsTerminalState() bool

IsTerminalState returns true if the execution desired of observed state is terminal

func (*Execution) JobNamespacedID

func (e *Execution) JobNamespacedID() NamespacedID

func (*Execution) Normalize

func (e *Execution) Normalize()

Normalize Allocation to ensure fields are initialized to the expectations of this version of Bacalhau. Should be called when restoring persisted Executions or receiving Executions from Bacalhau clients potentially on an older version of Bacalhau.

func (*Execution) OrchestrationProtocol added in v1.5.2

func (e *Execution) OrchestrationProtocol() Protocol

OrchestrationProtocol is the protocol used to orchestrate the execution

func (*Execution) String

func (e *Execution) String() string

func (*Execution) TotalAllocatedResources

func (e *Execution) TotalAllocatedResources() *Resources

func (*Execution) Validate

func (e *Execution) Validate() error

Validate is used to check a job for reasonable configuration

type ExecutionDesiredStateType

type ExecutionDesiredStateType int
const (
	ExecutionDesiredStatePending ExecutionDesiredStateType = iota
	ExecutionDesiredStateRunning
	ExecutionDesiredStateStopped
)

func (ExecutionDesiredStateType) String

func (i ExecutionDesiredStateType) String() string

type ExecutionLog added in v1.2.2

type ExecutionLog struct {
	Type ExecutionLogType
	Line string
}

type ExecutionLogType added in v1.2.2

type ExecutionLogType int
const (
	ExecutionLogTypeSTDOUT ExecutionLogType
	ExecutionLogTypeSTDERR
)

type ExecutionStateType

type ExecutionStateType int

ExecutionStateType The state of an execution. An execution represents a single attempt to execute a job on a node. A compute node can have multiple executions for the same job due to retries, but there can only be a single active execution per node at any given time.

const (
	ExecutionStateUndefined ExecutionStateType = iota
	// ExecutionStateNew The execution has been created, but not pushed to a compute node yet.
	ExecutionStateNew
	// ExecutionStateAskForBid A node has been selected to execute a job, and is being asked to bid on the job.
	ExecutionStateAskForBid
	// ExecutionStateAskForBidAccepted compute node has rejected the ask for bid.
	ExecutionStateAskForBidAccepted
	// ExecutionStateAskForBidRejected compute node has rejected the ask for bid.
	ExecutionStateAskForBidRejected
	// ExecutionStateBidAccepted requester has accepted the bid, and the execution is expected to be running on the compute node.
	ExecutionStateBidAccepted // aka running
	// ExecutionStateRunning The execution is running on the compute node.
	ExecutionStateRunning
	// ExecutionStatePublishing The execution has completed, and the result is being published.
	ExecutionStatePublishing
	// ExecutionStateBidRejected requester has rejected the bid.
	ExecutionStateBidRejected
	// ExecutionStateCompleted The execution has been completed, and the result has been published.
	ExecutionStateCompleted
	// ExecutionStateFailed The execution has failed.
	ExecutionStateFailed
	// ExecutionStateCancelled The execution has been canceled by the user
	ExecutionStateCancelled
)

TODO: change states to reflect non-bidding scheduling

func ExecutionStateTypes added in v1.5.2

func ExecutionStateTypes() []ExecutionStateType

func (ExecutionStateType) IsExecuting added in v1.5.2

func (s ExecutionStateType) IsExecuting() bool

IsExecuting returns true if the execution is running in the backend

func (ExecutionStateType) IsTerminal added in v1.5.1

func (s ExecutionStateType) IsTerminal() bool

func (ExecutionStateType) IsUndefined

func (s ExecutionStateType) IsUndefined() bool

IsUndefined returns true if the execution state is undefined

func (ExecutionStateType) String

func (i ExecutionStateType) String() string

type ExecutionUpsert added in v1.5.2

type ExecutionUpsert struct {
	// Current represents the new state of the execution
	Current *Execution
	// Previous represents the old state of the execution, nil if this is a new execution
	Previous *Execution
	// Events contains the list of events associated with this state change
	Events []*Event
}

ExecutionUpsert represents a change in execution state, containing the current and previous execution states along with associated events. It is used for tracking and propagating execution state changes across nodes.

func (ExecutionUpsert) HasStateChange added in v1.5.2

func (u ExecutionUpsert) HasStateChange() bool

HasStateChange returns true if there are changes in either desired or compute state

type FailureInjectionConfig added in v1.5.0

type FailureInjectionConfig struct {
	IsBadActor bool `yaml:"IsBadActor"`
}

type GPU added in v1.2.0

type GPU struct {
	// Self-reported index of the device in the system
	Index uint64
	// Model name of the GPU e.g. Tesla T4
	Name string
	// Maker of the GPU, e.g. NVidia, AMD, Intel
	Vendor GPUVendor
	// Total GPU memory in mebibytes (MiB)
	Memory uint64
	// PCI address of the device, in the format AAAA:BB:CC.C
	// Used to discover the correct device rendering cards
	PCIAddress string
}

func (GPU) Less added in v1.6.0

func (g GPU) Less(other GPU) bool

Less compares this GPU with another for sorting/ordering purposes The comparison order is: Index, Name, Vendor, Memory, PCIAddress

type GPUVendor added in v1.2.0

type GPUVendor string
const (
	GPUVendorNvidia GPUVendor = "NVIDIA"
	GPUVendorAMDATI GPUVendor = "AMD/ATI"
	GPUVendorIntel  GPUVendor = "Intel"
)

type HasDetails added in v1.3.1

type HasDetails interface {
	// Details An extra set of metadata provided by the error.
	Details() map[string]string
}

type HasFailsExecution added in v1.3.1

type HasFailsExecution interface {
	// FailsExecution Whether this error means that the associated execution cannot
	// continue.
	//
	// If a component raises an error with FailsExecution() as true,
	// the hosting executor should report the execution as failed and stop any
	// further steps.
	FailsExecution() bool
}

type HasHint added in v1.3.1

type HasHint interface {
	// Hint A human-readable string that advises the user on how they might solve the error.
	Hint() string
}

type HasRetryable added in v1.3.1

type HasRetryable interface {
	// Retryable Whether the error could be retried, assuming the same input and
	// node configuration; i.e. the error is transient and due to network
	// capacity or service outage.
	//
	// If a component raises an error with Retryable() as true, the system may
	// retry the last action after some length of time. If it is false, it
	// should not try the action again, and may choose an alternative action or
	// fail the job.
	Retryable() bool
}

type InputSource

type InputSource struct {
	// Source is the source of the artifact to be downloaded, e.g a URL, S3 bucket, etc.
	Source *SpecConfig `json:"Source"`

	// Alias is an optional reference to this input source that can be used for
	// dynamic linking to this input. (e.g. dynamic import in wasm by alias)
	Alias string `json:"Alias"`

	// Target is the path where the artifact should be mounted on
	Target string `json:"Target"`
}

func (*InputSource) Copy

func (a *InputSource) Copy() *InputSource

Copy returns a deep copy of the artifact

func (*InputSource) MarshalZerologObject added in v1.2.2

func (a *InputSource) MarshalZerologObject(e *zerolog.Event)

func (*InputSource) Normalize

func (a *InputSource) Normalize()

Normalize normalizes the artifact's source and target

func (*InputSource) Validate

func (a *InputSource) Validate() error

Validate validates the artifact's source and target

type Job

type Job struct {
	// ID is a unique identifier assigned to this job.
	// It helps to distinguish jobs with the same name after they have been deleted and re-created.
	// The ID is generated by the server and should not be set directly by the client.
	ID string `json:"ID"`

	// Name is the logical name of the job used to refer to it.
	// Submitting a job with the same name as an existing job will result in an
	// update to the existing job.
	Name string `json:"Name"`

	// Namespace is the namespace this job is running in.
	Namespace string `json:"Namespace"`

	// Type is the type of job this is, e.g. "daemon" or "batch".
	Type string `json:"Type"`

	// Priority defines the scheduling priority of this job.
	Priority int `json:"Priority"`

	// Count is the number of replicas that should be scheduled.
	Count int `json:"Count"`

	// Constraints is a selector which must be true for the compute node to run this job.
	Constraints []*LabelSelectorRequirement `json:"Constraints"`

	// Meta is used to associate arbitrary metadata with this job.
	Meta map[string]string `json:"Meta"`

	// Labels is used to associate arbitrary labels with this job, which can be used
	// for filtering.
	// key=value
	Labels map[string]string `json:"Labels"`

	Tasks []*Task `json:"Tasks"`

	// State is the current state of the job.
	State State[JobStateType] `json:"State"`

	// Version is a per-job monotonically increasing version number that is incremented
	// on each job specification update.
	Version uint64 `json:"Version"`

	// Revision is a per-job monotonically increasing revision number that is incremented
	// on each update to the job's state or specification
	Revision uint64 `json:"Revision"`

	CreateTime int64 `json:"CreateTime"`
	ModifyTime int64 `json:"ModifyTime"`
}

func (*Job) AllStorageTypes

func (j *Job) AllStorageTypes() []string

AllStorageTypes returns keys of all storage types required by the job

func (*Job) Copy

func (j *Job) Copy() *Job

Copy returns a deep copy of the Job. It is expected that callers use recover. This job can panic if the deep copy failed as it uses reflection.

func (*Job) GetCreateTime

func (j *Job) GetCreateTime() time.Time

GetCreateTime returns the creation time

func (*Job) GetModifyTime

func (j *Job) GetModifyTime() time.Time

GetModifyTime returns the modify time

func (*Job) IsExpired added in v1.4.0

func (j *Job) IsExpired(expirationTime time.Time) bool

IsExpired returns true if the job is still running beyond the expiration time

func (*Job) IsLongRunning added in v1.1.0

func (j *Job) IsLongRunning() bool

IsLongRunning returns true if the job is long running

func (*Job) IsTerminal

func (j *Job) IsTerminal() bool

IsTerminal returns true if the job is in a terminal state

func (*Job) MetricAttributes added in v1.2.1

func (j *Job) MetricAttributes() []attribute.KeyValue

func (*Job) NamespacedID

func (j *Job) NamespacedID() NamespacedID

NamespacedID returns the namespaced id useful for logging

func (*Job) Normalize

func (j *Job) Normalize()

Normalize is used to canonicalize fields in the Job. This should be called when registering a Job.

func (*Job) OrchestrationProtocol added in v1.5.2

func (j *Job) OrchestrationProtocol() Protocol

OrchestrationProtocol returns the orchestrator protocol for the job

func (*Job) OrchestratorID added in v1.5.2

func (j *Job) OrchestratorID() string

OrchestratorID returns the orchestrator ID for the job from its metadata

func (*Job) SanitizeSubmission added in v1.1.0

func (j *Job) SanitizeSubmission() (warnings []string)

SanitizeSubmission is used to sanitize a job for reasonable configuration when it is submitted.

func (*Job) String

func (j *Job) String() string

func (*Job) Task

func (j *Job) Task() *Task

Task returns the job task TODO: remove this once we have multiple tasks per job

func (*Job) Validate

func (j *Job) Validate() error

Validate is used to check a job for reasonable configuration

func (*Job) ValidateSubmission

func (j *Job) ValidateSubmission() error

ValidateSubmission is used to check a job for reasonable configuration when it is submitted. It is a subset of Validate that does not check fields with defaults, such as job ID

type JobHistory

type JobHistory struct {
	SeqNum      uint64         `json:"SeqNum"`
	Type        JobHistoryType `json:"Type"`
	JobID       string         `json:"JobID"`
	ExecutionID string         `json:"ExecutionID,omitempty"`
	Event       Event          `json:"Event,omitempty"`
	Time        time.Time      `json:"Time"`

	// TODO: remove with v1.5
	// Deprecated: Left for backward compatibility with v1.4.x clients
	JobState *StateChange[JobStateType] `json:"JobState,omitempty"`
	// Deprecated: Left for backward compatibility with v1.4.x clients
	ExecutionState *StateChange[ExecutionStateType] `json:"ExecutionState,omitempty"`
}

JobHistory represents a single event in the history of a job. An event can be at the job level, or execution (node) level.

{Job,Event}State fields will only be present if the Type field is of the matching type.

func (JobHistory) IsExecutionLevel added in v1.5.0

func (jh JobHistory) IsExecutionLevel() bool

IsExecutionLevel returns true if the JobHistory is at the execution level.

func (JobHistory) IsJobLevel added in v1.5.0

func (jh JobHistory) IsJobLevel() bool

IsJobLevel returns true if the JobHistory is at the job level.

func (JobHistory) Occurred added in v1.3.1

func (jh JobHistory) Occurred() time.Time

Occurred returns when the action that triggered an update to job history actually occurred.

The Time field represents the moment that the JobHistory item was recorded, i.e. it is almost always set to time.Now() when creating the object. This is different to the Event.Timestamp which represents when the source of the history update actually occurred.

type JobHistoryType

type JobHistoryType int
const (
	JobHistoryTypeUndefined JobHistoryType = iota
	JobHistoryTypeJobLevel
	JobHistoryTypeExecutionLevel
)

func (JobHistoryType) MarshalText

func (s JobHistoryType) MarshalText() ([]byte, error)

func (JobHistoryType) String

func (i JobHistoryType) String() string

func (*JobHistoryType) UnmarshalText

func (s *JobHistoryType) UnmarshalText(text []byte) (err error)

type JobSelectionDataLocality added in v1.5.0

type JobSelectionDataLocality int64

Job selection policy configuration

const (
	Local    JobSelectionDataLocality = 0 // local
	Anywhere JobSelectionDataLocality = 1 // anywhere
)

func ParseJobSelectionDataLocality added in v1.5.0

func ParseJobSelectionDataLocality(s string) (ret JobSelectionDataLocality, err error)

func (JobSelectionDataLocality) MarshalText added in v1.5.0

func (i JobSelectionDataLocality) MarshalText() ([]byte, error)

func (JobSelectionDataLocality) String added in v1.5.0

func (i JobSelectionDataLocality) String() string

func (*JobSelectionDataLocality) UnmarshalText added in v1.5.0

func (i *JobSelectionDataLocality) UnmarshalText(text []byte) error

type JobSelectionPolicy added in v1.5.0

type JobSelectionPolicy struct {
	// this describes if we should run a job based on
	// where the data is located - i.e. if the data is "local"
	// or if the data is "anywhere"
	Locality JobSelectionDataLocality `json:"locality" yaml:"Locality"`
	// should we reject jobs that don't specify any data
	// the default is "accept"
	RejectStatelessJobs bool `json:"reject_stateless_jobs" yaml:"RejectStatelessJobs"`
	// should we accept jobs that specify networking
	// the default is "reject"
	AcceptNetworkedJobs bool `json:"accept_networked_jobs" yaml:"AcceptNetworkedJobs"`
	// external hooks that decide if we should take on the job or not
	// if either of these are given they will override the data locality settings
	ProbeHTTP string `json:"probe_http,omitempty" yaml:"ProbeHTTP"`
	ProbeExec string `json:"probe_exec,omitempty" yaml:"ProbeExec"`
}

describe the rules for how a compute node selects an incoming job

type JobStateType

type JobStateType int
const (
	JobStateTypeUndefined JobStateType = iota

	// JobStateTypePending is the state of a job that has been submitted but not
	// yet scheduled.
	JobStateTypePending

	// JobStateTypeQueued is the state of a job that has been evaluated but no
	// matching nodes are available yet.
	JobStateTypeQueued

	// JobStateTypeRunning is the state of a job that has been scheduled, with at
	// least one active execution.
	JobStateTypeRunning

	// JobStateTypeCompleted is the state of a job that has successfully completed.
	// Only valid for batch jobs.
	JobStateTypeCompleted

	// JobStateTypeFailed is the state of a job that has failed.
	JobStateTypeFailed

	// JobStateTypeStopped is the state of a job that has been stopped by the user.
	JobStateTypeStopped
)

func JobStateTypes added in v1.1.0

func JobStateTypes() []JobStateType

func (JobStateType) IsTerminal added in v1.3.2

func (s JobStateType) IsTerminal() bool

IsTerminal returns true if the job state is terminal

func (JobStateType) IsUndefined

func (s JobStateType) IsUndefined() bool

IsUndefined returns true if the job state is undefined

func (JobStateType) MarshalText added in v1.1.0

func (s JobStateType) MarshalText() ([]byte, error)

func (JobStateType) String

func (i JobStateType) String() string

func (*JobStateType) UnmarshalText added in v1.1.0

func (s *JobStateType) UnmarshalText(text []byte) (err error)

type LabelSelectorRequirement

type LabelSelectorRequirement struct {
	// key is the label key that the selector applies to.
	Key string `json:"Key"`
	// operator represents a key's relationship to a set of values.
	// Valid operators are In, NotIn, Exists and KeyNotInImap.
	Operator selection.Operator `json:"Operator"`
	// values is an array of string values. If the operator is In or NotIn,
	// the values array must be non-empty. If the operator is Exists or KeyNotInImap,
	// the values array must be empty. This array is replaced during a strategic
	Values []string `json:"Values,omitempty"`
}

LabelSelectorRequirement A selector that contains values, a key, and an operator that relates the key and values. These are based on labels library from kubernetes package. While we use labels.Requirement to represent the label selector requirements in the command line arguments as the library supports multiple parsing formats, and we also use it when matching selectors to labels as that's what the library expects, labels.Requirements are not serializable, so we need to convert them to LabelSelectorRequirements.

func ToLabelSelectorRequirements

func ToLabelSelectorRequirements(requirements ...labels.Requirement) []LabelSelectorRequirement

func (*LabelSelectorRequirement) Copy

func (*LabelSelectorRequirement) String

func (r *LabelSelectorRequirement) String() string

func (*LabelSelectorRequirement) Validate

func (r *LabelSelectorRequirement) Validate() error

type LabelsProvider added in v1.2.1

type LabelsProvider interface {
	GetLabels(ctx context.Context) map[string]string
}

func MergeLabelsInOrder added in v1.2.1

func MergeLabelsInOrder(providers ...LabelsProvider) LabelsProvider

type NamespacedID

type NamespacedID struct {
	ID        string `json:"ID"`
	Namespace string `json:"Namespace"`
}

NamespacedID is a tuple of an ID and a namespace

func NewNamespacedID

func NewNamespacedID(id, ns string) NamespacedID

NewNamespacedID returns a new namespaced ID given the ID and namespace

func (NamespacedID) String

func (n NamespacedID) String() string

type Network

type Network int
const (
	// NetworkNone specifies that the job does not require networking.
	NetworkNone Network = iota

	// NetworkFull specifies that the job requires unfiltered raw IP networking.
	NetworkFull

	// NetworkHTTP specifies that the job requires HTTP networking to certain domains.
	//
	// The model is: the job specifier submits a job with the domain(s) it will
	// need to communicate with, the compute provider uses this to make some
	// decision about the risk of the job and bids accordingly, and then at run
	// time the traffic is limited to only the domain(s) specified.
	//
	// As a command, something like:
	//
	//  bacalhau docker run —network=http —domain=crates.io —domain=github.com -i ipfs://Qmy1234myd4t4,dst=/code rust/compile
	//
	// The “risk” for the compute provider is that the job does something that
	// violates its terms, the terms of its hosting provider or ISP, or even the
	// law in its jurisdiction (e.g. accessing and spreading illegal content,
	// performing cyberattacks). So the same sort of risk as operating a Tor
	// exit node.
	//
	// The risk for the job specifier is that we are operating in an environment
	// they are paying for, so there is an incentive to hijack that environment
	// (e.g. via a compromised package download that runs a crypto miner on
	// install, and uses up all the paid-for job time). Having the traffic
	// enforced to only domains specified makes those sorts of attacks much
	// trickier and less valuable.
	//
	// The compute provider might well enforce its limits by other means, but
	// having the domains specified up front allows it to skip bidding on jobs
	// it knows will fail in its executor. So this is hopefully a better UX for
	// job specifiers who can have their job picked up only by someone who will
	// run it successfully.
	NetworkHTTP
)

func ParseNetwork

func ParseNetwork(s string) (Network, error)

func (Network) MarshalText

func (n Network) MarshalText() ([]byte, error)

func (Network) String

func (i Network) String() string

func (*Network) UnmarshalText

func (n *Network) UnmarshalText(text []byte) (err error)

type NetworkConfig

type NetworkConfig struct {
	Type    Network  `json:"Type"`
	Domains []string `json:"Domains,omitempty"`
}

func (*NetworkConfig) Copy

func (n *NetworkConfig) Copy() *NetworkConfig

func (*NetworkConfig) Disabled

func (n *NetworkConfig) Disabled() bool

Disabled returns whether network connections should be completely disabled according to this config.

func (*NetworkConfig) DomainSet

func (n *NetworkConfig) DomainSet() []string

DomainSet returns the "unique set" of domains from the network config. Domains listed multiple times and any subdomain that is also matched by a wildcard is removed.

This is something of an implementation detail – it matches the behavior expected by our Docker HTTP gateway, which complains and/or fails to start if these requirements are not met.

func (*NetworkConfig) Normalize

func (n *NetworkConfig) Normalize()

Normalize ensures that the network config is in a consistent state.

func (*NetworkConfig) Validate

func (n *NetworkConfig) Validate() (err error)

Validate returns an error if any of the fields do not pass validation, or nil otherwise.

type NodeConnectionState added in v1.3.1

type NodeConnectionState struct {
	// contains filtered or unexported fields
}

TODO if we ever pass a pointer to this type and use `==` comparison on it we're gonna have a bad time implement an `Equal()` method for this type and default to it.

func ParseConnection added in v1.3.1

func ParseConnection(a any) NodeConnectionState

func (NodeConnectionState) IsValid added in v1.3.1

func (t NodeConnectionState) IsValid() bool

func (NodeConnectionState) MarshalJSON added in v1.3.1

func (s NodeConnectionState) MarshalJSON() ([]byte, error)

func (NodeConnectionState) String added in v1.3.1

func (t NodeConnectionState) String() string

func (*NodeConnectionState) UnmarshalJSON added in v1.3.1

func (s *NodeConnectionState) UnmarshalJSON(b []byte) error

type NodeInfo

type NodeInfo struct {
	NodeID   string            `json:"NodeID"`
	NodeType NodeType          `json:"NodeType"`
	Labels   map[string]string `json:"Labels"`
	// SupportedProtocols indicates which communication protocols this node supports
	SupportedProtocols []Protocol       `json:"SupportedProtocols"`
	ComputeNodeInfo    ComputeNodeInfo  `json:"ComputeNodeInfo,omitempty" yaml:",omitempty"`
	BacalhauVersion    BuildVersionInfo `json:"BacalhauVersion"`
}

NodeInfo contains metadata about a node on the network. Compute nodes share their NodeInfo with Requester nodes to further its view of the networks conditions. ComputeNodeInfo is non-nil iff the NodeType is NodeTypeCompute. TODO(walid): add Validate() method to NodeInfo and make sure it is called in all the places where it is initialized

func (*NodeInfo) Copy added in v1.6.0

func (n *NodeInfo) Copy() *NodeInfo

Copy returns a deep copy of the NodeInfo

func (NodeInfo) HasStaticConfigChanged added in v1.6.0

func (n NodeInfo) HasStaticConfigChanged(other NodeInfo) bool

HasStaticConfigChanged returns true if the static/configuration aspects of this node have changed compared to other. It ignores dynamic operational fields like queue capacity and execution counts that change frequently during normal operation.

func (NodeInfo) ID added in v1.1.4

func (n NodeInfo) ID() string

ID returns the node ID

func (NodeInfo) IsComputeNode

func (n NodeInfo) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

type NodeInfoDecorator added in v1.2.1

type NodeInfoDecorator interface {
	DecorateNodeInfo(ctx context.Context, nodeInfo NodeInfo) NodeInfo
}

type NodeInfoProvider

type NodeInfoProvider interface {
	GetNodeInfo(ctx context.Context) NodeInfo
}

type NodeMembershipState added in v1.3.1

type NodeMembershipState struct {
	// contains filtered or unexported fields
}

TODO if we ever pass a pointer to this type and use `==` comparison on it we're gonna have a bad time implement an `Equal()` method for this type and default to it.

func Parse added in v1.2.2

func Parse(a any) NodeMembershipState

func (NodeMembershipState) IsUndefined added in v1.3.2

func (t NodeMembershipState) IsUndefined() bool

func (NodeMembershipState) IsValid added in v1.3.1

func (t NodeMembershipState) IsValid() bool

func (NodeMembershipState) MarshalJSON added in v1.3.1

func (t NodeMembershipState) MarshalJSON() ([]byte, error)

func (NodeMembershipState) String added in v1.3.1

func (t NodeMembershipState) String() string

func (*NodeMembershipState) UnmarshalJSON added in v1.3.1

func (t *NodeMembershipState) UnmarshalJSON(b []byte) error

type NodeState added in v1.3.1

type NodeState struct {
	// Durable node information
	Info       NodeInfo            `json:"Info"`
	Membership NodeMembershipState `json:"Membership"`

	// Deprecated: Use ConnectionState.Status instead
	Connection NodeConnectionState `json:"Connection"`

	// Connection and messaging state
	ConnectionState ConnectionState `json:"ConnectionState"`
}

NodeState contains metadata about the state of a node on the network. Requester nodes maintain a NodeState for each node they are aware of. The NodeState represents a Requester nodes view of another node on the network.

func (*NodeState) IsConnected added in v1.6.0

func (s *NodeState) IsConnected() bool

type NodeStateProvider added in v1.3.1

type NodeStateProvider interface {
	GetNodeState(ctx context.Context) NodeState
}

type NodeType

type NodeType int
const (
	NodeTypeRequester NodeType
	NodeTypeCompute
)

func ParseNodeType

func ParseNodeType(s string) (NodeType, error)

func (NodeType) MarshalText added in v1.1.0

func (e NodeType) MarshalText() ([]byte, error)

func (NodeType) String

func (i NodeType) String() string

func (*NodeType) UnmarshalText added in v1.1.0

func (e *NodeType) UnmarshalText(text []byte) (err error)

type NoopNodeInfoDecorator added in v1.2.1

type NoopNodeInfoDecorator struct{}

NoopNodeInfoDecorator is a decorator that does nothing

func (NoopNodeInfoDecorator) DecorateNodeInfo added in v1.2.1

func (n NoopNodeInfoDecorator) DecorateNodeInfo(ctx context.Context, nodeInfo NodeInfo) NodeInfo

type Normalizable

type Normalizable interface {
	Normalize()
}

Normalizable is an interface for types that can be normalized (e.g. empty maps are converted to nil)

type PagingToken added in v1.2.2

type PagingToken struct {
	SortBy      string
	SortReverse bool
	Limit       uint32
	Offset      uint64
}

func NewPagingToken added in v1.2.2

func NewPagingToken(params *PagingTokenParams) *PagingToken

func NewPagingTokenFromString added in v1.2.2

func NewPagingTokenFromString(s string) (*PagingToken, error)

func (*PagingToken) RawString added in v1.2.2

func (pagingToken *PagingToken) RawString() string

func (*PagingToken) String added in v1.2.2

func (pagingToken *PagingToken) String() string

String returns the token as a base 64 encoded string where each field is delimited.

type PagingTokenParams added in v1.2.2

type PagingTokenParams struct {
	SortBy      string
	SortReverse bool
	Limit       uint32
	Offset      uint64
}

type Plan

type Plan struct {
	EvalID      string      `json:"EvalID"`
	EvalReceipt string      `json:"EvalReceipt"`
	Eval        *Evaluation `json:"Eval,omitempty"`
	Priority    int         `json:"Priority"`

	Job *Job `json:"Job,omitempty"`

	DesiredJobState JobStateType `json:"DesiredJobState,omitempty"`
	UpdateMessage   string       `json:"Message,omitempty"`

	// NewExecutions holds the executions to be created.
	NewExecutions []*Execution `json:"NewExecutions,omitempty"`

	UpdatedExecutions map[string]*PlanExecutionDesiredUpdate `json:"UpdatedExecutions,omitempty"`

	// NewEvaluations holds the evaluations to be created, such as delayed evaluations when no nodes are available.
	NewEvaluations []*Evaluation `json:"NewEvaluations,omitempty"`

	JobEvents       []Event            `json:"JobEvents,omitempty"`
	ExecutionEvents map[string][]Event `json:"ExecutionEvents,omitempty"`
}

Plan holds actions as a result of processing an evaluation by the scheduler.

func NewPlan

func NewPlan(eval *Evaluation, job *Job) *Plan

NewPlan creates a new Plan instance.

func (*Plan) AppendApprovedExecution

func (p *Plan) AppendApprovedExecution(execution *Execution, event Event)

AppendApprovedExecution marks an execution as accepted and ready to be started.

func (*Plan) AppendEvaluation added in v1.3.2

func (p *Plan) AppendEvaluation(eval *Evaluation)

AppendEvaluation appends the evaluation to the plan evaluations.

func (*Plan) AppendExecution

func (p *Plan) AppendExecution(execution *Execution, event Event)

AppendExecution appends the execution to the plan executions.

func (*Plan) AppendExecutionEvent added in v1.5.0

func (p *Plan) AppendExecutionEvent(executionID string, event Event)

AppendExecutionEvent appends the event to the execution events.

func (*Plan) AppendJobEvent added in v1.5.0

func (p *Plan) AppendJobEvent(event Event)

AppendJobEvent appends the event to the job events.

func (*Plan) AppendStoppedExecution

func (p *Plan) AppendStoppedExecution(execution *Execution, event Event)

AppendStoppedExecution marks an execution to be stopped.

func (*Plan) IsJobFailed added in v1.3.2

func (p *Plan) IsJobFailed() bool

IsJobFailed returns true if the plan is marking the job as failed

func (*Plan) MarkJobCompleted

func (p *Plan) MarkJobCompleted(event Event)

func (*Plan) MarkJobFailed

func (p *Plan) MarkJobFailed(event Event)

func (*Plan) MarkJobQueued added in v1.4.0

func (p *Plan) MarkJobQueued(event Event)

MarkJobQueued marks the job as pending.

func (*Plan) MarkJobRunningIfEligible added in v1.1.3

func (p *Plan) MarkJobRunningIfEligible(event Event) bool

MarkJobRunningIfEligible updates the job state to "Running" under certain conditions.

type PlanExecutionDesiredUpdate

type PlanExecutionDesiredUpdate struct {
	Execution    *Execution                `json:"Execution"`
	DesiredState ExecutionDesiredStateType `json:"DesiredState"`
	Event        Event                     `json:"Event"`
}

type Protocol added in v1.5.2

type Protocol string
const (
	// ProtocolNCLV1 is nats based async protocol based on NCL library.
	// Currently in development and not yet default protocol.
	ProtocolNCLV1 Protocol = "ncl/v1"

	// ProtocolBProtocolV2 is nats based request/response protocol.
	// Currently the default protocol while NCL is under development.
	ProtocolBProtocolV2 Protocol = "bprotocol/v2"
)

func GetPreferredProtocol added in v1.5.2

func GetPreferredProtocol(availableProtocols []Protocol) Protocol

GetPreferredProtocol accepts a slice of available protocols and returns the preferred protocol based on the order of preference along with any error

func (Protocol) String added in v1.5.2

func (p Protocol) String() string

String implements the Stringer interface

type Resources

type Resources struct {
	// CPU units
	CPU float64 `json:"CPU,omitempty"`
	// Memory in bytes
	Memory uint64 `json:"Memory,omitempty"`
	// Disk in bytes
	Disk uint64 `json:"Disk,omitempty"`
	// GPU units
	GPU uint64 `json:"GPU,omitempty"`
	// GPU details
	GPUs []GPU `json:"GPUs,omitempty"`
}

func (*Resources) Add

func (r *Resources) Add(other Resources) *Resources

Add returns the sum of the resources

func (*Resources) Copy

func (r *Resources) Copy() *Resources

Copy returns a deep copy of the resources

func (*Resources) IsZero

func (r *Resources) IsZero() bool

func (*Resources) LessThan

func (r *Resources) LessThan(other Resources) bool

func (*Resources) LessThanEq

func (r *Resources) LessThanEq(other Resources) bool

func (*Resources) Max

func (r *Resources) Max(other Resources) *Resources

func (*Resources) Merge

func (r *Resources) Merge(other Resources) *Resources

Merge merges the resources, preferring the current resources

func (*Resources) Multiply added in v1.3.2

func (r *Resources) Multiply(factor float64) *Resources

Multiply returns the product of the resources

func (*Resources) String

func (r *Resources) String() string

return string representation of ResourceUsageData

func (*Resources) Sub

func (r *Resources) Sub(other Resources) *Resources

func (*Resources) Validate

func (r *Resources) Validate() error

Validate returns an error if the resources are invalid

type ResourcesConfig

type ResourcesConfig struct {
	// CPU https://github.com/BTBurke/k8sresource string
	CPU string `json:"CPU,omitempty"`
	// Memory github.com/dustin/go-humanize string
	Memory string `json:"Memory,omitempty"`
	// Memory github.com/dustin/go-humanize string
	Disk string `json:"Disk,omitempty"`
	GPU  string `json:"GPU,omitempty"`
}

func (*ResourcesConfig) Copy

func (r *ResourcesConfig) Copy() *ResourcesConfig

Copy returns a deep copy of the resources

func (*ResourcesConfig) Normalize

func (r *ResourcesConfig) Normalize()

Normalize normalizes the resources

func (*ResourcesConfig) ToResources

func (r *ResourcesConfig) ToResources() (*Resources, error)

ToResources converts the resources config to resources

func (*ResourcesConfig) Validate

func (r *ResourcesConfig) Validate() error

Validate returns an error if the resources are invalid

type ResultPath

type ResultPath struct {
	// Name
	Name string `json:"Name"`
	// The path to the file/dir
	Path string `json:"Path"`
}

func (*ResultPath) Copy

func (p *ResultPath) Copy() *ResultPath

Copy returns a copy of the path

func (*ResultPath) Normalize

func (p *ResultPath) Normalize()

Normalize normalizes the path to a canonical form

func (*ResultPath) Validate

func (p *ResultPath) Validate() error

Validate validates the path

type RunCommandResult

type RunCommandResult struct {
	// stdout of the run. Yaml provided for `describe` output
	STDOUT string `json:"Stdout"`

	// bool describing if stdout was truncated
	StdoutTruncated bool `json:"StdoutTruncated"`

	// stderr of the run.
	STDERR string `json:"stderr"`

	// bool describing if stderr was truncated
	StderrTruncated bool `json:"StderrTruncated"`

	// exit code of the run.
	ExitCode int `json:"ExitCode"`

	// Runner error
	ErrorMsg string `json:"ErrorMsg"`
}

func NewRunCommandResult

func NewRunCommandResult() *RunCommandResult

func (*RunCommandResult) Copy added in v1.5.2

type SpecConfig

type SpecConfig struct {
	// Type of the config
	Type string `json:"Type" yaml:"Type,omitempty"`

	// Params is a map of the config params
	Params map[string]interface{} `json:"Params,omitempty" yaml:"Params,omitempty"`
}

func NewSpecConfig

func NewSpecConfig(t string) *SpecConfig

NewSpecConfig returns a new spec config

func (*SpecConfig) Copy

func (s *SpecConfig) Copy() *SpecConfig

Copy returns a shallow copy of the spec config TODO: implement deep copy if the value is a nested map, slice or Copyable

func (*SpecConfig) IsEmpty added in v1.1.0

func (s *SpecConfig) IsEmpty() bool

IsEmpty returns true if the spec config is empty

func (*SpecConfig) IsType

func (s *SpecConfig) IsType(t string) bool

IsType returns true if the current SpecConfig

func (*SpecConfig) MarshalZerologObject added in v1.2.2

func (s *SpecConfig) MarshalZerologObject(e *zerolog.Event)

func (*SpecConfig) MetricAttributes added in v1.2.2

func (s *SpecConfig) MetricAttributes() []attribute.KeyValue

func (*SpecConfig) Normalize

func (s *SpecConfig) Normalize()

func (*SpecConfig) Validate

func (s *SpecConfig) Validate() error

func (*SpecConfig) ValidateAllowBlank added in v1.1.0

func (s *SpecConfig) ValidateAllowBlank() error

ValidateAllowBlank is the same as Validate but allows blank types. This is useful for when you want to validate a spec config that is optional.

func (*SpecConfig) WithParam

func (s *SpecConfig) WithParam(key string, value interface{}) *SpecConfig

WithParam adds a param to the spec config

type State

type State[T any] struct {
	// StateType is the current state of the object.
	StateType T `json:"StateType"`

	// Message is a human readable message describing the state.
	Message string `json:"Message,omitempty"`

	// Details is a map of additional details about the state.
	Details map[string]string `json:"Details,omitempty"`
}

State is a generic struct for representing the state of an object, with an optional human readable message.

func NewExecutionDesiredState

func NewExecutionDesiredState(stateType ExecutionDesiredStateType) State[ExecutionDesiredStateType]

NewExecutionDesiredState returns a new ExecutionDesiredStateType with the specified state type

func NewExecutionState

func NewExecutionState(stateType ExecutionStateType) State[ExecutionStateType]

NewExecutionState returns a new ExecutionState with the specified state type

func NewJobState

func NewJobState(stateType JobStateType) State[JobStateType]

NewJobState returns a new JobState with the specified state type

func (State[T]) WithDetail added in v1.5.0

func (s State[T]) WithDetail(key, value string) State[T]

WithDetail returns a new State with the specified detail.

func (State[T]) WithDetails added in v1.5.0

func (s State[T]) WithDetails(details map[string]string) State[T]

WithDetails returns a new State with the specified details.

func (State[T]) WithMessage

func (s State[T]) WithMessage(message string) State[T]

WithMessage returns a new State with the specified message.

type StateChange

type StateChange[StateType any] struct {
	Previous StateType `json:"Previous,omitempty"`
	New      StateType `json:"New,omitempty"`
}

StateChange represents a change in state of one of the state types.

type Task

type Task struct {
	// Name of the task
	Name string `json:"Name"`

	Engine *SpecConfig `json:"Engine"`

	Publisher *SpecConfig `json:"Publisher"`

	// Map of environment variables to be used by the driver
	Env map[string]string `json:"Env,omitempty"`

	// Meta is used to associate arbitrary metadata with this task.
	Meta map[string]string `json:"Meta,omitempty"`

	// InputSources is a list of remote artifacts to be downloaded before running the task
	// and mounted into the task.
	InputSources []*InputSource `json:"InputSources,omitempty"`

	// ResultPaths is a list of task volumes to be included in the task's published result
	ResultPaths []*ResultPath `json:"ResultPaths,omitempty"`

	// ResourcesConfig is the resources needed by this task
	ResourcesConfig *ResourcesConfig `json:"Resources,omitempty"`

	Network *NetworkConfig `json:"Network,omitempty"`

	Timeouts *TimeoutConfig `json:"Timeouts,omitempty"`
}

func (*Task) AllStorageTypes

func (t *Task) AllStorageTypes() []string

func (*Task) Copy

func (t *Task) Copy() *Task

func (*Task) MetricAttributes added in v1.2.1

func (t *Task) MetricAttributes() []attribute.KeyValue

func (*Task) Normalize

func (t *Task) Normalize()

func (*Task) Validate

func (t *Task) Validate() error

Validate is used to check a job for reasonable configuration

func (*Task) ValidateSubmission added in v1.1.0

func (t *Task) ValidateSubmission() error

ValidateSubmission is used to check a task for reasonable configuration when it is submitted. It is a subset of Validate that does not check fields with defaults, such as timeouts and resources.

type TimeoutConfig

type TimeoutConfig struct {
	// ExecutionTimeout is the maximum amount of time a task is allowed to run in seconds.
	// Zero means no timeout, such as for a daemon task.
	ExecutionTimeout int64 `json:"ExecutionTimeout,omitempty"`
	// QueueTimeout is the maximum amount of time a task is allowed to wait in the orchestrator
	// queue in seconds before being scheduled. Zero means no timeout.
	QueueTimeout int64 `json:"QueueTimeout,omitempty"`
	// TotalTimeout is the maximum amount of time a task is allowed to complete in seconds.
	// This includes the time spent in the queue, the time spent executing and the time spent retrying.
	// Zero means no timeout.
	TotalTimeout int64 `json:"TotalTimeout,omitempty"`
}

TimeoutConfig is the configuration for timeout related settings, such as execution and shutdown timeouts.

func (*TimeoutConfig) Copy

func (t *TimeoutConfig) Copy() *TimeoutConfig

Copy returns a deep copy of the timeout config.

func (*TimeoutConfig) GetExecutionTimeout

func (t *TimeoutConfig) GetExecutionTimeout() time.Duration

GetExecutionTimeout returns the execution timeout duration Returns ExecutionTimeout if configured, otherwise returns TotalTimeout value, otherwise returns 0.

func (*TimeoutConfig) GetQueueTimeout added in v1.3.2

func (t *TimeoutConfig) GetQueueTimeout() time.Duration

GetQueueTimeout returns the queue timeout duration Returns QueueTimeout if configured, otherwise returns 0. We don't fallback to TotalTimeout to allow users to disable queueing if no nodes were available.

func (*TimeoutConfig) GetTotalTimeout added in v1.3.2

func (t *TimeoutConfig) GetTotalTimeout() time.Duration

GetTotalTimeout returns the total timeout duration

func (*TimeoutConfig) Validate

func (t *TimeoutConfig) Validate() error

Validate is used to check a timeout config for reasonable configuration. This is called after server side defaults are applied.

func (*TimeoutConfig) ValidateSubmission added in v1.5.0

func (t *TimeoutConfig) ValidateSubmission() error

ValidateSubmission is used to check a timeout config for reasonable configuration when it is submitted.

type Validatable

type Validatable interface {
	Validate() error
}

Validatable is an interface for types that can be validated

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL