Documentation ¶
Index ¶
- Constants
- func LoadUnCompletedWfRuntime(wfRuntime *WfRuntime, wf *ResourceWorkflow) error
- type CreateHookFunc
- type CreateMementoStorageFunc
- type CreateMetaLoaderFunc
- type FlowMeta
- type FlowStatusType
- type Recover
- type ResourceWorkflow
- func (m *ResourceWorkflow) CommonWorkFlowMainEnter(ctx context.Context, resource statemachine.StateResource, flowName string, ...) error
- func (m *ResourceWorkflow) GetLastUnCompletedRuntime() (*WfRuntime, error)
- func (m *ResourceWorkflow) RetryInterruptedStep() error
- func (m *ResourceWorkflow) Run(ctx context.Context, flowName string, initContext map[string]interface{}) (err error)
- func (m *ResourceWorkflow) RunLastUnCompletedRuntime(ctx context.Context, flowName string, ignoreOtherUnCompleted bool) (oldWfName string, isWaiting bool, err error)
- func (m *ResourceWorkflow) RunUnCompletedRuntime(ctx context.Context, wfRuntime *WfRuntime) error
- type StepAction
- type StepGroupMeta
- type StepMeta
- type StepRuntime
- type StepStatusType
- type WfHook
- type WfManager
- func (m *WfManager) CheckRecovery(resource statemachine.StateResource) bool
- func (m *WfManager) CreateResourceWorkflow(resource statemachine.StateResource) (*ResourceWorkflow, error)
- func (m *WfManager) GetConfItem(confKey define.WFConfKey) string
- func (m *WfManager) RegisterConf(conf map[define.WFConfKey]string)
- func (m *WfManager) RegisterLogger(logger logr.Logger) *WfManager
- func (m *WfManager) RegisterRecover(recover Recover) *WfManager
- func (m *WfManager) RegisterStep(step StepAction)
- func (m *WfManager) RegisterSteps(steps ...StepAction)
- type WfMetaLoader
- type WfRuntime
- type WfRuntimeMemento
- type WfRuntimeMementoCareTaker
- func (cm *WfRuntimeMementoCareTaker) CreateMemento(flowName string) (*WfRuntimeMemento, error)
- func (cm *WfRuntimeMementoCareTaker) GetAllUnCompletedWorkflowRuntime() (map[string]*WfRuntime, error)
- func (cm *WfRuntimeMementoCareTaker) GetLastWorkflowRuntime() (string, *WfRuntime, error)
- func (cm *WfRuntimeMementoCareTaker) LoadMemento(wf *WfRuntime) (*WfRuntimeMemento, error)
- func (cm *WfRuntimeMementoCareTaker) SaveMemento(memento *WfRuntimeMemento) error
- type WfRuntimeMementoStorage
Constants ¶
View Source
const FlowMaxAutoRetryTimes = 10
Variables ¶
This section is empty.
Functions ¶
func LoadUnCompletedWfRuntime ¶
func LoadUnCompletedWfRuntime(wfRuntime *WfRuntime, wf *ResourceWorkflow) error
Types ¶
type CreateHookFunc ¶
type CreateMementoStorageFunc ¶
type CreateMementoStorageFunc func(resource statemachine.StateResource) (WfRuntimeMementoStorage, error)
type CreateMetaLoaderFunc ¶
type CreateMetaLoaderFunc func(resourceType string) (WfMetaLoader, error)
type FlowStatusType ¶
type FlowStatusType string
const ( FlowStatusPrepared FlowStatusType = "prepared" FlowStatusRunning FlowStatusType = "running" FlowStatusFailed FlowStatusType = "failed" FlowStatusWaiting FlowStatusType = "waiting" FlowStatusFailedCompleted FlowStatusType = "failedAndCompleted" //失败,但不需要再次重试的状态 FlowStatusCompleted FlowStatusType = "completed" )
type Recover ¶
type Recover interface { GetResourceRecoverInfo(resource statemachine.StateResource, conf map[define.WFConfKey]string) (recover bool, preStatus statemachine.State) CancelResourceRecover(resource statemachine.StateResource, conf map[define.WFConfKey]string) error SaveResourceInterruptInfo(resource statemachine.StateResource, conf map[define.WFConfKey]string, reason, message, prevState string) error }
type ResourceWorkflow ¶
type ResourceWorkflow struct { Logger logr.Logger Resource statemachine.StateResource WfManager *WfManager MementoCareTaker *WfRuntimeMementoCareTaker }
func CreateResourceWorkflow ¶
func CreateResourceWorkflow( resource statemachine.StateResource, wfManager *WfManager, ) (*ResourceWorkflow, error)
func (*ResourceWorkflow) CommonWorkFlowMainEnter ¶
func (m *ResourceWorkflow) CommonWorkFlowMainEnter(ctx context.Context, resource statemachine.StateResource, flowName string, ignoreUnCompleted bool, eventChecker statemachine.EventChecker) error
func (*ResourceWorkflow) GetLastUnCompletedRuntime ¶
func (m *ResourceWorkflow) GetLastUnCompletedRuntime() (*WfRuntime, error)
func (*ResourceWorkflow) RetryInterruptedStep ¶
func (m *ResourceWorkflow) RetryInterruptedStep() error
func (*ResourceWorkflow) RunLastUnCompletedRuntime ¶
func (*ResourceWorkflow) RunUnCompletedRuntime ¶
func (m *ResourceWorkflow) RunUnCompletedRuntime(ctx context.Context, wfRuntime *WfRuntime) error
type StepAction ¶
type StepGroupMeta ¶
type StepRuntime ¶
type StepRuntime struct { StepName string `json:"StepName,omitempty"` StepStatus StepStatusType `json:"stepStatus,omitempty"` StepStartTime string `json:"stepStartTime,omitempty"` LastStepCompleteTime string `json:"lastStepCompleteTime,omitempty"` ContextOutput map[string]interface{} `json:"contextOutput,omitempty"` // 会传递给下一个step的context. RetryTimes int `json:"retryTimes,omitempty"` }
type StepStatusType ¶
type StepStatusType string
const ( StepStatusPrepared StepStatusType = "prepared" StepStatusInited StepStatusType = "inited" StepStatusFailed StepStatusType = "failed" StepStatusCompleted StepStatusType = "completed" StepStatusWaiting StepStatusType = "waiting" )
type WfHook ¶
type WfHook interface { // 流程初始化钩子 OnWfInit() error // 流程结束时执行 OnWfCompleted() error // 流程中断时执行 OnWfInterrupt(*define.InterruptError) error // 流程步骤执行前运行 OnStepInit(step *StepRuntime) error // 步骤等待状态时执行 OnStepWaiting(step *StepRuntime) error // 流程步骤执行后运行 OnStepCompleted(step *StepRuntime) error }
type WfManager ¶
type WfManager struct { ResourceType string FlowMetaMap map[string]*FlowMeta TypeRegistry map[string]reflect.Type Logger logr.Logger // contains filtered or unexported fields }
func CreateWfManager ¶
func CreateWfManager( resourceType string, workFlowMetaDir string, createMetaLoaderFunc CreateMetaLoaderFunc, createHookFunc CreateHookFunc, createMementoStorageFunc CreateMementoStorageFunc, ) (*WfManager, error)
func (*WfManager) CheckRecovery ¶
func (m *WfManager) CheckRecovery(resource statemachine.StateResource) bool
func (*WfManager) CreateResourceWorkflow ¶
func (m *WfManager) CreateResourceWorkflow(resource statemachine.StateResource) (*ResourceWorkflow, error)
func (*WfManager) RegisterLogger ¶
func (*WfManager) RegisterRecover ¶
func (*WfManager) RegisterStep ¶
func (m *WfManager) RegisterStep(step StepAction)
func (*WfManager) RegisterSteps ¶
func (m *WfManager) RegisterSteps(steps ...StepAction)
type WfMetaLoader ¶
type WfRuntime ¶
type WfRuntime struct { FlowStatus FlowStatusType `json:"flowStatus,omitempty"` RetryTimes int `json:"retryTimes,omitempty"` FlowName string `json:"flowName,omitempty"` IgnoreError bool `json:"ignoreError,omitempty"` // 是否忽略报警. StartTime string `json:"startTime,omitempty"` CompleteTime string `json:"completeTime,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` InitContext map[string]interface{} `json:"initContext,omitempty"` // 初始化context. 会传给第一个步骤 RunningSteps []*StepRuntime `json:"runningSteps,omitempty"` FlowHookIns WfHook `json:"-"` // hook 函数用于在步骤完成/失败后调用,在k8s场景下,在这个方法会去执行落库的动作. ResourceWorkflow *ResourceWorkflow `json:"-"` Logger logr.Logger `json:"-"` }
+k8s:deepcopy-gen:
func CreateWfRuntime ¶
func CreateWfRuntime( flowName string, flowContext map[string]interface{}, wf *ResourceWorkflow, ) (*WfRuntime, error)
func (*WfRuntime) NewStepActionIns ¶
func (r *WfRuntime) NewStepActionIns(stepMeta *StepMeta) (StepAction, error)
type WfRuntimeMemento ¶
type WfRuntimeMemento struct {
// contains filtered or unexported fields
}
工作流运行时数据存储
type WfRuntimeMementoCareTaker ¶
type WfRuntimeMementoCareTaker struct { Name string MementoMap map[string]string MementoStorage WfRuntimeMementoStorage Namespace string FlowApplyResource string }
工作流运行时数据存储管理器
func CreateWfRuntimeMementoCareTaker ¶
func CreateWfRuntimeMementoCareTaker(resource statemachine.StateResource, createStorageFunc CreateMementoStorageFunc) (*WfRuntimeMementoCareTaker, error)
func (*WfRuntimeMementoCareTaker) CreateMemento ¶
func (cm *WfRuntimeMementoCareTaker) CreateMemento(flowName string) (*WfRuntimeMemento, error)
创建一个新的工作流运行时备忘录
func (*WfRuntimeMementoCareTaker) GetAllUnCompletedWorkflowRuntime ¶
func (cm *WfRuntimeMementoCareTaker) GetAllUnCompletedWorkflowRuntime() (map[string]*WfRuntime, error)
获得所有尚未完成的工作流
func (*WfRuntimeMementoCareTaker) GetLastWorkflowRuntime ¶
func (cm *WfRuntimeMementoCareTaker) GetLastWorkflowRuntime() (string, *WfRuntime, error)
获得最后执行的工作流
func (*WfRuntimeMementoCareTaker) LoadMemento ¶
func (cm *WfRuntimeMementoCareTaker) LoadMemento(wf *WfRuntime) (*WfRuntimeMemento, error)
根据已经存在的工作流运行时创建一个备忘录实例,用于更新备忘录
func (*WfRuntimeMementoCareTaker) SaveMemento ¶
func (cm *WfRuntimeMementoCareTaker) SaveMemento(memento *WfRuntimeMemento) error
保存备忘录
Click to show internal directories.
Click to hide internal directories.