state

package
v0.0.0-...-a193300 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const DefinitionSelect = `` /* 934-byte string literal not displayed */

DefinitionSelect postgres specific query for definitions

View Source
const GetDefinitionByAliasSQL = DefinitionSelect + "\nwhere alias = $1"

GetDefinitionByAliasSQL get definition by alias

View Source
const GetDefinitionSQL = DefinitionSelect + "\nwhere definition_id = $1"

GetDefinitionSQL postgres specific query for getting a single definition

View Source
const GetRunSQL = RunSelect + "\nwhere run_id = $1"

GetRunSQL postgres specific query for getting a single run

View Source
const GetRunSQLByEMRJobId = RunSelect + "\nwhere spark_extension->>'emr_job_id' = $1"
View Source
const GetRunSQLForUpdate = GetRunSQL + " for update"

GetRunSQLForUpdate postgres specific query for getting a single run for update

View Source
const GetTemplateByIDSQL = TemplateSelect + "\nwhere template_id = $1"

GetTemplateByIDSQL postgres specific query for getting a single template

View Source
const GetTemplateByVersionSQL = TemplateSelect + "\nWHERE template_name = $1 AND version = $2 ORDER BY version DESC LIMIT 1;"
View Source
const GetTemplateLatestOnlySQL = TemplateSelect + "\nWHERE template_name = $1 ORDER BY version DESC LIMIT 1;"

GetTemplateLatestOnlySQL get the latest version of a specific template name.

View Source
const GetWorkerEngine = WorkerSelect + "\nwhere engine = $1"
View Source
const GetWorkerSQL = WorkerSelect + "\nwhere worker_type = $1 and engine = $2"

GetWorkerSQL postgres specific query for retrieving data for a specific worker type.

View Source
const GetWorkerSQLForUpdate = GetWorkerSQL + " for update"

GetWorkerSQLForUpdate postgres specific query for retrieving data for a specific worker type; locks the row.

View Source
const GroupsSelect = `
select distinct group_name from task_def
`

GroupsSelect postgres specific query for getting existing definition group_names

View Source
const ListDefinitionsSQL = DefinitionSelect + "\n%s %s limit $1 offset $2"

ListDefinitionsSQL postgres specific query for listing definitions

View Source
const ListFailingNodesSQL = `` /* 593-byte string literal not displayed */
View Source
const ListGroupsSQL = GroupsSelect + "\n%s order by group_name asc limit $1 offset $2"

ListGroupsSQL postgres specific query for listing definition group_names

View Source
const ListRunsSQL = RunSelect + "\n%s %s limit $1 offset $2"

ListRunsSQL postgres specific query for listing runs

View Source
const ListTagsSQL = TagsSelect + "\n%s order by text asc limit $1 offset $2"

ListTagsSQL postgres specific query for listing definition tags

View Source
const ListTemplatesLatestOnlySQL = `` /* 458-byte string literal not displayed */

ListTemplatesLatestOnlySQL lists the latest version of each distinct template name.

View Source
const ListTemplatesSQL = TemplateSelect + "\n%s limit $1 offset $2"

ListTemplatesSQL postgres specific query for listing templates

View Source
const ListWorkersSQL = WorkerSelect

ListWorkersSQL postgres specific query for listing workers

View Source
const PodReAttemptRate = `` /* 430-byte string literal not displayed */
View Source
const RunSelect = `` /* 2460-byte string literal not displayed */

RunSelect postgres specific query for runs

View Source
const TagsSelect = `
select distinct text from tags
`

TagsSelect postgres specific query for getting existing definition tags

View Source
const TaskExecutionRuntimeCommandSQL = `` /* 407-byte string literal not displayed */
View Source
const TaskIdempotenceKeyCheckSQL = `` /* 221-byte string literal not displayed */
View Source
const TaskResourcesDriverOOMSQL = `` /* 278-byte string literal not displayed */
View Source
const TaskResourcesExecutorCountSQL = `` /* 656-byte string literal not displayed */
View Source
const TaskResourcesExecutorNodeLifecycleSQL = `` /* 291-byte string literal not displayed */
View Source
const TaskResourcesExecutorOOMSQL = `` /* 331-byte string literal not displayed */
View Source
const TaskResourcesSelectCommandSQL = `` /* 578-byte string literal not displayed */
View Source
const TemplatePayloadKey = "template_payload"
View Source
const TemplateSelect = `` /* 325-byte string literal not displayed */

TemplateSelect selects a template

View Source
const WorkerSelect = `
  select
    worker_type        as workertype,
    count_per_instance as countperinstance,
    engine
  from worker
`

WorkerSelect postgres specific query for workers

Variables

View Source
var CommandTemplate, _ = template.New("command").Parse(commandWrapper)
View Source
var DefaultEngine = EKSEngine
View Source
var DefaultLifecycle = SpotLifecycle
View Source
var DefaultTaskType = "task"
View Source
var EKSBackoffLimit = int32(0)
View Source
var EKSEngine = "eks"
View Source
var EKSSparkEngine = "eks-spark"
View Source
var GPUNodeTypes = []string{"p3.2xlarge", "p3.8xlarge", "p3.16xlarge", "g5.xlarge", "g5.2xlarge", "g5.4xlarge", "g5.8xlarge", "g5.12xlarge", "g5.16xlarge", "g5.24xlarge", "g5.48xlarge"}
View Source
var MaxCPU = int64(60000)
View Source
var MaxEphemeralStorage = int64(5000)
View Source
var MaxGPUCPU = int64(94000)
View Source
var MaxGPUMem = int64(376000)
View Source
var MaxLogLines = int64(256)
View Source
var MaxMem = int64(350000)
View Source
var MinCPU = int64(256)
View Source
var MinMem = int64(512)
View Source
var OndemandActiveDeadlineSeconds = int64(604800)
View Source
var OndemandLifecycle = "ondemand"
View Source
var SpotActiveDeadlineSeconds = int64(172800)
View Source
var SpotLifecycle = "spot"
View Source
var StatusNeedsRetry = "NEEDS_RETRY"

StatusNeedsRetry indicates the run failed for infra reasons and needs retried

View Source
var StatusPending = "PENDING"

StatusPending indicates the run has been allocated to a host and is in the process of launching

View Source
var StatusQueued = "QUEUED"

StatusQueued indicates the run is queued

View Source
var StatusRunning = "RUNNING"

StatusRunning indicates the run is running

View Source
var StatusStopped = "STOPPED"

StatusStopped means the run is finished

View Source
var TTLSecondsAfterFinished = int32(3600)
View Source
var WorkerTypes = map[string]bool{
	"retry":  true,
	"submit": true,
	"status": true,
}

Functions

func IsValidStatus

func IsValidStatus(status string) bool

IsValidStatus checks that the given status string is one of the valid statuses

func IsValidWorkerType

func IsValidWorkerType(workerType string) bool

func NewDefinitionID

func NewDefinitionID(definition Definition) (string, error)

NewDefinitionID returns a new uuid for a Definition

func NewRunID

func NewRunID(engine *string) (string, error)

NewRunID returns a new uuid for a Run

func NewTemplateID

func NewTemplateID(t Template) (string, error)

NewTemplateID returns a new uuid for a Template

Types

type CloudTrailNotifications

type CloudTrailNotifications struct {
	Records []Record `json:"Records"`
}

CloudTrail notification object that is persisted into the DB.

func (*CloudTrailNotifications) Marshal

func (e *CloudTrailNotifications) Marshal() ([]byte, error)

Marshal method for CloudTrail SQS notifications.

func (*CloudTrailNotifications) Scan

func (e *CloudTrailNotifications) Scan(value interface{}) error

Scan from db

func (CloudTrailNotifications) Value

Value to db

type CloudTrailS3File

type CloudTrailS3File struct {
	S3Bucket    string   `json:"s3Bucket"`
	S3ObjectKey []string `json:"s3ObjectKey"`
	Done        func() error
}

SQS notification object for CloudTrail S3 files.

type Conf

type Conf struct {
	Name  *string `json:"name,omitempty"`
	Value *string `json:"value,omitempty"`
}

type CreateTemplateRequest

type CreateTemplateRequest struct {
	TemplateName    string             `json:"template_name"`
	Schema          TemplateJSONSchema `json:"schema"`
	CommandTemplate string             `json:"command_template"`
	Defaults        TemplatePayload    `json:"defaults"`
	AvatarURI       string             `json:"avatar_uri"`
	ExecutableResources
}

type CreateTemplateResponse

type CreateTemplateResponse struct {
	DidCreate bool     `json:"did_create"`
	Template  Template `json:"template,omitempty"`
}

type Definition

type Definition struct {
	DefinitionID   string `json:"definition_id"`
	GroupName      string `json:"group_name,omitempty"`
	Alias          string `json:"alias"`
	Command        string `json:"command,omitempty"`
	TaskType       string `json:"task_type,omitempty"`
	RequiresDocker bool   `json:"requires_docker,omitempty" db:"requires_docker"`
	TargetCluster  string `json:"target_cluster,omitempty" db:"target_cluster"`
	ExecutableResources
}

task definition. It implements the `Executable` interface.

func (*Definition) DefaultOrderField

func (d *Definition) DefaultOrderField() string

func (Definition) GetExecutableCommand

func (d Definition) GetExecutableCommand(req ExecutionRequest) (string, error)

func (Definition) GetExecutableID

func (d Definition) GetExecutableID() *string

Return definition or template id

func (Definition) GetExecutableResourceName

func (d Definition) GetExecutableResourceName() string

func (Definition) GetExecutableResources

func (d Definition) GetExecutableResources() *ExecutableResources

func (Definition) GetExecutableType

func (d Definition) GetExecutableType() *ExecutableType

Returns definition or template

func (*Definition) IsValid

func (d *Definition) IsValid() (bool, []string)

IsValid returns true only if this is a valid definition with all required information

func (Definition) MarshalJSON

func (d Definition) MarshalJSON() ([]byte, error)

func (*Definition) UpdateWith

func (d *Definition) UpdateWith(other Definition)

UpdateWith updates this definition with information from another

func (*Definition) ValidOrderField

func (d *Definition) ValidOrderField(field string) bool

func (*Definition) ValidOrderFields

func (d *Definition) ValidOrderFields() []string

func (*Definition) WrappedCommand

func (d *Definition) WrappedCommand() (string, error)

WrappedCommand returns the wrapped command for the definition * wrapping ensures lines are logged and exit code is set

type DefinitionExecutionRequest

type DefinitionExecutionRequest struct {
	*ExecutionRequestCommon
}

func (*DefinitionExecutionRequest) GetExecutionRequestCommon

func (d *DefinitionExecutionRequest) GetExecutionRequestCommon() *ExecutionRequestCommon

Returns ExecutionRequestCommon, common between Template and Definition types

func (*DefinitionExecutionRequest) GetExecutionRequestCustom

func (d *DefinitionExecutionRequest) GetExecutionRequestCustom() *ExecutionRequestCustom

Only relevant to the template type

type DefinitionList

type DefinitionList struct {
	Total       int          `json:"total"`
	Definitions []Definition `json:"definitions"`
}

DefinitionList wraps a list of Definitions

func (*DefinitionList) MarshalJSON

func (dl *DefinitionList) MarshalJSON() ([]byte, error)

type Detail

type Detail struct {
	Severity         *string `json:"severity,omitempty"`
	Name             *string `json:"name,omitempty"`
	ID               *string `json:"id,omitempty"`
	Arn              *string `json:"arn,omitempty"`
	VirtualClusterID *string `json:"virtualClusterId,omitempty"`
	State            *string `json:"state,omitempty"`
	CreatedBy        *string `json:"createdBy,omitempty"`
	ReleaseLabel     *string `json:"releaseLabel,omitempty"`
	ExecutionRoleArn *string `json:"executionRoleArn,omitempty"`
	FailureReason    *string `json:"failureReason,omitempty"`
	StateDetails     *string `json:"stateDetails,omitempty"`
	Message          *string `json:"message,omitempty"`
}

type EmrEvent

type EmrEvent struct {
	Version    *string       `json:"version,omitempty"`
	ID         *string       `json:"id,omitempty"`
	DetailType *string       `json:"detail-type,omitempty"`
	Source     *string       `json:"source,omitempty"`
	Account    *string       `json:"account,omitempty"`
	Time       *string       `json:"time,omitempty"`
	Region     *string       `json:"region,omitempty"`
	Resources  []interface{} `json:"resources,omitempty"`
	Detail     *Detail       `json:"detail,omitempty"`
	Done       func() error
}

func UnmarshalEmrEvents

func UnmarshalEmrEvents(data []byte) (EmrEvent, error)

func (*EmrEvent) Marshal

func (r *EmrEvent) Marshal() ([]byte, error)

type EnvList

type EnvList []EnvVar

EnvList wraps a list of EnvVar

  • abstraction to make it easier to read and write to db

func (*EnvList) Scan

func (e *EnvList) Scan(value interface{}) error

Scan from db

func (EnvList) Value

func (e EnvList) Value() (driver.Value, error)

Value to db

type EnvVar

type EnvVar struct {
	Name  string `json:"name"`
	Value string `json:"value"`
}

EnvVar represents a single environment variable for either a definition or a run

type EventLabels

type EventLabels struct {
	ControllerUid string `json:"controller-uid,omitempty"`
	JobName       string `json:"job-name,omitempty"`
	ClusterName   string `json:"cluster-name,omitempty"`
}

type Executable

type Executable interface {
	GetExecutableID() *string
	GetExecutableType() *ExecutableType
	GetExecutableResources() *ExecutableResources
	GetExecutableCommand(req ExecutionRequest) (string, error)
	GetExecutableResourceName() string // This will typically be an ARN.
}

type ExecutableResources

type ExecutableResources struct {
	Image                      string     `json:"image"`
	Memory                     *int64     `json:"memory,omitempty"`
	Gpu                        *int64     `json:"gpu,omitempty"`
	Cpu                        *int64     `json:"cpu,omitempty"`
	EphemeralStorage           *int64     `json:"ephemeral_storage,omitempty" db:"ephemeral_storage"`
	Env                        *EnvList   `json:"env"`
	AdaptiveResourceAllocation *bool      `json:"adaptive_resource_allocation,omitempty"`
	Ports                      *PortsList `json:"ports,omitempty"`
	Tags                       *Tags      `json:"tags,omitempty"`
}

ExecutableResources define the resources and flags required to run an executable.

type ExecutableType

type ExecutableType string
const (
	ExecutableTypeDefinition ExecutableType = "task_definition"
	ExecutableTypeTemplate   ExecutableType = "template"
)

type ExecutionRequest

type ExecutionRequest interface {
	GetExecutionRequestCommon() *ExecutionRequestCommon
	GetExecutionRequestCustom() *ExecutionRequestCustom
}

type ExecutionRequestCommon

type ExecutionRequestCommon struct {
	ClusterName           string          `json:"cluster_name"`
	Env                   *EnvList        `json:"env"`
	OwnerID               string          `json:"owner_id"`
	Command               *string         `json:"command"`
	Memory                *int64          `json:"memory"`
	Cpu                   *int64          `json:"cpu"`
	Gpu                   *int64          `json:"gpu"`
	Engine                *string         `json:"engine"`
	EphemeralStorage      *int64          `json:"ephemeral_storage"`
	NodeLifecycle         *string         `json:"node_lifecycle"`
	ActiveDeadlineSeconds *int64          `json:"active_deadline_seconds,omitempty"`
	SparkExtension        *SparkExtension `json:"spark_extension,omitempty"`
	Description           *string         `json:"description,omitempty"`
	CommandHash           *string         `json:"command_hash,omitempty"`
	IdempotenceKey        *string         `json:"idempotence_key,omitempty"`
	Arch                  *string         `json:"arch,omitempty"`
	Labels                *Labels         `json:"labels,omitempty"`
	ServiceAccount        *string         `json:"service_account,omitempty"`
}

Common fields required to execute any Executable.

type ExecutionRequestCustom

type ExecutionRequestCustom map[string]interface{}

func (*ExecutionRequestCustom) Scan

func (e *ExecutionRequestCustom) Scan(value interface{}) error

Scan from db

func (ExecutionRequestCustom) Value

Value to db

type GroupsList

type GroupsList struct {
	Groups []string
	Total  int
}

GroupsList wraps a list of group names

type IOrderable

type IOrderable interface {
	ValidOrderField(field string) bool
	ValidOrderFields() []string
	DefaultOrderField() string
}

type InvolvedObject

type InvolvedObject struct {
	Kind            string      `json:"kind,omitempty"`
	Namespace       string      `json:"namespace,omitempty"`
	Name            string      `json:"name,omitempty"`
	Uid             string      `json:"uid,omitempty"`
	APIVersion      string      `json:"apiVersion,omitempty"`
	ResourceVersion string      `json:"resourceVersion,omitempty"`
	FieldPath       string      `json:"fieldPath,omitempty"`
	Labels          EventLabels `json:"labels,omitempty"`
}

type KubernetesEvent

type KubernetesEvent struct {
	Metadata           Metadata       `json:"metadata,omitempty"`
	Reason             string         `json:"reason,omitempty"`
	Message            string         `json:"message,omitempty"`
	Source             Source         `json:"source,omitempty"`
	FirstTimestamp     string         `json:"firstTimestamp,omitempty"`
	LastTimestamp      string         `json:"lastTimestamp,omitempty"`
	Count              int64          `json:"count,omitempty"`
	Type               string         `json:"type,omitempty"`
	EventTime          interface{}    `json:"eventTime,omitempty"`
	ReportingComponent string         `json:"reportingComponent,omitempty"`
	ReportingInstance  string         `json:"reportingInstance,omitempty"`
	InvolvedObject     InvolvedObject `json:"involvedObject,omitempty"`
	Done               func() error
}

func (*KubernetesEvent) Marshal

func (r *KubernetesEvent) Marshal() ([]byte, error)

type Labels

type Labels map[string]string

func (*Labels) Scan

func (e *Labels) Scan(value interface{}) error

func (Labels) Value

func (e Labels) Value() (driver.Value, error)

Value to db

type LaunchRequest

type LaunchRequest struct {
	ClusterName *string  `json:"cluster,omitempty"`
	Env         *EnvList `json:"env,omitempty"`
}

type LaunchRequestV2

type LaunchRequestV2 struct {
	RunTags               RunTags         `json:"run_tags"`
	Command               *string         `json:"command,omitempty"`
	Memory                *int64          `json:"memory,omitempty"`
	Cpu                   *int64          `json:"cpu,omitempty"`
	Gpu                   *int64          `json:"gpu,omitempty"`
	EphemeralStorage      *int64          `json:"ephemeral_storage,omitempty"`
	Engine                *string         `json:"engine,omitempty"`
	NodeLifecycle         *string         `json:"node_lifecycle,omitempty"`
	ActiveDeadlineSeconds *int64          `json:"active_deadline_seconds,omitempty"`
	SparkExtension        *SparkExtension `json:"spark_extension,omitempty"`
	ClusterName           *string         `json:"cluster,omitempty"`
	Env                   *EnvList        `json:"env,omitempty"`
	Description           *string         `json:"description,omitempty"`
	CommandHash           *string         `json:"command_hash,omitempty"`
	IdempotenceKey        *string         `json:"idempotence_key,omitempty"`
	Arch                  *string         `json:"arch,omitempty"`
	Labels                *Labels         `json:"labels,omitempty"`
	ServiceAccount        *string         `json:"service_account,omitempty"`
}

type Manager

type Manager interface {
	Name() string
	Initialize(conf config.Config) error
	Cleanup() error
	ListDefinitions(
		limit int, offset int, sortBy string,
		order string, filters map[string][]string,
		envFilters map[string]string) (DefinitionList, error)
	GetDefinition(definitionID string) (Definition, error)
	GetDefinitionByAlias(alias string) (Definition, error)
	UpdateDefinition(definitionID string, updates Definition) (Definition, error)
	CreateDefinition(d Definition) error
	DeleteDefinition(definitionID string) error

	ListRuns(limit int, offset int, sortBy string, order string, filters map[string][]string, envFilters map[string]string, engines []string) (RunList, error)
	EstimateRunResources(executableID string, commandHash string) (TaskResources, error)
	EstimateExecutorCount(executableID string, commandHash string) (int64, error)
	ExecutorOOM(executableID string, commandHash string) (bool, error)
	DriverOOM(executableID string, commandHash string) (bool, error)

	GetRun(runID string) (Run, error)
	CreateRun(r Run) error
	UpdateRun(runID string, updates Run) (Run, error)

	ListGroups(limit int, offset int, name *string) (GroupsList, error)
	ListTags(limit int, offset int, name *string) (TagsList, error)

	ListWorkers(engine string) (WorkersList, error)
	BatchUpdateWorkers(updates []Worker) (WorkersList, error)
	GetWorker(workerType string, engine string) (Worker, error)
	UpdateWorker(workerType string, updates Worker) (Worker, error)

	GetExecutableByTypeAndID(executableType ExecutableType, executableID string) (Executable, error)

	GetTemplateByID(templateID string) (Template, error)
	GetLatestTemplateByTemplateName(templateName string) (bool, Template, error)
	GetTemplateByVersion(templateName string, templateVersion int64) (bool, Template, error)
	ListTemplates(limit int, offset int, sortBy string, order string) (TemplateList, error)
	ListTemplatesLatestOnly(limit int, offset int, sortBy string, order string) (TemplateList, error)
	CreateTemplate(t Template) error

	ListFailingNodes() (NodeList, error)
	GetPodReAttemptRate() (float32, error)
	GetNodeLifecycle(executableID string, commandHash string) (string, error)
	GetTaskHistoricalRuntime(executableID string, runId string) (float32, error)
	CheckIdempotenceKey(idempotenceKey string) (string, error)

	GetRunByEMRJobId(string) (Run, error)
}

Manager interface for CRUD operations on definitions and runs

func NewStateManager

func NewStateManager(conf config.Config, logger log.Logger) (Manager, error)

NewStateManager sets up and configures a new statemanager - if no `state_manager` is configured, will use postgres

type Metadata

type Metadata struct {
	Name              string `json:"name,omitempty"`
	Namespace         string `json:"namespace,omitempty"`
	SelfLink          string `json:"selfLink,omitempty"`
	Uid               string `json:"uid,omitempty"`
	ResourceVersion   string `json:"resourceVersion,omitempty"`
	CreationTimestamp string `json:"creationTimestamp,omitempty"`
}

type NodeList

type NodeList []string

type PodEvent

type PodEvent struct {
	Timestamp    *time.Time `json:"timestamp,omitempty"`
	EventType    string     `json:"event_type"`
	Reason       string     `json:"reason"`
	SourceObject string     `json:"source_object"`
	Message      string     `json:"message"`
}

func (*PodEvent) Equal

func (w *PodEvent) Equal(other PodEvent) bool

type PodEventList

type PodEventList struct {
	Total     int       `json:"total"`
	PodEvents PodEvents `json:"pod_events"`
}

type PodEvents

type PodEvents []PodEvent

func (*PodEvents) Scan

func (e *PodEvents) Scan(value interface{}) error

Scan from db

func (PodEvents) Value

func (e PodEvents) Value() (driver.Value, error)

Value to db

type PortsList

type PortsList []int

PortsList wraps a list of int

  • abstraction to make it easier to read and write to db

func (*PortsList) Scan

func (e *PortsList) Scan(value interface{}) error

Scan from db

func (PortsList) Value

func (e PortsList) Value() (driver.Value, error)

Value to db

type Record

type Record struct {
	UserIdentity UserIdentity `json:"userIdentity"`
	EventSource  string       `json:"eventSource"`
	EventName    string       `json:"eventName"`
}

CloudTrail notification record.

func (*Record) Equal

func (w *Record) Equal(other Record) bool

Equals helper method for Record.

func (*Record) String

func (w *Record) String() string

String helper method for Record.

type Run

type Run struct {
	RunID                   string                   `json:"run_id"`
	DefinitionID            string                   `json:"definition_id"`
	Alias                   string                   `json:"alias"`
	Image                   string                   `json:"image"`
	ClusterName             string                   `json:"cluster"`
	ExitCode                *int64                   `json:"exit_code,omitempty"`
	Status                  string                   `json:"status"`
	QueuedAt                *time.Time               `json:"queued_at,omitempty"`
	StartedAt               *time.Time               `json:"started_at,omitempty"`
	FinishedAt              *time.Time               `json:"finished_at,omitempty"`
	InstanceID              string                   `json:"-"`
	InstanceDNSName         string                   `json:"-"`
	GroupName               string                   `json:"group_name"`
	User                    string                   `json:"user,omitempty"`
	TaskType                string                   `json:"task_type,omitempty"`
	Env                     *EnvList                 `json:"env,omitempty"`
	Command                 *string                  `json:"command,omitempty"`
	CommandHash             *string                  `json:"command_hash,omitempty"`
	Memory                  *int64                   `json:"memory,omitempty"`
	MemoryLimit             *int64                   `json:"memory_limit,omitempty"`
	Cpu                     *int64                   `json:"cpu,omitempty"`
	CpuLimit                *int64                   `json:"cpu_limit,omitempty"`
	Gpu                     *int64                   `json:"gpu,omitempty"`
	ExitReason              *string                  `json:"exit_reason,omitempty"`
	Engine                  *string                  `json:"engine,omitempty"`
	NodeLifecycle           *string                  `json:"node_lifecycle,omitempty"`
	EphemeralStorage        *int64                   `json:"ephemeral_storage,omitempty" db:"ephemeral_storage"`
	PodName                 *string                  `json:"pod_name,omitempty"`
	Namespace               *string                  `json:"namespace,omitempty"`
	MaxMemoryUsed           *int64                   `json:"max_memory_used,omitempty"`
	MaxCpuUsed              *int64                   `json:"max_cpu_used,omitempty"`
	PodEvents               *PodEvents               `json:"pod_events,omitempty"`
	CloudTrailNotifications *CloudTrailNotifications `json:"cloudtrail_notifications,omitempty"`
	ExecutableID            *string                  `json:"executable_id,omitempty"`
	ExecutableType          *ExecutableType          `json:"executable_type,omitempty"`
	ExecutionRequestCustom  *ExecutionRequestCustom  `json:"execution_request_custom,omitempty"`
	AttemptCount            *int64                   `json:"attempt_count,omitempty"`
	SpawnedRuns             *SpawnedRuns             `json:"spawned_runs,omitempty"`
	RunExceptions           *RunExceptions           `json:"run_exceptions,omitempty"`
	ActiveDeadlineSeconds   *int64                   `json:"active_deadline_seconds,omitempty"`
	SparkExtension          *SparkExtension          `json:"spark_extension,omitempty"`
	MetricsUri              *string                  `json:"metrics_uri,omitempty"`
	Description             *string                  `json:"description,omitempty"`
	IdempotenceKey          *string                  `json:"idempotence_key,omitempty"`
	Arch                    *string                  `json:"arch,omitempty"`
	Labels                  Labels                   `json:"labels,omitempty"`
	RequiresDocker          bool                     `json:"requires_docker,omitempty" db:"requires_docker"`
	ServiceAccount          *string                  `json:"service_account,omitempty" db:"service_account"`
}

Run represents a single run of a Definition

TODO:

Runs need to -copy- the run relevant information
from their associated definition when they are
created so they always have correct info. Currently
the definition can change during or after the run
is created and launched meaning the run is acting
on information that is no longer accessible.

func (*Run) DefaultOrderField

func (r *Run) DefaultOrderField() string

func (Run) MarshalJSON

func (r Run) MarshalJSON() ([]byte, error)

func (*Run) UpdateWith

func (d *Run) UpdateWith(other Run)

UpdateWith updates this run with information from another

func (*Run) ValidOrderField

func (r *Run) ValidOrderField(field string) bool

func (*Run) ValidOrderFields

func (r *Run) ValidOrderFields() []string

type RunExceptions

type RunExceptions []string

func (*RunExceptions) Scan

func (e *RunExceptions) Scan(value interface{}) error

func (RunExceptions) Value

func (e RunExceptions) Value() (driver.Value, error)

Value to db

type RunList

type RunList struct {
	Total int   `json:"total"`
	Runs  []Run `json:"history"`
}

RunList wraps a list of Runs

type RunTags

type RunTags struct {
	OwnerEmail string `json:"owner_email"`
	TeamName   string `json:"team_name"`
	OwnerID    string `json:"owner_id"`
}

RunTags represents which user is responsible for a task run

type SQLStateManager

type SQLStateManager struct {
	// contains filtered or unexported fields
}

SQLStateManager uses postgresql to manage state

func (*SQLStateManager) BatchUpdateWorkers

func (sm *SQLStateManager) BatchUpdateWorkers(updates []Worker) (WorkersList, error)

BatchUpdateWorker updates multiple workers.

func (*SQLStateManager) CheckIdempotenceKey

func (sm *SQLStateManager) CheckIdempotenceKey(idempotenceKey string) (string, error)

func (*SQLStateManager) Cleanup

func (sm *SQLStateManager) Cleanup() error

Cleanup close any open resources

func (*SQLStateManager) CreateDefinition

func (sm *SQLStateManager) CreateDefinition(d Definition) error

CreateDefinition creates the passed in definition object - error if definition already exists

func (*SQLStateManager) CreateRun

func (sm *SQLStateManager) CreateRun(r Run) error

CreateRun creates the passed in run

func (*SQLStateManager) CreateTemplate

func (sm *SQLStateManager) CreateTemplate(t Template) error

CreateTemplate creates a new template.

func (*SQLStateManager) DeleteDefinition

func (sm *SQLStateManager) DeleteDefinition(definitionID string) error

DeleteDefinition deletes definition and associated runs and environment variables

func (*SQLStateManager) DriverOOM

func (sm *SQLStateManager) DriverOOM(executableID string, commandHash string) (bool, error)

func (*SQLStateManager) EstimateExecutorCount

func (sm *SQLStateManager) EstimateExecutorCount(executableID string, commandHash string) (int64, error)

func (*SQLStateManager) EstimateRunResources

func (sm *SQLStateManager) EstimateRunResources(executableID string, runID string) (TaskResources, error)

func (*SQLStateManager) ExecutorOOM

func (sm *SQLStateManager) ExecutorOOM(executableID string, commandHash string) (bool, error)

func (*SQLStateManager) GetDefinition

func (sm *SQLStateManager) GetDefinition(definitionID string) (Definition, error)

GetDefinition returns a single definition by id

func (*SQLStateManager) GetDefinitionByAlias

func (sm *SQLStateManager) GetDefinitionByAlias(alias string) (Definition, error)

GetDefinitionByAlias returns a single definition by id

func (*SQLStateManager) GetExecutableByTypeAndID

func (sm *SQLStateManager) GetExecutableByTypeAndID(t ExecutableType, id string) (Executable, error)

GetExecutableByExecutableType returns a single executable by id.

func (*SQLStateManager) GetLatestTemplateByTemplateName

func (sm *SQLStateManager) GetLatestTemplateByTemplateName(templateName string) (bool, Template, error)

GetLatestTemplateByTemplateName returns the latest version of a template of a specific template name.

func (*SQLStateManager) GetNodeLifecycle

func (sm *SQLStateManager) GetNodeLifecycle(executableID string, commandHash string) (string, error)

func (*SQLStateManager) GetPodReAttemptRate

func (sm *SQLStateManager) GetPodReAttemptRate() (float32, error)

func (*SQLStateManager) GetResources

func (sm *SQLStateManager) GetResources(runID string) (Run, error)

func (*SQLStateManager) GetRun

func (sm *SQLStateManager) GetRun(runID string) (Run, error)

GetRun gets run by id

func (*SQLStateManager) GetRunByEMRJobId

func (sm *SQLStateManager) GetRunByEMRJobId(emrJobId string) (Run, error)

func (*SQLStateManager) GetTaskHistoricalRuntime

func (sm *SQLStateManager) GetTaskHistoricalRuntime(executableID string, runID string) (float32, error)

func (*SQLStateManager) GetTemplateByID

func (sm *SQLStateManager) GetTemplateByID(templateID string) (Template, error)

GetTemplateByID returns a single template by id.

func (*SQLStateManager) GetTemplateByVersion

func (sm *SQLStateManager) GetTemplateByVersion(templateName string, templateVersion int64) (bool, Template, error)

func (*SQLStateManager) GetWorker

func (sm *SQLStateManager) GetWorker(workerType string, engine string) (w Worker, err error)

GetWorker returns data for a single worker.

func (*SQLStateManager) Initialize

func (sm *SQLStateManager) Initialize(conf config.Config) error

Initialize creates tables if they do not exist

func (*SQLStateManager) ListDefinitions

func (sm *SQLStateManager) ListDefinitions(
	limit int, offset int, sortBy string,
	order string, filters map[string][]string,
	envFilters map[string]string) (DefinitionList, error)

ListDefinitions returns a DefinitionList limit: limit the result to this many definitions offset: start the results at this offset sortBy: sort by this field order: 'asc' or 'desc' filters: map of field filters on Definition - joined with AND envFilters: map of environment variable filters - joined with AND

func (*SQLStateManager) ListFailingNodes

func (sm *SQLStateManager) ListFailingNodes() (NodeList, error)

func (*SQLStateManager) ListGroups

func (sm *SQLStateManager) ListGroups(limit int, offset int, name *string) (GroupsList, error)

ListGroups returns a list of the existing group names.

func (*SQLStateManager) ListRuns

func (sm *SQLStateManager) ListRuns(limit int, offset int, sortBy string, order string, filters map[string][]string, envFilters map[string]string, engines []string) (RunList, error)

ListRuns returns a RunList limit: limit the result to this many runs offset: start the results at this offset sortBy: sort by this field order: 'asc' or 'desc' filters: map of field filters on Run - joined with AND envFilters: map of environment variable filters - joined with AND

func (*SQLStateManager) ListTags

func (sm *SQLStateManager) ListTags(limit int, offset int, name *string) (TagsList, error)

ListTags returns a list of the existing tags.

func (*SQLStateManager) ListTemplates

func (sm *SQLStateManager) ListTemplates(limit int, offset int, sortBy string, order string) (TemplateList, error)

ListTemplates returns list of templates from the database.

func (*SQLStateManager) ListTemplatesLatestOnly

func (sm *SQLStateManager) ListTemplatesLatestOnly(limit int, offset int, sortBy string, order string) (TemplateList, error)

ListTemplates returns list of templates from the database.

func (*SQLStateManager) ListWorkers

func (sm *SQLStateManager) ListWorkers(engine string) (WorkersList, error)

ListWorkers returns list of workers

func (*SQLStateManager) Name

func (sm *SQLStateManager) Name() string

Name is the name of the state manager - matches value in configuration

func (*SQLStateManager) UpdateDefinition

func (sm *SQLStateManager) UpdateDefinition(definitionID string, updates Definition) (Definition, error)

UpdateDefinition updates a definition - updates can be partial

func (*SQLStateManager) UpdateRun

func (sm *SQLStateManager) UpdateRun(runID string, updates Run) (Run, error)

UpdateRun updates run with updates - can be partial

func (*SQLStateManager) UpdateWorker

func (sm *SQLStateManager) UpdateWorker(workerType string, updates Worker) (Worker, error)

UpdateWorker updates a single worker.

type Source

type Source struct {
	Component string `json:"component,omitempty"`
	Host      string `json:"host,omitempty"`
}

type SparkExtension

type SparkExtension struct {
	SparkSubmitJobDriver *SparkSubmitJobDriver `json:"spark_submit_job_driver,omitempty"`
	ApplicationConf      []Conf                `json:"application_conf,omitempty"`
	HiveConf             []Conf                `json:"hive_conf,omitempty"`
	EMRJobId             *string               `json:"emr_job_id,omitempty"`
	SparkAppId           *string               `json:"spark_app_id,omitempty"`
	EMRJobManifest       *string               `json:"emr_job_manifest,omitempty"`
	HistoryUri           *string               `json:"history_uri,omitempty"`
	MetricsUri           *string               `json:"metrics_uri,omitempty"`
	VirtualClusterId     *string               `json:"virtual_cluster_id,omitempty"`
	EMRReleaseLabel      *string               `json:"emr_release_label,omitempty"`
	ExecutorInitCommand  *string               `json:"executor_init_command,omitempty"`
	DriverInitCommand    *string               `json:"driver_init_command,omitempty"`
	AppUri               *string               `json:"app_uri,omitempty"`
	Executors            []string              `json:"executors,omitempty"`
	ExecutorOOM          *bool                 `json:"executor_oom,omitempty"`
	DriverOOM            *bool                 `json:"driver_oom,omitempty"`
}

func UnmarshalSparkExtension

func UnmarshalSparkExtension(data []byte) (SparkExtension, error)

func (*SparkExtension) Marshal

func (r *SparkExtension) Marshal() ([]byte, error)

func (*SparkExtension) Scan

func (e *SparkExtension) Scan(value interface{}) error

func (SparkExtension) Value

func (e SparkExtension) Value() (driver.Value, error)

Value to db

type SparkSubmitJobDriver

type SparkSubmitJobDriver struct {
	EntryPoint          *string   `json:"entry_point,omitempty"`
	EntryPointArguments []*string `json:"entry_point_arguments,omitempty"`
	SparkSubmitConf     []Conf    `json:"spark_submit_conf,omitempty"`
	Files               []string  `json:"files,omitempty"`
	PyFiles             []string  `json:"py_files,omitempty"`
	Jars                []string  `json:"jars,omitempty"`
	Class               *string   `json:"class,omitempty"`
	WorkingDir          *string   `json:"working_dir,omitempty"`
	NumExecutors        *int64    `json:"num_executors,omitempty"`
	ExecutorMemory      *int64    `json:"executor_memory,omitempty"`
}

type SpawnedRun

type SpawnedRun struct {
	RunID string `json:"run_id"`
}

type SpawnedRuns

type SpawnedRuns []SpawnedRun

func (*SpawnedRuns) Scan

func (e *SpawnedRuns) Scan(value interface{}) error

func (SpawnedRuns) Value

func (e SpawnedRuns) Value() (driver.Value, error)

Value to db

type Tags

type Tags []string

Tags wraps a list of strings

  • abstraction to make it easier to read and write to db

func (*Tags) Scan

func (e *Tags) Scan(value interface{}) error

Scan from db

func (Tags) Value

func (e Tags) Value() (driver.Value, error)

Value to db

type TagsList

type TagsList struct {
	Tags  []string
	Total int
}

TagsList wraps a list of tag names

type TaskResources

type TaskResources struct {
	Cpu    int64 `json:"cpu"`
	Memory int64 `json:"memory"`
}

Internal object for tracking cpu / memory resources.

type Template

type Template struct {
	TemplateID      string             `json:"template_id"`
	TemplateName    string             `json:"template_name"`
	Version         int64              `json:"version"`
	Schema          TemplateJSONSchema `json:"schema"`
	CommandTemplate string             `json:"command_template"`
	Defaults        TemplatePayload    `json:"defaults"`
	AvatarURI       string             `json:"avatar_uri"`
	ExecutableResources
}

Template Object Type. The CommandTemplate is a Go Template type.

func (*Template) DefaultOrderField

func (t *Template) DefaultOrderField() string

func (Template) GetExecutableCommand

func (t Template) GetExecutableCommand(req ExecutionRequest) (string, error)

Renders the command to be rendered for that Template.

func (Template) GetExecutableID

func (t Template) GetExecutableID() *string

Returns Template ID

func (Template) GetExecutableResourceName

func (t Template) GetExecutableResourceName() string

Returns the Template Id.

func (Template) GetExecutableResources

func (t Template) GetExecutableResources() *ExecutableResources

Returns default resources associated with that Template.

func (Template) GetExecutableType

func (t Template) GetExecutableType() *ExecutableType

Returns Template Type

func (*Template) IsValid

func (t *Template) IsValid() (bool, []string)

Checks validity of a template.

func (*Template) ValidOrderField

func (t *Template) ValidOrderField(field string) bool

func (*Template) ValidOrderFields

func (t *Template) ValidOrderFields() []string

type TemplateExecutionRequest

type TemplateExecutionRequest struct {
	*ExecutionRequestCommon
	TemplatePayload TemplatePayload `json:"template_payload"`
	DryRun          bool            `json:"dry_run,omitempty"`
}

func (TemplateExecutionRequest) GetExecutionRequestCommon

func (t TemplateExecutionRequest) GetExecutionRequestCommon() *ExecutionRequestCommon

Returns ExecutionRequestCommon associated with a Template type.

func (TemplateExecutionRequest) GetExecutionRequestCustom

func (t TemplateExecutionRequest) GetExecutionRequestCustom() *ExecutionRequestCustom

Returns ExecutionRequestCustom associated with a Template type.

type TemplateJSONSchema

type TemplateJSONSchema map[string]interface{}

Templates uses JSON Schema types.

func (*TemplateJSONSchema) Scan

func (tjs *TemplateJSONSchema) Scan(value interface{}) error

Scan from db

func (TemplateJSONSchema) Value

func (tjs TemplateJSONSchema) Value() (driver.Value, error)

Value to db

type TemplateList

type TemplateList struct {
	Total     int        `json:"total"`
	Templates []Template `json:"templates"`
}

TemplateList wraps a list of Templates

func (*TemplateList) MarshalJSON

func (tl *TemplateList) MarshalJSON() ([]byte, error)

Template Marshal method.

type TemplatePayload

type TemplatePayload map[string]interface{}

func (*TemplatePayload) Scan

func (tjs *TemplatePayload) Scan(value interface{}) error

Scan from db

func (TemplatePayload) Value

func (tjs TemplatePayload) Value() (driver.Value, error)

Value to db

type TerminateJob

type TerminateJob struct {
	RunID    string
	UserInfo UserInfo
}

type UserIdentity

type UserIdentity struct {
	Arn string `json:"arn"`
}

User ARN who performed the AWS api action.

type UserInfo

type UserInfo struct {
	Name  string `json:"name"`
	Email string `json:"email"`
}

User information making the API calls

type Worker

type Worker struct {
	WorkerType       string `json:"worker_type"`
	CountPerInstance int    `json:"count_per_instance"`
	Engine           string `json:"engine"`
}

Worker represents a Flotilla Worker

func (*Worker) UpdateWith

func (w *Worker) UpdateWith(other Worker)

UpdateWith updates this definition with information from another

type WorkersList

type WorkersList struct {
	Total   int      `json:"total"`
	Workers []Worker `json:"workers"`
}

WorkersList wraps a list of Workers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL