flow

package
v2.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultLineItemSeparate = "\t"
View Source
const ErrorSeparate = "\tQShellError:"
View Source
const UnknownWorkCount = int64(-1)

Variables

This section is empty.

Functions

func NewBlockLimit added in v2.9.1

func NewBlockLimit(limitCount int, options ...AutoLimitOption) limit.BlockLimit

func UserCodeVerification

func UserCodeVerification() (success bool)

Types

type AutoLimitOption added in v2.9.1

type AutoLimitOption func(l *autoLimit)

func IncreaseLimitCount added in v2.9.1

func IncreaseLimitCount(count int) AutoLimitOption

func IncreaseLimitCountPeriod added in v2.9.1

func IncreaseLimitCountPeriod(period time.Duration) AutoLimitOption

func MaxLimitCount added in v2.9.1

func MaxLimitCount(count int) AutoLimitOption

func MinLimitCount added in v2.9.1

func MinLimitCount(count int) AutoLimitOption

type EventListener

type EventListener struct {
	FlowWillStartFunc func(flow *Flow) (err *data.CodeError)
	FlowWillEndFunc   func(flow *Flow) (err *data.CodeError)
	WillWorkFunc      func(work *WorkInfo) (shouldContinue bool, err *data.CodeError)
	OnWorkSkipFunc    func(work *WorkInfo, result Result, err *data.CodeError)
	OnWorkSuccessFunc func(work *WorkInfo, result Result)
	OnWorkFailFunc    func(work *WorkInfo, err *data.CodeError)
}

func (*EventListener) FlowWillEnd

func (e *EventListener) FlowWillEnd(flow *Flow) (err *data.CodeError)

func (*EventListener) FlowWillStart

func (e *EventListener) FlowWillStart(flow *Flow) (err *data.CodeError)

func (*EventListener) OnWorkFail

func (e *EventListener) OnWorkFail(work *WorkInfo, err *data.CodeError)

func (*EventListener) OnWorkSkip

func (e *EventListener) OnWorkSkip(work *WorkInfo, result Result, err *data.CodeError)

func (*EventListener) OnWorkSuccess

func (e *EventListener) OnWorkSuccess(work *WorkInfo, result Result)

func (*EventListener) WillWork

func (e *EventListener) WillWork(work *WorkInfo) (shouldContinue bool, err *data.CodeError)

type Flow

type Flow struct {
	Info           Info           // flow 的参数信息 【可选】
	WorkProvider   WorkProvider   // work 提供者 【必填】
	WorkerProvider WorkerProvider // worker 提供者 【必填】

	DoWorkInfoListMaxCount int // Worker.DoWork 函数中 works 数组最大长度,默认:250,最小长度为 1

	DoWorkInfoListMinCount int // Worker.DoWork 函数中 works 数组最小长度,默认:50,最小长度为 1

	Limit         limit.BlockLimit // 速度限制,用于限制
	EventListener EventListener    // work 处理事项监听者 【可选】
	Overseer      Overseer         // work 监工,涉及 work 是否已处理相关的逻辑 【可选】
	Skipper       Skipper          // work 是否跳过相关逻辑 【可选】
	Redo          Redo             // work 是否需要重新做相关逻辑,有些工作虽然已经做过,但下次处理时可能条件发生变化,需要重新处理 【可选】
	// contains filtered or unexported fields
}

func (*Flow) Check

func (f *Flow) Check() *data.CodeError

func (*Flow) Start

func (f *Flow) Start()

type FlowBuilder

type FlowBuilder struct {
	// contains filtered or unexported fields
}

func (*FlowBuilder) Build

func (b *FlowBuilder) Build() *Flow

func (*FlowBuilder) DoWorkListMaxCount

func (b *FlowBuilder) DoWorkListMaxCount(count int) *FlowBuilder

func (*FlowBuilder) DoWorkListMinCount added in v2.9.1

func (b *FlowBuilder) DoWorkListMinCount(count int) *FlowBuilder

func (*FlowBuilder) FlowWillEndFunc

func (b *FlowBuilder) FlowWillEndFunc(f func(flow *Flow) (err *data.CodeError)) *FlowBuilder

func (*FlowBuilder) FlowWillStartFunc

func (b *FlowBuilder) FlowWillStartFunc(f func(flow *Flow) (err *data.CodeError)) *FlowBuilder

func (*FlowBuilder) OnWillWork

func (b *FlowBuilder) OnWillWork(f func(workInfo *WorkInfo) (shouldContinue bool, err *data.CodeError)) *FlowBuilder

func (*FlowBuilder) OnWorkFail

func (b *FlowBuilder) OnWorkFail(f func(workInfo *WorkInfo, err *data.CodeError)) *FlowBuilder

func (*FlowBuilder) OnWorkSkip

func (b *FlowBuilder) OnWorkSkip(f func(workInfo *WorkInfo, result Result, err *data.CodeError)) *FlowBuilder

func (*FlowBuilder) OnWorkSuccess

func (b *FlowBuilder) OnWorkSuccess(f func(workInfo *WorkInfo, result Result)) *FlowBuilder

func (*FlowBuilder) SetDBOverseer added in v2.8.0

func (b *FlowBuilder) SetDBOverseer(dbPath string, blankWorkRecordBuilder func() *WorkRecord) *FlowBuilder

func (*FlowBuilder) SetLimit added in v2.9.1

func (b *FlowBuilder) SetLimit(limit limit.BlockLimit) *FlowBuilder

func (*FlowBuilder) SetOverseer added in v2.8.0

func (b *FlowBuilder) SetOverseer(overseer Overseer) *FlowBuilder

func (*FlowBuilder) SetOverseerEnable added in v2.9.0

func (b *FlowBuilder) SetOverseerEnable(enable bool) *FlowBuilder

func (*FlowBuilder) ShouldRedo

func (b *FlowBuilder) ShouldRedo(f func(workInfo *WorkInfo, workRecord *WorkRecord) (shouldRedo bool, cause *data.CodeError)) *FlowBuilder

func (*FlowBuilder) ShouldSkip

func (b *FlowBuilder) ShouldSkip(f func(workInfo *WorkInfo) (skip bool, cause *data.CodeError)) *FlowBuilder

type Info

type Info struct {
	Force                     bool // 是否强制直接进行 Flow, 不强制需要用户输入验证码验证
	WorkerCount               int  // worker 数量
	MinWorkerCount            int  // 最小 work 数量,当遇到限制错误会减小 work 数,最小 1
	WorkerCountIncreasePeriod int  // WorkerCount 递增的周期,当在 WorkerCountIncreasePeriod 时间内没有遇到限制错误时,会尝试增加 WorkerCount,最小 10s
	StopWhenWorkError         bool // 当某个 work 遇到执行错误是否结束 batch 任务
}

func (*Info) Check

func (i *Info) Check() *data.CodeError

type Overseer

type Overseer interface {
	WillWork(work *WorkInfo)
	WorkDone(record *WorkRecord)
	GetWorkRecordIfHasDone(work *WorkInfo) (hasDone bool, record *WorkRecord)
}

func NewDBRecordOverseer

func NewDBRecordOverseer(dbPath string, blankWorkRecordBuilder func() *WorkRecord) (Overseer, *data.CodeError)

type Redo

type Redo interface {

	// ShouldRedo
	// @Description: 是否需要重新做
	// @param work 工作信息
	// @param workRecord 此工作的记录
	// @return shouldRedo 是否需要重做
	// @return cause 需要重做或不能重做的原因
	ShouldRedo(work *WorkInfo, workRecord *WorkRecord) (shouldRedo bool, cause *data.CodeError)
}

func NewRedo

func NewRedo(f func(work *WorkInfo, workRecord *WorkRecord) (shouldRedo bool, cause *data.CodeError)) Redo

type Result

type Result interface {
	IsValid() bool
}

type Skipper

type Skipper interface {
	ShouldSkip(work *WorkInfo) (skip bool, cause *data.CodeError)
}

func NewSkipper

func NewSkipper(f func(work *WorkInfo) (skip bool, cause *data.CodeError)) Skipper

type Work

type Work interface {
	WorkId() string
}

type WorkCreator

type WorkCreator interface {
	Create(info string) (work Work, err *data.CodeError)
}

func NewItemsWorkCreator

func NewItemsWorkCreator(separate string, minItemsCount int, creatorFunc func(items []string) (work Work, err *data.CodeError)) WorkCreator

func NewJsonWorkCreator

func NewJsonWorkCreator(blankQuotedWorkCreatFunc func() Work) WorkCreator

type WorkInfo

type WorkInfo struct {
	Data string `json:"data"`
	Work Work   `json:"work"`
}

type WorkProvideBuilder

type WorkProvideBuilder struct {
	// contains filtered or unexported fields
}

func New

func New(info Info) *WorkProvideBuilder

func (*WorkProvideBuilder) WorkProvider

func (b *WorkProvideBuilder) WorkProvider(provider WorkProvider) *WorkerProvideBuilder

func (*WorkProvideBuilder) WorkProviderWithArray

func (b *WorkProvideBuilder) WorkProviderWithArray(workList []Work) *WorkerProvideBuilder

func (*WorkProvideBuilder) WorkProviderWithChan added in v2.8.0

func (b *WorkProvideBuilder) WorkProviderWithChan(works <-chan Work) *WorkerProvideBuilder

func (*WorkProvideBuilder) WorkProviderWithFile

func (b *WorkProvideBuilder) WorkProviderWithFile(filePath string, enableStdin bool, creator WorkCreator) *WorkerProvideBuilder

type WorkProvider

type WorkProvider interface {
	WorkTotalCount() int64
	Provide() (hasMore bool, work *WorkInfo, err *data.CodeError)
}

func NewArrayWorkProvider

func NewArrayWorkProvider(works []Work) (WorkProvider, *data.CodeError)

func NewChanWorkProvider added in v2.8.0

func NewChanWorkProvider(works <-chan Work) (WorkProvider, *data.CodeError)

func NewFileWorkProvider

func NewFileWorkProvider(filePath string, creator WorkCreator) (WorkProvider, *data.CodeError)

func NewReaderWorkProvider

func NewReaderWorkProvider(reader io.Reader, creator WorkCreator) (WorkProvider, *data.CodeError)

func NewWorkProviderOfFile

func NewWorkProviderOfFile(filepath string, enableStdin bool, creator WorkCreator) (provider WorkProvider, err *data.CodeError)

type WorkRecord

type WorkRecord struct {
	*WorkInfo

	Result Result          `json:"result"`
	Err    *data.CodeError `json:"err"`
}

type Worker

type Worker interface {

	// DoWork 处理工作
	// @Description: recordList 长度需和 workInfos 长度想等
	// @param workInfos 工作列表
	// @return recordList 工作记录列表
	// @return err 工作错误信息
	DoWork(workInfos []*WorkInfo) (recordList []*WorkRecord, err *data.CodeError)
}

func NewSimpleWorker

func NewSimpleWorker(doFunc func(workInfo *WorkInfo) (Result, *data.CodeError)) Worker

func NewWorker

func NewWorker(doFunc func(workInfos []*WorkInfo) ([]*WorkRecord, *data.CodeError)) Worker

type WorkerProvideBuilder

type WorkerProvideBuilder struct {
	// contains filtered or unexported fields
}

func (*WorkerProvideBuilder) WorkerProvider

func (b *WorkerProvideBuilder) WorkerProvider(provider WorkerProvider) *FlowBuilder

type WorkerProvider

type WorkerProvider interface {
	Provide() (worker Worker, err *data.CodeError)
}

func NewWorkerProvider

func NewWorkerProvider(builder func() (Worker, *data.CodeError)) WorkerProvider

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL