Documentation ¶
Index ¶
- Constants
- Variables
- func DiffTasksState(old DesiredState, new DesiredState) ([]string, []string, []string)
- func DiffWorkersState(old WorkerView, new WorkerView) ([]string, []string, []string)
- type ClusterConfig
- func (c *ClusterConfig) GetAssignPath() string
- func (c *ClusterConfig) GetBeesPath() string
- func (c *ClusterConfig) GetClusterPath() string
- func (c *ClusterConfig) GetLeaderPath() string
- func (c *ClusterConfig) GetMasterPath() string
- func (c *ClusterConfig) GetMyTaskPath(id string) string
- func (c *ClusterConfig) GetMyTaskStatusPath(id string) string
- func (c *ClusterConfig) GetMyWorkerPath(id string) string
- func (c *ClusterConfig) GetNotifierPath() string
- func (c *ClusterConfig) GetTaskAssignPath(id string) string
- func (c *ClusterConfig) GetTasksPath() string
- func (c *ClusterConfig) GetWorkerAssignPath(id string) string
- func (c *ClusterConfig) GetWorkersPath() string
- func (c *ClusterConfig) Validate() error
- type Crawler
- type DesiredState
- type Forwarder
- type Implementer
- type Master
- func (m *Master) AddTask(req *pb.TaskSpec) (*pb.WorkerSpec, error)
- func (m *Master) DeleteTask(taskID string) error
- func (m *Master) GetTask(taskID string, loadStatus bool) (*pb.TaskSpec, error)
- func (m *Master) GetWorker(id string) (*pb.WorkerSpec, error)
- func (m *Master) ListTasks(workerID string, loadStatus bool) ([]*pb.TaskSpec, error)
- func (m *Master) ListWorkers() ([]*pb.WorkerSpec, error)
- func (m *Master) Run()
- func (m *Master) Stop()
- type NotLeaderError
- type QuotaBasedScheduler
- type Scheduler
- type Score
- type TaskView
- type Worker
- type WorkerView
- type WorkloadBasedScheduler
Constants ¶
const ( BeesBaseNode = "/bees" PollTimeout = 60 * time.Second BatchBackoffTimeoutMS = 1000 ForwardTimeout = 10 * time.Second )
Variables ¶
var ( ErrInvalidClusterID = errors.New("invalid cluster id") ErrInvalidZKEndpoints = errors.New("invalid zookeeper endpoints") ErrInvalidMasterID = errors.New("invalid master id") ErrInvalidWorkerID = errors.New("invalid worker id") ErrInvalidTaskID = errors.New("invalid task id") ErrNoWorker = errors.New("no such worker") ErrNoAvailableWorker = errors.New("no available worker") ErrWorkerResourceExhausted = errors.New("worker resource exhausted") ErrNoTask = errors.New("no such task") ErrTaskExists = errors.New("task exists") ErrBadAddress = errors.New("bad address") )
Functions ¶
func DiffTasksState ¶
func DiffTasksState(old DesiredState, new DesiredState) ([]string, []string, []string)
DiffTasksState picks out what added/deleted/changed between old DesiredState and new DesiredState.
func DiffWorkersState ¶
func DiffWorkersState(old WorkerView, new WorkerView) ([]string, []string, []string)
DiffWorkersState picks out what added/deleted/changed between old WorkerView and new WorkerView.
Types ¶
type ClusterConfig ¶
type ClusterConfig struct { ID string `json:"cluster_id"` ZKEndpoints []string `json:"zk_endpoints"` }
ClusterConfig denotes the cluster config.
func LoadClusterConfig ¶
func LoadClusterConfig(filename string) (*ClusterConfig, error)
LoadClusterConfig loads cluster config from target file.
func (*ClusterConfig) GetAssignPath ¶
func (c *ClusterConfig) GetAssignPath() string
GetAssignPath gets "/bees/{cluster_id}/assign".
func (*ClusterConfig) GetBeesPath ¶
func (c *ClusterConfig) GetBeesPath() string
GetBeesPath gets "/bees".
func (*ClusterConfig) GetClusterPath ¶
func (c *ClusterConfig) GetClusterPath() string
GetClusterPath gets "/bees/{cluster_id}".
func (*ClusterConfig) GetLeaderPath ¶
func (c *ClusterConfig) GetLeaderPath() string
GetLeaderPath gets "/bees/{cluster_id}/leader".
func (*ClusterConfig) GetMasterPath ¶
func (c *ClusterConfig) GetMasterPath() string
GetMasterPath gets "/bees/{cluster_id}/master".
func (*ClusterConfig) GetMyTaskPath ¶
func (c *ClusterConfig) GetMyTaskPath(id string) string
GetMyTaskPath gets "/bees/{cluster_id}/tasks/{task_id}".
func (*ClusterConfig) GetMyTaskStatusPath ¶
func (c *ClusterConfig) GetMyTaskStatusPath(id string) string
GetMyTaskStatusPath gets "/bees/{cluster_id}/tasks/{task_id}/status".
func (*ClusterConfig) GetMyWorkerPath ¶
func (c *ClusterConfig) GetMyWorkerPath(id string) string
GetMyWorkerPath gets "/bees/{cluster_id}/workers/{worker_id}".
func (*ClusterConfig) GetNotifierPath ¶
func (c *ClusterConfig) GetNotifierPath() string
GetNotifierPath gets "/bees/{cluster_id}/notifier".
func (*ClusterConfig) GetTaskAssignPath ¶
func (c *ClusterConfig) GetTaskAssignPath(id string) string
GetTaskAssignPath gets "/bees/{cluster_id}/assign/{worker_id}/{task_id}".
func (*ClusterConfig) GetTasksPath ¶
func (c *ClusterConfig) GetTasksPath() string
GetTasksPath gets "/bees/{cluster_id}/tasks".
func (*ClusterConfig) GetWorkerAssignPath ¶
func (c *ClusterConfig) GetWorkerAssignPath(id string) string
GetWorkerAssignPath gets "/bees/{cluster_id}/assign/{worker_id}".
func (*ClusterConfig) GetWorkersPath ¶
func (c *ClusterConfig) GetWorkersPath() string
GetWorkersPath gets "/bees/{cluster_id}/workers".
func (*ClusterConfig) Validate ¶
func (c *ClusterConfig) Validate() error
Validate validates the cluster config.
type DesiredState ¶
type Forwarder ¶
type Forwarder struct {
// contains filtered or unexported fields
}
Forwarder forwards the request to the next opened grpc connection.
func NewForwarder ¶
func NewForwarder(opts ...grpc.DialOption) *Forwarder
NewForwarder creates a new forwarder instance.
type Implementer ¶
Implementer interface provides strategy to implement task.
type Master ¶
type Master struct {
// contains filtered or unexported fields
}
Master provides a task scheduler with HA and auto reconcile support.
func NewMaster ¶
func NewMaster(conf *ClusterConfig, id string) (*Master, error)
NewMaster creates a new master instance.
func (*Master) DeleteTask ¶
DeleteTask deletes a specific task.
func (*Master) GetWorker ¶
func (m *Master) GetWorker(id string) (*pb.WorkerSpec, error)
GetWorker gets a specific worker.
func (*Master) ListWorkers ¶
func (m *Master) ListWorkers() ([]*pb.WorkerSpec, error)
ListWorkers lists all workers.
type NotLeaderError ¶
type NotLeaderError struct {
Leader string
}
func (*NotLeaderError) Error ¶
func (e *NotLeaderError) Error() string
type QuotaBasedScheduler ¶
type QuotaBasedScheduler struct { }
QuotaBasedScheduler schedules task to worker with corresponding labels and quotas not exceeded.
func (*QuotaBasedScheduler) Assign ¶
func (qbs *QuotaBasedScheduler) Assign(workers WorkerView, task *pb.TaskSpec) (*pb.WorkerSpec, error)
Assign assigns task to a worker with low workload.
func (*QuotaBasedScheduler) Rebalance ¶
func (qbs *QuotaBasedScheduler) Rebalance(workers WorkerView, tasks TaskView) (map[string]string, error)
Rebalance reblances tasks among workers.
type Scheduler ¶
type Scheduler interface { Assign(workers WorkerView, task *pb.TaskSpec) (*pb.WorkerSpec, error) Rebalance(workers WorkerView, tasks TaskView) (map[string]string, error) }
Scheduler interface provides strategy to assign task to a worker.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a worker with given labels and resource quotas.
func NewWorker ¶
func NewWorker(conf *ClusterConfig, spec *pb.WorkerSpec) (*Worker, error)
NewWorker creates a new worker instance.
func (*Worker) GetDesiredState ¶
func (w *Worker) GetDesiredState() <-chan DesiredState
GetDesiredState returns a channel which provides desired task states for current worker. It's caller's responsibility to calculate state diffs and change real world state.
func (*Worker) Run ¶
func (w *Worker) Run()
Run runs worker main loop which watches task assignment changes.
func (*Worker) SetTaskStatus ¶
func (w *Worker) SetTaskStatus(id string, status *pb.TaskStatus) error
SetTaskStatus sets user-defined status, which can be read by master. Note that status is bound to current worker, and will be invalidated on task reassignment.
type WorkerView ¶
type WorkerView map[string]pb.WorkerSpec
type WorkloadBasedScheduler ¶
type WorkloadBasedScheduler struct { }
WorkloadBasedScheduler schedules task to worker with corresponding labels and low workload.
func (*WorkloadBasedScheduler) Assign ¶
func (wbs *WorkloadBasedScheduler) Assign(workers WorkerView, task *pb.TaskSpec) (*pb.WorkerSpec, error)
Assign assigns task to a worker with low workload.
func (*WorkloadBasedScheduler) Rebalance ¶
func (wbs *WorkloadBasedScheduler) Rebalance(workers WorkerView, tasks TaskView) (map[string]string, error)
Rebalance reblances tasks among workers.