Documentation ¶
Index ¶
- Constants
- func AvailableFactories() map[string]bool
- func Register(factoryName string, factory Factory)
- func StartManager(m *Manager) (*sync.WaitGroup, context.Context, context.CancelFunc)
- func Unregister(name string)
- func VerifyAllTasksDone(ctx context.Context, ts *topo.Server, uuid string) error
- type Action
- type ActionListener
- type ActionParameters
- type ActionState
- type ActionStyle
- type CheckpointWriter
- type Factory
- type Manager
- func (m *Manager) Create(ctx context.Context, factoryName string, args []string) (string, error)
- func (m *Manager) Delete(ctx context.Context, uuid string) error
- func (m *Manager) HandleHTTPLongPolling(pattern string)
- func (m *Manager) HandleHTTPWebSocket(pattern string)
- func (m *Manager) NodeManager() *NodeManager
- func (m *Manager) Run(ctx context.Context)
- func (m *Manager) SetRedirectFunc(rf func() (string, error))
- func (m *Manager) Start(ctx context.Context, uuid string) error
- func (m *Manager) Stop(ctx context.Context, uuid string) error
- func (m *Manager) TopoServer() *topo.Server
- func (m *Manager) Wait(ctx context.Context, uuid string) error
- func (m *Manager) WaitUntilRunning()
- func (m *Manager) WorkflowForTesting(uuid string) (Workflow, error)
- func (m *Manager) WorkflowInfoForTesting(uuid string) (*topo.WorkflowInfo, error)
- type Node
- type NodeDisplay
- type NodeManager
- func (m *NodeManager) Action(ctx context.Context, ap *ActionParameters) error
- func (m *NodeManager) AddRootNode(n *Node) error
- func (m *NodeManager) CloseWatcher(i int)
- func (m *NodeManager) GetAndWatchFullTree(notifications chan []byte) ([]byte, int, error)
- func (m *NodeManager) GetFullTree() ([]byte, error)
- func (m *NodeManager) RemoveRootNode(n *Node)
- type ParallelRunner
- type PhaseType
- type SleepWorkflow
- type SleepWorkflowData
- type SleepWorkflowFactory
- type Update
- type Workflow
Constants ¶
const ( // Sequential means that the tasks will run sequentially. Sequential level = iota //Parallel means that the tasks will run in parallel. Parallel )
Variables ¶
This section is empty.
Functions ¶
func AvailableFactories ¶
AvailableFactories returns a map with the names of the available factories as keys and 'true' as value.
func Register ¶
Register lets implementations register Factory objects. Typically called at init() time.
func StartManager ¶
StartManager starts a manager. This function should only be used for tests purposes.
func Unregister ¶
func Unregister(name string)
Unregister removes a factory object. Typically called from a flag to remove dangerous workflows.
Types ¶
type Action ¶
type Action struct { Name string `json:"name"` State ActionState `json:"state,omitempty"` Style ActionStyle `json:"style,omitempty"` Message string `json:"message"` }
Action must match node.ts Action.
type ActionListener ¶
type ActionListener interface { // Action is called when the user requests an action on a node. // 'path' is the node's Path value and 'name' is the invoked action's name. Action(ctx context.Context, path, name string) error }
ActionListener is an interface for receiving notifications about actions triggered from workflow UI.
type ActionParameters ¶
type ActionParameters struct { // Path is the path of the Node the action was performed on. Path string `json:"path"` // Name is the Name of the Action. Name string `json:"name"` }
ActionParameters describe an action initiated by the user.
type ActionState ¶
type ActionState int
ActionState constants need to match node.ts.ActionState.
const ( // ActionStateUnknown is an unknown value and should never be set. ActionStateUnknown ActionState = 0 // ActionStateEnabled is for when the action is enabled. ActionStateEnabled ActionState = 1 // ActionStateDisabled is for when the action is disabled. ActionStateDisabled ActionState = 2 )
type ActionStyle ¶
type ActionStyle int
ActionStyle constants need to match node.ts.ActionStyle.
const ( // ActionStyleUnknown is an unknown value and should never be set. ActionStyleUnknown ActionStyle = 0 // ActionStyleNormal will just trigger the action. ActionStyleNormal ActionStyle = 1 // ActionStyleWarning will display a warning dialog to confirm // action with Action.Message. ActionStyleWarning ActionStyle = 2 // ActionStyleWaiting highlights to the user that the process // is waiting on the execution of the action. ActionStyleWaiting ActionStyle = 3 // ActionStyleTriggered is a state where the button is greyed // out and cannot be pressed. ActionStyleTriggered ActionStyle = 4 )
type CheckpointWriter ¶
type CheckpointWriter struct {
// contains filtered or unexported fields
}
CheckpointWriter saves the checkpoint data into topology server.
func NewCheckpointWriter ¶
func NewCheckpointWriter(ts *topo.Server, checkpoint *workflowpb.WorkflowCheckpoint, wi *topo.WorkflowInfo) *CheckpointWriter
NewCheckpointWriter creates a CheckpointWriter.
func (*CheckpointWriter) UpdateTask ¶
func (c *CheckpointWriter) UpdateTask(taskID string, status workflowpb.TaskState, err error) error
UpdateTask updates the task status in the checkpointing copy and saves the full checkpoint to the topology server.
type Factory ¶
type Factory interface { // Init initializes the private parts of the workflow object. // The passed in workflow will have its Uuid, FactoryName and State // variable filled it. This Init method should fill in the // Name and Data attributes, based on the provided args. // This is called during the Manager.Create phase and will initially // checkpoint the workflow in the topology. // The Manager object is passed to Init method since the resharding workflow // will use the topology server in Manager. Init(m *Manager, w *workflowpb.Workflow, args []string) error // Instantiate loads a workflow from the proto representation // into an in-memory Workflow object. rootNode is the root UI node // representing the workflow. Instantiate(m *Manager, w *workflowpb.Workflow, rootNode *Node) (Workflow, error) }
Factory can create the initial version of a Workflow, or instantiate them from a serialized version.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is the main Workflow manager object. Its management API allows it to create, start and stop workflows.
func NewManager ¶
NewManager creates an initialized Manager.
func (*Manager) Create ¶
Create creates a workflow from the given factory name with the provided args. Returns the unique UUID of the workflow. The workflowpb.Workflow object is saved in the topo server after creation.
func (*Manager) HandleHTTPLongPolling ¶
HandleHTTPLongPolling registers the streaming-over-HTTP APIs.
func (*Manager) HandleHTTPWebSocket ¶
HandleHTTPWebSocket registers the WebSocket handler.
func (*Manager) NodeManager ¶
func (m *Manager) NodeManager() *NodeManager
NodeManager returns the NodeManager used by the Manager. It is meant to be used by the running workflows.
func (*Manager) Run ¶
Run is the main entry point for the Manager. It will read each checkpoint from the topo Server, and for the ones that are in the Running state, will load them in memory and run them. It will not return until ctx is canceled.
func (*Manager) SetRedirectFunc ¶
SetRedirectFunc sets the redirect function to use.
func (*Manager) Start ¶
Start will start a Workflow. It will load it in memory, update its status to Running, and call its Run() method.
func (*Manager) Stop ¶
Stop stops the running workflow. It will cancel its context and wait for it to exit.
func (*Manager) TopoServer ¶
TopoServer returns the topo.Server used by the Manager. It is meant to be used by the running workflows.
func (*Manager) WaitUntilRunning ¶
func (m *Manager) WaitUntilRunning()
WaitUntilRunning blocks until Run() has progressed to a state where the manager can start workflows. It is mainly used by tests.
func (*Manager) WorkflowForTesting ¶
WorkflowForTesting returns the Workflow object of the running workflow identified by uuid. The method is used in unit tests to inject mocks.
func (*Manager) WorkflowInfoForTesting ¶
func (m *Manager) WorkflowInfoForTesting(uuid string) (*topo.WorkflowInfo, error)
WorkflowInfoForTesting returns the WorkflowInfo object of the running workflow identified by uuid. The method is used in unit tests to manipulate checkpoint.
type Node ¶
type Node struct { // Listener will be notified about actions invoked on this node. Listener ActionListener `json:"-"` Name string `json:"name"` PathName string `json:"pathName"` Path string `json:"path"` Children []*Node `json:"children,omitempty"` LastChanged int64 `json:"lastChanged"` CreateTime int64 `json:"createTime"` Progress int `json:"progress"` ProgressMessage string `json:"progressMsg"` State workflowpb.WorkflowState `json:"state"` Display NodeDisplay `json:"display,omitempty"` Message string `json:"message"` Log string `json:"log"` Disabled bool `json:"disabled"` Actions []*Action `json:"actions"` // contains filtered or unexported fields }
Node is the UI representation of a Workflow toplevel object, or of a Workflow task. It is just meant to be a tree, and the various Workflow implementations can expose a tree of Nodes that represent what they are doing.
Actions are meant to be buttons, that will trigger the Action callback of a workflow.
In order for the web UIs to be notified when changing this structure, any change to this Node has to be done inside a Modify() function.
It should match the Node object described in web/vtctld2/src/app/workflows/node.ts as it is exposed as JSON to the Angular 2 web app.
func (*Node) BroadcastChanges ¶
BroadcastChanges sends the new contents of the node to the watchers.
type NodeDisplay ¶
type NodeDisplay int
NodeDisplay constants need to match node.ts.Display.
const ( // NodeDisplayUnknown is an unknown value and should never be set. NodeDisplayUnknown NodeDisplay = 0 // NodeDisplayIndeterminate is a progress bar that doesn't have // a current value, but just shows movement. NodeDisplayIndeterminate NodeDisplay = 1 // NodeDisplayDeterminate is a progress bar driven by the // Progress field. NodeDisplayDeterminate NodeDisplay = 2 // NodeDisplayNone shows no progress bar or status. NodeDisplayNone NodeDisplay = 3 )
type NodeManager ¶
type NodeManager struct {
// contains filtered or unexported fields
}
NodeManager manages the Node tree.
func (*NodeManager) Action ¶
func (m *NodeManager) Action(ctx context.Context, ap *ActionParameters) error
Action is called by the UI agents to trigger actions.
func (*NodeManager) AddRootNode ¶
func (m *NodeManager) AddRootNode(n *Node) error
AddRootNode adds a toplevel Node to the NodeManager, and broadcasts the Node to the listeners.
func (*NodeManager) CloseWatcher ¶
func (m *NodeManager) CloseWatcher(i int)
CloseWatcher unregisters the watcher from this Manager.
func (*NodeManager) GetAndWatchFullTree ¶
func (m *NodeManager) GetAndWatchFullTree(notifications chan []byte) ([]byte, int, error)
GetAndWatchFullTree returns the JSON representation of the entire Node tree, and registers a watcher to monitor changes to the tree.
func (*NodeManager) GetFullTree ¶
func (m *NodeManager) GetFullTree() ([]byte, error)
GetFullTree returns the JSON representation of the entire Node tree.
func (*NodeManager) RemoveRootNode ¶
func (m *NodeManager) RemoveRootNode(n *Node)
RemoveRootNode removes a toplevel Node from the NodeManager, and broadcasts the change to the listeners.
type ParallelRunner ¶
type ParallelRunner struct {
// contains filtered or unexported fields
}
ParallelRunner is used to control executing tasks concurrently. Each phase has its own ParallelRunner object.
func NewParallelRunner ¶
func NewParallelRunner(ctx context.Context, rootUINode *Node, cp *CheckpointWriter, tasks []*workflowpb.Task, executeFunc func(context.Context, *workflowpb.Task) error, concurrencyLevel level, enableApprovals bool) *ParallelRunner
NewParallelRunner returns a new ParallelRunner.
func (*ParallelRunner) Action ¶
func (p *ParallelRunner) Action(ctx context.Context, path, name string) error
Action handles retrying, approval of the first task and approval of the remaining tasks actions. It implements the interface ActionListener.
func (*ParallelRunner) Run ¶
func (p *ParallelRunner) Run() error
Run is the entry point for controling task executions.
type SleepWorkflow ¶
type SleepWorkflow struct {
// contains filtered or unexported fields
}
SleepWorkflow implements the Workflow interface. It is a test Workflow that only sleeps for a provided interval. It is meant to test all the plumbing and corner cases of the workflow library.
func (*SleepWorkflow) Action ¶
func (sw *SleepWorkflow) Action(ctx context.Context, path, name string) error
Action is part of the workflow.ActionListener interface.
func (*SleepWorkflow) Run ¶
func (sw *SleepWorkflow) Run(ctx context.Context, manager *Manager, wi *topo.WorkflowInfo) error
Run is part of the workflow.Workflow interface. It updates the UI every second, and checkpoints every 5 seconds.
type SleepWorkflowData ¶
type SleepWorkflowData struct { // Duration is how long we need to sleep total. Duration int // Slept is how long we've already slept. Slept int // Paused is true if we should not be making any progress. Paused bool }
SleepWorkflowData is the data structure serialized as JSON in Workflow.Data.
type SleepWorkflowFactory ¶
type SleepWorkflowFactory struct{}
SleepWorkflowFactory is the factory to register the Sleep workflows.
func (*SleepWorkflowFactory) Init ¶
func (f *SleepWorkflowFactory) Init(_ *Manager, w *workflowpb.Workflow, args []string) error
Init is part of the workflow.Factory interface.
func (*SleepWorkflowFactory) Instantiate ¶
func (f *SleepWorkflowFactory) Instantiate(_ *Manager, w *workflowpb.Workflow, rootNode *Node) (Workflow, error)
Instantiate is part of the workflow.Factory interface.
type Update ¶
type Update struct { // Redirect is set to the URL to go to if we are not the // master. It is only set in the initial response, and if set // then no other field in this structure is set. Redirect string `json:"redirect,omitempty"` // Index is the watcher index. It is only set in the initial // tree. Index int `json:"index,omitempty"` // Nodes is a list of nodes to update. Nodes []*Node `json:"nodes,omitempty"` // Deletes is a list of toplevel paths to delete. Deletes []string `json:"deletes,omitempty"` // FullUpdate is set to true if this is a full refresh of the data. FullUpdate bool `json:"fullUpdate,omitempty"` }
Update is the data structure we send on the websocket or on the long-polling HTTP connection to the clients.
type Workflow ¶
type Workflow interface { // Run runs the Workflow within the provided WorkflowManager. // It should return ctx.Err() if ctx.Done() is closed. The // Run method can alter the provided WorkflowInfo to save its // state (and it can checkpoint that new value by saving it // into the manager's topo Server). Run(ctx context.Context, manager *Manager, wi *topo.WorkflowInfo) error }
Workflow is a running instance of a job.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package topovalidator contains a workflow that validates the topology data.
|
Package topovalidator contains a workflow that validates the topology data. |