task

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: MPL-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ByState

func ByState(i, j *Task) int

ByState sorts tasks according to the following order:

1. running (ordered by last updated desc) 2. queued (ordered by last updated asc) 3. pending (ordered by last updated asc) 4. finished (ordered by last updated desc)

func SortGroupsByCreated added in v0.2.0

func SortGroupsByCreated(i, j *Group) int

func StartEnqueuer

func StartEnqueuer(tasks *Service)

func StartRunner

func StartRunner(ctx context.Context, logger logging.Interface, tasks *Service, maxTasks int) func()

StartRunner starts the task runner and returns a function that waits for running tasks to finish.

Types

type Dependencies added in v0.5.2

type Dependencies struct {
	ModuleIDs []resource.ID
	// InverseDependencyOrder inverts the order of module dependencies, i.e. if
	// module A depends on module B, then a task specified for module B will
	// only be started once any tasks specified on module A have completed. This
	// is useful when carrying out a `terraform destroy`. All specs in the task
	// group must set InverseDependencyOrder to the same value otherwise an
	// error is raised.
	InverseDependencyOrder bool
}

Dependencies specifies that the task respect its module's dependencies: any tasks belonging to the its module's dependencies must have finished successfully before this task can be started. This only makes sense in the context of a task group, in which multiple tasks are created.

type Execution added in v0.5.2

type Execution struct {
	// Program to execute. Defaults to the `program` pug config option.
	Program string
	// Terraform command, including sub commands, e.g. plan, state rm, etc.
	// Ignored if Program is non-empty.
	TerraformCommand []string
	// Args to pass to program.
	Args []string
}

Execution specifies the program and arguments to execute

type Group added in v0.2.0

type Group struct {
	resource.ID

	Created      time.Time
	Command      string
	Tasks        []*Task
	CreateErrors []error
}

func (*Group) Errored added in v0.2.0

func (g *Group) Errored() int

func (*Group) Exited added in v0.2.0

func (g *Group) Exited() int

func (*Group) Finished added in v0.2.0

func (g *Group) Finished() int

func (*Group) IncludesTask added in v0.2.0

func (g *Group) IncludesTask(taskID resource.ID) bool

func (*Group) String added in v0.2.0

func (g *Group) String() string

type Identifier added in v0.5.2

type Identifier string

Identifier uniquely identifies the type of task.

type ListOptions

type ListOptions struct {
	// Filter tasks by those with a matching module path. Optional.
	Path *string
	// Filter tasks by status: match task if it has one of these statuses.
	// Optional.
	Status []Status
	// Order tasks by oldest first (true), or newest first (false)
	Oldest bool
	// Filter tasks by only those that are blocking. If false, both blocking and
	// non-blocking tasks are returned.
	Blocking bool
	// Only return those tasks that are exclusive. If false, both exclusive and
	// non-exclusive tasks are returned.
	Exclusive bool
}

type Service

type Service struct {
	TaskBroker  *pubsub.Broker[*Task]
	GroupBroker *pubsub.Broker[*Group]
	// contains filtered or unexported fields
}

func NewService

func NewService(opts ServiceOptions) *Service

func (*Service) AddGroup added in v0.3.4

func (s *Service) AddGroup(group *Group)

AddGroup adds a task group to the DB.

func (*Service) Cancel

func (s *Service) Cancel(taskID resource.ID) (*Task, error)

func (*Service) Counter

func (s *Service) Counter() int

func (*Service) Create

func (s *Service) Create(spec Spec) (*Task, error)

Create a task. The task is placed into a pending state and requires enqueuing before it'll be processed.

func (*Service) CreateGroup added in v0.2.0

func (s *Service) CreateGroup(specs ...Spec) (*Group, error)

Create a task group from one or more task specs. An error is returned if zero specs are provided, or if it fails to create at least one task.

func (*Service) Delete

func (s *Service) Delete(taskID resource.ID) error

func (*Service) Enqueue

func (s *Service) Enqueue(taskID resource.ID) (*Task, error)

Enqueue moves the task onto the global queue for processing.

func (*Service) Get

func (s *Service) Get(taskID resource.ID) (*Task, error)

func (*Service) GetGroup added in v0.3.4

func (s *Service) GetGroup(groupID resource.ID) (*Group, error)

func (*Service) List

func (s *Service) List(opts ListOptions) []*Task

func (*Service) ListGroups added in v0.2.0

func (s *Service) ListGroups() []*Group

type ServiceOptions

type ServiceOptions struct {
	Program    string
	Logger     logging.Interface
	Workdir    internal.Workdir
	UserEnvs   []string
	UserArgs   []string
	Terragrunt bool
}

type Spec added in v0.5.0

type Spec struct {
	// ModuleID is the ID of the module the task belongs to. If nil, the task
	// does not belong to a module
	ModuleID *resource.ID
	// WorkspaceID is the ID of the workspace the task belongs to. If nil, the
	// task does not belong to a workspace.
	WorkspaceID *resource.ID
	// Execution specifies the execution of a program.
	Execution Execution
	// AdditionalExecution specifies the execution of another program. The
	// program is only executed if the first program exits successfully.
	AdditionalExecution *Execution
	// Identifier uniquely identifies the type of task.
	Identifier Identifier
	// Path relative to the pug working directory in which to run the command.
	Path string
	// Environment variables.
	Env []string
	// A blocking task blocks other tasks from running on the module or
	// workspace.
	Blocking bool
	// Globally exclusive task - at most only one such task can be running
	Exclusive bool
	// Set to true to indicate that the task produces JSON output
	JSON bool
	// Skip queue and immediately start task
	Immediate bool
	// Wait blocks until the task has finished
	Wait bool
	// Description assigns an optional description to the task to display to the
	// user, overriding the default of displaying the command.
	Description string
	// Call this function before the task has successfully finished. The
	// returned string sets the task summary, and the error, if non-nil, deems
	// the task to have failed and places the task into an errored state.
	BeforeExited func(*Task) (Summary, error)
	// Call this function after the task has successfully finished
	AfterExited func(*Task)
	// Call this function after the task is enqueued.
	AfterQueued func(*Task)
	// Call this function after the task starts running.
	AfterRunning func(*Task)
	// Call this function after the task fails with an error
	AfterError func(*Task)
	// Call this function after the task is successfully canceled
	AfterCanceled func(*Task)
	// Call this function after the task is successfully created
	AfterCreate func(*Task)
	// Call this function after the task terminates for whatever reason.
	AfterFinish func(*Task)
	// Dependencies specifies that the task respect its module's dependencies.
	// Only makes sense when the task is specified as part of a task group. All
	// specs in the task group must set Dependencies to either nil, or to
	// non-nil.
	Dependencies *Dependencies
	// contains filtered or unexported fields
}

Spec is a specification for creating a task.

type SpecFunc added in v0.5.0

type SpecFunc func(resource.ID) (Spec, error)

SpecFunc is a function that creates a spec.

type Status

type Status string

Status is a stage in the lifecycle of a task.

const (
	Pending  Status = "pending"
	Queued   Status = "queued"
	Running  Status = "running"
	Exited   Status = "exited"
	Errored  Status = "errored"
	Canceled Status = "canceled"

	MaxStatusLen = len(Canceled)
)

func (Status) IsFinal added in v0.5.0

func (s Status) IsFinal() bool

IsFinal returns true if the state is a final state.

type Summary added in v0.5.0

type Summary interface {
	String() string
}

Summary summarises the outcome of a task.

type Task

type Task struct {
	resource.ID

	ModuleID            *resource.ID
	WorkspaceID         *resource.ID
	Identifier          Identifier
	Program             string
	Args                []string
	AdditionalExecution *Execution
	Path                string
	Blocking            bool
	State               Status
	JSON                bool
	Immediate           bool
	AdditionalEnv       []string
	DependsOn           []resource.ID
	// Summary summarises the outcome of a task to the end-user.
	Summary     Summary
	Description string

	Created time.Time
	Updated time.Time

	// Nil until task finishes with an error
	Err error

	// Retain a copy of the Spec used to originally create the task so that
	// the task can be retried.
	Spec Spec

	AfterCreate   func(*Task)
	AfterQueued   func(*Task)
	AfterRunning  func(*Task)
	BeforeExited  func(*Task) (Summary, error)
	AfterExited   func(*Task)
	AfterError    func(*Task)
	AfterCanceled func(*Task)
	AfterFinish   func(*Task)
	// contains filtered or unexported fields
}

Task is an execution of a CLI program.

func (*Task) Elapsed

func (t *Task) Elapsed(s Status) time.Duration

Elapsed returns the length of time the task has been in the given status.

func (*Task) IsActive

func (t *Task) IsActive() bool

func (*Task) LogValue

func (t *Task) LogValue() slog.Value

func (*Task) NewReader

func (t *Task) NewReader(combined bool) io.Reader

NewReader returns a reader which contains what has been written thus far to the task buffer. Set combined to true to receieve stderr as well as stdout.

func (*Task) NewStreamer added in v0.5.2

func (t *Task) NewStreamer() <-chan []byte

NewStreamer returns a stream of output from the task; the channel is closed when the task has finished.

func (*Task) String added in v0.2.0

func (t *Task) String() string

func (*Task) Wait

func (t *Task) Wait() error

Wait for task to complete successfully. If the task completes unsuccessfully then the returned error is non-nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL