Documentation ¶
Overview ¶
This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model.
Index ¶
- Constants
- func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionContext, ...) (batchInput *batch.SubmitJobInput, err error)
- func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore.TaskLog
- func GetJobUri(jobSize int, accountID, region, queue, jobID string) string
- func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, ...) ([]*idlCore.TaskLog, error)
- func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, ...) error
- func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInput, arraySize int64) *batch.SubmitJobInput
- type ArrayJobSummary
- type Attempt
- type BatchServiceClient
- type Client
- type Event
- type EventHandler
- type Executor
- func (e Executor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (e Executor) GetID() string
- func (e Executor) GetProperties() core.PluginProperties
- func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
- func (e Executor) Start(ctx context.Context) error
- type Job
- type JobConfig
- type JobID
- type JobName
- type JobPhaseType
- type JobStatus
- type JobStore
- func (s JobStore) Get(jobName string) *Job
- func (s JobStore) GetOrCreate(jobName string, job *Job) (*Job, error)
- func (s JobStore) IsStarted() bool
- func (s *JobStore) Start(ctx context.Context) error
- func (s JobStore) SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error)
- type State
- func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, ...) (newState *State, err error)
- func EnsureJobDefinition(ctx context.Context, tCtx pluginCore.TaskExecutionContext, cfg *config.Config, ...) (nextState *State, err error)
- func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, ...) (nextState *State, err error)
Constants ¶
const ( // Keep these in-sync with flyteAdmin @ // https://github.com/lyft/flyteadmin/blob/d1c61c34f62d8ee51964f47877802d070dfa9e98/pkg/manager/impl/execution_manager.go#L42-L43 PrimaryTaskQueueKey = "primary_queue" DynamicTaskQueueKey = "dynamic_queue" ChildTaskQueueKey = "child_queue" )
const ( LogStreamFormatter = "https://console.aws.amazon.com/cloudwatch/home?region=%v#logEventViewer:group=/aws/batch/job;stream=%v" ArrayJobFormatter = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/%v" JobFormatter = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/queue/arn:aws:batch:%v:%v:job-queue~2F%v/job/%v" )
const (
ArrayJobIndex = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
)
Variables ¶
This section is empty.
Functions ¶
func FlyteTaskToBatchInput ¶
func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionContext, jobDefinition string, cfg *config2.Config) ( batchInput *batch.SubmitJobInput, err error)
Note that Name is not set on the result object. It's up to the caller to set the Name before creating the object in K8s.
func GetJobTaskLog ¶
func GetTaskLinks ¶
func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, jobStore *JobStore, state *State) ( []*idlCore.TaskLog, error)
func TerminateSubTasks ¶
func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string) error
Attempts to terminate the AWS Job if one is recorded in the pluginState. This API is idempotent and should be safe to call multiple times on the same job. It'll result in multiple calls to AWS Batch in that case, however.
func UpdateBatchInputForArray ¶
func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInput, arraySize int64) *batch.SubmitJobInput
Types ¶
type ArrayJobSummary ¶
type ArrayJobSummary map[JobPhaseType]int64
type BatchServiceClient ¶
type BatchServiceClient interface { SubmitJobWithContext(ctx a.Context, input *batch.SubmitJobInput, opts ...request.Option) (*batch.SubmitJobOutput, error) TerminateJobWithContext(ctx a.Context, input *batch.TerminateJobInput, opts ...request.Option) (*batch.TerminateJobOutput, error) DescribeJobsWithContext(ctx a.Context, input *batch.DescribeJobsInput, opts ...request.Option) (*batch.DescribeJobsOutput, error) RegisterJobDefinitionWithContext(ctx a.Context, input *batch.RegisterJobDefinitionInput, opts ...request.Option) (*batch.RegisterJobDefinitionOutput, error) }
BatchServiceClient is an interface on top of the native AWS Batch client to allow for mocking and alternative implementations.
type Client ¶
type Client interface { // Submits a new job to AWS Batch and retrieves job info. Note that submitted jobs will not have status populated. SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error) // Attempts to terminate a job. If the job hasn't started yet, it'll just get deleted. TerminateJob(ctx context.Context, jobID JobID, reason string) error // Retrieves jobs' details from AWS Batch. GetJobDetailsBatch(ctx context.Context, ids []JobID) ([]*batch.JobDetail, error) // Registers a new Job Definition with AWS Batch provided a name, image and role. RegisterJobDefinition(ctx context.Context, name, image, role string) (arn string, err error) // Gets the single region this client interacts with. GetRegion() string GetAccountID() string }
AWS Batch Client interface.
func NewBatchClient ¶
func NewBatchClient(awsClient aws.Client, getRateLimiter utils.RateLimiter, defaultRateLimiter utils.RateLimiter) Client
Initializes a new Batch Client that can be used to interact with AWS Batch.
func NewCustomBatchClient ¶
func NewCustomBatchClient(batchClient BatchServiceClient, accountId, region string, getRateLimiter utils.RateLimiter, defaultRateLimiter utils.RateLimiter) Client
type EventHandler ¶
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func (Executor) GetProperties ¶
func (e Executor) GetProperties() core.PluginProperties
func (Executor) Handle ¶
func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
type Job ¶
type JobConfig ¶
type JobConfig struct { PrimaryTaskQueue string `json:"primary_queue"` DynamicTaskQueue string `json:"dynamic_queue"` }
func (*JobConfig) MergeFromConfigMap ¶
func (*JobConfig) MergeFromKeyValuePairs ¶
func (j *JobConfig) MergeFromKeyValuePairs(pairs []*core.KeyValuePair) *JobConfig
type JobPhaseType ¶
type JobStatus ¶
type JobStatus struct { Phase JobPhaseType `json:"phase,omitempty"` Message string `json:"msg,omitempty"` }
type JobStore ¶
type JobStore struct { Client cache.AutoRefresh // contains filtered or unexported fields }
func NewJobStore ¶
func NewJobStore(ctx context.Context, batchClient Client, cfg config.JobStoreConfig, handler EventHandler, scope promutils.Scope) (JobStore, error)
Constructs a new in-memory store.
type State ¶
type State struct { *core.State ExternalJobID *string `json:"externalJobID"` JobDefinitionArn definition.JobDefinitionArn }
func CheckSubTasksState ¶
func EnsureJobDefinition ¶
func EnsureJobDefinition(ctx context.Context, tCtx pluginCore.TaskExecutionContext, cfg *config.Config, client Client, definitionCache definition.Cache, currentState *State) (nextState *State, err error)
func LaunchSubTasks ¶
func (State) GetExternalJobID ¶
func (State) GetJobDefinitionArn ¶
func (s State) GetJobDefinitionArn() definition.JobDefinitionArn
func (*State) SetExternalJobID ¶
func (*State) SetJobDefinitionArn ¶
func (s *State) SetJobDefinitionArn(arn definition.JobDefinitionArn) *State