task

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: MPL-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ByState

func ByState(i, j *Task) int

ByState sorts tasks according to the following order:

1. running (ordered by last updated desc) 2. queued (ordered by last updated asc) 3. pending (ordered by last updated asc) 4. finished (ordered by last updated desc)

func StartEnqueuer

func StartEnqueuer(ctx context.Context, tasks *Service)

func StartRunner

func StartRunner(ctx context.Context, logger logging.Interface, tasks *Service, maxTasks int) func() error

Types

type CreateOptions

type CreateOptions struct {
	// Resource that the task belongs to.
	Parent resource.Resource
	// Program command and any sub commands, e.g. plan, state rm, etc.
	Command []string
	// Args to pass to program.
	Args []string
	// Path relative to the pug working directory in which to run the command.
	Path string
	// Environment variables.
	Env []string
	// A blocking task blocks other tasks from running on the module or
	// workspace.
	Blocking bool
	// Globally exclusive task - at most only one such task can be running
	Exclusive bool
	// Set to true to indicate that the task produces JSON output
	JSON bool
	// Skip queue and immediately start task
	Immediate bool
	// Call this function after the task has successfully finished
	AfterExited func(*Task)
	// Call this function after the task is enqueued.
	AfterQueued func(*Task)
	// Call this function after the task starts running.
	AfterRunning func(*Task)
	// Call this function after the task fails with an error
	AfterError func(*Task)
	// Call this function after the task is successfully canceled
	AfterCanceled func(*Task)
	// Call this function after the task is successfully created
	AfterCreate func(*Task)
	// Call this function after the task terminates for whatever reason.
	AfterFinish func(*Task)
}

type Func

type Func func(resource.ID) (*Task, error)

Func is a function that creates a task.

type ListOptions

type ListOptions struct {
	// Filter tasks by those with a matching module path. Optional.
	Path *string
	// Filter tasks by status: match task if it has one of these statuses.
	// Optional.
	Status []Status
	// Order tasks by oldest first (true), or newest first (false)
	Oldest bool
	// Filter tasks by only those that are blocking. If false, both blocking and
	// non-blocking tasks are returned.
	Blocking bool
	// Only return those tasks that are exclusive. If false, both exclusive and
	// non-exclusive tasks are returned.
	Exclusive bool
	// Filter tasks by those with one of the following commands
	Command [][]string
	// Filter tasks by only those that have an ancestor with the given ID.
	// Defaults the zero value, which is the ID of the abstract global entity to
	// which all resources belong.
	Ancestor resource.ID
}

type Multi

type Multi []*Task

Multi is a group of tasks, each one carrying out the same underyling task but carried out on different resources, e.g. terraform init on multiple modules.

func CreateMulti

func CreateMulti(fn Func, ids ...resource.ID) (multi Multi, errs []error)

CreateMulti invokes the task creating func for each resource id. If the task func fails and returns an error, the error is added to errs.

func (Multi) Wait

func (m Multi) Wait()

type Service

type Service struct {
	Broker *pubsub.Broker[*Task]
	// contains filtered or unexported fields
}

func NewService

func NewService(opts ServiceOptions) *Service

func (*Service) Cancel

func (s *Service) Cancel(taskID resource.ID) (*Task, error)

func (*Service) Counter

func (s *Service) Counter() int

func (*Service) Create

func (s *Service) Create(opts CreateOptions) (*Task, error)

Create a task. The task is placed into a pending state and requires enqueuing before it'll be processed.

func (*Service) Delete

func (s *Service) Delete(taskID resource.ID) error

func (*Service) Enqueue

func (s *Service) Enqueue(taskID resource.ID) (*Task, error)

Enqueue moves the task onto the global queue for processing.

func (*Service) Get

func (s *Service) Get(taskID resource.ID) (*Task, error)

func (*Service) List

func (s *Service) List(opts ListOptions) []*Task

func (*Service) Subscribe

func (s *Service) Subscribe(ctx context.Context) <-chan resource.Event[*Task]

type ServiceOptions

type ServiceOptions struct {
	Program string
	Logger  logging.Interface
	Workdir internal.Workdir
}

type Status

type Status string

Status is a stage in the lifecycle of a task.

const (
	Pending  Status = "pending"
	Queued   Status = "queued"
	Running  Status = "running"
	Exited   Status = "exited"
	Errored  Status = "errored"
	Canceled Status = "canceled"
)

type Task

type Task struct {
	resource.Resource

	Command   []string
	Args      []string
	Path      string
	Blocking  bool
	State     Status
	Env       []string
	JSON      bool
	Immediate bool

	Created time.Time
	Updated time.Time

	// Nil until task finishes with an error
	Err error

	// Call this function after the task has successfully finished
	AfterExited func(*Task)
	// Call this function after the task is enqueued.
	AfterQueued func(*Task)
	// Call this function after the task starts running.
	AfterRunning func(*Task)
	// Call this function after the task fails with an error
	AfterError func(*Task)
	// Call this function after the task is successfully canceled
	AfterCanceled func(*Task)
	// Call this function after the task terminates for whatever reason.
	AfterFinish func(*Task)
	// contains filtered or unexported fields
}

Task is an execution of a CLI program.

func (*Task) CommandString

func (t *Task) CommandString() string

func (*Task) Elapsed

func (t *Task) Elapsed(s Status) time.Duration

Elapsed returns the length of time the task has been in the given status.

func (*Task) IsActive

func (t *Task) IsActive() bool

func (*Task) IsFinished

func (t *Task) IsFinished() bool

func (*Task) LogValue

func (t *Task) LogValue() slog.Value

func (*Task) NewReader

func (t *Task) NewReader() io.Reader

NewReader provides a reader from which to read the task output from start to end.

func (*Task) Wait

func (t *Task) Wait() error

Wait for task to complete successfully. If the task completes unsuccessfully then the returned error is non-nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL