workflow

package
v0.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Sequential means that the tasks will run sequentially.
	Sequential level = iota
	//Parallel means that the tasks will run in parallel.
	Parallel
)
View Source
const HTTPHeaderRedactedMessage string = "*** redacted by sanitizeRequestHeader() ***"

Variables

This section is empty.

Functions

func AvailableFactories

func AvailableFactories() map[string]bool

AvailableFactories returns a map with the names of the available factories as keys and 'true' as value.

func Register

func Register(factoryName string, factory Factory)

Register lets implementations register Factory objects. Typically called at init() time.

func StartManager

func StartManager(m *Manager) (*sync.WaitGroup, context.Context, context.CancelFunc)

StartManager starts a manager. This function should only be used for tests purposes.

func Unregister

func Unregister(name string)

Unregister removes a factory object. Typically called from a flag to remove dangerous workflows.

func VerifyAllTasksDone

func VerifyAllTasksDone(ctx context.Context, ts *topo.Server, uuid string) error

VerifyAllTasksDone checks that all tasks are done in a workflow. This should only be used for test purposes.

Types

type Action

type Action struct {
	Name    string      `json:"name"`
	State   ActionState `json:"state,omitempty"`
	Style   ActionStyle `json:"style,omitempty"`
	Message string      `json:"message"`
}

Action must match node.ts Action.

type ActionListener

type ActionListener interface {
	// Action is called when the user requests an action on a node.
	// 'path' is the node's Path value and 'name' is the invoked action's name.
	Action(ctx context.Context, path, name string) error
}

ActionListener is an interface for receiving notifications about actions triggered from workflow UI.

type ActionParameters

type ActionParameters struct {
	// Path is the path of the Node the action was performed on.
	Path string `json:"path"`

	// Name is the Name of the Action.
	Name string `json:"name"`
}

ActionParameters describe an action initiated by the user.

type ActionState

type ActionState int

ActionState constants need to match node.ts.ActionState.

const (
	// ActionStateUnknown is an unknown value and should never be set.
	ActionStateUnknown ActionState = 0

	// ActionStateEnabled is for when the action is enabled.
	ActionStateEnabled ActionState = 1

	// ActionStateDisabled is for when the action is disabled.
	ActionStateDisabled ActionState = 2
)

type ActionStyle

type ActionStyle int

ActionStyle constants need to match node.ts.ActionStyle.

const (
	// ActionStyleUnknown is an unknown value and should never be set.
	ActionStyleUnknown ActionStyle = 0

	// ActionStyleNormal will just trigger the action.
	ActionStyleNormal ActionStyle = 1

	// ActionStyleWarning will display a warning dialog to confirm
	// action with Action.Message.
	ActionStyleWarning ActionStyle = 2

	// ActionStyleWaiting highlights to the user that the process
	// is waiting on the execution of the action.
	ActionStyleWaiting ActionStyle = 3

	// ActionStyleTriggered is a state where the button is greyed
	// out and cannot be pressed.
	ActionStyleTriggered ActionStyle = 4
)

type CheckpointWriter

type CheckpointWriter struct {
	// contains filtered or unexported fields
}

CheckpointWriter saves the checkpoint data into topology server.

func NewCheckpointWriter

func NewCheckpointWriter(ts *topo.Server, checkpoint *workflowpb.WorkflowCheckpoint, wi *topo.WorkflowInfo) *CheckpointWriter

NewCheckpointWriter creates a CheckpointWriter.

func (*CheckpointWriter) UpdateTask

func (c *CheckpointWriter) UpdateTask(taskID string, status workflowpb.TaskState, err error) error

UpdateTask updates the task status in the checkpointing copy and saves the full checkpoint to the topology server.

type Factory

type Factory interface {
	// Init initializes the private parts of the workflow object.
	// The passed in workflow will have its Uuid, FactoryName and State
	// variable filled it. This Init method should fill in the
	// Name and Data attributes, based on the provided args.
	// This is called during the Manager.Create phase and will initially
	// checkpoint the workflow in the topology.
	// The Manager object is passed to Init method since the resharding workflow
	// will use the topology server in Manager.
	Init(m *Manager, w *workflowpb.Workflow, args []string) error

	// Instantiate loads a workflow from the proto representation
	// into an in-memory Workflow object. rootNode is the root UI node
	// representing the workflow.
	Instantiate(m *Manager, w *workflowpb.Workflow, rootNode *Node) (Workflow, error)
}

Factory can create the initial version of a Workflow, or instantiate them from a serialized version.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is the main Workflow manager object. Its management API allows it to create, start and stop workflows.

func NewManager

func NewManager(ts *topo.Server) *Manager

NewManager creates an initialized Manager.

func (*Manager) Create

func (m *Manager) Create(ctx context.Context, factoryName string, args []string) (string, error)

Create creates a workflow from the given factory name with the provided args. Returns the unique UUID of the workflow. The workflowpb.Workflow object is saved in the topo server after creation.

func (*Manager) Delete

func (m *Manager) Delete(ctx context.Context, uuid string) error

Delete deletes the finished or not started workflow.

func (*Manager) HandleHTTPLongPolling

func (m *Manager) HandleHTTPLongPolling(pattern string)

HandleHTTPLongPolling registers the streaming-over-HTTP APIs.

func (*Manager) HandleHTTPWebSocket

func (m *Manager) HandleHTTPWebSocket(pattern string)

HandleHTTPWebSocket registers the WebSocket handler.

func (*Manager) NodeManager

func (m *Manager) NodeManager() *NodeManager

NodeManager returns the NodeManager used by the Manager. It is meant to be used by the running workflows.

func (*Manager) Run

func (m *Manager) Run(ctx context.Context)

Run is the main entry point for the Manager. It will read each checkpoint from the topo Server, and for the ones that are in the Running state, will load them in memory and run them. It will not return until ctx is canceled.

func (*Manager) SetRedirectFunc

func (m *Manager) SetRedirectFunc(rf func() (string, error))

SetRedirectFunc sets the redirect function to use.

func (*Manager) SetSanitizeHTTPHeaders added in v0.14.0

func (m *Manager) SetSanitizeHTTPHeaders(to bool)

SetSanitizeHTTPHeaders - toggles m.sanitizeHTTPHeaders on/off

func (*Manager) Start

func (m *Manager) Start(ctx context.Context, uuid string) error

Start will start a Workflow. It will load it in memory, update its status to Running, and call its Run() method.

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context, uuid string) error

Stop stops the running workflow. It will cancel its context and wait for it to exit.

func (*Manager) TopoServer

func (m *Manager) TopoServer() *topo.Server

TopoServer returns the topo.Server used by the Manager. It is meant to be used by the running workflows.

func (*Manager) Wait

func (m *Manager) Wait(ctx context.Context, uuid string) error

Wait waits for the provided workflow to end.

func (*Manager) WaitUntilRunning

func (m *Manager) WaitUntilRunning()

WaitUntilRunning blocks until Run() has progressed to a state where the manager can start workflows. It is mainly used by tests.

func (*Manager) WorkflowForTesting

func (m *Manager) WorkflowForTesting(uuid string) (Workflow, error)

WorkflowForTesting returns the Workflow object of the running workflow identified by uuid. The method is used in unit tests to inject mocks.

func (*Manager) WorkflowInfoForTesting

func (m *Manager) WorkflowInfoForTesting(uuid string) (*topo.WorkflowInfo, error)

WorkflowInfoForTesting returns the WorkflowInfo object of the running workflow identified by uuid. The method is used in unit tests to manipulate checkpoint.

type Node

type Node struct {

	// Listener will be notified about actions invoked on this node.
	Listener ActionListener `json:"-"`

	Name            string                   `json:"name"`
	PathName        string                   `json:"pathName"`
	Path            string                   `json:"path"`
	Children        []*Node                  `json:"children,omitempty"`
	LastChanged     int64                    `json:"lastChanged"`
	CreateTime      int64                    `json:"createTime"`
	Progress        int                      `json:"progress"`
	ProgressMessage string                   `json:"progressMsg"`
	State           workflowpb.WorkflowState `json:"state"`
	Display         NodeDisplay              `json:"display,omitempty"`
	Message         string                   `json:"message"`
	Log             string                   `json:"log"`
	Disabled        bool                     `json:"disabled"`
	Actions         []*Action                `json:"actions"`
	// contains filtered or unexported fields
}

Node is the UI representation of a Workflow toplevel object, or of a Workflow task. It is just meant to be a tree, and the various Workflow implementations can expose a tree of Nodes that represent what they are doing.

Actions are meant to be buttons, that will trigger the Action callback of a workflow.

In order for the web UIs to be notified when changing this structure, any change to this Node has to be done inside a Modify() function.

It should match the Node object described in web/vtctld2/src/app/workflows/node.ts as it is exposed as JSON to the Angular 2 web app.

func NewNode

func NewNode() *Node

NewNode is a helper function to create new UI Node struct.

func (*Node) BroadcastChanges

func (n *Node) BroadcastChanges(updateChildren bool) error

BroadcastChanges sends the new contents of the node to the watchers.

func (*Node) GetChildByPath

func (n *Node) GetChildByPath(subPath string) (*Node, error)

GetChildByPath returns the child node given the relative path to this node. The caller must ensure that the node tree is not modified during the call.

type NodeDisplay

type NodeDisplay int

NodeDisplay constants need to match node.ts.Display.

const (
	// NodeDisplayUnknown is an unknown value and should never be set.
	NodeDisplayUnknown NodeDisplay = 0

	// NodeDisplayIndeterminate is a progress bar that doesn't have
	// a current value, but just shows movement.
	NodeDisplayIndeterminate NodeDisplay = 1

	// NodeDisplayDeterminate is a progress bar driven by the
	// Progress field.
	NodeDisplayDeterminate NodeDisplay = 2

	// NodeDisplayNone shows no progress bar or status.
	NodeDisplayNone NodeDisplay = 3
)

type NodeManager

type NodeManager struct {
	// contains filtered or unexported fields
}

NodeManager manages the Node tree.

func NewNodeManager

func NewNodeManager() *NodeManager

NewNodeManager returns a new NodeManager.

func (*NodeManager) Action

func (m *NodeManager) Action(ctx context.Context, ap *ActionParameters) error

Action is called by the UI agents to trigger actions.

func (*NodeManager) AddRootNode

func (m *NodeManager) AddRootNode(n *Node) error

AddRootNode adds a toplevel Node to the NodeManager, and broadcasts the Node to the listeners.

func (*NodeManager) CloseWatcher

func (m *NodeManager) CloseWatcher(i int)

CloseWatcher unregisters the watcher from this Manager.

func (*NodeManager) GetAndWatchFullTree

func (m *NodeManager) GetAndWatchFullTree(notifications chan []byte) ([]byte, int, error)

GetAndWatchFullTree returns the JSON representation of the entire Node tree, and registers a watcher to monitor changes to the tree.

func (*NodeManager) GetFullTree

func (m *NodeManager) GetFullTree() ([]byte, error)

GetFullTree returns the JSON representation of the entire Node tree.

func (*NodeManager) RemoveRootNode

func (m *NodeManager) RemoveRootNode(n *Node)

RemoveRootNode removes a toplevel Node from the NodeManager, and broadcasts the change to the listeners.

type ParallelRunner

type ParallelRunner struct {
	// contains filtered or unexported fields
}

ParallelRunner is used to control executing tasks concurrently. Each phase has its own ParallelRunner object.

func NewParallelRunner

func NewParallelRunner(ctx context.Context, rootUINode *Node, cp *CheckpointWriter, tasks []*workflowpb.Task, executeFunc func(context.Context, *workflowpb.Task) error, concurrencyLevel level, enableApprovals bool) *ParallelRunner

NewParallelRunner returns a new ParallelRunner.

func (*ParallelRunner) Action

func (p *ParallelRunner) Action(ctx context.Context, path, name string) error

Action handles retrying, approval of the first task and approval of the remaining tasks actions. It implements the interface ActionListener.

func (*ParallelRunner) Run

func (p *ParallelRunner) Run() error

Run is the entry point for controlling task executions.

type PhaseType

type PhaseType string

PhaseType is used to store the phase name in a workflow.

type SleepWorkflow

type SleepWorkflow struct {
	// contains filtered or unexported fields
}

SleepWorkflow implements the Workflow interface. It is a test Workflow that only sleeps for a provided interval. It is meant to test all the plumbing and corner cases of the workflow library.

func (*SleepWorkflow) Action

func (sw *SleepWorkflow) Action(ctx context.Context, path, name string) error

Action is part of the workflow.ActionListener interface.

func (*SleepWorkflow) Run

func (sw *SleepWorkflow) Run(ctx context.Context, manager *Manager, wi *topo.WorkflowInfo) error

Run is part of the workflow.Workflow interface. It updates the UI every second, and checkpoints every 5 seconds.

type SleepWorkflowData

type SleepWorkflowData struct {
	// Duration is how long we need to sleep total.
	Duration int

	// Slept is how long we've already slept.
	Slept int

	// Paused is true if we should not be making any progress.
	Paused bool
}

SleepWorkflowData is the data structure serialized as JSON in Workflow.Data.

type SleepWorkflowFactory

type SleepWorkflowFactory struct{}

SleepWorkflowFactory is the factory to register the Sleep workflows.

func (*SleepWorkflowFactory) Init

func (f *SleepWorkflowFactory) Init(_ *Manager, w *workflowpb.Workflow, args []string) error

Init is part of the workflow.Factory interface.

func (*SleepWorkflowFactory) Instantiate

func (f *SleepWorkflowFactory) Instantiate(_ *Manager, w *workflowpb.Workflow, rootNode *Node) (Workflow, error)

Instantiate is part of the workflow.Factory interface.

type Update

type Update struct {
	// Redirect is set to the URL to go to if we are not the
	// primary.  It is only set in the initial response, and if set
	// then no other field in this structure is set.
	Redirect string `json:"redirect,omitempty"`

	// Index is the watcher index. It is only set in the initial
	// tree.
	Index int `json:"index,omitempty"`

	// Nodes is a list of nodes to update.
	Nodes []*Node `json:"nodes,omitempty"`

	// Deletes is a list of toplevel paths to delete.
	Deletes []string `json:"deletes,omitempty"`

	// FullUpdate is set to true if this is a full refresh of the data.
	FullUpdate bool `json:"fullUpdate,omitempty"`
}

Update is the data structure we send on the websocket or on the long-polling HTTP connection to the clients.

type Workflow

type Workflow interface {
	// Run runs the Workflow within the provided WorkflowManager.
	// It should return ctx.Err() if ctx.Done() is closed.  The
	// Run method can alter the provided WorkflowInfo to save its
	// state (and it can checkpoint that new value by saving it
	// into the manager's topo Server).
	Run(ctx context.Context, manager *Manager, wi *topo.WorkflowInfo) error
}

Workflow is a running instance of a job.

Directories

Path Synopsis
Package topovalidator contains a workflow that validates the topology data.
Package topovalidator contains a workflow that validates the topology data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL