Documentation ¶
Index ¶
- Constants
- Variables
- type Dispatcher
- type EmptyTask
- type Node
- type Partition
- type PartitionTask
- type ServiceTask
- type Stage
- type Task
- type TaskManager
- func (m *TaskManager) BuildStage(c *config.Stage) (*Stage, error)
- func (m *TaskManager) BuildTask(c *config.Task) Task
- func (m *TaskManager) Init() error
- func (m *TaskManager) KillStage(name string) error
- func (m *TaskManager) Register(a *config.Agent)
- func (m *TaskManager) RunStage(name string) error
- type TaskMeta
- type TaskRunner
- type TickerTask
- type TimerTask
Constants ¶
const ( StatusWaiting = iota StatusReady StatusRunning StatusDone StatusTerminating StautsTerminated StatusError )
Task or Stage status
const ( Ticker = "ticker" Timer = "timer" Delay = "delay" )
key of arguments in nemesis
const ( Head = "head" PARTITION = "PARTITION" SERVICE = "SERVICE" IOERROR = "IOERROR" IODELAY = "IODELAY" EMPTY = "EMPTY" )
key of head task
Variables ¶
var ( ErrNotFound = errors.New("NotFound") ErrAlreadyRunning = errors.New("AlreadyRunning") ErrDuplicated = errors.New("Duplicated") )
Errors
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
Dispatcher is responseble to transport task to agent with rpc
type EmptyTask ¶
type EmptyTask struct {
Meta *TaskMeta
}
EmptyTask is used as the header of stage and debug
func (*EmptyTask) GetLastTarget ¶
GetLastTarget implement Task
type Node ¶
type Node struct { Agent *config.Agent // agent to process NemesisID string // unique id of a instance of nemesis }
Node contains necessary info of a node
type PartitionTask ¶
type PartitionTask struct { Meta *TaskMeta TargetList []*config.Agent // Agents to be selected Arguments map[string]string // arguments to run nemesis SelectFunction func() []*config.Agent // function to select targets Partitions []Partition // parts of targets LastTargetInfo string // return by GetLastTarget }
PartitionTask is special NemesisTask which create a partition with targets
func NewPartitionTask ¶
func NewPartitionTask(selectFunction func() []*config.Agent, name string, arguments map[string]string) *PartitionTask
NewPartitionTask create a instance of partiton nemesis with specified targets
func (*PartitionTask) Execute ¶
func (t *PartitionTask) Execute(ctx context.Context) error
Execute implement Task
func (*PartitionTask) GetLastTarget ¶
func (t *PartitionTask) GetLastTarget() string
GetLastTarget implement Task
func (*PartitionTask) String ¶
func (t *PartitionTask) String() string
type ServiceTask ¶
type ServiceTask struct { Meta *TaskMeta // task meta TargetList []*Node // targets to process Arguments map[string]string // arguments to run nemesis SelectFunction func() []*config.Agent // function to select targets LastTargetInfo string // return by GetLastTarget sync.Mutex }
ServiceTask control a instance of nemesis at the service level
func NewIOTask ¶
func NewIOTask(selectFunction func() []*config.Agent, name string, arguments map[string]string, typ int) *ServiceTask
NewIOTask create a new task to control a nemesis instance
func NewServiceTask ¶
func NewServiceTask(selectFunction func() []*config.Agent, name string, arguments map[string]string, typ int) *ServiceTask
NewServiceTask create a new task to control a nemesis instance
func (*ServiceTask) Execute ¶
func (t *ServiceTask) Execute(ctx context.Context) error
Execute implement Task
func (*ServiceTask) GetLastTarget ¶
func (t *ServiceTask) GetLastTarget() string
GetLastTarget implement Task
func (*ServiceTask) String ¶
func (t *ServiceTask) String() string
type Stage ¶
type Stage struct { Name string // stage name TaskDict map[string]Task // nodes of DAG RunnerDict map[string]*TaskRunner // nodes of Runner Notify chan Task // notify to proceed the DAG Cancel func() // call back to terminate this stage Running bool // is running ? sync.Mutex }
Stage is consisted by a group of tasks with the same head. TODO Stage DAG
type Task ¶
type Task interface { GetMeta() *TaskMeta Terminate(context.Context) error Execute(context.Context) error GetLastTarget() string }
Task interface
type TaskManager ¶
type TaskManager struct { Name string // name for loggging Topology config.Topology // cluster topology RunnerDict map[string]*TaskRunner // dictionary to register runner Stop chan struct{} // stop notifier CancelFunction func() // cancel all the tasks DataDir string // dir to load stage StageDict map[string]*Stage // stage map StageConfigDict map[string]*config.Stage // stage configuration map sync.Mutex }
TaskManager build stage with tasks, currently only static stages supported TODO dynamic stages
func (*TaskManager) BuildStage ¶
func (m *TaskManager) BuildStage(c *config.Stage) (*Stage, error)
BuildStage create a stage according to config, only task DAG current TODO: Stage DAG
func (*TaskManager) BuildTask ¶
func (m *TaskManager) BuildTask(c *config.Task) Task
BuildTask create a task and select a agent for the task
func (*TaskManager) KillStage ¶
func (m *TaskManager) KillStage(name string) error
KillStage terminate stage by name
func (*TaskManager) Register ¶
func (m *TaskManager) Register(a *config.Agent)
Register try adding a agent into topology
func (*TaskManager) RunStage ¶
func (m *TaskManager) RunStage(name string) error
RunStage start a stage by name
type TaskMeta ¶
type TaskMeta struct { ID string // task unique id Type int // task type Group string // task group name Name string // task name Upstream map[string]Task // requires Downstream map[string]Task // privodes }
TaskMeta contains the base info of a task
type TaskRunner ¶
type TaskRunner struct { In chan Task // notify for upstream Out chan Task // notify for downstram Count int // upstream counter T Task // task wrapped Name string // 'TaskRunner' LasterError error // last error in execution }
TaskRunner is a wrapper to run task
func NewTaskRunner ¶
func NewTaskRunner(t Task, out chan Task) *TaskRunner
NewTaskRunner create a new wapper to check and run task
func (*TaskRunner) Run ¶
func (r *TaskRunner) Run(ctx context.Context)
Run execute the task and proceed the step in DAG
type TickerTask ¶
type TickerTask struct { T Task // inner task to run Delay time.Duration // duration for a task to run Last time.Duration // duration for a task to last Cycle time.Duration // cycle to trigger the task Cancle func() // cancle function }
TickerTask is a wrapper which setup a statefull Task with ticker
func NewTickerTask ¶
func NewTickerTask(t Task, last, cycle time.Duration) *TickerTask
NewTickerTask create a TickerTask
func (*TickerTask) Execute ¶
func (t *TickerTask) Execute(ctx context.Context) error
Execute implement Task
func (*TickerTask) GetLastTarget ¶
func (t *TickerTask) GetLastTarget() string
GetLastTarget implement Task
type TimerTask ¶
TimerTask setup a long running task with timeout
func NewTimerTask ¶
NewTimerTask setup a Task with timeout
func (*TimerTask) GetLastTarget ¶
GetLastTarget implement Task