task

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2024 License: BSD-3-Clause Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNonRetriable = errors.New("non-retriable error")
View Source
var PipeModes = [...]PipeMode{
	PipeOrdered{},
	PipeParallel{},
	PipeRace{},
	PipeAny{},
}

PipeModes is a list of all pipe modes.

Functions

func NonRetriable

func NonRetriable(err error) error

Types

type Controller

type Controller interface {
	Worker
	Report(Report)
	Stopping() <-chan struct{}
	Launch(ctx context.Context, subtask Task, modifiers ...func(*Options)) <-chan error
}

type ITask

type ITask interface {
	Launch(Controller) (exitReason <-chan error)
}

type Options

type Options struct {
	RetryDisabled     bool                  `json:"retry_disabled,omitempty"`      // If set no restarts will be attempted if the process fails to start initially.
	RetryBackoff      util.ParsableDuration `json:"retry_backoff,omitempty"`       // The time to wait between retries.
	RetryBackoffScale float64               `json:"retry_backoff_scale,omitempty"` // The scale factor to apply to the retry backoff.
	RetrySuccess      bool                  `json:"retry_success,omitempty"`       // If set the process will be restarted even if it exits with status 0.
	RetryLimit        util.TimerRate        `json:"retry_limit,omitempty"`         // Maximum number of consequtive restarts within the period before the process is considered "errored".
	MaxMemory         util.ParsableSize     `json:"max_memory,omitempty"`          // Maximum amount of memory the process is allowed to use, <= 0 means unlimited.
	MinUptime         util.ParsableDuration `json:"min_uptime,omitempty"`          // Minimum uptime of the process before it is considered "started", <= 0 means immediate.
	ExecTimeout       util.ParsableDuration `json:"exec_timeout,omitempty"`        // The time to wait for a process to exit before killing it, <= 0 means never.
	StartTimeout      util.ParsableDuration `json:"start_timeout,omitempty"`       // The time to wait for a process to start before killing it, <= 0 means never.
	StopTimeout       util.ParsableDuration `json:"stop_timeout"`                  // The time to wait for a process to stop before killing it, <= 0 means immediate.
}

func (*Options) WithDefaults

func (o *Options) WithDefaults()

type Pipe

type Pipe struct {
	Mode     PipeMode `json:"-"`
	Subtasks []Task   `json:"sub"`
}

func (*Pipe) Launch

func (p *Pipe) Launch(controller Controller) (exitReason <-chan error)

type PipeAny

type PipeAny struct{}

pipe:any runs all subtasks in parallel, and cancels the rest if any of them succeeds.

func (PipeAny) String

func (PipeAny) String() string

type PipeMode

type PipeMode interface {
	String() string
	// contains filtered or unexported methods
}

type PipeOrdered

type PipeOrdered struct{}

pipe:ordered runs all subtasks one after another, and cancels the rest if any of them fails.

func (PipeOrdered) String

func (PipeOrdered) String() string

type PipeParallel

type PipeParallel struct{}

func (PipeParallel) String

func (PipeParallel) String() string

type PipeRace

type PipeRace struct{}

pipe:race runs all subtasks in parallel, and cancels the rest if any of them succeeds or fails.

func (PipeRace) String

func (PipeRace) String() string

type Report

type Report struct {
	Pid        []int32   `json:"pid"`
	Cpu        float64   `json:"cpu"`
	Mem        float64   `json:"mem"`
	Username   string    `json:"usr"`
	CreateTime time.Time `json:"create_time"`
}

func InspectProcess

func InspectProcess(proc *process.Process) (r Report)

func (Report) IsZero

func (r Report) IsZero() bool

func (Report) String

func (r Report) String() string

type Status

type Status uint8

Status code for session.

const (
	FlagTransition Status = 0x20
	FlagAlive      Status = 0x40
	FlagError      Status = 0x80
)
const (
	Idle         Status = iota
	Complete     Status = iota
	Cancelled    Status = iota
	Starting     Status = iota | FlagAlive | FlagTransition
	Stopping     Status = iota | FlagAlive | FlagTransition
	Running      Status = iota | FlagAlive
	Retrying     Status = iota | FlagAlive
	Errored      Status = iota | FlagError
	TimeoutStop  Status = iota | FlagError
	TimeoutStart Status = iota | FlagError
	TimeoutExec  Status = iota | FlagError
)

func StatusFromErr

func StatusFromErr(err error) Status

func StatusFromName

func StatusFromName(name string) Status

func (Status) Error

func (s Status) Error() string

func (Status) Icon

func (s Status) Icon() string

func (Status) IsAlive

func (s Status) IsAlive() bool

func (Status) IsDead

func (s Status) IsDead() bool

func (Status) IsError

func (s Status) IsError() bool

func (Status) IsTransition

func (s Status) IsTransition() bool

func (Status) Join

func (s Status) Join(err error) error

func (Status) MarshalJSON

func (s Status) MarshalJSON() ([]byte, error)

func (Status) String

func (s Status) String() string

func (*Status) UnmarshalJSON

func (s *Status) UnmarshalJSON(data []byte) (err error)

type StatusOrError

type StatusOrError = error

type Task

type Task struct {
	ITask
	automarshal.ID
}

func (Task) MarshalJSON

func (t Task) MarshalJSON() ([]byte, error)

func (*Task) UnmarshalJSON

func (t *Task) UnmarshalJSON(data []byte) error

type TaskEx

type TaskEx interface {
	LaunchEx(ctx context.Context, options Options) Worker
}

type TaskFetch

type TaskFetch struct {
	Url    string `json:"url"`
	Method string `json:"method"`
	Body   string `json:"body"`
}

func (*TaskFetch) Launch

func (h *TaskFetch) Launch(ctx Controller) <-chan error

func (*TaskFetch) UnmarshalInline

func (h *TaskFetch) UnmarshalInline(text string) (err error)

type TaskNoop

type TaskNoop struct{}

func (TaskNoop) Launch

func (t TaskNoop) Launch(ctx Controller) <-chan error

type TaskRun

type TaskRun struct {
	Foreign string            `json:"-"`               // The foreign language to run.
	Exec    string            `json:"exec,omitempty"`  // The executable used to run the script.
	Args    []string          `json:"args,omitempty"`  // Arguments passed.
	Cwd     string            `json:"cwd"`             // Working directory.
	Env     map[string]string `json:"env,omitempty"`   // The environment variables to set.
	N       int               `json:"n,omitempty"`     // The number of instances to launch, >1 will run as cluster with special env.
	Proxy   *revproxy.Options `json:"proxy,omitempty"` // The proxy options.

}

func (*TaskRun) Launch

func (h *TaskRun) Launch(ctx Controller) <-chan error

func (*TaskRun) UnmarshalInline

func (h *TaskRun) UnmarshalInline(text string) (err error)

func (*TaskRun) WithDefaults

func (t *TaskRun) WithDefaults()

type TaskWait

type TaskWait struct {
	Duration util.ParsableDuration `json:"duration"`
}

func (*TaskWait) Launch

func (h *TaskWait) Launch(ctx Controller) <-chan error

func (*TaskWait) UnmarshalInline

func (h *TaskWait) UnmarshalInline(text string) (err error)

type TaskWithOpts

type TaskWithOpts interface {
	Configure(*Options)
}

type Whiteboard

type Whiteboard struct {
	// contains filtered or unexported fields
}

func NewWhiteboard

func NewWhiteboard() Whiteboard

func WhiteboardFromContext

func WhiteboardFromContext(ctx context.Context) Whiteboard

func (Whiteboard) Clear

func (w Whiteboard) Clear()

func (Whiteboard) Fork

func (w Whiteboard) Fork(key string) Whiteboard

func (Whiteboard) Get

func (w Whiteboard) Get(key string, out any) error

func (Whiteboard) Set

func (w Whiteboard) Set(key string, value any)

func (Whiteboard) WithContext

func (w Whiteboard) WithContext(ctx context.Context) context.Context

type Worker

type Worker interface {
	Task() Task
	Options() Options
	Namespace() string
	Logger() *clog.Logger
	Status() StatusOrError
	Whiteboard() *Whiteboard

	context.Context
	Inspect() Report
	Run() error
	Stop()
	Kill()
	Traverse(func(Worker) bool)
}

func NewWorker

func NewWorker(ctx context.Context, task Task, options Options) (w Worker)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL