task

package
v0.0.0-...-034a78d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2019 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrControllerClosed       = fmt.Errorf("coordinator/task: controller closed")
	ErrMaxTasksLimitExceeded  = fmt.Errorf("coordinator/task: tasks number can not greater than %d", maxTasksLimit)
	ErrTaskNameAlreadyExisted = fmt.Errorf("coordinator/task: task name already existed")
)

Functions

This section is empty.

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller is responsible for submitting tasks, noticing responding when task status changes.

TODO(damnever): API to notify task status changes.

  • we can simply watch the key: /task-coordinator/<version>/status/kinds/<task-kind>/names/<task-name>

func NewController

func NewController(ctx context.Context, cli state.Repository) *Controller

NewController creates a new controller.

func (*Controller) Close

func (c *Controller) Close() error

Close shutdown Controller.

func (*Controller) Submit

func (c *Controller) Submit(kind Kind, name string, params []ControllerTaskParam) error

Submit submits a task with params and node ids, a readable name with context information is recommended.

type ControllerTaskParam

type ControllerTaskParam struct {
	NodeID string
	Params ToBytes
}

ControllerTaskParam is the param for tasks, the node id is the unique id for node.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor executes tasks on node.

func NewExecutor

func NewExecutor(ctx context.Context, node *models.Node, cli state.Repository) *Executor

NewExecutor creates a new Executor, the keypfx must be the same as Controller's.

func (*Executor) Close

func (e *Executor) Close() error

Close closes Executor.

func (*Executor) Register

func (e *Executor) Register(procs ...Processor)

Register must be called before Run.

func (*Executor) Run

func (e *Executor) Run()

Run must be called after Register, otherwise it may panic, O(∩_∩)O~.

type Kind

type Kind string

Kind is the task kind.

type Processor

type Processor interface {
	Kind() Kind
	RetryCount() int
	RetryBackOff() time.Duration
	Concurrency() int
	Process(ctx context.Context, task Task) error
}

Processor is responsible for process actual tasks. The caller must ensure Process func is goroutine safe if it changes the shared state.

type State

type State uint8

State represents the task state.

const (
	StateCreated State = iota
	StateRunning
	StateDoneOK
	StateDoneErr
)

func (State) String

func (st State) String() string

type Task

type Task struct {
	Kind     Kind            `json:"kind"`
	Name     string          `json:"name"`
	Executor string          `json:"executor"`
	Params   json.RawMessage `json:"params"`
	State    State           `json:"state"`
	ErrMsg   string          `json:"err_msg,omitempty"`
}

Task is the actual task with parameters and status.

func (Task) UnsafeMarshal

func (t Task) UnsafeMarshal() []byte

UnsafeMarshal marshals itself by JSON encoder, it will panic if error occurs.

func (*Task) UnsafeUnmarshal

func (t *Task) UnsafeUnmarshal(data []byte)

UnsafeUnmarshal unmarshals itself by JSON decoder, it will panic if error occurs.

type ToBytes

type ToBytes interface {
	Bytes() []byte
}

ToBytes can convert itself into bytes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL