Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultProcessor ¶ added in v1.0.1
type DefaultProcessor struct {
// contains filtered or unexported fields
}
func (*DefaultProcessor) Proccess ¶ added in v1.0.1
func (p *DefaultProcessor) Proccess(inTaskChan InTaskChan, outTaskChan OutTaskChan, ctx context.Context) (cancelAllProcess bool)
type Flow ¶ added in v0.0.4
type Flow interface { // AddNodeProcessors 添加流处理节点。 // outChanSize 表示该节点的输出通道大小。 // processors 为处理逻辑,每个Processor占用单独的goroutine,此方法开放性好。 AddNodeProcessors(outChanSize int, processors ...Processor) Node // AddNodeTaskHandlers 添加流处理节点。 // outChanSize 表示该节点的输出通道大小。 // taskHandlers 为处理逻辑,每个TaskHandler占用单独的goroutine,当需要追踪任务执行轨迹时,可以使用该方法。 AddNodeTaskHandlers(outChanSize int, taskHandlers ...TaskHandler) Node // Start 启动流处理引擎。 Start() // Await 等待流处理引擎执行完成。 Await() // Result 等待执行结果。该方法会阻塞,直到有结果输出。当ok为true时,结果有效。 // 当任务的执行结果有多个时,可循环调用该方法获取结果。 Result() (result interface{}, ok bool) }
Flow 表示流处理引擎。
type InTaskChan ¶ added in v0.0.3
type InTaskChan <-chan Task
type Node ¶ added in v1.0.0
type Node interface { Done() <-chan struct{} // Cancel cancels the task execution,the submitted tasks will be executed. Cancel() // SetProcessorSelector SetProcessorSelector(processorSelector ProcessorSelector) SubmitTask(task Task) error // contains filtered or unexported methods }
Node defines a node to process tasks
type OutTaskChan ¶ added in v0.0.3
type OutTaskChan chan<- Task
type Processor ¶
type Processor interface {
Proccess(inTasks InTaskChan, outTask OutTaskChan, ctx context.Context) (cancelAllProcess bool)
}
Processor processes task from the previous FlowNode, and place the result into outTask chan which will be proccessed by the next FlowNode. A node includes multiple processors which run concurrently.
func NewTaskHandlerProcessors ¶ added in v1.0.1
func NewTaskHandlerProcessors( taskAt func(task TraceableTask, nodeId int), onNewTaskCreated func(task TraceableTask, nodeId int), onTaskFinished func(task TraceableTask, nodeId int, err error), taskhandlers ...TaskHandler) []Processor
type ProcessorSelector ¶ added in v1.2.2
type ProcessorSelector interface { // DefineProcessorInTaskChan 为每个processor定义一个输入chan,返回值为输入chan的id DefineProcessorInTaskChan() (processorInTaskChanIndexes []int) // SelectInTaskChanIndex 将task分配给某个chan。 SelectInTaskChanIndex(task Task) (inTaskChanIndex int) }
ProcessorSelector 根据任务属性,决定将任务分发给哪个processor。
针对某个node,将输入TaskChan分解成多个TaskChan,作为node内processor的输入。node内的每个processor,都对应一个输入TaskChan。多个node可以使用同一个输入TaskChan。 应用场景: 1、当task之间存在依赖关系,即:task1执行完成之后才能执行task2,此时需要将task1和task2分配到同一个processor
type TaskHandler ¶ added in v1.0.1
type TaskHandler interface { // Handle 处理任务。如果返回的err不为nil,则会中断流处理。 // dispatch 用于转发任务至下一个节点。 Handle(inTask Task, dispatch func(outTask Task) error) (err error) // OnCompleted 当前节点的任务处理完毕时,回调该方法。如果返回的err不为nil,则会中断流处理。 OnCompleted(dispatch func(outTask Task) error) (err error) }
TaskHandler 定义任务处理逻辑。
func NewSortTaskHandler ¶ added in v1.2.2
func NewSortTaskHandler(taskStartId uint64, maxBlocking int) TaskHandler
NewSortTaskHandler 对任务按照taskId进行排序,taskId越小,越先被转发给下一个节点。 taskStartId:第一个任务的id maxBlocking:队列的最大长度。如果超过最大长度,则会转发队列中taskId最小的任务。
type TaskHandlerAdapter ¶ added in v1.1.0
type TaskHandlerAdapter struct { }
func (TaskHandlerAdapter) OnCompleted ¶ added in v1.1.0
func (h TaskHandlerAdapter) OnCompleted(dispatch func(outTask Task) error) (err error)
type TraceableFlow ¶ added in v1.2.2
type TraceableFlow interface { Flow // taskAt 用于跟踪任务。当TraceableTask流转到某个节点时,回调该方法。 SetTaskAt(taskAt func(task TraceableTask, nodeId int)) // onNewTaskCreated 当新的TraceableTask被创建时,回调该方法。 SetOnNewTaskCreated(onNewTaskCreated func(task TraceableTask, nodeId int)) // onTaskFinished 用于跟踪任务。当TraceableTask结束时,回调该方法。 SetOnTaskFinished(onTaskFinished func(task TraceableTask, nodeId int, err error)) }
type TraceableTask ¶ added in v1.1.0
type TraceableTask interface { // TaskId 用于获取TaskId,便于追踪。 TaskId() uint64 // Inner 用于获取底层的Task。 Inner() Task }
TraceableTask 表示可追踪的task。
func ToTraceableTask ¶ added in v1.1.0
func ToTraceableTask(taskId uint64, task Task) TraceableTask
ToTraceableTask 将Task转为TraceableTask。
Click to show internal directories.
Click to hide internal directories.