Documentation ¶
Overview ¶
Package server implements RNNR main logic to manage tasks and worker nodes.
Index ¶
- Constants
- func CheckTask(task *models.Task, node *models.Node, res chan<- *models.Task, ...)
- func GetNodeResources(node *models.Node) (*proto.Info, error)
- func RemoteCancel(task *models.Task, node *models.Node) error
- func RemoteCheck(task *models.Task, address string) error
- func RemoteRun(task *models.Task, address string) error
- type DB
- func (d *DB) AddNode(n *models.Node) error
- func (d *DB) GetNode(host string) (*models.Node, error)
- func (d *DB) GetTask(id string, view models.View) (*models.Task, error)
- func (d *DB) ListNodes(active *bool) ([]*models.Node, error)
- func (d *DB) ListTasks(limit, skip int64, view models.View, nodes []string, states []models.State) ([]*models.Task, error)
- func (d *DB) SaveTask(t *models.Task) error
- func (d *DB) UpdateNode(n *models.Node) error
- func (d *DB) UpdateTask(t *models.Task) error
- type Docker
- type Main
- func (m *Main) CancelTask(id string) error
- func (m *Main) CheckTasks() error
- func (m *Main) CreateTask(t *models.Task) error
- func (m *Main) DisableNode(host string, cancel bool) error
- func (m *Main) EnableNode(node *models.Node) error
- func (m *Main) GetTask(id string, view models.View) (*models.Task, error)
- func (m *Main) InitializeTasks() error
- func (m *Main) ListNodes(active *bool) ([]*models.Node, error)
- func (m *Main) ListTasks(namePrefix string, limit int64, start int64, view models.View, nodes []string, ...) (*models.ListTasksResponse, error)
- func (m *Main) RequestNode(resources *models.Resources) (*models.Node, error)
- func (m *Main) RunTask(task *models.Task, node *models.Node, res chan<- *models.Task, ...)
- func (m *Main) RunTasks() error
- func (m *Main) StartTaskManager(sleepTime time.Duration)
- func (m *Main) UpdateNodesWorkload(nodes []*models.Node) error
- type NetworkError
- type NoActiveNodes
- type NoEnoughResources
- type Worker
- func (w *Worker) CheckContainer(ctx context.Context, container *proto.Container) (*proto.State, error)
- func (w *Worker) GetInfo(context.Context, *empty.Empty) (*proto.Info, error)
- func (w *Worker) RunContainer(ctx context.Context, container *proto.Container) (*empty.Empty, error)
- func (w *Worker) StopContainer(ctx context.Context, container *proto.Container) (*empty.Empty, error)
Constants ¶
const ( // TaskCollection is the collection name for tasks TaskCollection = "tasks" // NodeCollection is the collection name for nodes NodeCollection = "nodes" )
Variables ¶
This section is empty.
Functions ¶
func GetNodeResources ¶ added in v1.2.1
GetNodeResources gets node resource information.
func RemoteCancel ¶ added in v1.2.1
RemoteCancel cancels remotely a task.
func RemoteCheck ¶ added in v1.2.1
RemoteCheck checks remotely a task.
Types ¶
type DB ¶ added in v1.2.1
type DB struct {
// contains filtered or unexported fields
}
DB wraps MongoDB client to provides task- and node-related operations.
func MongoConnect ¶ added in v1.2.1
MongoConnect creates a MongoDB client.
func (*DB) AddNode ¶ added in v1.2.1
AddNode activates a node. If already registered it updates node fields with same ID.
func (*DB) ListNodes ¶ added in v1.2.1
ListNodes returns worker nodes (disabled included). Set active to return active (enabled) or disable nodes.
func (*DB) ListTasks ¶ added in v1.2.1
func (d *DB) ListTasks(limit, skip int64, view models.View, nodes []string, states []models.State) ([]*models.Task, error)
ListTasks retrieves tasks that match given worker nodes and states. Pagination is done via limit and skip parameters. view defines task fields to be returned.
Minimal returns only task ID and state.
Basic returns all fields except Logs.ExecutorLogs.Stdout, Logs.ExecutorLogs.Stderr, Inputs.Content and Logs.SystemLogs.
Full returns all fields.
func (*DB) SaveTask ¶ added in v1.2.1
SaveTask stores a task. It will set Task.Created and Task.Updated to current local time.
func (*DB) UpdateNode ¶ added in v1.2.1
UpdateNode updates node information.
type Docker ¶ added in v1.2.1
type Docker struct {
// contains filtered or unexported fields
}
Docker struct wraps Docker client
func DockerConnect ¶ added in v1.2.1
DockerConnect creates a Docker client using environment variables
func (*Docker) RemoveContainer ¶ added in v1.2.1
RemoveContainer removes a container.
type Main ¶ added in v1.2.1
type Main struct { Router *mux.Router DB *DB ServiceInfo *models.ServiceInfo }
Main is a main instance.
func NewMain ¶ added in v1.2.1
NewMain creates a server and initializes Task and Node endpoints. database is URI to MongoDB (without database name, which is rnnr). sleepTimes defines the time in seconds that main will sleep after task management iteration.
func (*Main) CancelTask ¶ added in v1.2.1
CancelTask cancels a task by its ID.
func (*Main) CheckTasks ¶ added in v1.2.1
CheckTasks will iterate over running tasks checking if they have been completed well or not. It runs concurrently.
func (*Main) CreateTask ¶ added in v1.2.1
CreateTask creates a task with new ID and queue state.
func (*Main) DisableNode ¶ added in v1.2.1
DisableNode updates node availability removing usage information. Cancel argument will cancels remote tasks and puts them back to queue.
func (*Main) EnableNode ¶ added in v1.2.1
EnableNode inserts or enables a worker node. Use the current computational resources of the node if it had not been defined.
func (*Main) InitializeTasks ¶ added in v1.2.1
InitializeTasks iterates over all Queued tasks requesting a computing node for each task. The selected node is assigned to perform the task. The task changes to the Initializing state. If no active node has enough computing resources to perform the task the same is kept in queue.
func (*Main) ListNodes ¶ added in v1.2.1
ListNodes returns worker nodes (disabled included). Set active to return active (enabled) or disable nodes.
func (*Main) ListTasks ¶ added in v1.2.1
func (m *Main) ListTasks(namePrefix string, limit int64, start int64, view models.View, nodes []string, states []models.State) (*models.ListTasksResponse, error)
ListTasks returns all tasks.
func (*Main) RequestNode ¶ added in v1.2.1
RequestNode selects a node that have enough computing resource to execute task. If there is no active node it returns NoActiveNodes error. If there is some active node but none of them is able to process then it returns NoEnoughResources error. Once found a node it will update in database.
func (*Main) RunTask ¶ added in v1.2.1
func (m *Main) RunTask(task *models.Task, node *models.Node, res chan<- *models.Task, wg *sync.WaitGroup)
RunTask remotely starts a task.
func (*Main) StartTaskManager ¶ added in v1.2.1
StartTaskManager starts task management. It will iterate over: 1) queued tasks; 2) initialized tasks; and 3) running tasks. Then it will sleepTime seconds and start over.
type NetworkError ¶ added in v1.2.1
type NetworkError struct {
// contains filtered or unexported fields
}
NetworkError represents a network error.
type NoActiveNodes ¶ added in v1.2.1
type NoActiveNodes struct {
// contains filtered or unexported fields
}
NoActiveNodes error is returned when there is no active node for processing tasks remotely.
type NoEnoughResources ¶ added in v1.2.1
type NoEnoughResources struct {
// contains filtered or unexported fields
}
NoEnoughResources error is returned when none os active node have enough computing resources to process task.
type Worker ¶ added in v1.2.1
type Worker struct { proto.UnimplementedWorkerServer Info *proto.Info Docker *Docker }
Worker struct wraps service info and Docker connection.
func NewWorker ¶ added in v1.2.1
func NewWorker(cpuCores int32, ramGb float64, volumes []string, user, group string) (*Worker, error)
NewWorker creates a Worker. If cpuCores or ramGb is not defined (equal to 0) it will guess the available resources. It will warn if the defined values are bigger than guessed values.
func (*Worker) CheckContainer ¶ added in v1.2.1
func (w *Worker) CheckContainer(ctx context.Context, container *proto.Container) (*proto.State, error)
CheckContainer checks if container is running.