flow

package
v0.0.0-...-6b04d54 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2020 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkerInitError WorkerEventType = "WORKER_INIT_FAILED"
	WorkerAdded     WorkerEventType = "ADDED"
	WorkerModified  WorkerEventType = "MODIFIED"
	WorkerDeleted   WorkerEventType = "DELETED"
	WorkerError     WorkerEventType = "ERROR"
	WorkerFailed    WorkerEventType = "FAILED"
	WorkerSucceeded WorkerEventType = "SUCCEEDED"

	DefaultChanSize int32 = 100
)

Variables

View Source
var FlowStatusKey = map[int]string{
	0:  "FLOW_CREATED",
	1:  "FLOW_WAITINGTOSTART",
	2:  "FLOW_STARTING",
	3:  "FLOW_STARTED",
	4:  "FLOW_WAITING",
	5:  "FLOW_RUNNING",
	6:  "FLOW_COMPLETING",
	7:  "FLOW_CANCELLING",
	8:  "FLOW_STOPPING",
	9:  "FLOW_FAILED",
	10: "FLOW_STOPPED",
	11: "FLOW_COMPLETED",
	12: "FLOW_CANCELLED",
}

Functions

func ErrTaskComplete

func ErrTaskComplete() error

func FlowStatusToString

func FlowStatusToString(key FlowStatus) string

func GetClient

func GetClient(hfConfig *config.KubeConfig) (client *kube.Clientset, fnerr error)

func InvalidFlowID

func InvalidFlowID() error

InvalidFlowID : Error object

func InvalidFlowIdError

func InvalidFlowIdError(flowId string) error

func InvalidFlowParamsError

func InvalidFlowParamsError(flowId string) error

func InvalidTaskError

func InvalidTaskError(taskID string) error

InvalidTaskError : Raised when invalid task Id is passed to API

func InvalidTaskID

func InvalidTaskID() error

InvalidTaskID : Error object

func InvalidTaskStatusError

func InvalidTaskStatusError() error

InvalidTaskStatusError : Error object

func InvalidWorkerFlowCombo

func InvalidWorkerFlowCombo() error

InvalidWorkerFlowCombo : Error object

func NewFlowEngine

func NewFlowEngine(
	qs *queryServer,
	db db_pkg.DatabaseContext,
	logger storage.ObjectAPIServer,
	c *config.Config) (*flowEngine, error)

func NewQueryServer

func NewQueryServer(db db_pkg.DatabaseContext) *queryServer

func NewWorkPoolWatcher

func NewWorkPoolWatcher() chan WorkerEvent

func TaskWorkerExistsError

func TaskWorkerExistsError(flowId, taskId string) error

Types

type ComputeOptions

type ComputeOptions struct {

	// true when the master runs inside cluster
	InCluster bool
	// contains filtered or unexported fields
}

Worker/Pod Details to generate kubernetes namespace

type Flow

type Flow struct {
	Id      string
	Version string
}

func FlowRef

func FlowRef(id string) Flow

type FlowAttrs

type FlowAttrs struct {
	Flow Flow

	// mounted file systems
	OpenMounts map[string]string

	// value - look at constants above
	Status FlowStatus

	Created   time.Time
	Started   time.Time
	Completed time.Time
	Failed    time.Time

	CompletionText string

	EnvVars map[string]string

	// support multiple tasks in future releases
	Tasks map[string]TaskAttrs `json:"Tasks"`

	FlowConfig *FlowConfig `json:"FlowConfig"`
	// contains filtered or unexported fields
}

TODO: Add version to flow at somepoint

func NewFlowAttrs

func NewFlowAttrs(fc *FlowConfig) *FlowAttrs

func (*FlowAttrs) AddTask

func (f *FlowAttrs) AddTask(taskConfig *TaskConfig) *TaskAttrs

func (*FlowAttrs) FirstTask

func (f *FlowAttrs) FirstTask() *TaskAttrs

func (*FlowAttrs) FlowStatus

func (f *FlowAttrs) FlowStatus() string

func (*FlowAttrs) IsComplete

func (f *FlowAttrs) IsComplete() bool

func (*FlowAttrs) IsCompleted

func (fi *FlowAttrs) IsCompleted() bool

func (*FlowAttrs) IsCreated

func (fi *FlowAttrs) IsCreated() bool

func (*FlowAttrs) IsFailed

func (fi *FlowAttrs) IsFailed() bool

func (*FlowAttrs) IsStarted

func (fi *FlowAttrs) IsStarted() bool

func (*FlowAttrs) IsStarting

func (fi *FlowAttrs) IsStarting() bool

type FlowAttrsMessage

type FlowAttrsMessage struct {
	Type       FlowMessageType
	FlowAttrs  *FlowAttrs
	TasksAttrs *[]tasks.TaskAttrs
	TaskAttrs  *tasks.TaskAttrs
}

type FlowConfig

type FlowConfig struct {
	MountMap MountMap
}

type FlowEngine

type FlowEngine interface {
	StartFlow(flowId, taskId string) (*FlowAttrs, error)
	LaunchFlow(repoName string, branchName string, commitId string, cmdString string, evars map[string]string) (*FlowAttrs, error)
	LogStream(flowId string) (io.ReadCloser, error)
}

type FlowMessage

type FlowMessage struct {
	Type          FlowMessageType
	Flow          *Flow
	Tasks         *[]tasks.Task
	FlowStatusStr string
	Task          *tasks.Task
	TaskStatusStr string
	EnvVars       map[string]string

	Repos  []*ws.RepoMessage
	CmdStr string
}

type FlowMessageType

type FlowMessageType int
const (
	FlowRequest  FlowMessageType = 10
	FlowResponse FlowMessageType = 20
	FlowData     FlowMessageType = 30
)

type FlowOutRepoRequest

type FlowOutRepoRequest struct {
}

type FlowOutRepoResponse

type FlowOutRepoResponse struct {
	Repo   *ws.Repo
	Branch *ws.Branch
	Commit *ws.Commit
}

type FlowServer

type FlowServer struct {
	// contains filtered or unexported fields
}

func (*FlowServer) Close

func (fs *FlowServer) Close()

func (*FlowServer) DetachTaskWorker

func (fs *FlowServer) DetachTaskWorker(workerId, flowId, taskId string) error

func (*FlowServer) GetCommandLogPath

func (fs *FlowServer) GetCommandLogPath(taskId string) string

func (*FlowServer) GetFlowAttr

func (fs *FlowServer) GetFlowAttr(flowId string) (*FlowAttrs, error)

func (*FlowServer) GetFlowLogPath

func (fs *FlowServer) GetFlowLogPath(flowId string) string

func (*FlowServer) GetModel

func (fs *FlowServer) GetModel(flow Flow) (repo *ws.Repo, branch *ws.Branch, commit *ws.Commit, fnErr error)

func (*FlowServer) GetOrCreateModel

func (fs *FlowServer) GetOrCreateModel(flow Flow) (repo *ws.Repo, branch *ws.Branch, commit *ws.Commit, fnErr error)

func (*FlowServer) GetOrCreateOutput

func (fs *FlowServer) GetOrCreateOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) GetOutput

func (fs *FlowServer) GetOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) GetTaskLog

func (fs *FlowServer) GetTaskLog(flowId string) ([]byte, int, error)

func (*FlowServer) GetTaskLogPath

func (fs *FlowServer) GetTaskLogPath(taskId string) string

func (*FlowServer) LaunchFlow

func (fs *FlowServer) LaunchFlow(repoName, branchName, commitId, cmdStr string, evars map[string]string) (*FlowAttrs, error)

func (*FlowServer) LogStream

func (fs *FlowServer) LogStream(flow_id string) (io.ReadCloser, error)

func (*FlowServer) NewModel

func (fs *FlowServer) NewModel(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) NewOutput

func (fs *FlowServer) NewOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) RegisterWorker

func (fs *FlowServer) RegisterWorker(flowId string, taskId string, ipaddr string) (*WorkerAttrs, error)

func (*FlowServer) UpdateWorkerTaskStatus

func (fs *FlowServer) UpdateWorkerTaskStatus(worker Worker, tsr *TaskStatusChangeRequest) (*TaskStatusChangeResponse, error)

type FlowStatus

type FlowStatus int
const (
	FLOW_CREATED FlowStatus = iota
	FLOW_WAITINGTOSTART
	FLOW_STARTING
	FLOW_STARTED
	FLOW_WAITING
	FLOW_RUNNING
	FLOW_COMPLETING
	FLOW_CANCELLING
	FLOW_STOPPING
	FLOW_FAILED
	FLOW_STOPPED
	FLOW_COMPLETED
	FLOW_CANCELLED
)

type FlowTaskWorker

type FlowTaskWorker struct {
	Worker  Worker    `json:"Worker"`
	Flow    Flow      `json:"Flow"`
	Task    Task      `json:"Task"`
	Created time.Time `json:"created"`
}

type NewFlowLaunchRequest

type NewFlowLaunchRequest struct {
	Repo      ws.Repo
	Branch    ws.Branch
	Commit    ws.Commit
	CmdString string
}

type NewFlowLaunchResponse

type NewFlowLaunchResponse struct {
	TaskStatus    tasks.TaskStatus
	TaskStatusStr string
	Task          *tasks.Task
	Flow          *Flow
}

type PodKeeper

type PodKeeper struct {
	// contains filtered or unexported fields
}

generate packages / functions for creating/destroying workers

func NewDefaultPodKeeper

func NewDefaultPodKeeper(
	config *config.Config,
	db db_pkg.DatabaseContext,
	logger storage.ObjectAPIServer) (*PodKeeper, error)

func NewWorkerPool

func NewWorkerPool(config *config.Config, db db_pkg.DatabaseContext, logger storage.ObjectAPIServer) (*PodKeeper, error)

TODO: add worker limits

func (*PodKeeper) AssignWorker

func (pk *PodKeeper) AssignWorker(taskId string, flowAttrs *FlowAttrs, masterIp string, masterPort int32, masterExtPort int32) error

launch a new namespace config with K8s

func (*PodKeeper) CloseWatch

func (pk *PodKeeper) CloseWatch()

func (*PodKeeper) LogStream

func (pk *PodKeeper) LogStream(flowId string) (io.ReadCloser, error)

create a new channel and currespoding go rouine to call pod log return channel. let the reader read

func (*PodKeeper) ReleaseWorker

func (pk *PodKeeper) ReleaseWorker(flow Flow) error

task Id?

func (*PodKeeper) SaveMessageToWorkerLog

func (pk *PodKeeper) SaveMessageToWorkerLog(s string, worker Worker, flow Flow) error

func (*PodKeeper) SavePodLog

func (pk *PodKeeper) SavePodLog(podId, logDir, logName string) error

func (*PodKeeper) SaveWorkerLog

func (pk *PodKeeper) SaveWorkerLog(worker Worker, flow Flow) error

func (*PodKeeper) Watch

func (pk *PodKeeper) Watch(eventCh chan WorkerEvent)

func (*PodKeeper) WorkerExists

func (pk *PodKeeper) WorkerExists(flowId, taskId string) bool

type TaskStatusChangeRequest

type TaskStatusChangeRequest struct {
	Flow       Flow
	Task       tasks.Task
	TaskStatus tasks.TaskStatus
	Message    string
}

type TaskStatusChangeResponse

type TaskStatusChangeResponse struct {
	FlowAttrs *FlowAttrs
}

type Worker

type Worker struct {
	Id       string
	PodId    string
	PodPhase string
}

type WorkerAttrs

type WorkerAttrs struct {
	Worker    Worker       `json:"Worker"`
	Flow      Flow         `json:"Flow"`
	Ip        string       `json:"ip"`
	Task      Task         `json:"Task"`
	Started   time.Time    `json:"started"`
	Completed time.Time    `json:"completed"`
	Error     string       `json:"error"`
	Status    WorkerStatus `json:"WorkerStatus"` // REGISTERED, RUNNING, FAILED, STOPPED
}

type WorkerEvent

type WorkerEvent struct {
	Type WorkerEventType

	Worker Worker
	Flow   Flow
	Task   tsk_pkg.Task
}

type WorkerEventType

type WorkerEventType string

func WorkerEventFromStr

func WorkerEventFromStr(evt string) WorkerEventType

type WorkerPool

type WorkerPool interface {
	WorkerExists(flowId, taskId string) bool
	AssignWorker(taskId string, flowAttrs *FlowAttrs, masterIp string, masterPort, masterExtPort int32) error
	ReleaseWorker(flow Flow) error

	Watch(eventCh chan WorkerEvent)
	CloseWatch()

	SaveWorkerLog(worker Worker, flow Flow) error
	LogStream(flowId string) (io.ReadCloser, error)
}

type WorkerStatus

type WorkerStatus int
const (
	WORKER_REGISTERED WorkerStatus = iota
	WORKER_RUNNING
	WORKER_FAILED
	WORKER_COMPLETED
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL