Documentation ¶
Overview ¶
This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model.
Example (NewContainerResources) ¶
defaultValues := []*core.Resources_ResourceEntry{ { Name: core.Resources_CPU, Value: "1", }, } overrideValues := []*core.Resources_ResourceEntry{ { Name: core.Resources_CPU, Value: "3", }, } res := newContainerResources(context.TODO(), nil, overrideValues, defaultValues) fmt.Printf("Computed Cpu: %v\n", res.Cpus)
Output: Computed Cpu: 3
Index ¶
- Constants
- func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionContext, ...) (batchInput *batch.SubmitJobInput, err error)
- func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore.TaskLog
- func GetJobUri(jobSize int, accountID, region, queue, jobID string) string
- func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, ...) ([]*idlCore.TaskLog, error)
- func TerminateSubTasks(ctx context.Context, batchClient Client, jobID string) error
- func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInput, arraySize int64) *batch.SubmitJobInput
- type ArrayJobSummary
- type Attempt
- type BatchServiceClient
- type Client
- type Event
- type EventHandler
- type Executor
- func (e Executor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (e Executor) GetID() string
- func (e Executor) GetProperties() core.PluginProperties
- func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
- func (e Executor) Start(ctx context.Context) error
- type Job
- type JobConfig
- type JobID
- type JobName
- type JobPhaseType
- type JobStatus
- type JobStore
- func (s JobStore) Get(jobName string) *Job
- func (s JobStore) GetOrCreate(jobName string, job *Job) (*Job, error)
- func (s JobStore) IsStarted() bool
- func (s *JobStore) Start(ctx context.Context) error
- func (s JobStore) SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error)
- type State
- func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, ...) (newState *State, err error)
- func EnsureJobDefinition(ctx context.Context, tCtx pluginCore.TaskExecutionContext, cfg *config.Config, ...) (nextState *State, err error)
- func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, ...) (nextState *State, err error)
Examples ¶
Constants ¶
View Source
const ( CpusKey = "cpu" GpusKey = "gpu" MemoryMbKey = "memory" )
View Source
const ( DefaultCPU = 2 DefaultMemory = 700 )
View Source
const ( // Keep these in-sync with flyteAdmin @ // https://github.com/lyft/flyteadmin/blob/d1c61c34f62d8ee51964f47877802d070dfa9e98/pkg/manager/impl/execution_manager.go#L42-L43 PrimaryTaskQueueKey = "primary_queue" DynamicTaskQueueKey = "dynamic_queue" ChildTaskQueueKey = "child_queue" )
View Source
const ( LogStreamFormatter = "https://console.aws.amazon.com/cloudwatch/home?region=%v#logEventViewer:group=/aws/batch/job;stream=%v" ArrayJobFormatter = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/%v" JobFormatter = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/queue/arn:aws:batch:%v:%v:job-queue~2F%v/job/%v" )
View Source
const (
ArrayJobIndex = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
)
View Source
const (
NoScale resource.Scale = 0
)
Variables ¶
This section is empty.
Functions ¶
func FlyteTaskToBatchInput ¶
func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionContext, jobDefinition string, cfg *config2.Config) ( batchInput *batch.SubmitJobInput, err error)
Note that Name is not set on the result object. It's up to the caller to set the Name before creating the object in K8s.
func GetJobTaskLog ¶
func GetTaskLinks ¶
func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, jobStore *JobStore, state *State) ( []*idlCore.TaskLog, error)
func TerminateSubTasks ¶
func UpdateBatchInputForArray ¶
func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInput, arraySize int64) *batch.SubmitJobInput
Types ¶
type ArrayJobSummary ¶
type ArrayJobSummary map[JobPhaseType]int64
type BatchServiceClient ¶
type BatchServiceClient interface { SubmitJobWithContext(ctx a.Context, input *batch.SubmitJobInput, opts ...request.Option) (*batch.SubmitJobOutput, error) TerminateJobWithContext(ctx a.Context, input *batch.TerminateJobInput, opts ...request.Option) (*batch.TerminateJobOutput, error) DescribeJobsWithContext(ctx a.Context, input *batch.DescribeJobsInput, opts ...request.Option) (*batch.DescribeJobsOutput, error) RegisterJobDefinitionWithContext(ctx a.Context, input *batch.RegisterJobDefinitionInput, opts ...request.Option) (*batch.RegisterJobDefinitionOutput, error) }
BatchServiceClient is an interface on top of the native AWS Batch client to allow for mocking and alternative implementations.
type Client ¶
type Client interface { // Submits a new job to AWS Batch and retrieves job info. Note that submitted jobs will not have status populated. SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error) // Attempts to terminate a job. If the job hasn't started yet, it'll just get deleted. TerminateJob(ctx context.Context, jobID JobID, reason string) error // Retrieves jobs' details from AWS Batch. GetJobDetailsBatch(ctx context.Context, ids []JobID) ([]*batch.JobDetail, error) // Registers a new Job Definition with AWS Batch provided a name, image and role. RegisterJobDefinition(ctx context.Context, name, image, role string) (arn string, err error) // Gets the single region this client interacts with. GetRegion() string GetAccountID() string }
AWS Batch Client interface.
func NewBatchClient ¶
func NewBatchClient(awsClient aws.Client, getRateLimiter utils.RateLimiter, defaultRateLimiter utils.RateLimiter) Client
Initializes a new Batch Client that can be used to interact with AWS Batch.
func NewCustomBatchClient ¶
func NewCustomBatchClient(batchClient BatchServiceClient, accountId, region string, getRateLimiter utils.RateLimiter, defaultRateLimiter utils.RateLimiter) Client
type EventHandler ¶
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func (Executor) GetProperties ¶
func (e Executor) GetProperties() core.PluginProperties
func (Executor) Handle ¶
func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
type Job ¶
type JobConfig ¶
type JobConfig struct { PrimaryTaskQueue string `json:"primary_queue"` DynamicTaskQueue string `json:"dynamic_queue"` }
func (*JobConfig) MergeFromConfigMap ¶
func (*JobConfig) MergeFromKeyValuePairs ¶
func (j *JobConfig) MergeFromKeyValuePairs(pairs []*core.KeyValuePair) *JobConfig
type JobPhaseType ¶
type JobStatus ¶
type JobStatus struct { Phase JobPhaseType `json:"phase,omitempty"` Message string `json:"msg,omitempty"` }
type JobStore ¶
type JobStore struct { Client cache.AutoRefresh // contains filtered or unexported fields }
func NewJobStore ¶
func NewJobStore(ctx context.Context, batchClient Client, cfg config.JobStoreConfig, handler EventHandler, scope promutils.Scope) (JobStore, error)
Constructs a new in-memory store.
type State ¶
type State struct { *core.State ExternalJobID *string `json:"externalJobID"` JobDefinitionArn definition.JobDefinitionArn }
func CheckSubTasksState ¶
func EnsureJobDefinition ¶
func EnsureJobDefinition(ctx context.Context, tCtx pluginCore.TaskExecutionContext, cfg *config.Config, client Client, definitionCache definition.Cache, currentState *State) (nextState *State, err error)
func LaunchSubTasks ¶
func (State) GetExternalJobID ¶
func (State) GetJobDefinitionArn ¶
func (s State) GetJobDefinitionArn() definition.JobDefinitionArn
func (*State) SetExternalJobID ¶
func (*State) SetJobDefinitionArn ¶
func (s *State) SetJobDefinitionArn(arn definition.JobDefinitionArn) *State
Source Files ¶
Click to show internal directories.
Click to hide internal directories.