control

package
v0.0.0-...-f78b0ad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2020 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusWaiting = iota
	StatusReady
	StatusRunning
	StatusDone
	StatusTerminating
	StautsTerminated
	StatusError
)

Task or Stage status

View Source
const (
	Ticker = "ticker"
	Timer  = "timer"
	Delay  = "delay"
)

key of arguments in nemesis

View Source
const (
	Head      = "head"
	PARTITION = "PARTITION"
	SERVICE   = "SERVICE"
	IOERROR   = "IOERROR"
	IODELAY   = "IODELAY"
	EMPTY     = "EMPTY"
)

key of head task

Variables

View Source
var (
	ErrNotFound       = errors.New("NotFound")
	ErrAlreadyRunning = errors.New("AlreadyRunning")
	ErrDuplicated     = errors.New("Duplicated")
)

Errors

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	Dispatch(t *Task) error
}

Dispatcher is responseble to transport task to agent with rpc

type EmptyTask

type EmptyTask struct {
	Meta *TaskMeta
}

EmptyTask is used as the header of stage and debug

func NewEmptyTask

func NewEmptyTask(name string) *EmptyTask

NewEmptyTask create a new empty task

func (*EmptyTask) Execute

func (t *EmptyTask) Execute(context.Context) error

Execute implement Task

func (*EmptyTask) GetLastTarget

func (t *EmptyTask) GetLastTarget() string

GetLastTarget implement Task

func (*EmptyTask) GetMeta

func (t *EmptyTask) GetMeta() *TaskMeta

GetMeta implement Task

func (*EmptyTask) Terminate

func (t *EmptyTask) Terminate(context.Context) error

Terminate implement Task

type Node

type Node struct {
	Agent     *config.Agent // agent to process
	NemesisID string        // unique id of a instance of nemesis
}

Node contains necessary info of a node

func (*Node) GetAddress

func (n *Node) GetAddress() string

GetAddress return the address

type Partition

type Partition []*Node

Partition contains parts of the targets

func (Partition) String

func (p Partition) String() string

type PartitionTask

type PartitionTask struct {
	Meta           *TaskMeta
	TargetList     []*config.Agent        // Agents to be selected
	Arguments      map[string]string      // arguments to run nemesis
	SelectFunction func() []*config.Agent // function to select targets
	Partitions     []Partition            // parts of targets
	LastTargetInfo string                 // return by GetLastTarget
}

PartitionTask is special NemesisTask which create a partition with targets

func NewPartitionTask

func NewPartitionTask(selectFunction func() []*config.Agent, name string, arguments map[string]string) *PartitionTask

NewPartitionTask create a instance of partiton nemesis with specified targets

func (*PartitionTask) Execute

func (t *PartitionTask) Execute(ctx context.Context) error

Execute implement Task

func (*PartitionTask) GetLastTarget

func (t *PartitionTask) GetLastTarget() string

GetLastTarget implement Task

func (*PartitionTask) GetMeta

func (t *PartitionTask) GetMeta() *TaskMeta

GetMeta implement Task

func (*PartitionTask) String

func (t *PartitionTask) String() string

func (*PartitionTask) Terminate

func (t *PartitionTask) Terminate(ctx context.Context) error

Terminate implement Task

type ServiceTask

type ServiceTask struct {
	Meta           *TaskMeta              // task meta
	TargetList     []*Node                // targets to process
	Arguments      map[string]string      // arguments to run nemesis
	SelectFunction func() []*config.Agent // function to select targets
	LastTargetInfo string                 // return by GetLastTarget
	sync.Mutex
}

ServiceTask control a instance of nemesis at the service level

func NewIOTask

func NewIOTask(selectFunction func() []*config.Agent, name string, arguments map[string]string, typ int) *ServiceTask

NewIOTask create a new task to control a nemesis instance

func NewServiceTask

func NewServiceTask(selectFunction func() []*config.Agent, name string, arguments map[string]string, typ int) *ServiceTask

NewServiceTask create a new task to control a nemesis instance

func (*ServiceTask) Execute

func (t *ServiceTask) Execute(ctx context.Context) error

Execute implement Task

func (*ServiceTask) GetLastTarget

func (t *ServiceTask) GetLastTarget() string

GetLastTarget implement Task

func (*ServiceTask) GetMeta

func (t *ServiceTask) GetMeta() *TaskMeta

GetMeta implement Task

func (*ServiceTask) String

func (t *ServiceTask) String() string

func (*ServiceTask) Terminate

func (t *ServiceTask) Terminate(ctx context.Context) error

Terminate implement Task

type Stage

type Stage struct {
	Name       string                 // stage name
	TaskDict   map[string]Task        // nodes of DAG
	RunnerDict map[string]*TaskRunner // nodes of Runner
	Notify     chan Task              // notify to proceed the DAG
	Cancel     func()                 // call back to terminate this stage
	Running    bool                   // is running ?
	sync.Mutex
}

Stage is consisted by a group of tasks with the same head. TODO Stage DAG

func NewStage

func NewStage(name string) *Stage

NewStage create a new stage

func (*Stage) Kill

func (s *Stage) Kill() error

Kill terminates the stage

func (*Stage) Run

func (s *Stage) Run(ctx context.Context) error

Run starts the stage

type Task

type Task interface {
	GetMeta() *TaskMeta
	Terminate(context.Context) error
	Execute(context.Context) error
	GetLastTarget() string
}

Task interface

func WithTicker

func WithTicker(t Task, last, cycle time.Duration) Task

WithTicker wrap a task with ticker

type TaskManager

type TaskManager struct {
	Name            string                   // name for loggging
	Topology        config.Topology          // cluster topology
	RunnerDict      map[string]*TaskRunner   // dictionary to register runner
	Stop            chan struct{}            // stop notifier
	CancelFunction  func()                   // cancel all the tasks
	DataDir         string                   // dir to load stage
	StageDict       map[string]*Stage        // stage map
	StageConfigDict map[string]*config.Stage // stage configuration map
	sync.Mutex
}

TaskManager build stage with tasks, currently only static stages supported TODO dynamic stages

func NewTaskManager

func NewTaskManager() *TaskManager

NewTaskManager create a task manager

func (*TaskManager) BuildStage

func (m *TaskManager) BuildStage(c *config.Stage) (*Stage, error)

BuildStage create a stage according to config, only task DAG current TODO: Stage DAG

func (*TaskManager) BuildTask

func (m *TaskManager) BuildTask(c *config.Task) Task

BuildTask create a task and select a agent for the task

func (*TaskManager) Init

func (m *TaskManager) Init() error

Init setup task manager

func (*TaskManager) KillStage

func (m *TaskManager) KillStage(name string) error

KillStage terminate stage by name

func (*TaskManager) Register

func (m *TaskManager) Register(a *config.Agent)

Register try adding a agent into topology

func (*TaskManager) RunStage

func (m *TaskManager) RunStage(name string) error

RunStage start a stage by name

type TaskMeta

type TaskMeta struct {
	ID         string          // task unique id
	Type       int             // task type
	Group      string          // task group name
	Name       string          // task name
	Upstream   map[string]Task // requires
	Downstream map[string]Task // privodes
}

TaskMeta contains the base info of a task

type TaskRunner

type TaskRunner struct {
	In          chan Task // notify for upstream
	Out         chan Task // notify for downstram
	Count       int       // upstream counter
	T           Task      // task wrapped
	Name        string    // 'TaskRunner'
	LasterError error     // last error in execution
}

TaskRunner is a wrapper to run task

func NewTaskRunner

func NewTaskRunner(t Task, out chan Task) *TaskRunner

NewTaskRunner create a new wapper to check and run task

func (*TaskRunner) Run

func (r *TaskRunner) Run(ctx context.Context)

Run execute the task and proceed the step in DAG

type TickerTask

type TickerTask struct {
	T      Task          // inner task to run
	Delay  time.Duration // duration for a task to run
	Last   time.Duration // duration for a task to last
	Cycle  time.Duration // cycle to trigger the task
	Cancle func()        // cancle function
}

TickerTask is a wrapper which setup a statefull Task with ticker

func NewTickerTask

func NewTickerTask(t Task, last, cycle time.Duration) *TickerTask

NewTickerTask create a TickerTask

func (*TickerTask) Execute

func (t *TickerTask) Execute(ctx context.Context) error

Execute implement Task

func (*TickerTask) GetLastTarget

func (t *TickerTask) GetLastTarget() string

GetLastTarget implement Task

func (*TickerTask) GetMeta

func (t *TickerTask) GetMeta() *TaskMeta

GetMeta implement Task

func (*TickerTask) Terminate

func (t *TickerTask) Terminate(ctx context.Context) error

Terminate implement Task

type TimerTask

type TimerTask struct {
	T       Task
	Timeout time.Duration
	Cancel  func()
}

TimerTask setup a long running task with timeout

func NewTimerTask

func NewTimerTask(t Task, timeout time.Duration) *TimerTask

NewTimerTask setup a Task with timeout

func (*TimerTask) Execute

func (t *TimerTask) Execute(ctx context.Context) error

Execute implement Task

func (*TimerTask) GetLastTarget

func (t *TimerTask) GetLastTarget() string

GetLastTarget implement Task

func (*TimerTask) GetMeta

func (t *TimerTask) GetMeta() *TaskMeta

GetMeta implement Task

func (*TimerTask) Terminate

func (t *TimerTask) Terminate(ctx context.Context) error

Terminate implement Task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL