core

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2024 License: Apache-2.0 Imports: 14 Imported by: 9

Documentation

Index

Examples

Constants

View Source
const DefaultPhaseVersion = uint32(0)
View Source
const SystemErrorCode = "SystemError"

Variables

View Source
var PhaseInfoUndefined = PhaseInfo{/* contains filtered or unexported fields */}

PhaseInfoUndefined should be used when the Phase is unknown usually associated with an error

UnknownTransition is synonymous to UndefinedTransition. To be returned when an error is observed

Functions

This section is empty.

Types

type AllocationStatus

type AllocationStatus int
const (
	// This is the enum returned when there's an error
	AllocationUndefined AllocationStatus = iota

	// Go for it
	AllocationStatusGranted

	// This means that no resources are available globally.  This is the only rejection message we use right now.
	AllocationStatusExhausted

	// We're not currently using this - but this would indicate that things globally are okay, but that your
	// own namespace is too busy
	AllocationStatusNamespaceQuotaExceeded
)

func AllocationStatusString

func AllocationStatusString(s string) (AllocationStatus, error)

AllocationStatusString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func AllocationStatusValues

func AllocationStatusValues() []AllocationStatus

AllocationStatusValues returns all values of the enum

func (AllocationStatus) IsAAllocationStatus

func (i AllocationStatus) IsAAllocationStatus() bool

IsAAllocationStatus returns "true" if the value is listed in the enum definition. "false" otherwise

func (AllocationStatus) String

func (i AllocationStatus) String() string

type EnqueueOwner

type EnqueueOwner func(id types.NamespacedName) error

When a change is observed, the owning entity with id types.NamespacedName can be triggered for re-validation

type EventsRecorder

type EventsRecorder interface {
	RecordRaw(ctx context.Context, ev PhaseInfo) error
}

Task events recorder, which get stored in the Admin. If this is invoked multiple times, multiple events will be sent to Admin. It is not recommended that one uses this interface, a transition will trigger an auto event to admin

type ExternalResource

type ExternalResource struct {
	// A unique identifier for the external resource
	ExternalID string
	// Captures the status of caching for this external resource
	CacheStatus core.CatalogCacheStatus
	// A unique index for the external resource. Although the ID may change, this will remain the same
	// throughout task event reports and retries.
	Index uint32
	// Log information for the external resource
	Logs []*core.TaskLog
	// The number of times this external resource has been attempted
	RetryAttempt uint32
	// Phase (if exists) associated with the external resource
	Phase Phase
}

type KubeClient

type KubeClient interface {
	// GetClient returns a client configured with the Config
	GetClient() client.Client

	// GetCache returns a cache.Cache
	GetCache() cache.Cache
}

TODO we may not want to expose this? A friendly controller-runtime client that gets passed to executors

type Phase

type Phase int8
const (
	// Does not mean an error, but simply states that we dont know the state in this round, try again later. But can be used to signal a system error too
	PhaseUndefined Phase = iota
	PhaseNotReady
	// Indicates plugin is not ready to submit the request as it is waiting for resources
	PhaseWaitingForResources
	// Indicates plugin has submitted the execution, but it has not started executing yet
	PhaseQueued
	// The system has started the pre-execution process, like container download, cluster startup etc
	PhaseInitializing
	// Indicates that the task has started executing
	PhaseRunning
	// Indicates that the task has completed successfully
	PhaseSuccess
	// Indicates that the Failure is recoverable, by re-executing the task if retries permit
	PhaseRetryableFailure
	// Indicate that the failure is non recoverable even if retries exist
	PhasePermanentFailure
	// Indicates the task is waiting for the cache to be populated so it can reuse results
	PhaseWaitingForCache
	// Indicate the task has been aborted
	PhaseAborted
)

func PhaseString

func PhaseString(s string) (Phase, error)

PhaseString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func PhaseValues

func PhaseValues() []Phase

PhaseValues returns all values of the enum

func (Phase) IsAPhase

func (i Phase) IsAPhase() bool

IsAPhase returns "true" if the value is listed in the enum definition. "false" otherwise

func (Phase) IsAborted

func (p Phase) IsAborted() bool

func (Phase) IsFailure

func (p Phase) IsFailure() bool

func (Phase) IsSuccess

func (p Phase) IsSuccess() bool

func (Phase) IsTerminal

func (p Phase) IsTerminal() bool

Returns true if the given phase is failure, retryable failure or success

func (Phase) IsWaitingForResources

func (p Phase) IsWaitingForResources() bool

func (Phase) String

func (i Phase) String() string

type PhaseInfo

type PhaseInfo struct {
	// contains filtered or unexported fields
}

Additional info that should be sent to the front end. The Information is sent to the front-end if it meets certain criterion, for example currently, it is sent only if an event was not already sent for

func PhaseInfoFailed

func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo

func PhaseInfoFailure

func PhaseInfoFailure(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoFailureWithCleanup

func PhaseInfoFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoInitializing

func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoNotReady

func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo

PhaseInfoNotReady represents the case the plugin is not ready to start

func PhaseInfoQueued

func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo

func PhaseInfoQueuedWithTaskInfo

func PhaseInfoQueuedWithTaskInfo(version uint32, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoRetryableFailure

func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoRetryableFailureWithCleanup

func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoRunning

func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo

func PhaseInfoSuccess

func PhaseInfoSuccess(info *TaskInfo) PhaseInfo

func PhaseInfoSystemFailure

func PhaseInfoSystemFailure(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoSystemFailureWithCleanup

func PhaseInfoSystemFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoSystemRetryableFailure

func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoSystemRetryableFailureWithCleanup

func PhaseInfoSystemRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo

func PhaseInfoWaitingForCache

func PhaseInfoWaitingForCache(version uint32, info *TaskInfo) PhaseInfo

Creates a new PhaseInfo with phase set to PhaseWaitingForCache

func PhaseInfoWaitingForResources deprecated

func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) PhaseInfo

Deprecated: Please use PhaseInfoWaitingForResourcesInfo instead

func PhaseInfoWaitingForResourcesInfo

func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo

PhaseInfoWaitingForResourcesInfo represents the case the plugin is not ready to start

func (PhaseInfo) CleanupOnFailure

func (p PhaseInfo) CleanupOnFailure() bool

func (PhaseInfo) Err

func (p PhaseInfo) Err() *core.ExecutionError

func (PhaseInfo) Info

func (p PhaseInfo) Info() *TaskInfo

func (PhaseInfo) Phase

func (p PhaseInfo) Phase() Phase

func (PhaseInfo) Reason

func (p PhaseInfo) Reason() string

func (PhaseInfo) String

func (p PhaseInfo) String() string

func (PhaseInfo) Version

func (p PhaseInfo) Version() uint32

func (PhaseInfo) WithVersion

func (p PhaseInfo) WithVersion(version uint32) PhaseInfo

type Plugin

type Plugin interface {
	// Unique ID for the plugin, should be ideally the same the ID in PluginEntry
	GetID() string
	// Properties desired by the plugin from the available set
	GetProperties() PluginProperties
	// The actual method that is invoked for every single task execution. The method should be a non blocking method.
	// It maybe invoked multiple times and hence all actions should be idempotent. If idempotency is not possible look at
	// Transitions to get some system level guarantees
	Handle(ctx context.Context, tCtx TaskExecutionContext) (Transition, error)
	// Called when the task is to be killed/aborted, because the top level entity was aborted or some other failure happened.
	// Abort should always be idempotent
	Abort(ctx context.Context, tCtx TaskExecutionContext) error
	// Finalize is always called, after Handle or Abort. Finalize should be an idempotent operation
	Finalize(ctx context.Context, tCtx TaskExecutionContext) error
}

Interface for the core Kozmo plugin

func LoadPlugin

func LoadPlugin(ctx context.Context, iCtx SetupContext, entry PluginEntry) (Plugin, error)

Loads and validates a plugin.

type PluginEntry

type PluginEntry struct {
	// System wide unique identifier for the plugin
	ID TaskType
	// A list of all the task types for which this plugin is applicable.
	RegisteredTaskTypes []TaskType
	// A Lazy loading function, that will load the plugin. Plugins should be initialized in this method. It is guaranteed
	// that the plugin loader will be called before any Handle/Abort/Finalize functions are invoked
	LoadPlugin PluginLoader
	// Boolean that indicates if this plugin can be used as the default for unknown task types. There can only be
	// one default in the system
	IsDefault bool
}

An entry that identifies the CorePlugin

type PluginLoader

type PluginLoader func(ctx context.Context, iCtx SetupContext) (Plugin, error)

A Lazy loading function, that will load the plugin. Plugins should be initialized in this method. It is guaranteed that the plugin loader will be called before any Handle/Abort/Finalize functions are invoked

type PluginProperties

type PluginProperties struct {
	// Instructs the execution engine to not attempt to cache lookup or write for the node.
	DisableNodeLevelCaching bool
	// Specifies the length of TaskExecutionID generated name. default: 50
	GeneratedNameMaxLength *int
}

System level properties that this Plugin supports

type PluginStateReader

type PluginStateReader interface {
	// Retrieve state version that is currently stored
	GetStateVersion() uint8
	// Retrieve the typed state in t from the stored value. It also returns the stateversion.
	// If there is no state, t will be zero value, stateversion will be 0
	Get(t interface{}) (stateVersion uint8, err error)
}

Read previously written plugin state (previous round)

type PluginStateWriter

type PluginStateWriter interface {
	// Only the last call to this method is recorded. All previous calls are overwritten
	// This data is also not accessible until the next round.
	Put(stateVersion uint8, v interface{}) error
	// Resets the state to empty or zero value
	Reset() error
}

Write new plugin state for a plugin

type ReasonInfo

type ReasonInfo struct {
	Reason     string
	OccurredAt *time.Time
}

type ResourceConstraint

type ResourceConstraint struct {
	Value int64
}

type ResourceConstraintsSpec

type ResourceConstraintsSpec struct {
	ProjectScopeResourceConstraint   *ResourceConstraint
	NamespaceScopeResourceConstraint *ResourceConstraint
}

ResourceConstraintsSpec is a contract that a plugin can specify with ResourceManager to force runtime quota-allocation constraints at different levels.

Setting constraints in a ResourceConstraintsSpec to nil objects is valid, meaning there's no constraint at the corresponding level. For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level.

type ResourceManager

type ResourceManager interface {
	GetID() string
	// During execution time, plugins can call AllocateResource() to register a token to the token pool associated with a resource with the resource manager.
	// If granted an allocation, the token will be recorded in the corresponding token pool until the same plugin releases it.
	// When calling AllocateResource, the plugin needs to specify a ResourceConstraintsSpec which contains resource capping constraints at different levels.
	// The ResourceConstraint pointers in ResourceConstraintsSpec, however, can be set to nil to present a non-constraint at that level
	AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string, constraintsSpec ResourceConstraintsSpec) (AllocationStatus, error)
	// During execution time, after an outstanding request is completed, the plugin need to use ReleaseResource() to release the allocation of the corresponding token
	// from the token pool in order to gain back the quota taken by the token
	ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error
}

ResourceManager Interface 1. Terms and definitions

  • Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a single unit or multiple units at once. At Kozmo's current state, a resource means a logical separation (e.g., a cluster) of an external service that allows a limited number of outstanding requests to be sent to.

  • Token: Kozmo uses a token to serve as the placeholder to represent a unit of resource. Kozmo resource manager manages resources by managing the tokens of the resources.

    2. Description ResourceManager provides a task-type-specific pooling system for Kozmo Tasks. Plugin writers can optionally request for resources in their tasks, in single quantity.

    3. Usage A Kozmo plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the setup time of Kozmo Propeller. At the end of the setup time, Kozmo Propeller builds a ResourceManager based on these registration requests.

    During runtime, the ResourceManager does two simple things: allocating tokens and releasing tokens. When a Kozmo task execution wants to send a request to an external service, the plugin should claim a unit of the corresponding resource. Specifically, an execution needs to generate a unique token, and register the token with ResourceManager by calling ResourceManager's AllocateResource() function. ResourceManager will check its current utilization and the allocation policy to decide whether or not to grant the request. Only when receiving the "AllocationGranted" status shall this execution move forward and send out the request. The granted token will be recorded in a token pool corresponding to the resource and managed by ResourceManager. When the request is done, the plugin will ask the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be erased from the corresponding pool.

    4. Example Kozmo has a built-on Qubole plugin that allows Kozmo tasks to send out Hive commands to Qubole. In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>) and the de-allocation is achieved by the plugin calling status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)

    For example, status, err := AllocateResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0", ResourceConstraintsSpec{}) When the corresponding Hive command finishes, the plugin needs to make the following function call to release the corresponding token err := ReleaseResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0")

type ResourceNamespace

type ResourceNamespace string

func (ResourceNamespace) CreateSubNamespace

func (r ResourceNamespace) CreateSubNamespace(namespace ResourceNamespace) ResourceNamespace

type ResourceRegistrar

type ResourceRegistrar interface {
	RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error
}

type SecretManager

type SecretManager interface {
	Get(ctx context.Context, key string) (string, error)
}

type SetupContext

type SetupContext interface {
	// returns a callback mechanism that indicates that (workflow, task) is ready to be re-evaluated
	EnqueueOwner() EnqueueOwner
	// provides a k8s specific owner kind
	OwnerKind() string
	// a metrics scope to publish stats under
	MetricsScope() promutils.Scope
	// A kubernetes client to the bound cluster
	KubeClient() KubeClient
	// Returns a secret manager that can retrieve configured secrets for this plugin
	SecretManager() SecretManager
	// Returns a resource negotiator that the plugin can register resource quota against
	ResourceRegistrar() ResourceRegistrar
}

Passed to the Loader function when setting up a plugin

type SignalAsync

type SignalAsync func(ctx context.Context)

A simple fire-and-forget func

type TaskExecutionContext

type TaskExecutionContext interface {
	// Returns a resource manager that can be used to create reservations for limited resources
	ResourceManager() ResourceManager

	// Returns a secret manager that can retrieve configured secrets for this plugin
	SecretManager() SecretManager

	// Returns a method that allows a plugin to indicate that the task has a new update and can be invoked again to check for updates
	TaskRefreshIndicator() SignalAsync

	// Returns the max allowed dataset size that the outputwriter will accept
	MaxDatasetSizeBytes() int64

	// Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata
	DataStore() *storage.DataStore

	// Returns a reader that retrieves previously stored plugin internal state. the state itself is immutable
	PluginStateReader() PluginStateReader

	// Returns a TaskReader, to retrieve task details
	TaskReader() TaskReader

	// Returns an input reader to retrieve input data
	InputReader() io.InputReader

	// Returns a handle to the Task's execution metadata.
	TaskExecutionMetadata() TaskExecutionMetadata

	// Provides an output sync of type io.OutputWriter
	OutputWriter() io.OutputWriter

	// Get a handle to the PluginStateWriter. Any mutations to the plugins internal state can be persisted using this
	// These mutation will be visible in the next round
	PluginStateWriter() PluginStateWriter

	// Get a handle to catalog client
	Catalog() catalog.AsyncClient

	// Returns a handle to the Task events recorder, which get stored in the Admin.
	EventsRecorder() EventsRecorder
}

An interface that is passed to every plugin invocation. It carries all meta and contextual information for the current task execution

type TaskExecutionID

type TaskExecutionID interface {
	// GetGeneratedName returns the computed/generated name for the task id
	// deprecated: use GetGeneratedNameWithLength
	GetGeneratedName() string

	// GetGeneratedNameWith returns the generated name within a bounded length. If the name is smaller than minLength,
	// it'll get right-padded with character '0'. If the name is bigger than maxLength, it'll get hashed to fit within.
	GetGeneratedNameWith(minLength, maxLength int) (string, error)

	// GetID returns the underlying idl task identifier.
	GetID() core.TaskExecutionIdentifier

	// GetUniqueNodeID returns the fully-qualified Node ID that is unique within a
	// given workflow execution.
	GetUniqueNodeID() string
}

TaskExecutionID is a simple Interface to expose the ExecutionID of the running Task

type TaskExecutionMetadata

type TaskExecutionMetadata interface {
	// GetOwnerID returns the owning Kubernetes object
	GetOwnerID() types.NamespacedName
	// GetTaskExecutionID is a specially generated task execution id, that is guaranteed to be unique and consistent for subsequent calls
	GetTaskExecutionID() TaskExecutionID
	GetNamespace() string
	GetOwnerReference() v12.OwnerReference
	GetOverrides() TaskOverrides
	GetLabels() map[string]string
	GetMaxAttempts() uint32
	GetAnnotations() map[string]string
	GetK8sServiceAccount() string
	GetSecurityContext() core.SecurityContext
	IsInterruptible() bool
	GetPlatformResources() *v1.ResourceRequirements
	GetInterruptibleFailureThreshold() int32
	GetEnvironmentVariables() map[string]string
}

TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the execution or any previously stored information

type TaskInfo

type TaskInfo struct {
	// log information for the task execution
	Logs []*core.TaskLog
	// This value represents the time the status occurred at. If not provided, it will be defaulted to the time Kozmo
	// checked the task status.
	OccurredAt *time.Time
	// This value represents the time the status was reported at. If not provided, will be defaulted to the current time
	// when Kozmo published the event.
	ReportedAt *time.Time
	// Custom Event information that the plugin would like to expose to the front-end
	CustomInfo *structpb.Struct
	// A collection of information about external resources launched by this task
	ExternalResources []*ExternalResource
	// Additional reasons for this case. Note, these are not included in the phase state.
	AdditionalReasons []ReasonInfo
}

func (*TaskInfo) String

func (t *TaskInfo) String() string

type TaskOverrides

type TaskOverrides interface {
	GetResources() *v1.ResourceRequirements
	GetExtendedResources() *core.ExtendedResources
	GetConfig() *v1.ConfigMap
}

TaskOverrides interface to expose any overrides that have been set for this task (like resource overrides etc)

type TaskReader

type TaskReader interface {
	TaskTemplatePath
	// Returns the core TaskTemplate
	Read(ctx context.Context) (*core.TaskTemplate, error)
}

An interface to access the TaskInformation

type TaskTemplatePath

type TaskTemplatePath interface {
	// Returns the path
	Path(ctx context.Context) (storage.DataReference, error)
}

An interface to access a remote/sharable location that contains the serialized TaskTemplate

type TaskType

type TaskType = string

type Transition

type Transition struct {
	// contains filtered or unexported fields
}

A Plugin Handle method returns a Transition. This transition indicates to the Kozmo framework that if the plugin wants to continue "Handle"ing this task, or if wants to move the task to success, attempt a retry or fail. The transition automatically sends an event to Admin service which shows the plugin provided information in the Console/cli etc The information to be published is in the PhaseInfo structure. Transition Type indicates the type of consistency for subsequent handle calls in case the phase info results in a non terminal state. the PhaseInfo structure is very important and is used to record events in Admin. Only if the Phase + PhaseVersion was not previously observed, will an event be published to Admin there are only a configurable number of phase-versions usable. Usually it is preferred to be a monotonically increasing sequence

func DoTransition

func DoTransition(info PhaseInfo) Transition

Same as DoTransition, but TransitionTime is always Ephemeral

func DoTransitionType

func DoTransitionType(ttype TransitionType, info PhaseInfo) Transition

Creates and returns a new Transition based on the PhaseInfo.Phase Phases: PhaseNotReady, PhaseQueued, PhaseInitializing, PhaseRunning will cause the system to continue invoking Handle

func (Transition) Info

func (t Transition) Info() PhaseInfo

func (Transition) String

func (t Transition) String() string
Example
trns := DoTransitionType(TransitionTypeBarrier, PhaseInfoUndefined)
fmt.Println(trns.String())
Output:

TransitionTypeBarrier,Phase<PhaseUndefined:0 <nil> Reason:>

func (Transition) Type

func (t Transition) Type() TransitionType

type TransitionType

type TransitionType int

Type of Transition, refer to Transition to understand what transition means

const (
	// The transition is eventually consistent. For all the state written may not be visible in the next call, but eventually will persist
	// Best to use when the plugin logic is completely idempotent. This is also the most performant option.
	TransitionTypeEphemeral TransitionType = iota
	// @deprecated support for Barrier type transitions has been deprecated
	// This transition tries its best to make the latest state visible for every consecutive read. But, it is possible
	// to go back in time, i.e. monotonic consistency is violated (in rare cases).
	TransitionTypeBarrier
)

func TransitionTypeString

func TransitionTypeString(s string) (TransitionType, error)

TransitionTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func TransitionTypeValues

func TransitionTypeValues() []TransitionType

TransitionTypeValues returns all values of the enum

func (TransitionType) IsATransitionType

func (i TransitionType) IsATransitionType() bool

IsATransitionType returns "true" if the value is listed in the enum definition. "false" otherwise

func (TransitionType) String

func (i TransitionType) String() string

Directories

Path Synopsis
Package template exports the Render method Render Evaluates templates in each command with the equivalent value from passed args.
Package template exports the Render method Render Evaluates templates in each command with the equivalent value from passed args.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL