processors

package
v0.0.0-...-e6fb8a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseProcessor

type BaseProcessor struct {
	// contains filtered or unexported fields
}

BaseProcessor Processor的适配器,这样再下层的struct就可以只实现自己需要的方法了

func NewBaseProcessor

func NewBaseProcessor(name string) BaseProcessor

func (*BaseProcessor) Duration

func (p *BaseProcessor) Duration() time.Duration

func (*BaseProcessor) Execute

func (p *BaseProcessor) Execute()

func (*BaseProcessor) From

func (p *BaseProcessor) From(senders ...IProcessor)

func (*BaseProcessor) In

func (p *BaseProcessor) In() *InPort

func (*BaseProcessor) Name

func (p *BaseProcessor) Name() string

func (*BaseProcessor) Out

func (p *BaseProcessor) Out() *OutPort

func (*BaseProcessor) Pause

func (p *BaseProcessor) Pause()

func (*BaseProcessor) Resume

func (p *BaseProcessor) Resume()

func (*BaseProcessor) SetContext

func (p *BaseProcessor) SetContext(ctx context.Context)

func (*BaseProcessor) Subscribe

func (p *BaseProcessor) Subscribe(eventHandlers ...EventHandler)

func (*BaseProcessor) To

func (p *BaseProcessor) To(receivers ...IProcessor)

type DoneFunc

type DoneFunc func()

type EventHandler

type EventHandler interface{}

type IProcessor

type IProcessor interface {

	// Name 处理器的名称
	Name() string

	// Pause TODO 暂停?啥意思?先不处理了吗?
	Pause()

	// Resume 恢复?听起来像是跟上面那个方法对应的...
	Resume()

	// Execute 当前处理器的执行逻辑,不同的处理器的主要区别就是在这里
	Execute()

	// In 返回当前处理器的输入流
	In() *InPort

	// Out 返回当前处理器的输出流
	Out() *OutPort

	// Duration 当前处理器持续执行了多长时间了
	Duration() time.Duration

	To(...IProcessor)

	From(...IProcessor)

	SetContext(context.Context)
}

IProcessor 处理器?干嘛的处理啊???我好懵...

func NewMockAddTransform

func NewMockAddTransform(name string) IProcessor

func NewMockMultiTransform

func NewMockMultiTransform(name string) IProcessor

func NewMockSleepTransform

func NewMockSleepTransform(name string, ms int) IProcessor

func NewSink

func NewSink(name string) IProcessor

func NewSource

func NewSource(name string) IProcessor

type InPort

type InPort struct {
	// contains filtered or unexported fields
}

InPort 用来表示一个输入流,给Processor提供数据输入

func NewInPort

func NewInPort(name string) *InPort

func (*InPort) AddEdge

func (pt *InPort) AddEdge(rpt *OutPort)

func (*InPort) Close

func (pt *InPort) Close()

func (*InPort) From

func (pt *InPort) From(rpt *OutPort)

func (*InPort) Name

func (pt *InPort) Name() string

func (*InPort) Recv

func (pt *InPort) Recv() <-chan interface{}

Recv 从输入流中读取数据

func (*InPort) Send

func (pt *InPort) Send(v interface{})

Send 往输入流中写入数据

type MockAddTransform

type MockAddTransform struct {
	BaseProcessor
}

func (*MockAddTransform) Execute

func (p *MockAddTransform) Execute()

type MockMultiTransform

type MockMultiTransform struct {
	BaseProcessor
}

func (*MockMultiTransform) Execute

func (p *MockMultiTransform) Execute()

type MockSleepTransform

type MockSleepTransform struct {
	BaseProcessor
	// contains filtered or unexported fields
}

func (*MockSleepTransform) Execute

func (p *MockSleepTransform) Execute()

type NextFunc

type NextFunc func(interface{})

type OutPort

type OutPort struct {
	// contains filtered or unexported fields
}

func NewOutPort

func NewOutPort(name string) *OutPort

func (*OutPort) AddEdge

func (pt *OutPort) AddEdge(rpt *InPort)

func (*OutPort) Close

func (pt *OutPort) Close()

func (*OutPort) IsClose

func (pt *OutPort) IsClose() bool

func (*OutPort) Name

func (pt *OutPort) Name() string

func (*OutPort) Send

func (pt *OutPort) Send(v interface{})

func (*OutPort) To

func (pt *OutPort) To(rpt *InPort)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(ctx context.Context) *Pipeline

func (*Pipeline) Add

func (pipeline *Pipeline) Add(proc IProcessor) *Pipeline

func (*Pipeline) Last

func (pipeline *Pipeline) Last() IProcessor

func (*Pipeline) Out

func (pipeline *Pipeline) Out() <-chan interface{}

func (*Pipeline) Pause

func (pipeline *Pipeline) Pause()

func (*Pipeline) Resume

func (pipeline *Pipeline) Resume()

func (*Pipeline) Run

func (pipeline *Pipeline) Run()

func (*Pipeline) String

func (pipeline *Pipeline) String() string

func (*Pipeline) Wait

func (pipeline *Pipeline) Wait(f func(v interface{}) error) error

type Sink

type Sink struct {
	BaseProcessor
}

Sink 用于输出数据的

func (*Sink) In

func (p *Sink) In() *InPort

type Source

type Source struct {
	BaseProcessor
}

Source 表示一个数据源

func (*Source) Out

func (p *Source) Out() *OutPort

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL