Documentation ¶
Index ¶
- Constants
- Variables
- func ErrTaskComplete() error
- func FlowStatusToString(key FlowStatus) string
- func GetClient(hfConfig *config.KubeConfig) (client *kube.Clientset, fnerr error)
- func InvalidFlowID() error
- func InvalidFlowIdError(flowId string) error
- func InvalidFlowParamsError(flowId string) error
- func InvalidTaskError(taskID string) error
- func InvalidTaskID() error
- func InvalidTaskStatusError() error
- func InvalidWorkerFlowCombo() error
- func NewFlowEngine(qs *queryServer, db db_pkg.DatabaseContext, logger storage.ObjectAPIServer, ...) (*flowEngine, error)
- func NewQueryServer(db db_pkg.DatabaseContext) *queryServer
- func NewWorkPoolWatcher() chan WorkerEvent
- func TaskWorkerExistsError(flowId, taskId string) error
- type ComputeOptions
- type Flow
- type FlowAttrs
- func (f *FlowAttrs) AddTask(taskConfig *TaskConfig) *TaskAttrs
- func (f *FlowAttrs) FirstTask() *TaskAttrs
- func (f *FlowAttrs) FlowStatus() string
- func (f *FlowAttrs) IsComplete() bool
- func (fi *FlowAttrs) IsCompleted() bool
- func (fi *FlowAttrs) IsCreated() bool
- func (fi *FlowAttrs) IsFailed() bool
- func (fi *FlowAttrs) IsStarted() bool
- func (fi *FlowAttrs) IsStarting() bool
- type FlowAttrsMessage
- type FlowConfig
- type FlowEngine
- type FlowMessage
- type FlowMessageType
- type FlowOutRepoRequest
- type FlowOutRepoResponse
- type FlowServer
- func (fs *FlowServer) Close()
- func (fs *FlowServer) DetachTaskWorker(workerId, flowId, taskId string) error
- func (fs *FlowServer) GetCommandLogPath(taskId string) string
- func (fs *FlowServer) GetFlowAttr(flowId string) (*FlowAttrs, error)
- func (fs *FlowServer) GetFlowLogPath(flowId string) string
- func (fs *FlowServer) GetModel(flow Flow) (repo *ws.Repo, branch *ws.Branch, commit *ws.Commit, fnErr error)
- func (fs *FlowServer) GetOrCreateModel(flow Flow) (repo *ws.Repo, branch *ws.Branch, commit *ws.Commit, fnErr error)
- func (fs *FlowServer) GetOrCreateOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)
- func (fs *FlowServer) GetOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)
- func (fs *FlowServer) GetTaskLog(flowId string) ([]byte, int, error)
- func (fs *FlowServer) GetTaskLogPath(taskId string) string
- func (fs *FlowServer) LaunchFlow(repoName, branchName, commitId, cmdStr string, evars map[string]string) (*FlowAttrs, error)
- func (fs *FlowServer) LogStream(flow_id string) (io.ReadCloser, error)
- func (fs *FlowServer) NewModel(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)
- func (fs *FlowServer) NewOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)
- func (fs *FlowServer) RegisterWorker(flowId string, taskId string, ipaddr string) (*WorkerAttrs, error)
- func (fs *FlowServer) UpdateWorkerTaskStatus(worker Worker, tsr *TaskStatusChangeRequest) (*TaskStatusChangeResponse, error)
- type FlowStatus
- type FlowTaskWorker
- type NewFlowLaunchRequest
- type NewFlowLaunchResponse
- type PodKeeper
- func (pk *PodKeeper) AssignWorker(taskId string, flowAttrs *FlowAttrs, masterIp string, masterPort int32, ...) error
- func (pk *PodKeeper) CloseWatch()
- func (pk *PodKeeper) LogStream(flowId string) (io.ReadCloser, error)
- func (pk *PodKeeper) ReleaseWorker(flow Flow) error
- func (pk *PodKeeper) SaveMessageToWorkerLog(s string, worker Worker, flow Flow) error
- func (pk *PodKeeper) SavePodLog(podId, logDir, logName string) error
- func (pk *PodKeeper) SaveWorkerLog(worker Worker, flow Flow) error
- func (pk *PodKeeper) Watch(eventCh chan WorkerEvent)
- func (pk *PodKeeper) WorkerExists(flowId, taskId string) bool
- type TaskStatusChangeRequest
- type TaskStatusChangeResponse
- type Worker
- type WorkerAttrs
- type WorkerEvent
- type WorkerEventType
- type WorkerPool
- type WorkerStatus
Constants ¶
View Source
const ( WorkerInitError WorkerEventType = "WORKER_INIT_FAILED" WorkerAdded WorkerEventType = "ADDED" WorkerModified WorkerEventType = "MODIFIED" WorkerDeleted WorkerEventType = "DELETED" WorkerError WorkerEventType = "ERROR" WorkerFailed WorkerEventType = "FAILED" WorkerSucceeded WorkerEventType = "SUCCEEDED" DefaultChanSize int32 = 100 )
Variables ¶
View Source
var FlowStatusKey = map[int]string{
0: "FLOW_CREATED",
1: "FLOW_WAITINGTOSTART",
2: "FLOW_STARTING",
3: "FLOW_STARTED",
4: "FLOW_WAITING",
5: "FLOW_RUNNING",
6: "FLOW_COMPLETING",
7: "FLOW_CANCELLING",
8: "FLOW_STOPPING",
9: "FLOW_FAILED",
10: "FLOW_STOPPED",
11: "FLOW_COMPLETED",
12: "FLOW_CANCELLED",
}
Functions ¶
func ErrTaskComplete ¶
func ErrTaskComplete() error
func FlowStatusToString ¶
func FlowStatusToString(key FlowStatus) string
func InvalidFlowIdError ¶
func InvalidFlowParamsError ¶
func InvalidTaskError ¶
InvalidTaskError : Raised when invalid task Id is passed to API
func InvalidTaskStatusError ¶
func InvalidTaskStatusError() error
InvalidTaskStatusError : Error object
func InvalidWorkerFlowCombo ¶
func InvalidWorkerFlowCombo() error
InvalidWorkerFlowCombo : Error object
func NewFlowEngine ¶
func NewFlowEngine( qs *queryServer, db db_pkg.DatabaseContext, logger storage.ObjectAPIServer, c *config.Config) (*flowEngine, error)
func NewQueryServer ¶
func NewQueryServer(db db_pkg.DatabaseContext) *queryServer
func NewWorkPoolWatcher ¶
func NewWorkPoolWatcher() chan WorkerEvent
func TaskWorkerExistsError ¶
Types ¶
type ComputeOptions ¶
type ComputeOptions struct { // true when the master runs inside cluster InCluster bool // contains filtered or unexported fields }
Worker/Pod Details to generate kubernetes namespace
type FlowAttrs ¶
type FlowAttrs struct { Flow Flow // mounted file systems OpenMounts map[string]string // value - look at constants above Status FlowStatus Created time.Time Started time.Time Completed time.Time Failed time.Time CompletionText string EnvVars map[string]string // support multiple tasks in future releases Tasks map[string]TaskAttrs `json:"Tasks"` FlowConfig *FlowConfig `json:"FlowConfig"` // contains filtered or unexported fields }
TODO: Add version to flow at somepoint
func NewFlowAttrs ¶
func NewFlowAttrs(fc *FlowConfig) *FlowAttrs
func (*FlowAttrs) FlowStatus ¶
func (*FlowAttrs) IsComplete ¶
func (*FlowAttrs) IsCompleted ¶
func (*FlowAttrs) IsStarting ¶
type FlowAttrsMessage ¶
type FlowConfig ¶
type FlowConfig struct {
MountMap MountMap
}
type FlowEngine ¶
type FlowMessage ¶
type FlowMessageType ¶
type FlowMessageType int
const ( FlowRequest FlowMessageType = 10 FlowResponse FlowMessageType = 20 FlowData FlowMessageType = 30 )
type FlowOutRepoRequest ¶
type FlowOutRepoRequest struct { }
type FlowOutRepoResponse ¶
type FlowServer ¶
type FlowServer struct {
// contains filtered or unexported fields
}
func NewFlowServer ¶
func NewFlowServer(config *config.Config, db db_pkg.DatabaseContext, obj storage.ObjectAPIServer, wsapi ws.ApiServer, logger storage.ObjectAPIServer) (*FlowServer, error)
func (*FlowServer) Close ¶
func (fs *FlowServer) Close()
func (*FlowServer) DetachTaskWorker ¶
func (fs *FlowServer) DetachTaskWorker(workerId, flowId, taskId string) error
func (*FlowServer) GetCommandLogPath ¶
func (fs *FlowServer) GetCommandLogPath(taskId string) string
func (*FlowServer) GetFlowAttr ¶
func (fs *FlowServer) GetFlowAttr(flowId string) (*FlowAttrs, error)
func (*FlowServer) GetFlowLogPath ¶
func (fs *FlowServer) GetFlowLogPath(flowId string) string
func (*FlowServer) GetOrCreateModel ¶
func (*FlowServer) GetOrCreateOutput ¶
func (*FlowServer) GetTaskLog ¶
func (fs *FlowServer) GetTaskLog(flowId string) ([]byte, int, error)
func (*FlowServer) GetTaskLogPath ¶
func (fs *FlowServer) GetTaskLogPath(taskId string) string
func (*FlowServer) LaunchFlow ¶
func (*FlowServer) LogStream ¶
func (fs *FlowServer) LogStream(flow_id string) (io.ReadCloser, error)
func (*FlowServer) RegisterWorker ¶
func (fs *FlowServer) RegisterWorker(flowId string, taskId string, ipaddr string) (*WorkerAttrs, error)
func (*FlowServer) UpdateWorkerTaskStatus ¶
func (fs *FlowServer) UpdateWorkerTaskStatus(worker Worker, tsr *TaskStatusChangeRequest) (*TaskStatusChangeResponse, error)
type FlowStatus ¶
type FlowStatus int
const ( FLOW_CREATED FlowStatus = iota FLOW_WAITINGTOSTART FLOW_STARTING FLOW_STARTED FLOW_WAITING FLOW_RUNNING FLOW_COMPLETING FLOW_CANCELLING FLOW_STOPPING FLOW_FAILED FLOW_STOPPED FLOW_COMPLETED FLOW_CANCELLED )
type FlowTaskWorker ¶
type NewFlowLaunchRequest ¶
type NewFlowLaunchResponse ¶
type PodKeeper ¶
type PodKeeper struct {
// contains filtered or unexported fields
}
generate packages / functions for creating/destroying workers
func NewDefaultPodKeeper ¶
func NewDefaultPodKeeper( config *config.Config, db db_pkg.DatabaseContext, logger storage.ObjectAPIServer) (*PodKeeper, error)
func NewWorkerPool ¶
func NewWorkerPool(config *config.Config, db db_pkg.DatabaseContext, logger storage.ObjectAPIServer) (*PodKeeper, error)
TODO: add worker limits
func (*PodKeeper) AssignWorker ¶
func (pk *PodKeeper) AssignWorker(taskId string, flowAttrs *FlowAttrs, masterIp string, masterPort int32, masterExtPort int32) error
launch a new namespace config with K8s
func (*PodKeeper) CloseWatch ¶
func (pk *PodKeeper) CloseWatch()
func (*PodKeeper) LogStream ¶
func (pk *PodKeeper) LogStream(flowId string) (io.ReadCloser, error)
create a new channel and currespoding go rouine to call pod log return channel. let the reader read
func (*PodKeeper) SaveMessageToWorkerLog ¶
func (*PodKeeper) SavePodLog ¶
func (*PodKeeper) SaveWorkerLog ¶
func (*PodKeeper) Watch ¶
func (pk *PodKeeper) Watch(eventCh chan WorkerEvent)
func (*PodKeeper) WorkerExists ¶
type TaskStatusChangeRequest ¶
type TaskStatusChangeResponse ¶
type TaskStatusChangeResponse struct {
FlowAttrs *FlowAttrs
}
type WorkerAttrs ¶
type WorkerAttrs struct { Worker Worker `json:"Worker"` Flow Flow `json:"Flow"` Ip string `json:"ip"` Task Task `json:"Task"` Started time.Time `json:"started"` Completed time.Time `json:"completed"` Error string `json:"error"` Status WorkerStatus `json:"WorkerStatus"` // REGISTERED, RUNNING, FAILED, STOPPED }
type WorkerEvent ¶
type WorkerEvent struct { Type WorkerEventType Worker Worker Flow Flow Task tsk_pkg.Task }
type WorkerEventType ¶
type WorkerEventType string
func WorkerEventFromStr ¶
func WorkerEventFromStr(evt string) WorkerEventType
type WorkerPool ¶
type WorkerPool interface { WorkerExists(flowId, taskId string) bool AssignWorker(taskId string, flowAttrs *FlowAttrs, masterIp string, masterPort, masterExtPort int32) error ReleaseWorker(flow Flow) error Watch(eventCh chan WorkerEvent) CloseWatch() SaveWorkerLog(worker Worker, flow Flow) error LogStream(flowId string) (io.ReadCloser, error) }
type WorkerStatus ¶
type WorkerStatus int
const ( WORKER_REGISTERED WorkerStatus = iota WORKER_RUNNING WORKER_FAILED WORKER_COMPLETED )
Click to show internal directories.
Click to hide internal directories.