tasklifecycle

package
v1.3.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2023 License: MPL-2.0 Imports: 7 Imported by: 1

Documentation

Overview

Package tasklifecycle manages the execution order of tasks based on their lifecycle configuration. Its main structs are the Coordinator and the Gate.

The Coordinator is used by an allocRunner to signal if a taskRunner is allowed to start or not. It does so using a set of Gates, each for a given task lifecycle configuration.

The Gate provides a channel that can be used to block its listener on demand. This is done by calling the Open() and Close() methods in the Gate which will cause activate or deactivate a producer at the other end of the channel.

The allocRunner feeds task state updates to the Coordinator that then uses this information to determine which Gates it should open or close. Each Gate is connected to a taskRunner with a matching lifecycle configuration.

In the diagrams below, a solid line from a Gate indicates that it's open (active), while a dashed line indicates that it's closed (inactive). A taskRunner connected to an open Gate is allowed to run, while one that is connected to a closed Gate is blocked.

The Open/Close control line represents the Coordinator calling the Open() and Close() methods of the Gates.

In this state, the Coordinator is allowing prestart tasks to run, while blocking the main tasks.

         ┌────────┐
         │ ALLOC  │
         │ RUNNER │
         └───┬────┘
             │
         Task state
             │
┌────────────▼────────────┐
│Current state:           │
│Prestart                 │         ┌─────────────┐
│                         │         │ TASK RUNNER │
│     ┌───────────────────┼─────────┤ (Prestart)  │
│     │                   │         └─────────────┘
│     │                   │
│     │                   │         ┌─────────────┐
│     │ COORDINATOR       │         │ TASK RUNNER │
│     │             ┌─ ─ ─┼─ ─ ─ ─┬╶┤   (Main)    │
│     │             ╷     │       ╷ └─────────────┘
│     │             ╷     │       ╷
│     │             ╷     │       ╷ ┌─────────────┐
│   Prestart       Main   │       ╷ │ TASK RUNNER │
└─────┬─┬───────────┬─┬───┘       └╶┤   (Main)    │
      │ │Open/      ╷ │Open/        └─────────────┘
      │ │Close      ╷ │Close
   ┌──┴─▼─┐      ┌──┴─▼─┐
   │ GATE │      │ GATE │
   └──────┘      └──────┘

When the prestart task completes, the allocRunner will send a new batch of task states to the Coordinator that will cause it to transition to a state where it will close the Gate for prestart tasks, blocking their execution, and will open the Gate for main tasks, allowing them to start.

         ┌────────┐
         │ ALLOC  │
         │ RUNNER │
         └───┬────┘
             │
         Task state
             │
┌────────────▼────────────┐
│Current state:           │
│Main                     │         ┌─────────────┐
│                         │         │ TASK RUNNER │
│     ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼─ ─ ─ ─ ─┤ (Prestart)  │
│     ╷                   │         └─────────────┘
│     ╷                   │
│     ╷                   │         ┌─────────────┐
│     ╷ COORDINATOR       │         │ TASK RUNNER │
│     ╷             ┌─────┼───────┬─┤   (Main)    │
│     ╷             │     │       │ └─────────────┘
│     ╷             │     │       │
│     ╷             │     │       │ ┌─────────────┐
│   Prestart       Main   │       │ │ TASK RUNNER │
└─────┼─┬───────────┬─┬───┘       └─┤   (Main)    │
      ╷ │Open/      │ │Open/        └─────────────┘
      ╷ │Close      │ │Close
   ┌──┴─▼─┐      ┌──┴─▼─┐
   │ GATE │      │ GATE │
   └──────┘      └──────┘

Diagram source: https://asciiflow.com/#/share/eJyrVspLzE1VssorzcnRUcpJrEwtUrJSqo5RqohRsjI0MDTViVGqBDKNLA2ArJLUihIgJ0ZJAQYeTdmDB8XE5CGrVHD08fF3BjPRZYJC%2Ffxcg7DIEGk6VDWyUEhicbZCcUliSSp2hfgNR6BpxCmDmelcWlSUmlcCsdkKm62%2BiZmo7kEOCOK8jtVmrGZiMVchxDHYGzXEYSpIspVUpKAREOQaHOIYFKKpgGkvjcIDp8kk2t7zaEoDcWgCmsnO%2Fv5BLp5%2BjiH%2BQVhNbkKLjyY8LtNFAyDdCgoavo6efppQ0%2FDorkETrQGypxDtrxmkmEyiK8iJ24CiVGAeKyqBGgPNVWjmYk%2FrVE7X8LhBiwtEcQRSBcT%2B%2Bs4KyK5D4pOewlFMRglfuDy6vmkoLoaL1yDLwXUquDuGuCogq4aLYDd9CnbT0V2uVKtUCwCqNQgp)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RequireTaskAllowed

func RequireTaskAllowed(t testing.T, c *Coordinator, task *structs.Task)

func RequireTaskBlocked

func RequireTaskBlocked(t testing.T, c *Coordinator, task *structs.Task)

func WaitNotInitUntil

func WaitNotInitUntil(c *Coordinator, until time.Duration, errorFunc func())

Types

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator controls when tasks with a given lifecycle configuration are allowed to start and run.

It behaves like a finite state machine where each state transition blocks or allows some task lifecycle types to run.

func NewCoordinator

func NewCoordinator(logger hclog.Logger, tasks []*structs.Task, shutdownCh <-chan struct{}) *Coordinator

NewCoordinator returns a new Coordinator with all tasks initially blocked.

func (*Coordinator) Restart

func (c *Coordinator) Restart()

Restart sets the Coordinator state back to "init" and is used to coordinate a full alloc restart. Since all tasks will run again they need to be pending before they are allowed to proceed.

func (*Coordinator) Restore

func (c *Coordinator) Restore(states map[string]*structs.TaskState)

Restore is used to set the Coordinator FSM to the correct state when an alloc is restored. Must be called before the allocrunner is running.

func (*Coordinator) StartConditionForTask

func (c *Coordinator) StartConditionForTask(task *structs.Task) <-chan struct{}

StartConditionForTask returns a channel that is unblocked when the task is allowed to run.

func (*Coordinator) TaskStateUpdated

func (c *Coordinator) TaskStateUpdated(states map[string]*structs.TaskState)

TaskStateUpdated notifies that a task state has changed. This may cause the Coordinator to transition to another state.

type Gate

type Gate struct {
	// contains filtered or unexported fields
}

Gate is used by the Coordinator to block or allow tasks from running.

It provides a channel that taskRunners listens on to determine when they are allowed to run. The Gate has an infinite loop that is either feeding this channel (therefore allowing listeners to proceed) or not doing anything (causing listeners to block an wait).

The Coordinator uses the Gate Open() and Close() methods to control this producer loop.

func NewGate

func NewGate(shutdownCh <-chan struct{}) *Gate

NewGate returns a new Gate that is initially closed. The Gate should not be used after the shutdownCh is closed.

func (*Gate) Close

func (g *Gate) Close()

Close is used to block listeners from proceeding. if the gate shutdownch channel is closed, this method is a no-op so callers should check its state.

func (*Gate) Open

func (g *Gate) Open()

Open is used to allow listeners to proceed. If the gate shutdownCh channel is closed, this method is a no-op so callers should check its state.

func (*Gate) WaitCh

func (g *Gate) WaitCh() <-chan struct{}

WaitCh returns a channel that the listener must block on before starting its task.

Callers must also check the state of the shutdownCh used to create the Gate to avoid blocking indefinitely.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL