Documentation ¶
Index ¶
- Constants
- Variables
- func IsValidStatus(status string) bool
- func IsValidWorkerType(workerType string) bool
- func NewDefinitionID(definition Definition) (string, error)
- func NewRunID(engine *string) (string, error)
- func NewTemplateID(t Template) (string, error)
- type CloudTrailNotifications
- type CloudTrailS3File
- type Conf
- type CreateTemplateRequest
- type CreateTemplateResponse
- type Definition
- func (d *Definition) DefaultOrderField() string
- func (d Definition) GetExecutableCommand(req ExecutionRequest) (string, error)
- func (d Definition) GetExecutableID() *string
- func (d Definition) GetExecutableResourceName() string
- func (d Definition) GetExecutableResources() *ExecutableResources
- func (d Definition) GetExecutableType() *ExecutableType
- func (d *Definition) IsValid() (bool, []string)
- func (d Definition) MarshalJSON() ([]byte, error)
- func (d *Definition) UpdateWith(other Definition)
- func (d *Definition) ValidOrderField(field string) bool
- func (d *Definition) ValidOrderFields() []string
- func (d *Definition) WrappedCommand() (string, error)
- type DefinitionExecutionRequest
- type DefinitionList
- type Detail
- type EmrEvent
- type EnvList
- type EnvVar
- type EventLabels
- type Executable
- type ExecutableResources
- type ExecutableType
- type ExecutionRequest
- type ExecutionRequestCommon
- type ExecutionRequestCustom
- type GroupsList
- type IOrderable
- type InvolvedObject
- type KubernetesEvent
- type Labels
- type LaunchRequest
- type LaunchRequestV2
- type Manager
- type Metadata
- type NodeList
- type PodEvent
- type PodEventList
- type PodEvents
- type PortsList
- type Record
- type Run
- type RunExceptions
- type RunList
- type RunTags
- type SQLStateManager
- func (sm *SQLStateManager) BatchUpdateWorkers(updates []Worker) (WorkersList, error)
- func (sm *SQLStateManager) CheckIdempotenceKey(idempotenceKey string) (string, error)
- func (sm *SQLStateManager) Cleanup() error
- func (sm *SQLStateManager) CreateDefinition(d Definition) error
- func (sm *SQLStateManager) CreateRun(r Run) error
- func (sm *SQLStateManager) CreateTemplate(t Template) error
- func (sm *SQLStateManager) DeleteDefinition(definitionID string) error
- func (sm *SQLStateManager) DriverOOM(executableID string, commandHash string) (bool, error)
- func (sm *SQLStateManager) EstimateExecutorCount(executableID string, commandHash string) (int64, error)
- func (sm *SQLStateManager) EstimateRunResources(executableID string, runID string) (TaskResources, error)
- func (sm *SQLStateManager) ExecutorOOM(executableID string, commandHash string) (bool, error)
- func (sm *SQLStateManager) GetDefinition(definitionID string) (Definition, error)
- func (sm *SQLStateManager) GetDefinitionByAlias(alias string) (Definition, error)
- func (sm *SQLStateManager) GetExecutableByTypeAndID(t ExecutableType, id string) (Executable, error)
- func (sm *SQLStateManager) GetLatestTemplateByTemplateName(templateName string) (bool, Template, error)
- func (sm *SQLStateManager) GetNodeLifecycle(executableID string, commandHash string) (string, error)
- func (sm *SQLStateManager) GetPodReAttemptRate() (float32, error)
- func (sm *SQLStateManager) GetResources(runID string) (Run, error)
- func (sm *SQLStateManager) GetRun(runID string) (Run, error)
- func (sm *SQLStateManager) GetRunByEMRJobId(emrJobId string) (Run, error)
- func (sm *SQLStateManager) GetTaskHistoricalRuntime(executableID string, runID string) (float32, error)
- func (sm *SQLStateManager) GetTemplateByID(templateID string) (Template, error)
- func (sm *SQLStateManager) GetTemplateByVersion(templateName string, templateVersion int64) (bool, Template, error)
- func (sm *SQLStateManager) GetWorker(workerType string, engine string) (w Worker, err error)
- func (sm *SQLStateManager) Initialize(conf config.Config) error
- func (sm *SQLStateManager) ListDefinitions(limit int, offset int, sortBy string, order string, ...) (DefinitionList, error)
- func (sm *SQLStateManager) ListFailingNodes() (NodeList, error)
- func (sm *SQLStateManager) ListGroups(limit int, offset int, name *string) (GroupsList, error)
- func (sm *SQLStateManager) ListRuns(limit int, offset int, sortBy string, order string, ...) (RunList, error)
- func (sm *SQLStateManager) ListTags(limit int, offset int, name *string) (TagsList, error)
- func (sm *SQLStateManager) ListTemplates(limit int, offset int, sortBy string, order string) (TemplateList, error)
- func (sm *SQLStateManager) ListTemplatesLatestOnly(limit int, offset int, sortBy string, order string) (TemplateList, error)
- func (sm *SQLStateManager) ListWorkers(engine string) (WorkersList, error)
- func (sm *SQLStateManager) Name() string
- func (sm *SQLStateManager) UpdateDefinition(definitionID string, updates Definition) (Definition, error)
- func (sm *SQLStateManager) UpdateRun(runID string, updates Run) (Run, error)
- func (sm *SQLStateManager) UpdateWorker(workerType string, updates Worker) (Worker, error)
- type Source
- type SparkExtension
- type SparkSubmitJobDriver
- type SpawnedRun
- type SpawnedRuns
- type Tags
- type TagsList
- type TaskResources
- type Template
- func (t *Template) DefaultOrderField() string
- func (t Template) GetExecutableCommand(req ExecutionRequest) (string, error)
- func (t Template) GetExecutableID() *string
- func (t Template) GetExecutableResourceName() string
- func (t Template) GetExecutableResources() *ExecutableResources
- func (t Template) GetExecutableType() *ExecutableType
- func (t *Template) IsValid() (bool, []string)
- func (t *Template) ValidOrderField(field string) bool
- func (t *Template) ValidOrderFields() []string
- type TemplateExecutionRequest
- type TemplateJSONSchema
- type TemplateList
- type TemplatePayload
- type TerminateJob
- type UserIdentity
- type UserInfo
- type Worker
- type WorkersList
Constants ¶
const DefinitionSelect = `` /* 934-byte string literal not displayed */
DefinitionSelect postgres specific query for definitions
const GetDefinitionByAliasSQL = DefinitionSelect + "\nwhere alias = $1"
GetDefinitionByAliasSQL get definition by alias
const GetDefinitionSQL = DefinitionSelect + "\nwhere definition_id = $1"
GetDefinitionSQL postgres specific query for getting a single definition
const GetRunSQL = RunSelect + "\nwhere run_id = $1"
GetRunSQL postgres specific query for getting a single run
const GetRunSQLByEMRJobId = RunSelect + "\nwhere spark_extension->>'emr_job_id' = $1"
const GetRunSQLForUpdate = GetRunSQL + " for update"
GetRunSQLForUpdate postgres specific query for getting a single run for update
const GetTemplateByIDSQL = TemplateSelect + "\nwhere template_id = $1"
GetTemplateByIDSQL postgres specific query for getting a single template
const GetTemplateByVersionSQL = TemplateSelect + "\nWHERE template_name = $1 AND version = $2 ORDER BY version DESC LIMIT 1;"
const GetTemplateLatestOnlySQL = TemplateSelect + "\nWHERE template_name = $1 ORDER BY version DESC LIMIT 1;"
GetTemplateLatestOnlySQL get the latest version of a specific template name.
const GetWorkerEngine = WorkerSelect + "\nwhere engine = $1"
const GetWorkerSQL = WorkerSelect + "\nwhere worker_type = $1 and engine = $2"
GetWorkerSQL postgres specific query for retrieving data for a specific worker type.
const GetWorkerSQLForUpdate = GetWorkerSQL + " for update"
GetWorkerSQLForUpdate postgres specific query for retrieving data for a specific worker type; locks the row.
const GroupsSelect = `
select distinct group_name from task_def
`
GroupsSelect postgres specific query for getting existing definition group_names
const ListDefinitionsSQL = DefinitionSelect + "\n%s %s limit $1 offset $2"
ListDefinitionsSQL postgres specific query for listing definitions
const ListFailingNodesSQL = `` /* 593-byte string literal not displayed */
const ListGroupsSQL = GroupsSelect + "\n%s order by group_name asc limit $1 offset $2"
ListGroupsSQL postgres specific query for listing definition group_names
const ListRunsSQL = RunSelect + "\n%s %s limit $1 offset $2"
ListRunsSQL postgres specific query for listing runs
const ListTagsSQL = TagsSelect + "\n%s order by text asc limit $1 offset $2"
ListTagsSQL postgres specific query for listing definition tags
const ListTemplatesLatestOnlySQL = `` /* 458-byte string literal not displayed */
ListTemplatesLatestOnlySQL lists the latest version of each distinct template name.
const ListTemplatesSQL = TemplateSelect + "\n%s limit $1 offset $2"
ListTemplatesSQL postgres specific query for listing templates
const ListWorkersSQL = WorkerSelect
ListWorkersSQL postgres specific query for listing workers
const PodReAttemptRate = `` /* 430-byte string literal not displayed */
const RunSelect = `` /* 2460-byte string literal not displayed */
RunSelect postgres specific query for runs
const TagsSelect = `
select distinct text from tags
`
TagsSelect postgres specific query for getting existing definition tags
const TaskExecutionRuntimeCommandSQL = `` /* 407-byte string literal not displayed */
const TaskIdempotenceKeyCheckSQL = `` /* 221-byte string literal not displayed */
const TaskResourcesDriverOOMSQL = `` /* 278-byte string literal not displayed */
const TaskResourcesExecutorCountSQL = `` /* 656-byte string literal not displayed */
const TaskResourcesExecutorNodeLifecycleSQL = `` /* 291-byte string literal not displayed */
const TaskResourcesExecutorOOMSQL = `` /* 331-byte string literal not displayed */
const TaskResourcesSelectCommandSQL = `` /* 578-byte string literal not displayed */
const TemplatePayloadKey = "template_payload"
const TemplateSelect = `` /* 325-byte string literal not displayed */
TemplateSelect selects a template
const WorkerSelect = `
select
worker_type as workertype,
count_per_instance as countperinstance,
engine
from worker
`
WorkerSelect postgres specific query for workers
Variables ¶
var CommandTemplate, _ = template.New("command").Parse(commandWrapper)
var DefaultEngine = EKSEngine
var DefaultLifecycle = SpotLifecycle
var DefaultTaskType = "task"
var EKSBackoffLimit = int32(0)
var EKSEngine = "eks"
var EKSSparkEngine = "eks-spark"
var Engines = []string{EKSEngine, EKSSparkEngine}
var GPUNodeTypes = []string{"p3.2xlarge", "p3.8xlarge", "p3.16xlarge", "g5.xlarge", "g5.2xlarge", "g5.4xlarge", "g5.8xlarge", "g5.12xlarge", "g5.16xlarge", "g5.24xlarge", "g5.48xlarge"}
var MaxCPU = int64(60000)
var MaxEphemeralStorage = int64(5000)
var MaxGPUCPU = int64(94000)
var MaxGPUMem = int64(376000)
var MaxLogLines = int64(256)
var MaxMem = int64(350000)
var MinCPU = int64(256)
var MinMem = int64(512)
var NodeLifeCycles = []string{OndemandLifecycle, SpotLifecycle}
var OndemandActiveDeadlineSeconds = int64(604800)
var OndemandLifecycle = "ondemand"
var SpotActiveDeadlineSeconds = int64(172800)
var SpotLifecycle = "spot"
var StatusNeedsRetry = "NEEDS_RETRY"
StatusNeedsRetry indicates the run failed for infra reasons and needs retried
var StatusPending = "PENDING"
StatusPending indicates the run has been allocated to a host and is in the process of launching
var StatusQueued = "QUEUED"
StatusQueued indicates the run is queued
var StatusRunning = "RUNNING"
StatusRunning indicates the run is running
var StatusStopped = "STOPPED"
StatusStopped means the run is finished
var TTLSecondsAfterFinished = int32(3600)
Functions ¶
func IsValidStatus ¶
IsValidStatus checks that the given status string is one of the valid statuses
func IsValidWorkerType ¶
func NewDefinitionID ¶
func NewDefinitionID(definition Definition) (string, error)
NewDefinitionID returns a new uuid for a Definition
func NewTemplateID ¶
NewTemplateID returns a new uuid for a Template
Types ¶
type CloudTrailNotifications ¶
type CloudTrailNotifications struct {
Records []Record `json:"Records"`
}
CloudTrail notification object that is persisted into the DB.
func (*CloudTrailNotifications) Marshal ¶
func (e *CloudTrailNotifications) Marshal() ([]byte, error)
Marshal method for CloudTrail SQS notifications.
func (*CloudTrailNotifications) Scan ¶
func (e *CloudTrailNotifications) Scan(value interface{}) error
Scan from db
type CloudTrailS3File ¶
type CloudTrailS3File struct { S3Bucket string `json:"s3Bucket"` S3ObjectKey []string `json:"s3ObjectKey"` Done func() error }
SQS notification object for CloudTrail S3 files.
type CreateTemplateRequest ¶
type CreateTemplateRequest struct { TemplateName string `json:"template_name"` Schema TemplateJSONSchema `json:"schema"` CommandTemplate string `json:"command_template"` Defaults TemplatePayload `json:"defaults"` AvatarURI string `json:"avatar_uri"` ExecutableResources }
type CreateTemplateResponse ¶
type Definition ¶
type Definition struct { DefinitionID string `json:"definition_id"` GroupName string `json:"group_name,omitempty"` Alias string `json:"alias"` Command string `json:"command,omitempty"` TaskType string `json:"task_type,omitempty"` RequiresDocker bool `json:"requires_docker,omitempty" db:"requires_docker"` TargetCluster string `json:"target_cluster,omitempty" db:"target_cluster"` ExecutableResources }
task definition. It implements the `Executable` interface.
func (*Definition) DefaultOrderField ¶
func (d *Definition) DefaultOrderField() string
func (Definition) GetExecutableCommand ¶
func (d Definition) GetExecutableCommand(req ExecutionRequest) (string, error)
func (Definition) GetExecutableID ¶
func (d Definition) GetExecutableID() *string
Return definition or template id
func (Definition) GetExecutableResourceName ¶
func (d Definition) GetExecutableResourceName() string
func (Definition) GetExecutableResources ¶
func (d Definition) GetExecutableResources() *ExecutableResources
func (Definition) GetExecutableType ¶
func (d Definition) GetExecutableType() *ExecutableType
Returns definition or template
func (*Definition) IsValid ¶
func (d *Definition) IsValid() (bool, []string)
IsValid returns true only if this is a valid definition with all required information
func (Definition) MarshalJSON ¶
func (d Definition) MarshalJSON() ([]byte, error)
func (*Definition) UpdateWith ¶
func (d *Definition) UpdateWith(other Definition)
UpdateWith updates this definition with information from another
func (*Definition) ValidOrderField ¶
func (d *Definition) ValidOrderField(field string) bool
func (*Definition) ValidOrderFields ¶
func (d *Definition) ValidOrderFields() []string
func (*Definition) WrappedCommand ¶
func (d *Definition) WrappedCommand() (string, error)
WrappedCommand returns the wrapped command for the definition * wrapping ensures lines are logged and exit code is set
type DefinitionExecutionRequest ¶
type DefinitionExecutionRequest struct {
*ExecutionRequestCommon
}
func (*DefinitionExecutionRequest) GetExecutionRequestCommon ¶
func (d *DefinitionExecutionRequest) GetExecutionRequestCommon() *ExecutionRequestCommon
Returns ExecutionRequestCommon, common between Template and Definition types
func (*DefinitionExecutionRequest) GetExecutionRequestCustom ¶
func (d *DefinitionExecutionRequest) GetExecutionRequestCustom() *ExecutionRequestCustom
Only relevant to the template type
type DefinitionList ¶
type DefinitionList struct { Total int `json:"total"` Definitions []Definition `json:"definitions"` }
DefinitionList wraps a list of Definitions
func (*DefinitionList) MarshalJSON ¶
func (dl *DefinitionList) MarshalJSON() ([]byte, error)
type Detail ¶
type Detail struct { Severity *string `json:"severity,omitempty"` Name *string `json:"name,omitempty"` ID *string `json:"id,omitempty"` Arn *string `json:"arn,omitempty"` VirtualClusterID *string `json:"virtualClusterId,omitempty"` State *string `json:"state,omitempty"` CreatedBy *string `json:"createdBy,omitempty"` ReleaseLabel *string `json:"releaseLabel,omitempty"` ExecutionRoleArn *string `json:"executionRoleArn,omitempty"` FailureReason *string `json:"failureReason,omitempty"` StateDetails *string `json:"stateDetails,omitempty"` Message *string `json:"message,omitempty"` }
type EmrEvent ¶
type EmrEvent struct { Version *string `json:"version,omitempty"` ID *string `json:"id,omitempty"` DetailType *string `json:"detail-type,omitempty"` Source *string `json:"source,omitempty"` Account *string `json:"account,omitempty"` Time *string `json:"time,omitempty"` Region *string `json:"region,omitempty"` Resources []interface{} `json:"resources,omitempty"` Detail *Detail `json:"detail,omitempty"` Done func() error }
func UnmarshalEmrEvents ¶
type EnvList ¶
type EnvList []EnvVar
EnvList wraps a list of EnvVar
- abstraction to make it easier to read and write to db
type EventLabels ¶
type Executable ¶
type Executable interface { GetExecutableID() *string GetExecutableType() *ExecutableType GetExecutableResources() *ExecutableResources GetExecutableCommand(req ExecutionRequest) (string, error) GetExecutableResourceName() string // This will typically be an ARN. }
type ExecutableResources ¶
type ExecutableResources struct { Image string `json:"image"` Memory *int64 `json:"memory,omitempty"` Gpu *int64 `json:"gpu,omitempty"` Cpu *int64 `json:"cpu,omitempty"` EphemeralStorage *int64 `json:"ephemeral_storage,omitempty" db:"ephemeral_storage"` Env *EnvList `json:"env"` AdaptiveResourceAllocation *bool `json:"adaptive_resource_allocation,omitempty"` Ports *PortsList `json:"ports,omitempty"` Tags *Tags `json:"tags,omitempty"` }
ExecutableResources define the resources and flags required to run an executable.
type ExecutableType ¶
type ExecutableType string
const ( ExecutableTypeDefinition ExecutableType = "task_definition" ExecutableTypeTemplate ExecutableType = "template" )
type ExecutionRequest ¶
type ExecutionRequest interface { GetExecutionRequestCommon() *ExecutionRequestCommon GetExecutionRequestCustom() *ExecutionRequestCustom }
type ExecutionRequestCommon ¶
type ExecutionRequestCommon struct { ClusterName string `json:"cluster_name"` Env *EnvList `json:"env"` OwnerID string `json:"owner_id"` Command *string `json:"command"` Memory *int64 `json:"memory"` Cpu *int64 `json:"cpu"` Gpu *int64 `json:"gpu"` Engine *string `json:"engine"` EphemeralStorage *int64 `json:"ephemeral_storage"` NodeLifecycle *string `json:"node_lifecycle"` ActiveDeadlineSeconds *int64 `json:"active_deadline_seconds,omitempty"` SparkExtension *SparkExtension `json:"spark_extension,omitempty"` Description *string `json:"description,omitempty"` CommandHash *string `json:"command_hash,omitempty"` IdempotenceKey *string `json:"idempotence_key,omitempty"` Arch *string `json:"arch,omitempty"` Labels *Labels `json:"labels,omitempty"` ServiceAccount *string `json:"service_account,omitempty"` }
Common fields required to execute any Executable.
type ExecutionRequestCustom ¶
type ExecutionRequestCustom map[string]interface{}
func (*ExecutionRequestCustom) Scan ¶
func (e *ExecutionRequestCustom) Scan(value interface{}) error
Scan from db
type GroupsList ¶
GroupsList wraps a list of group names
type IOrderable ¶
type InvolvedObject ¶
type InvolvedObject struct { Kind string `json:"kind,omitempty"` Namespace string `json:"namespace,omitempty"` Name string `json:"name,omitempty"` Uid string `json:"uid,omitempty"` APIVersion string `json:"apiVersion,omitempty"` ResourceVersion string `json:"resourceVersion,omitempty"` FieldPath string `json:"fieldPath,omitempty"` Labels EventLabels `json:"labels,omitempty"` }
type KubernetesEvent ¶
type KubernetesEvent struct { Metadata Metadata `json:"metadata,omitempty"` Reason string `json:"reason,omitempty"` Message string `json:"message,omitempty"` Source Source `json:"source,omitempty"` FirstTimestamp string `json:"firstTimestamp,omitempty"` LastTimestamp string `json:"lastTimestamp,omitempty"` Count int64 `json:"count,omitempty"` Type string `json:"type,omitempty"` EventTime interface{} `json:"eventTime,omitempty"` ReportingComponent string `json:"reportingComponent,omitempty"` ReportingInstance string `json:"reportingInstance,omitempty"` InvolvedObject InvolvedObject `json:"involvedObject,omitempty"` Done func() error }
func (*KubernetesEvent) Marshal ¶
func (r *KubernetesEvent) Marshal() ([]byte, error)
type LaunchRequest ¶
type LaunchRequestV2 ¶
type LaunchRequestV2 struct { RunTags RunTags `json:"run_tags"` Command *string `json:"command,omitempty"` Memory *int64 `json:"memory,omitempty"` Cpu *int64 `json:"cpu,omitempty"` Gpu *int64 `json:"gpu,omitempty"` EphemeralStorage *int64 `json:"ephemeral_storage,omitempty"` Engine *string `json:"engine,omitempty"` NodeLifecycle *string `json:"node_lifecycle,omitempty"` ActiveDeadlineSeconds *int64 `json:"active_deadline_seconds,omitempty"` SparkExtension *SparkExtension `json:"spark_extension,omitempty"` ClusterName *string `json:"cluster,omitempty"` Env *EnvList `json:"env,omitempty"` Description *string `json:"description,omitempty"` CommandHash *string `json:"command_hash,omitempty"` IdempotenceKey *string `json:"idempotence_key,omitempty"` Arch *string `json:"arch,omitempty"` Labels *Labels `json:"labels,omitempty"` ServiceAccount *string `json:"service_account,omitempty"` }
type Manager ¶
type Manager interface { Name() string Initialize(conf config.Config) error Cleanup() error ListDefinitions( limit int, offset int, sortBy string, order string, filters map[string][]string, envFilters map[string]string) (DefinitionList, error) GetDefinition(definitionID string) (Definition, error) GetDefinitionByAlias(alias string) (Definition, error) UpdateDefinition(definitionID string, updates Definition) (Definition, error) CreateDefinition(d Definition) error DeleteDefinition(definitionID string) error ListRuns(limit int, offset int, sortBy string, order string, filters map[string][]string, envFilters map[string]string, engines []string) (RunList, error) EstimateRunResources(executableID string, commandHash string) (TaskResources, error) EstimateExecutorCount(executableID string, commandHash string) (int64, error) ExecutorOOM(executableID string, commandHash string) (bool, error) DriverOOM(executableID string, commandHash string) (bool, error) GetRun(runID string) (Run, error) CreateRun(r Run) error UpdateRun(runID string, updates Run) (Run, error) ListGroups(limit int, offset int, name *string) (GroupsList, error) ListTags(limit int, offset int, name *string) (TagsList, error) ListWorkers(engine string) (WorkersList, error) BatchUpdateWorkers(updates []Worker) (WorkersList, error) GetWorker(workerType string, engine string) (Worker, error) UpdateWorker(workerType string, updates Worker) (Worker, error) GetExecutableByTypeAndID(executableType ExecutableType, executableID string) (Executable, error) GetTemplateByID(templateID string) (Template, error) GetLatestTemplateByTemplateName(templateName string) (bool, Template, error) GetTemplateByVersion(templateName string, templateVersion int64) (bool, Template, error) ListTemplates(limit int, offset int, sortBy string, order string) (TemplateList, error) ListTemplatesLatestOnly(limit int, offset int, sortBy string, order string) (TemplateList, error) CreateTemplate(t Template) error ListFailingNodes() (NodeList, error) GetPodReAttemptRate() (float32, error) GetNodeLifecycle(executableID string, commandHash string) (string, error) GetTaskHistoricalRuntime(executableID string, runId string) (float32, error) CheckIdempotenceKey(idempotenceKey string) (string, error) GetRunByEMRJobId(string) (Run, error) }
Manager interface for CRUD operations on definitions and runs
type Metadata ¶
type Metadata struct { Name string `json:"name,omitempty"` Namespace string `json:"namespace,omitempty"` SelfLink string `json:"selfLink,omitempty"` Uid string `json:"uid,omitempty"` ResourceVersion string `json:"resourceVersion,omitempty"` CreationTimestamp string `json:"creationTimestamp,omitempty"` }
type PodEvent ¶
type PodEventList ¶
type PortsList ¶
type PortsList []int
PortsList wraps a list of int
- abstraction to make it easier to read and write to db
type Record ¶
type Record struct { UserIdentity UserIdentity `json:"userIdentity"` EventSource string `json:"eventSource"` EventName string `json:"eventName"` }
CloudTrail notification record.
type Run ¶
type Run struct { RunID string `json:"run_id"` DefinitionID string `json:"definition_id"` Alias string `json:"alias"` Image string `json:"image"` ClusterName string `json:"cluster"` ExitCode *int64 `json:"exit_code,omitempty"` Status string `json:"status"` QueuedAt *time.Time `json:"queued_at,omitempty"` StartedAt *time.Time `json:"started_at,omitempty"` FinishedAt *time.Time `json:"finished_at,omitempty"` InstanceID string `json:"-"` InstanceDNSName string `json:"-"` GroupName string `json:"group_name"` User string `json:"user,omitempty"` TaskType string `json:"task_type,omitempty"` Env *EnvList `json:"env,omitempty"` Command *string `json:"command,omitempty"` CommandHash *string `json:"command_hash,omitempty"` Memory *int64 `json:"memory,omitempty"` MemoryLimit *int64 `json:"memory_limit,omitempty"` Cpu *int64 `json:"cpu,omitempty"` CpuLimit *int64 `json:"cpu_limit,omitempty"` Gpu *int64 `json:"gpu,omitempty"` ExitReason *string `json:"exit_reason,omitempty"` Engine *string `json:"engine,omitempty"` NodeLifecycle *string `json:"node_lifecycle,omitempty"` EphemeralStorage *int64 `json:"ephemeral_storage,omitempty" db:"ephemeral_storage"` PodName *string `json:"pod_name,omitempty"` Namespace *string `json:"namespace,omitempty"` MaxMemoryUsed *int64 `json:"max_memory_used,omitempty"` MaxCpuUsed *int64 `json:"max_cpu_used,omitempty"` PodEvents *PodEvents `json:"pod_events,omitempty"` CloudTrailNotifications *CloudTrailNotifications `json:"cloudtrail_notifications,omitempty"` ExecutableID *string `json:"executable_id,omitempty"` ExecutableType *ExecutableType `json:"executable_type,omitempty"` ExecutionRequestCustom *ExecutionRequestCustom `json:"execution_request_custom,omitempty"` AttemptCount *int64 `json:"attempt_count,omitempty"` SpawnedRuns *SpawnedRuns `json:"spawned_runs,omitempty"` RunExceptions *RunExceptions `json:"run_exceptions,omitempty"` ActiveDeadlineSeconds *int64 `json:"active_deadline_seconds,omitempty"` SparkExtension *SparkExtension `json:"spark_extension,omitempty"` MetricsUri *string `json:"metrics_uri,omitempty"` Description *string `json:"description,omitempty"` IdempotenceKey *string `json:"idempotence_key,omitempty"` Arch *string `json:"arch,omitempty"` Labels Labels `json:"labels,omitempty"` RequiresDocker bool `json:"requires_docker,omitempty" db:"requires_docker"` ServiceAccount *string `json:"service_account,omitempty" db:"service_account"` }
Run represents a single run of a Definition
TODO:
Runs need to -copy- the run relevant information from their associated definition when they are created so they always have correct info. Currently the definition can change during or after the run is created and launched meaning the run is acting on information that is no longer accessible.
func (*Run) DefaultOrderField ¶
func (Run) MarshalJSON ¶
func (*Run) UpdateWith ¶
UpdateWith updates this run with information from another
func (*Run) ValidOrderField ¶
func (*Run) ValidOrderFields ¶
type RunExceptions ¶
type RunExceptions []string
func (*RunExceptions) Scan ¶
func (e *RunExceptions) Scan(value interface{}) error
type RunTags ¶
type RunTags struct { OwnerEmail string `json:"owner_email"` TeamName string `json:"team_name"` OwnerID string `json:"owner_id"` }
RunTags represents which user is responsible for a task run
type SQLStateManager ¶
type SQLStateManager struct {
// contains filtered or unexported fields
}
SQLStateManager uses postgresql to manage state
func (*SQLStateManager) BatchUpdateWorkers ¶
func (sm *SQLStateManager) BatchUpdateWorkers(updates []Worker) (WorkersList, error)
BatchUpdateWorker updates multiple workers.
func (*SQLStateManager) CheckIdempotenceKey ¶
func (sm *SQLStateManager) CheckIdempotenceKey(idempotenceKey string) (string, error)
func (*SQLStateManager) Cleanup ¶
func (sm *SQLStateManager) Cleanup() error
Cleanup close any open resources
func (*SQLStateManager) CreateDefinition ¶
func (sm *SQLStateManager) CreateDefinition(d Definition) error
CreateDefinition creates the passed in definition object - error if definition already exists
func (*SQLStateManager) CreateRun ¶
func (sm *SQLStateManager) CreateRun(r Run) error
CreateRun creates the passed in run
func (*SQLStateManager) CreateTemplate ¶
func (sm *SQLStateManager) CreateTemplate(t Template) error
CreateTemplate creates a new template.
func (*SQLStateManager) DeleteDefinition ¶
func (sm *SQLStateManager) DeleteDefinition(definitionID string) error
DeleteDefinition deletes definition and associated runs and environment variables
func (*SQLStateManager) DriverOOM ¶
func (sm *SQLStateManager) DriverOOM(executableID string, commandHash string) (bool, error)
func (*SQLStateManager) EstimateExecutorCount ¶
func (sm *SQLStateManager) EstimateExecutorCount(executableID string, commandHash string) (int64, error)
func (*SQLStateManager) EstimateRunResources ¶
func (sm *SQLStateManager) EstimateRunResources(executableID string, runID string) (TaskResources, error)
func (*SQLStateManager) ExecutorOOM ¶
func (sm *SQLStateManager) ExecutorOOM(executableID string, commandHash string) (bool, error)
func (*SQLStateManager) GetDefinition ¶
func (sm *SQLStateManager) GetDefinition(definitionID string) (Definition, error)
GetDefinition returns a single definition by id
func (*SQLStateManager) GetDefinitionByAlias ¶
func (sm *SQLStateManager) GetDefinitionByAlias(alias string) (Definition, error)
GetDefinitionByAlias returns a single definition by id
func (*SQLStateManager) GetExecutableByTypeAndID ¶
func (sm *SQLStateManager) GetExecutableByTypeAndID(t ExecutableType, id string) (Executable, error)
GetExecutableByExecutableType returns a single executable by id.
func (*SQLStateManager) GetLatestTemplateByTemplateName ¶
func (sm *SQLStateManager) GetLatestTemplateByTemplateName(templateName string) (bool, Template, error)
GetLatestTemplateByTemplateName returns the latest version of a template of a specific template name.
func (*SQLStateManager) GetNodeLifecycle ¶
func (sm *SQLStateManager) GetNodeLifecycle(executableID string, commandHash string) (string, error)
func (*SQLStateManager) GetPodReAttemptRate ¶
func (sm *SQLStateManager) GetPodReAttemptRate() (float32, error)
func (*SQLStateManager) GetResources ¶
func (sm *SQLStateManager) GetResources(runID string) (Run, error)
func (*SQLStateManager) GetRun ¶
func (sm *SQLStateManager) GetRun(runID string) (Run, error)
GetRun gets run by id
func (*SQLStateManager) GetRunByEMRJobId ¶
func (sm *SQLStateManager) GetRunByEMRJobId(emrJobId string) (Run, error)
func (*SQLStateManager) GetTaskHistoricalRuntime ¶
func (sm *SQLStateManager) GetTaskHistoricalRuntime(executableID string, runID string) (float32, error)
func (*SQLStateManager) GetTemplateByID ¶
func (sm *SQLStateManager) GetTemplateByID(templateID string) (Template, error)
GetTemplateByID returns a single template by id.
func (*SQLStateManager) GetTemplateByVersion ¶
func (*SQLStateManager) GetWorker ¶
func (sm *SQLStateManager) GetWorker(workerType string, engine string) (w Worker, err error)
GetWorker returns data for a single worker.
func (*SQLStateManager) Initialize ¶
func (sm *SQLStateManager) Initialize(conf config.Config) error
Initialize creates tables if they do not exist
func (*SQLStateManager) ListDefinitions ¶
func (sm *SQLStateManager) ListDefinitions( limit int, offset int, sortBy string, order string, filters map[string][]string, envFilters map[string]string) (DefinitionList, error)
ListDefinitions returns a DefinitionList limit: limit the result to this many definitions offset: start the results at this offset sortBy: sort by this field order: 'asc' or 'desc' filters: map of field filters on Definition - joined with AND envFilters: map of environment variable filters - joined with AND
func (*SQLStateManager) ListFailingNodes ¶
func (sm *SQLStateManager) ListFailingNodes() (NodeList, error)
func (*SQLStateManager) ListGroups ¶
func (sm *SQLStateManager) ListGroups(limit int, offset int, name *string) (GroupsList, error)
ListGroups returns a list of the existing group names.
func (*SQLStateManager) ListRuns ¶
func (sm *SQLStateManager) ListRuns(limit int, offset int, sortBy string, order string, filters map[string][]string, envFilters map[string]string, engines []string) (RunList, error)
ListRuns returns a RunList limit: limit the result to this many runs offset: start the results at this offset sortBy: sort by this field order: 'asc' or 'desc' filters: map of field filters on Run - joined with AND envFilters: map of environment variable filters - joined with AND
func (*SQLStateManager) ListTemplates ¶
func (sm *SQLStateManager) ListTemplates(limit int, offset int, sortBy string, order string) (TemplateList, error)
ListTemplates returns list of templates from the database.
func (*SQLStateManager) ListTemplatesLatestOnly ¶
func (sm *SQLStateManager) ListTemplatesLatestOnly(limit int, offset int, sortBy string, order string) (TemplateList, error)
ListTemplates returns list of templates from the database.
func (*SQLStateManager) ListWorkers ¶
func (sm *SQLStateManager) ListWorkers(engine string) (WorkersList, error)
ListWorkers returns list of workers
func (*SQLStateManager) Name ¶
func (sm *SQLStateManager) Name() string
Name is the name of the state manager - matches value in configuration
func (*SQLStateManager) UpdateDefinition ¶
func (sm *SQLStateManager) UpdateDefinition(definitionID string, updates Definition) (Definition, error)
UpdateDefinition updates a definition - updates can be partial
func (*SQLStateManager) UpdateRun ¶
func (sm *SQLStateManager) UpdateRun(runID string, updates Run) (Run, error)
UpdateRun updates run with updates - can be partial
func (*SQLStateManager) UpdateWorker ¶
func (sm *SQLStateManager) UpdateWorker(workerType string, updates Worker) (Worker, error)
UpdateWorker updates a single worker.
type SparkExtension ¶
type SparkExtension struct { SparkSubmitJobDriver *SparkSubmitJobDriver `json:"spark_submit_job_driver,omitempty"` ApplicationConf []Conf `json:"application_conf,omitempty"` HiveConf []Conf `json:"hive_conf,omitempty"` EMRJobId *string `json:"emr_job_id,omitempty"` SparkAppId *string `json:"spark_app_id,omitempty"` EMRJobManifest *string `json:"emr_job_manifest,omitempty"` HistoryUri *string `json:"history_uri,omitempty"` MetricsUri *string `json:"metrics_uri,omitempty"` VirtualClusterId *string `json:"virtual_cluster_id,omitempty"` EMRReleaseLabel *string `json:"emr_release_label,omitempty"` ExecutorInitCommand *string `json:"executor_init_command,omitempty"` DriverInitCommand *string `json:"driver_init_command,omitempty"` AppUri *string `json:"app_uri,omitempty"` Executors []string `json:"executors,omitempty"` ExecutorOOM *bool `json:"executor_oom,omitempty"` DriverOOM *bool `json:"driver_oom,omitempty"` }
func UnmarshalSparkExtension ¶
func UnmarshalSparkExtension(data []byte) (SparkExtension, error)
func (*SparkExtension) Marshal ¶
func (r *SparkExtension) Marshal() ([]byte, error)
func (*SparkExtension) Scan ¶
func (e *SparkExtension) Scan(value interface{}) error
type SparkSubmitJobDriver ¶
type SparkSubmitJobDriver struct { EntryPoint *string `json:"entry_point,omitempty"` EntryPointArguments []*string `json:"entry_point_arguments,omitempty"` SparkSubmitConf []Conf `json:"spark_submit_conf,omitempty"` Files []string `json:"files,omitempty"` PyFiles []string `json:"py_files,omitempty"` Jars []string `json:"jars,omitempty"` Class *string `json:"class,omitempty"` WorkingDir *string `json:"working_dir,omitempty"` NumExecutors *int64 `json:"num_executors,omitempty"` ExecutorMemory *int64 `json:"executor_memory,omitempty"` }
type SpawnedRun ¶
type SpawnedRun struct {
RunID string `json:"run_id"`
}
type SpawnedRuns ¶
type SpawnedRuns []SpawnedRun
func (*SpawnedRuns) Scan ¶
func (e *SpawnedRuns) Scan(value interface{}) error
type Tags ¶
type Tags []string
Tags wraps a list of strings
- abstraction to make it easier to read and write to db
type TaskResources ¶
Internal object for tracking cpu / memory resources.
type Template ¶
type Template struct { TemplateID string `json:"template_id"` TemplateName string `json:"template_name"` Version int64 `json:"version"` Schema TemplateJSONSchema `json:"schema"` CommandTemplate string `json:"command_template"` Defaults TemplatePayload `json:"defaults"` AvatarURI string `json:"avatar_uri"` ExecutableResources }
Template Object Type. The CommandTemplate is a Go Template type.
func (*Template) DefaultOrderField ¶
func (Template) GetExecutableCommand ¶
func (t Template) GetExecutableCommand(req ExecutionRequest) (string, error)
Renders the command to be rendered for that Template.
func (Template) GetExecutableResourceName ¶
Returns the Template Id.
func (Template) GetExecutableResources ¶
func (t Template) GetExecutableResources() *ExecutableResources
Returns default resources associated with that Template.
func (Template) GetExecutableType ¶
func (t Template) GetExecutableType() *ExecutableType
Returns Template Type
func (*Template) ValidOrderField ¶
func (*Template) ValidOrderFields ¶
type TemplateExecutionRequest ¶
type TemplateExecutionRequest struct { *ExecutionRequestCommon TemplatePayload TemplatePayload `json:"template_payload"` DryRun bool `json:"dry_run,omitempty"` }
func (TemplateExecutionRequest) GetExecutionRequestCommon ¶
func (t TemplateExecutionRequest) GetExecutionRequestCommon() *ExecutionRequestCommon
Returns ExecutionRequestCommon associated with a Template type.
func (TemplateExecutionRequest) GetExecutionRequestCustom ¶
func (t TemplateExecutionRequest) GetExecutionRequestCustom() *ExecutionRequestCustom
Returns ExecutionRequestCustom associated with a Template type.
type TemplateJSONSchema ¶
type TemplateJSONSchema map[string]interface{}
Templates uses JSON Schema types.
func (*TemplateJSONSchema) Scan ¶
func (tjs *TemplateJSONSchema) Scan(value interface{}) error
Scan from db
type TemplateList ¶
TemplateList wraps a list of Templates
func (*TemplateList) MarshalJSON ¶
func (tl *TemplateList) MarshalJSON() ([]byte, error)
Template Marshal method.
type TemplatePayload ¶
type TemplatePayload map[string]interface{}
func (*TemplatePayload) Scan ¶
func (tjs *TemplatePayload) Scan(value interface{}) error
Scan from db
type TerminateJob ¶
type UserIdentity ¶
type UserIdentity struct {
Arn string `json:"arn"`
}
User ARN who performed the AWS api action.
type Worker ¶
type Worker struct { WorkerType string `json:"worker_type"` CountPerInstance int `json:"count_per_instance"` Engine string `json:"engine"` }
Worker represents a Flotilla Worker
func (*Worker) UpdateWith ¶
UpdateWith updates this definition with information from another
type WorkersList ¶
WorkersList wraps a list of Workers