awsbatch

package
v1.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: Apache-2.0 Imports: 40 Imported by: 1

Documentation

Overview

This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model.

Index

Constants

View Source
const (
	PrimaryTaskQueueKey = "primary_queue"
	DynamicTaskQueueKey = "dynamic_queue"
	ChildTaskQueueKey   = "child_queue"
)
View Source
const (
	LogStreamFormatter = "https://console.aws.amazon.com/cloudwatch/home?region=%v#logEventViewer:group=/aws/batch/job;stream=%v"
	ArrayJobFormatter  = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/%v"
	JobFormatter       = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/queue/arn:aws:batch:%v:%v:job-queue~2F%v/job/%v"
)
View Source
const (
	ArrayJobIndex = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
)

Variables

This section is empty.

Functions

func FlyteTaskToBatchInput

func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionContext, jobDefinition string, cfg *config2.Config) (
	batchInput *batch.SubmitJobInput, err error)

Note that Name is not set on the result object. It's up to the caller to set the Name before creating the object in K8s.

func GetJobTaskLog

func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore.TaskLog

func GetJobURI

func GetJobURI(jobSize int, accountID, region, queue, jobID string) string
func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, jobStore *JobStore, state *State) (
	[]*idlCore.TaskLog, []*pluginCore.ExternalResource, error)

func TerminateSubTasks

func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string, metrics ExecutorMetrics) error

Attempts to terminate the AWS Job if one is recorded in the pluginState. This API is idempotent and should be safe to call multiple times on the same job. It'll result in multiple calls to AWS Batch in that case, however.

func UpdateBatchInputForArray

func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInput, arraySize int64) *batch.SubmitJobInput

Types

type ArrayJobSummary

type ArrayJobSummary map[JobPhaseType]int64

type Attempt

type Attempt struct {
	LogStream string    `json:"logStream,omitempty"`
	StartedAt time.Time `json:"startedAt,omitempty"`
	StoppedAt time.Time `json:"stoppedAt,omitempty"`
}

type BatchServiceClient

type BatchServiceClient interface {
	SubmitJobWithContext(ctx a.Context, input *batch.SubmitJobInput, opts ...request.Option) (*batch.SubmitJobOutput, error)
	TerminateJobWithContext(ctx a.Context, input *batch.TerminateJobInput, opts ...request.Option) (*batch.TerminateJobOutput, error)
	DescribeJobsWithContext(ctx a.Context, input *batch.DescribeJobsInput, opts ...request.Option) (*batch.DescribeJobsOutput, error)
	RegisterJobDefinitionWithContext(ctx a.Context, input *batch.RegisterJobDefinitionInput, opts ...request.Option) (*batch.RegisterJobDefinitionOutput, error)
}

BatchServiceClient is an interface on top of the native AWS Batch client to allow for mocking and alternative implementations.

type Client

type Client interface {
	// Submits a new job to AWS Batch and retrieves job info. Note that submitted jobs will not have status populated.
	SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error)

	// Attempts to terminate a job. If the job hasn't started yet, it'll just get deleted.
	TerminateJob(ctx context.Context, jobID JobID, reason string) error

	// Retrieves jobs' details from AWS Batch.
	GetJobDetailsBatch(ctx context.Context, ids []JobID) ([]*batch.JobDetail, error)

	// Registers a new Job Definition with AWS Batch provided a name, image and role.
	RegisterJobDefinition(ctx context.Context, name, image, role string, platformCapabilities string) (arn string, err error)

	// Gets the single region this client interacts with.
	GetRegion() string

	GetAccountID() string
}

AWS Batch Client interface.

func NewBatchClient

func NewBatchClient(awsClient aws.Client,
	getRateLimiter utils.RateLimiter,
	defaultRateLimiter utils.RateLimiter) Client

Initializes a new Batch Client that can be used to interact with AWS Batch.

func NewCustomBatchClient

func NewCustomBatchClient(batchClient BatchServiceClient, accountID, region string,
	getRateLimiter utils.RateLimiter,
	defaultRateLimiter utils.RateLimiter) Client

type Event

type Event struct {
	OldJob *Job
	NewJob *Job
}

type EventHandler

type EventHandler struct {
	Updated func(ctx context.Context, event Event)
}

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(ctx context.Context, awsClient aws.Client, cfg *batchConfig.Config,
	enqueueOwner core.EnqueueOwner, scope promutils.Scope) (Executor, error)

func (Executor) Abort

func (Executor) Finalize

func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error

func (Executor) GetID

func (e Executor) GetID() string

func (Executor) GetProperties

func (e Executor) GetProperties() core.PluginProperties

func (Executor) Handle

func (Executor) Start

func (e Executor) Start(ctx context.Context) error

type ExecutorMetrics

type ExecutorMetrics struct {
	Scope              promutils.Scope
	SubTasksSubmitted  labeled.Counter
	SubTasksSucceeded  labeled.Counter
	SubTasksFailed     labeled.Counter
	SubTasksQueued     labeled.Counter
	BatchJobTerminated labeled.Counter
}

type Job

type Job struct {
	ID             JobID                `json:"id,omitempty"`
	OwnerReference types.NamespacedName `json:"owner.omitempty"`
	Attempts       []Attempt            `json:"attempts,omitempty"`
	Status         JobStatus            `json:"status,omitempty"`
	SubJobs        []*Job               `json:"array,omitempty"`
}

func (Job) IsTerminal

func (j Job) IsTerminal() bool

func (Job) String

func (j Job) String() string

type JobConfig

type JobConfig struct {
	PrimaryTaskQueue string `json:"primary_queue"`
	DynamicTaskQueue string `json:"dynamic_queue"`
}

func (*JobConfig) MergeFromConfigMap

func (j *JobConfig) MergeFromConfigMap(configMap *v1.ConfigMap) *JobConfig

func (*JobConfig) MergeFromKeyValuePairs

func (j *JobConfig) MergeFromKeyValuePairs(pairs []*core.KeyValuePair) *JobConfig

type JobID

type JobID = string

func GetJobID

func GetJobID(id JobID, index int) JobID

type JobName

type JobName = string

type JobPhaseType

type JobPhaseType = core.Phase

type JobStatus

type JobStatus struct {
	Phase   JobPhaseType `json:"phase,omitempty"`
	Message string       `json:"msg,omitempty"`
}

type JobStore

type JobStore struct {
	Client
	cache.AutoRefresh
	// contains filtered or unexported fields
}

func NewJobStore

func NewJobStore(ctx context.Context, batchClient Client, cfg config.JobStoreConfig,
	handler EventHandler, scope promutils.Scope) (JobStore, error)

Constructs a new in-memory store.

func (JobStore) Get

func (s JobStore) Get(jobName string) *Job

func (JobStore) GetOrCreate

func (s JobStore) GetOrCreate(jobName string, job *Job) (*Job, error)

func (JobStore) IsStarted

func (s JobStore) IsStarted() bool

func (*JobStore) Start

func (s *JobStore) Start(ctx context.Context) error

func (JobStore) SubmitJob

func (s JobStore) SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error)

Submits a new job to AWS Batch and retrieves job info. Note that submitted jobs will not have status populated.

type State

type State struct {
	*core.State

	ExternalJobID    *string `json:"externalJobID"`
	JobDefinitionArn definition.JobDefinitionArn
}

func CheckSubTasksState

func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, jobStore *JobStore,
	cfg *config.Config, currentState *State, metrics ExecutorMetrics) (newState *State, err error)

func EnsureJobDefinition

func EnsureJobDefinition(ctx context.Context, tCtx pluginCore.TaskExecutionContext, cfg *config.Config, client Client,
	definitionCache definition.Cache, currentState *State, terminalVersion uint32) (nextState *State, err error)

func LaunchSubTasks

func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, pluginConfig *config.Config,
	currentState *State, metrics ExecutorMetrics, terminalVersion uint32) (nextState *State, err error)

func (State) GetExternalJobID

func (s State) GetExternalJobID() *string

func (State) GetJobDefinitionArn

func (s State) GetJobDefinitionArn() definition.JobDefinitionArn

func (*State) SetExternalJobID

func (s *State) SetExternalJobID(jobID string) *State

func (*State) SetJobDefinitionArn

func (s *State) SetJobDefinitionArn(arn definition.JobDefinitionArn) *State

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL