Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Compose ¶
func Compose[T any](op1 WithSink[T], op2 WithSource[T])
Compose sets the sink of op1 and the source of op2.
Types ¶
type AsyncOperator ¶
type AsyncOperator[T, R any] struct { // contains filtered or unexported fields }
AsyncOperator process the data in async way.
Eg: The sink of AsyncOperator op1 and the source of op2 use the same channel, Then op2's worker will handle the result from op1.
func NewAsyncOperator ¶
func NewAsyncOperator[T, R any](ctx context.Context, pool *workerpool.WorkerPool[T, R]) *AsyncOperator[T, R]
NewAsyncOperator create an AsyncOperator.
func NewAsyncOperatorWithTransform ¶
func NewAsyncOperatorWithTransform[T, R any]( ctx context.Context, name string, workerNum int, transform func(T) R, ) *AsyncOperator[T, R]
NewAsyncOperatorWithTransform create an AsyncOperator with a transform function.
func (*AsyncOperator[T, R]) Close ¶
func (c *AsyncOperator[T, R]) Close() error
Close implements the Operator's Close interface.
func (*AsyncOperator[T, R]) Open ¶
func (c *AsyncOperator[T, R]) Open() error
Open implements the Operator's Open interface.
func (*AsyncOperator[T, R]) SetSink ¶
func (c *AsyncOperator[T, R]) SetSink(ch DataChannel[R])
SetSink set the sink channel.
func (*AsyncOperator[T, R]) SetSource ¶
func (c *AsyncOperator[T, R]) SetSource(ch DataChannel[T])
SetSource set the source channel.
func (*AsyncOperator[T, R]) String ¶
func (*AsyncOperator[T, R]) String() string
String show the name.
type AsyncPipeline ¶
type AsyncPipeline struct {
// contains filtered or unexported fields
}
AsyncPipeline wraps a list of Operators. The dataflow is from the first operator to the last operator.
func NewAsyncPipeline ¶
func NewAsyncPipeline(ops ...Operator) *AsyncPipeline
NewAsyncPipeline creates a new AsyncPipeline.
func (*AsyncPipeline) Execute ¶
func (p *AsyncPipeline) Execute() error
Execute opens all operators, it's run asynchronously.
type DataChannel ¶
type DataChannel[T any] interface { Channel() chan T Finish() }
DataChannel is a channel that can be used to transfer data between operators.
type Operator ¶
type Operator interface { Open() error // Close wait task done and close the operator. // TODO: the wait part should be separated from the close part. Close() error String() string }
Operator is the basic operation unit in the task execution.
type SimpleDataChannel ¶
type SimpleDataChannel[T any] struct { // contains filtered or unexported fields }
SimpleDataChannel is a simple implementation of DataChannel.
func NewSimpleDataChannel ¶
func NewSimpleDataChannel[T any](ch chan T) *SimpleDataChannel[T]
NewSimpleDataChannel creates a new SimpleDataChannel.
func (*SimpleDataChannel[T]) Channel ¶
func (s *SimpleDataChannel[T]) Channel() chan T
Channel returns the underlying channel of the SimpleDataChannel.
func (*SimpleDataChannel[T]) Finish ¶
func (s *SimpleDataChannel[T]) Finish()
Finish closes the underlying channel of the SimpleDataChannel.
type WithSink ¶
type WithSink[T any] interface { // SetSink sets the sink of the operator. // Operator implementations should call the Finish method of the sink when they are done. SetSink(channel DataChannel[T]) }
WithSink is an interface that can be used to set the sink of an operator.
type WithSource ¶
type WithSource[T any] interface { SetSource(channel DataChannel[T]) }
WithSource is an interface that can be used to set the source of an operator.