Documentation ¶
Index ¶
- Constants
- Variables
- type AllocationStatus
- type EnqueueOwner
- type EventsRecorder
- type ExternalResource
- type KubeClient
- type Phase
- type PhaseInfo
- func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo
- func PhaseInfoFailure(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo
- func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo
- func PhaseInfoQueuedWithTaskInfo(version uint32, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo
- func PhaseInfoSuccess(info *TaskInfo) PhaseInfo
- func PhaseInfoSystemFailure(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoSystemFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoSystemRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo
- func PhaseInfoWaitingForCache(version uint32, info *TaskInfo) PhaseInfo
- func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) PhaseInfodeprecated
- func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo
- func (p PhaseInfo) CleanupOnFailure() bool
- func (p PhaseInfo) Err() *core.ExecutionError
- func (p PhaseInfo) Info() *TaskInfo
- func (p PhaseInfo) Phase() Phase
- func (p PhaseInfo) Reason() string
- func (p PhaseInfo) String() string
- func (p PhaseInfo) Version() uint32
- func (p PhaseInfo) WithVersion(version uint32) PhaseInfo
- type Plugin
- type PluginEntry
- type PluginLoader
- type PluginProperties
- type PluginStateReader
- type PluginStateWriter
- type ReasonInfo
- type ResourceConstraint
- type ResourceConstraintsSpec
- type ResourceManager
- type ResourceNamespace
- type ResourceRegistrar
- type SecretManager
- type SetupContext
- type SignalAsync
- type TaskExecutionContext
- type TaskExecutionID
- type TaskExecutionMetadata
- type TaskInfo
- type TaskOverrides
- type TaskReader
- type TaskTemplatePath
- type TaskType
- type Transition
- type TransitionType
Examples ¶
Constants ¶
const DefaultPhaseVersion = uint32(0)
const SystemErrorCode = "SystemError"
Variables ¶
var PhaseInfoUndefined = PhaseInfo{/* contains filtered or unexported fields */}
PhaseInfoUndefined should be used when the Phase is unknown usually associated with an error
var Phases = []Phase{ PhaseUndefined, PhaseNotReady, PhaseWaitingForResources, PhaseQueued, PhaseInitializing, PhaseRunning, PhaseSuccess, PhaseRetryableFailure, PhasePermanentFailure, PhaseWaitingForCache, PhaseAborted, }
var UnknownTransition = Transition{TransitionTypeEphemeral, PhaseInfoUndefined}
UnknownTransition is synonymous to UndefinedTransition. To be returned when an error is observed
Functions ¶
This section is empty.
Types ¶
type AllocationStatus ¶
type AllocationStatus int
const ( // This is the enum returned when there's an error AllocationUndefined AllocationStatus = iota // Go for it AllocationStatusGranted // This means that no resources are available globally. This is the only rejection message we use right now. AllocationStatusExhausted // We're not currently using this - but this would indicate that things globally are okay, but that your // own namespace is too busy AllocationStatusNamespaceQuotaExceeded )
func AllocationStatusString ¶
func AllocationStatusString(s string) (AllocationStatus, error)
AllocationStatusString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func AllocationStatusValues ¶
func AllocationStatusValues() []AllocationStatus
AllocationStatusValues returns all values of the enum
func (AllocationStatus) IsAAllocationStatus ¶
func (i AllocationStatus) IsAAllocationStatus() bool
IsAAllocationStatus returns "true" if the value is listed in the enum definition. "false" otherwise
func (AllocationStatus) String ¶
func (i AllocationStatus) String() string
type EnqueueOwner ¶
type EnqueueOwner func(id types.NamespacedName) error
When a change is observed, the owning entity with id types.NamespacedName can be triggered for re-validation
type EventsRecorder ¶
Task events recorder, which get stored in the Admin. If this is invoked multiple times, multiple events will be sent to Admin. It is not recommended that one uses this interface, a transition will trigger an auto event to admin
type ExternalResource ¶
type ExternalResource struct { // A unique identifier for the external resource ExternalID string // Captures the status of caching for this external resource CacheStatus core.CatalogCacheStatus // A unique index for the external resource. Although the ID may change, this will remain the same // throughout task event reports and retries. Index uint32 // Log information for the external resource Logs []*core.TaskLog // The number of times this external resource has been attempted RetryAttempt uint32 // Phase (if exists) associated with the external resource Phase Phase }
type KubeClient ¶
type KubeClient interface { // GetClient returns a client configured with the Config GetClient() client.Client // GetCache returns a cache.Cache GetCache() cache.Cache }
TODO we may not want to expose this? A friendly controller-runtime client that gets passed to executors
type Phase ¶
type Phase int8
const ( // Does not mean an error, but simply states that we dont know the state in this round, try again later. But can be used to signal a system error too PhaseUndefined Phase = iota PhaseNotReady // Indicates plugin is not ready to submit the request as it is waiting for resources PhaseWaitingForResources // Indicates plugin has submitted the execution, but it has not started executing yet PhaseQueued // The system has started the pre-execution process, like container download, cluster startup etc PhaseInitializing // Indicates that the task has started executing PhaseRunning // Indicates that the task has completed successfully PhaseSuccess // Indicates that the Failure is recoverable, by re-executing the task if retries permit PhaseRetryableFailure // Indicate that the failure is non recoverable even if retries exist PhasePermanentFailure // Indicates the task is waiting for the cache to be populated so it can reuse results PhaseWaitingForCache // Indicate the task has been aborted PhaseAborted )
func PhaseString ¶
PhaseString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func (Phase) IsAPhase ¶
IsAPhase returns "true" if the value is listed in the enum definition. "false" otherwise
func (Phase) IsTerminal ¶
Returns true if the given phase is failure, retryable failure or success
func (Phase) IsWaitingForResources ¶
type PhaseInfo ¶
type PhaseInfo struct {
// contains filtered or unexported fields
}
Additional info that should be sent to the front end. The Information is sent to the front-end if it meets certain criterion, for example currently, it is sent only if an event was not already sent for
func PhaseInfoFailed ¶
func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo
func PhaseInfoFailure ¶
func PhaseInfoInitializing ¶
func PhaseInfoNotReady ¶
PhaseInfoNotReady represents the case the plugin is not ready to start
func PhaseInfoRunning ¶
func PhaseInfoSuccess ¶
func PhaseInfoSystemFailure ¶
func PhaseInfoWaitingForCache ¶
Creates a new PhaseInfo with phase set to PhaseWaitingForCache
func PhaseInfoWaitingForResourcesInfo ¶
func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo
PhaseInfoWaitingForResourcesInfo represents the case the plugin is not ready to start
func (PhaseInfo) CleanupOnFailure ¶
func (PhaseInfo) Err ¶
func (p PhaseInfo) Err() *core.ExecutionError
func (PhaseInfo) WithVersion ¶
type Plugin ¶
type Plugin interface { // Unique ID for the plugin, should be ideally the same the ID in PluginEntry GetID() string // Properties desired by the plugin from the available set GetProperties() PluginProperties // The actual method that is invoked for every single task execution. The method should be a non blocking method. // It maybe invoked multiple times and hence all actions should be idempotent. If idempotency is not possible look at // Transitions to get some system level guarantees Handle(ctx context.Context, tCtx TaskExecutionContext) (Transition, error) // Called when the task is to be killed/aborted, because the top level entity was aborted or some other failure happened. // Abort should always be idempotent Abort(ctx context.Context, tCtx TaskExecutionContext) error // Finalize is always called, after Handle or Abort. Finalize should be an idempotent operation Finalize(ctx context.Context, tCtx TaskExecutionContext) error }
Interface for the core Kozmo plugin
func LoadPlugin ¶
func LoadPlugin(ctx context.Context, iCtx SetupContext, entry PluginEntry) (Plugin, error)
Loads and validates a plugin.
type PluginEntry ¶
type PluginEntry struct { // System wide unique identifier for the plugin ID TaskType // A list of all the task types for which this plugin is applicable. RegisteredTaskTypes []TaskType // A Lazy loading function, that will load the plugin. Plugins should be initialized in this method. It is guaranteed // that the plugin loader will be called before any Handle/Abort/Finalize functions are invoked LoadPlugin PluginLoader // Boolean that indicates if this plugin can be used as the default for unknown task types. There can only be // one default in the system IsDefault bool }
An entry that identifies the CorePlugin
type PluginLoader ¶
type PluginLoader func(ctx context.Context, iCtx SetupContext) (Plugin, error)
A Lazy loading function, that will load the plugin. Plugins should be initialized in this method. It is guaranteed that the plugin loader will be called before any Handle/Abort/Finalize functions are invoked
type PluginProperties ¶
type PluginProperties struct { // Instructs the execution engine to not attempt to cache lookup or write for the node. DisableNodeLevelCaching bool // Specifies the length of TaskExecutionID generated name. default: 50 GeneratedNameMaxLength *int }
System level properties that this Plugin supports
type PluginStateReader ¶
type PluginStateReader interface { // Retrieve state version that is currently stored GetStateVersion() uint8 // Retrieve the typed state in t from the stored value. It also returns the stateversion. // If there is no state, t will be zero value, stateversion will be 0 Get(t interface{}) (stateVersion uint8, err error) }
Read previously written plugin state (previous round)
type PluginStateWriter ¶
type PluginStateWriter interface { // Only the last call to this method is recorded. All previous calls are overwritten // This data is also not accessible until the next round. Put(stateVersion uint8, v interface{}) error // Resets the state to empty or zero value Reset() error }
Write new plugin state for a plugin
type ReasonInfo ¶
type ResourceConstraint ¶
type ResourceConstraint struct {
Value int64
}
type ResourceConstraintsSpec ¶
type ResourceConstraintsSpec struct { ProjectScopeResourceConstraint *ResourceConstraint NamespaceScopeResourceConstraint *ResourceConstraint }
ResourceConstraintsSpec is a contract that a plugin can specify with ResourceManager to force runtime quota-allocation constraints at different levels.
Setting constraints in a ResourceConstraintsSpec to nil objects is valid, meaning there's no constraint at the corresponding level. For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level.
type ResourceManager ¶
type ResourceManager interface { GetID() string // During execution time, plugins can call AllocateResource() to register a token to the token pool associated with a resource with the resource manager. // If granted an allocation, the token will be recorded in the corresponding token pool until the same plugin releases it. // When calling AllocateResource, the plugin needs to specify a ResourceConstraintsSpec which contains resource capping constraints at different levels. // The ResourceConstraint pointers in ResourceConstraintsSpec, however, can be set to nil to present a non-constraint at that level AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string, constraintsSpec ResourceConstraintsSpec) (AllocationStatus, error) // During execution time, after an outstanding request is completed, the plugin need to use ReleaseResource() to release the allocation of the corresponding token // from the token pool in order to gain back the quota taken by the token ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error }
ResourceManager Interface 1. Terms and definitions
Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a single unit or multiple units at once. At Kozmo's current state, a resource means a logical separation (e.g., a cluster) of an external service that allows a limited number of outstanding requests to be sent to.
Token: Kozmo uses a token to serve as the placeholder to represent a unit of resource. Kozmo resource manager manages resources by managing the tokens of the resources.
2. Description ResourceManager provides a task-type-specific pooling system for Kozmo Tasks. Plugin writers can optionally request for resources in their tasks, in single quantity.
3. Usage A Kozmo plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the setup time of Kozmo Propeller. At the end of the setup time, Kozmo Propeller builds a ResourceManager based on these registration requests.
During runtime, the ResourceManager does two simple things: allocating tokens and releasing tokens. When a Kozmo task execution wants to send a request to an external service, the plugin should claim a unit of the corresponding resource. Specifically, an execution needs to generate a unique token, and register the token with ResourceManager by calling ResourceManager's AllocateResource() function. ResourceManager will check its current utilization and the allocation policy to decide whether or not to grant the request. Only when receiving the "AllocationGranted" status shall this execution move forward and send out the request. The granted token will be recorded in a token pool corresponding to the resource and managed by ResourceManager. When the request is done, the plugin will ask the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be erased from the corresponding pool.
4. Example Kozmo has a built-on Qubole plugin that allows Kozmo tasks to send out Hive commands to Qubole. In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>) and the de-allocation is achieved by the plugin calling status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
For example, status, err := AllocateResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0", ResourceConstraintsSpec{}) When the corresponding Hive command finishes, the plugin needs to make the following function call to release the corresponding token err := ReleaseResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0")
type ResourceNamespace ¶
type ResourceNamespace string
func (ResourceNamespace) CreateSubNamespace ¶
func (r ResourceNamespace) CreateSubNamespace(namespace ResourceNamespace) ResourceNamespace
type ResourceRegistrar ¶
type ResourceRegistrar interface {
RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error
}
type SecretManager ¶
type SetupContext ¶
type SetupContext interface { // returns a callback mechanism that indicates that (workflow, task) is ready to be re-evaluated EnqueueOwner() EnqueueOwner // provides a k8s specific owner kind OwnerKind() string // a metrics scope to publish stats under MetricsScope() promutils.Scope // A kubernetes client to the bound cluster KubeClient() KubeClient // Returns a secret manager that can retrieve configured secrets for this plugin SecretManager() SecretManager // Returns a resource negotiator that the plugin can register resource quota against ResourceRegistrar() ResourceRegistrar }
Passed to the Loader function when setting up a plugin
type TaskExecutionContext ¶
type TaskExecutionContext interface { // Returns a resource manager that can be used to create reservations for limited resources ResourceManager() ResourceManager // Returns a secret manager that can retrieve configured secrets for this plugin SecretManager() SecretManager // Returns a method that allows a plugin to indicate that the task has a new update and can be invoked again to check for updates TaskRefreshIndicator() SignalAsync // Returns the max allowed dataset size that the outputwriter will accept MaxDatasetSizeBytes() int64 // Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata DataStore() *storage.DataStore // Returns a reader that retrieves previously stored plugin internal state. the state itself is immutable PluginStateReader() PluginStateReader // Returns a TaskReader, to retrieve task details TaskReader() TaskReader // Returns an input reader to retrieve input data InputReader() io.InputReader // Returns a handle to the Task's execution metadata. TaskExecutionMetadata() TaskExecutionMetadata // Provides an output sync of type io.OutputWriter OutputWriter() io.OutputWriter // Get a handle to the PluginStateWriter. Any mutations to the plugins internal state can be persisted using this // These mutation will be visible in the next round PluginStateWriter() PluginStateWriter // Get a handle to catalog client Catalog() catalog.AsyncClient // Returns a handle to the Task events recorder, which get stored in the Admin. EventsRecorder() EventsRecorder }
An interface that is passed to every plugin invocation. It carries all meta and contextual information for the current task execution
type TaskExecutionID ¶
type TaskExecutionID interface { // GetGeneratedName returns the computed/generated name for the task id // deprecated: use GetGeneratedNameWithLength GetGeneratedName() string // GetGeneratedNameWith returns the generated name within a bounded length. If the name is smaller than minLength, // it'll get right-padded with character '0'. If the name is bigger than maxLength, it'll get hashed to fit within. GetGeneratedNameWith(minLength, maxLength int) (string, error) // GetID returns the underlying idl task identifier. GetID() core.TaskExecutionIdentifier // GetUniqueNodeID returns the fully-qualified Node ID that is unique within a // given workflow execution. GetUniqueNodeID() string }
TaskExecutionID is a simple Interface to expose the ExecutionID of the running Task
type TaskExecutionMetadata ¶
type TaskExecutionMetadata interface { // GetOwnerID returns the owning Kubernetes object GetOwnerID() types.NamespacedName // GetTaskExecutionID is a specially generated task execution id, that is guaranteed to be unique and consistent for subsequent calls GetTaskExecutionID() TaskExecutionID GetNamespace() string GetOwnerReference() v12.OwnerReference GetOverrides() TaskOverrides GetLabels() map[string]string GetMaxAttempts() uint32 GetAnnotations() map[string]string GetK8sServiceAccount() string GetSecurityContext() core.SecurityContext IsInterruptible() bool GetPlatformResources() *v1.ResourceRequirements GetInterruptibleFailureThreshold() int32 GetEnvironmentVariables() map[string]string }
TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the execution or any previously stored information
type TaskInfo ¶
type TaskInfo struct { // log information for the task execution Logs []*core.TaskLog // This value represents the time the status occurred at. If not provided, it will be defaulted to the time Kozmo // checked the task status. OccurredAt *time.Time // This value represents the time the status was reported at. If not provided, will be defaulted to the current time // when Kozmo published the event. ReportedAt *time.Time // Custom Event information that the plugin would like to expose to the front-end CustomInfo *structpb.Struct // A collection of information about external resources launched by this task ExternalResources []*ExternalResource // Additional reasons for this case. Note, these are not included in the phase state. AdditionalReasons []ReasonInfo }
type TaskOverrides ¶
type TaskOverrides interface { GetResources() *v1.ResourceRequirements GetExtendedResources() *core.ExtendedResources GetConfig() *v1.ConfigMap }
TaskOverrides interface to expose any overrides that have been set for this task (like resource overrides etc)
type TaskReader ¶
type TaskReader interface { TaskTemplatePath // Returns the core TaskTemplate Read(ctx context.Context) (*core.TaskTemplate, error) }
An interface to access the TaskInformation
type TaskTemplatePath ¶
type TaskTemplatePath interface { // Returns the path Path(ctx context.Context) (storage.DataReference, error) }
An interface to access a remote/sharable location that contains the serialized TaskTemplate
type Transition ¶
type Transition struct {
// contains filtered or unexported fields
}
A Plugin Handle method returns a Transition. This transition indicates to the Kozmo framework that if the plugin wants to continue "Handle"ing this task, or if wants to move the task to success, attempt a retry or fail. The transition automatically sends an event to Admin service which shows the plugin provided information in the Console/cli etc The information to be published is in the PhaseInfo structure. Transition Type indicates the type of consistency for subsequent handle calls in case the phase info results in a non terminal state. the PhaseInfo structure is very important and is used to record events in Admin. Only if the Phase + PhaseVersion was not previously observed, will an event be published to Admin there are only a configurable number of phase-versions usable. Usually it is preferred to be a monotonically increasing sequence
func DoTransition ¶
func DoTransition(info PhaseInfo) Transition
Same as DoTransition, but TransitionTime is always Ephemeral
func DoTransitionType ¶
func DoTransitionType(ttype TransitionType, info PhaseInfo) Transition
Creates and returns a new Transition based on the PhaseInfo.Phase Phases: PhaseNotReady, PhaseQueued, PhaseInitializing, PhaseRunning will cause the system to continue invoking Handle
func (Transition) Info ¶
func (t Transition) Info() PhaseInfo
func (Transition) String ¶
func (t Transition) String() string
Example ¶
trns := DoTransitionType(TransitionTypeBarrier, PhaseInfoUndefined) fmt.Println(trns.String())
Output: TransitionTypeBarrier,Phase<PhaseUndefined:0 <nil> Reason:>
func (Transition) Type ¶
func (t Transition) Type() TransitionType
type TransitionType ¶
type TransitionType int
Type of Transition, refer to Transition to understand what transition means
const ( // The transition is eventually consistent. For all the state written may not be visible in the next call, but eventually will persist // Best to use when the plugin logic is completely idempotent. This is also the most performant option. TransitionTypeEphemeral TransitionType = iota // @deprecated support for Barrier type transitions has been deprecated // This transition tries its best to make the latest state visible for every consecutive read. But, it is possible // to go back in time, i.e. monotonic consistency is violated (in rare cases). TransitionTypeBarrier )
func TransitionTypeString ¶
func TransitionTypeString(s string) (TransitionType, error)
TransitionTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func TransitionTypeValues ¶
func TransitionTypeValues() []TransitionType
TransitionTypeValues returns all values of the enum
func (TransitionType) IsATransitionType ¶
func (i TransitionType) IsATransitionType() bool
IsATransitionType returns "true" if the value is listed in the enum definition. "false" otherwise
func (TransitionType) String ¶
func (i TransitionType) String() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package template exports the Render method Render Evaluates templates in each command with the equivalent value from passed args.
|
Package template exports the Render method Render Evaluates templates in each command with the equivalent value from passed args. |