flow

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: MIT Imports: 27 Imported by: 1

Documentation

Index

Constants

View Source
const (
	StepScope uint8 = 16 * (iota + 1)
	ProcessScope
	FlowScope
)
View Source
const (
	RecoverIdle uint8 = 1 << iota
	RecoverRunning
	RecoverSuccess
	RecoverFailed uint8 = 32
)

Variables

View Source
var (

	// Entity was not skipped, but Step may still be cancelled becasue of CallbackFail.
	Pending = &StatusEnum{0b1 << 0, "Pending"}
	Pause   = &StatusEnum{0b1 << 1, "Pause"}

	// Entity recovering from suspension.
	Recovering = &StatusEnum{0b1 << 4, "Recovering"}
	// Entity can be recovered at an appropriate time.
	Suspend    = &StatusEnum{0b1 << 5, "Suspend"}
	Success    = &StatusEnum{0b1 << 15, "Success"}
	NormalMask = &StatusEnum{0b1<<16 - 1, "NormalMask"}

	Cancel       = &StatusEnum{0b1<<31 | 0b1<<16, "Cancel"}
	Timeout      = &StatusEnum{0b1<<31 | 0b1<<17, "Timeout"}
	Panic        = &StatusEnum{0b1<<31 | 0b1<<18, "Panic"}
	Error        = &StatusEnum{0b1<<31 | 0b1<<19, "Error"}
	Stop         = &StatusEnum{0b1<<31 | 0b1<<20, "Stop"}
	CallbackFail = &StatusEnum{0b1<<31 | 0b1<<21, "CallbackFail"}
	Failed       = &StatusEnum{0b1 << 31, "Failed"}
)

these variable are used to indicate the state of the unit

Functions

func DisableEncrypt

func DisableEncrypt()

func RegisterType

func RegisterType[T any]()

func ResetDefaultCallback

func ResetDefaultCallback()

func SetEncryptor

func SetEncryptor(encryptor SymmetricEncryptor)

func SetIdGenerator

func SetIdGenerator(method func() string)

func SetLogger added in v0.0.7

func SetLogger(l LoggerI)

func SetMaxSerializeSize

func SetMaxSerializeSize(size int)

func SuspendPersist

func SuspendPersist(impl Persist)

Types

type BuildChain added in v0.0.3

type BuildChain interface {
	SyncPoint
	After(depends ...any) BuildChain
}

type CheckPoint

type CheckPoint interface {
	// contains filtered or unexported methods
}

type Comparable

type Comparable interface {
	Equality
	Less(other any) bool
}

type Constraint

type Constraint interface {
	If(condition func() bool) Constraint
	When(status ...*StatusEnum) Constraint
	Exclude(status ...*StatusEnum) Constraint
	OnlyFor(name ...string) Constraint
	NotFor(name ...string) Constraint
}

type Equality

type Equality interface {
	Equal(other any) bool
}

Equality func (ei *EqualityImpl) Equal(other any) bool is invalid func (ei *EqualityImpl) Equal(other any) bool is valid

type EventLayer

type EventLayer uint8
const (
	FlowLayer EventLayer = iota
	ProcLayer
	StepLayer
)

************************************************************

  • Event Layer ************************************************************

func (EventLayer) String

func (s EventLayer) String() string

type EventLevel

type EventLevel uint8
const (
	HintLevel EventLevel = iota
	InfoLevel
	WarnLevel
	ErrorLevel
	PanicLevel
)

func (EventLevel) String

func (s EventLevel) String() string

type EventStage

type EventStage uint8
const (
	InCallback EventStage = iota
	InPersist
	InSuspend
	InRecover
	InResource
)

func (EventStage) String

func (s EventStage) String() string

type ExecCondition

type ExecCondition interface {
	SkipWithDependents(excludes ...any) ExecCondition
	EQ(match string, expect any) ExecCondition
	NEQ(match string, expect any) ExecCondition
	GT(match string, expect any) ExecCondition
	GTE(match string, expect any) ExecCondition
	LT(match string, expect any) ExecCondition
	LTE(match string, expect any) ExecCondition
	Condition(func(step Step) bool) ExecCondition
	OR() ExecCondition
}

type FinishedProcess

type FinishedProcess interface {
	Steps() []FinishedStep
	Exceptions() []FinishedStep
	// contains filtered or unexported methods
}

type FinishedStep

type FinishedStep interface {
	// contains filtered or unexported methods
}

type FinishedWorkFlow

type FinishedWorkFlow interface {
	Processes() []FinishedProcess
	Recover() (FinishedWorkFlow, error)
	Exceptions() []FinishedProcess
	// contains filtered or unexported methods
}

func DoneFlow

func DoneFlow(name string, input map[string]any) FinishedWorkFlow

func RecoverFlow

func RecoverFlow(flowId string) (ret FinishedWorkFlow, err error)

type FlexEvent

type FlexEvent interface {
	Get(key string) (any, bool) // Get value from context, only available for step and process.
	// contains filtered or unexported methods
}

type FlowCallback

type FlowCallback interface {
	ProcessCallback
	BeforeFlow(must bool, callback func(WorkFlow) (keepOn bool, err error)) *decorator[WorkFlow]
	AfterFlow(must bool, callback func(WorkFlow) (keepOn bool, err error)) *decorator[WorkFlow]
}

func DefaultCallback

func DefaultCallback() FlowCallback

type FlowConfig

type FlowConfig interface {
	ProcessConfig
	EnableRecover() FlowConfig
	DisableRecover() FlowConfig
}

func DefaultConfig

func DefaultConfig() FlowConfig

type FlowController

type FlowController interface {
	Process(name string) (process ProcController, exist bool)
	Done() FinishedWorkFlow
	Resume() FlowController
	Pause() FlowController
	Stop() FlowController
	// contains filtered or unexported methods
}

func AsyncFlow

func AsyncFlow(name string, input map[string]any) FlowController

type FlowMeta

type FlowMeta struct {
	// contains filtered or unexported fields
}

func RegisterFlow

func RegisterFlow(name string) *FlowMeta

func (*FlowMeta) AfterFlow

func (f *FlowMeta) AfterFlow(must bool, callback func(WorkFlow) (keepOn bool, err error)) *decorator[WorkFlow]

func (*FlowMeta) BeforeFlow

func (f *FlowMeta) BeforeFlow(must bool, callback func(WorkFlow) (keepOn bool, err error)) *decorator[WorkFlow]

func (*FlowMeta) DisableDefaultCallback

func (f *FlowMeta) DisableDefaultCallback()

func (*FlowMeta) DisableRecover

func (f *FlowMeta) DisableRecover() FlowConfig

func (*FlowMeta) EnableRecover

func (f *FlowMeta) EnableRecover() FlowConfig

func (*FlowMeta) Name

func (fm *FlowMeta) Name() string

func (*FlowMeta) Process

func (fm *FlowMeta) Process(name string) *ProcessMeta

type FlowPersistence

type FlowPersistence interface {
	OnInsert(func(WorkFlow) error) FlowPersistence
	OnUpdate(func(WorkFlow) error) FlowPersistence
}

func FlowPersist

func FlowPersist() FlowPersistence

type HandlerRegister

type HandlerRegister interface {
	Handle(stage EventStage, handles ...func(event FlexEvent) (keepOn bool)) HandlerRegister
	Discard(stage EventStage, discards ...func(event FlexEvent) (keepOn bool)) HandlerRegister
	DisableLog(stages ...EventStage) HandlerRegister
	Capacity(capacity int) HandlerRegister
	MaxHandler(num int) HandlerRegister
	EventTimeoutSec(sec int) HandlerRegister
	Clear()
}

func EventHandler

func EventHandler() HandlerRegister

type LoggerI

type LoggerI interface {
	Debug(v ...interface{})
	Info(v ...interface{})
	Warn(v ...interface{})
	Error(v ...interface{})
	Debugf(format string, v ...interface{})
	Infof(format string, v ...interface{})
	Warnf(format string, v ...interface{})
	Errorf(format string, v ...interface{})
}

type Persist

type Persist interface {
	GetLatestRecord(rootUid string) (RecoverRecord, error)
	ListCheckpoints(recoverId string) ([]CheckPoint, error)
	UpdateRecordStatus(record RecoverRecord) error
	// save checkpoint and update workflow latest recordId
	SaveCheckpointAndRecord(checkpoint []CheckPoint, record RecoverRecord) error
}

type ProcController

type ProcController interface {
	Stop() ProcController
	Pause() ProcController
	Resume() ProcController
	Step(name string) (step StepController, exist bool)
	// contains filtered or unexported methods
}

type ProcPersistence

type ProcPersistence interface {
	OnInsert(func(Process) error) ProcPersistence
	OnUpdate(func(Process) error) ProcPersistence
}

func ProcPersist

func ProcPersist() ProcPersistence

type Process

type Process interface {
	// contains filtered or unexported methods
}

type ProcessCallback

type ProcessCallback interface {
	StepCallback
	DisableDefaultCallback()
	BeforeProcess(must bool, callback func(Process) (keepOn bool, err error)) *decorator[Process]
	AfterProcess(must bool, callback func(Process) (keepOn bool, err error)) *decorator[Process]
}

type ProcessConfig

type ProcessConfig interface {
	StepConfig
	ProcessTimeout(time.Duration) ProcessConfig
}

type ProcessMeta

type ProcessMeta struct {
	// contains filtered or unexported fields
}

func FlowWithProcess added in v0.0.4

func FlowWithProcess(name string) *ProcessMeta

func (*ProcessMeta) AfterProcess

func (p *ProcessMeta) AfterProcess(must bool, callback func(Process) (keepOn bool, err error)) *decorator[Process]

func (*ProcessMeta) BeforeProcess

func (p *ProcessMeta) BeforeProcess(must bool, callback func(Process) (keepOn bool, err error)) *decorator[Process]

func (*ProcessMeta) CustomStep added in v0.0.5

func (pm *ProcessMeta) CustomStep(run func(ctx Step) (any, error), name string, depends ...any) *StepMeta

func (*ProcessMeta) DisableDefaultCallback

func (p *ProcessMeta) DisableDefaultCallback()

func (*ProcessMeta) Flow added in v0.0.4

func (pm *ProcessMeta) Flow() *FlowMeta

func (*ProcessMeta) Follow added in v0.0.4

func (pm *ProcessMeta) Follow(steps ...func(ctx Step) (any, error)) BuildChain

This function was inspired by the liteflow project (https://github.com/dromara/liteflow), which implements similar functionality in Java. The implementation here is independently developed in Go, with a different approach and code structure.

func (*ProcessMeta) Merge

func (pm *ProcessMeta) Merge(name string)

Merge will not merge config, because has not effective design to not use merged config.

func (*ProcessMeta) Name

func (pm *ProcessMeta) Name() string

func (*ProcessMeta) Parallel added in v0.0.3

func (pm *ProcessMeta) Parallel(steps ...func(ctx Step) (any, error)) BuildChain

This function was inspired by the liteflow project (https://github.com/dromara/liteflow), which implements similar functionality in Java. The implementation here is independently developed in Go, with a different approach and code structure.

func (*ProcessMeta) ProcessTimeout

func (pc *ProcessMeta) ProcessTimeout(duration time.Duration) ProcessConfig

func (*ProcessMeta) SyncAll added in v0.0.4

func (pm *ProcessMeta) SyncAll(run func(ctx Step) (any, error), alias string) *StepMeta

type RecoverRecord

type RecoverRecord interface {
	GetName() string
	GetRootUid() string // root id is equal to workflow id
	GetRecoverId() string
	GetStatus() uint8
}

type Resource

type Resource interface {
	ProcessName() string
	ProcessID() string
	Entity() any
	Put(key string, value any) any
	Fetch(key string) (value any, exist bool)
	Update(entity any) any
	Clear() any
	// contains filtered or unexported methods
}

type ResourceManager

type ResourceManager interface {
	OnInitialize(func(res Resource, initParam any) (entity any, err error)) ResourceManager
	OnRecover(func(res Resource) error) ResourceManager
	OnSuspend(func(res Resource) error) ResourceManager
	OnRelease(func(res Resource) error) ResourceManager
}

func AddResource

func AddResource(name string) ResourceManager

type StatusEnum

type StatusEnum struct {
	// contains filtered or unexported fields
}

func (*StatusEnum) String added in v0.0.2

func (s *StatusEnum) String() string

type Step

type Step interface {
	// contains filtered or unexported methods
}

type StepCallback

type StepCallback interface {
	BeforeStep(must bool, callback func(Step) (keepOn bool, err error)) *decorator[Step]
	AfterStep(must bool, callback func(Step) (keepOn bool, err error)) *decorator[Step]
}

type StepConfig

type StepConfig interface {
	StepTimeout(time.Duration) StepConfig
	StepRetry(int) StepConfig
}

type StepController

type StepController interface {
	// contains filtered or unexported methods
}

type StepMeta

type StepMeta struct {
	// contains filtered or unexported fields
}

func (*StepMeta) Condition

func (c *StepMeta) Condition(f func(step Step) bool) ExecCondition

func (*StepMeta) EQ

func (c *StepMeta) EQ(match string, expect any) ExecCondition

func (*StepMeta) GT

func (c *StepMeta) GT(match string, expect any) ExecCondition

func (*StepMeta) GTE

func (c *StepMeta) GTE(match string, expect any) ExecCondition

func (*StepMeta) LT

func (c *StepMeta) LT(match string, expect any) ExecCondition

func (*StepMeta) LTE

func (c *StepMeta) LTE(match string, expect any) ExecCondition

func (*StepMeta) NEQ

func (c *StepMeta) NEQ(match string, expect any) ExecCondition

func (*StepMeta) Name

func (meta *StepMeta) Name() string

func (*StepMeta) Next

func (meta *StepMeta) Next(run func(ctx Step) (any, error), name string) *StepMeta

func (*StepMeta) OR

func (c *StepMeta) OR() ExecCondition

func (*StepMeta) Restrict

func (meta *StepMeta) Restrict(restrict map[string]any)

func (*StepMeta) Same

func (meta *StepMeta) Same(run func(ctx Step) (any, error), name string) *StepMeta

func (*StepMeta) SkipWithDependents

func (c *StepMeta) SkipWithDependents(excludes ...any) ExecCondition

func (*StepMeta) StepRetry

func (s *StepMeta) StepRetry(i int) StepConfig

func (*StepMeta) StepTimeout

func (s *StepMeta) StepTimeout(duration time.Duration) StepConfig

type StepPersistence

type StepPersistence interface {
	OnInsert(func(Step) error) StepPersistence
	OnUpdate(func(Step) error) StepPersistence
}

func StepPersist

func StepPersist() StepPersistence

type SymmetricEncryptor

type SymmetricEncryptor interface {
	NeedEncrypt(key string) bool
	GetSecret() []byte
	// contains filtered or unexported methods
}

func NewAES256Encryptor

func NewAES256Encryptor(secret []byte, needEncrypt ...string) SymmetricEncryptor

type SyncPoint added in v0.0.4

type SyncPoint interface {
	// contains filtered or unexported methods
}

type WorkFlow

type WorkFlow interface {
	// contains filtered or unexported methods
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL