Documentation ¶
Index ¶
- type BaseProcessor
- func (p *BaseProcessor) Duration() time.Duration
- func (p *BaseProcessor) Execute()
- func (p *BaseProcessor) From(senders ...IProcessor)
- func (p *BaseProcessor) In() *InPort
- func (p *BaseProcessor) Name() string
- func (p *BaseProcessor) Out() *OutPort
- func (p *BaseProcessor) Pause()
- func (p *BaseProcessor) Resume()
- func (p *BaseProcessor) SetContext(ctx context.Context)
- func (p *BaseProcessor) Subscribe(eventHandlers ...EventHandler)
- func (p *BaseProcessor) To(receivers ...IProcessor)
- type DoneFunc
- type EventHandler
- type IProcessor
- type InPort
- type MockAddTransform
- type MockMultiTransform
- type MockSleepTransform
- type NextFunc
- type OutPort
- type Pipeline
- func (pipeline *Pipeline) Add(proc IProcessor) *Pipeline
- func (pipeline *Pipeline) Last() IProcessor
- func (pipeline *Pipeline) Out() <-chan interface{}
- func (pipeline *Pipeline) Pause()
- func (pipeline *Pipeline) Resume()
- func (pipeline *Pipeline) Run()
- func (pipeline *Pipeline) String() string
- func (pipeline *Pipeline) Wait(f func(v interface{}) error) error
- type Sink
- type Source
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseProcessor ¶
type BaseProcessor struct {
// contains filtered or unexported fields
}
BaseProcessor Processor的适配器,这样再下层的struct就可以只实现自己需要的方法了
func NewBaseProcessor ¶
func NewBaseProcessor(name string) BaseProcessor
func (*BaseProcessor) Duration ¶
func (p *BaseProcessor) Duration() time.Duration
func (*BaseProcessor) Execute ¶
func (p *BaseProcessor) Execute()
func (*BaseProcessor) From ¶
func (p *BaseProcessor) From(senders ...IProcessor)
func (*BaseProcessor) In ¶
func (p *BaseProcessor) In() *InPort
func (*BaseProcessor) Name ¶
func (p *BaseProcessor) Name() string
func (*BaseProcessor) Out ¶
func (p *BaseProcessor) Out() *OutPort
func (*BaseProcessor) Pause ¶
func (p *BaseProcessor) Pause()
func (*BaseProcessor) Resume ¶
func (p *BaseProcessor) Resume()
func (*BaseProcessor) SetContext ¶
func (p *BaseProcessor) SetContext(ctx context.Context)
func (*BaseProcessor) Subscribe ¶
func (p *BaseProcessor) Subscribe(eventHandlers ...EventHandler)
func (*BaseProcessor) To ¶
func (p *BaseProcessor) To(receivers ...IProcessor)
type EventHandler ¶
type EventHandler interface{}
type IProcessor ¶
type IProcessor interface { // Name 处理器的名称 Name() string // Pause TODO 暂停?啥意思?先不处理了吗? Pause() // Resume 恢复?听起来像是跟上面那个方法对应的... Resume() // Execute 当前处理器的执行逻辑,不同的处理器的主要区别就是在这里 Execute() // In 返回当前处理器的输入流 In() *InPort // Out 返回当前处理器的输出流 Out() *OutPort // Duration 当前处理器持续执行了多长时间了 Duration() time.Duration To(...IProcessor) From(...IProcessor) SetContext(context.Context) }
IProcessor 处理器?干嘛的处理啊???我好懵...
func NewMockAddTransform ¶
func NewMockAddTransform(name string) IProcessor
func NewMockMultiTransform ¶
func NewMockMultiTransform(name string) IProcessor
func NewMockSleepTransform ¶
func NewMockSleepTransform(name string, ms int) IProcessor
func NewSink ¶
func NewSink(name string) IProcessor
func NewSource ¶
func NewSource(name string) IProcessor
type InPort ¶
type InPort struct {
// contains filtered or unexported fields
}
InPort 用来表示一个输入流,给Processor提供数据输入
type MockAddTransform ¶
type MockAddTransform struct {
BaseProcessor
}
func (*MockAddTransform) Execute ¶
func (p *MockAddTransform) Execute()
type MockMultiTransform ¶
type MockMultiTransform struct {
BaseProcessor
}
func (*MockMultiTransform) Execute ¶
func (p *MockMultiTransform) Execute()
type MockSleepTransform ¶
type MockSleepTransform struct { BaseProcessor // contains filtered or unexported fields }
func (*MockSleepTransform) Execute ¶
func (p *MockSleepTransform) Execute()
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func (*Pipeline) Add ¶
func (pipeline *Pipeline) Add(proc IProcessor) *Pipeline
func (*Pipeline) Last ¶
func (pipeline *Pipeline) Last() IProcessor
Click to show internal directories.
Click to hide internal directories.