operator

package
v1.1.0-beta.0...-6463db6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Compose

func Compose[T any](op1 WithSink[T], op2 WithSource[T])

Compose sets the sink of op1 and the source of op2.

Types

type AsyncOperator

type AsyncOperator[T workerpool.TaskMayPanic, R any] struct {
	// contains filtered or unexported fields
}

AsyncOperator process the data in async way.

Eg: The sink of AsyncOperator op1 and the source of op2 use the same channel, Then op2's worker will handle the result from op1.

func NewAsyncOperator

func NewAsyncOperator[T workerpool.TaskMayPanic, R any](ctx context.Context, pool *workerpool.WorkerPool[T, R]) *AsyncOperator[T, R]

NewAsyncOperator create an AsyncOperator.

func NewAsyncOperatorWithTransform

func NewAsyncOperatorWithTransform[T workerpool.TaskMayPanic, R any](
	ctx context.Context,
	name string,
	workerNum int,
	transform func(T) R,
) *AsyncOperator[T, R]

NewAsyncOperatorWithTransform create an AsyncOperator with a transform function.

func (*AsyncOperator[T, R]) Close

func (c *AsyncOperator[T, R]) Close() error

Close implements the Operator's Close interface.

func (*AsyncOperator[T, R]) GetWorkerPoolSize

func (c *AsyncOperator[T, R]) GetWorkerPoolSize() int32

GetWorkerPoolSize returns the worker pool size.

func (*AsyncOperator[T, R]) Open

func (c *AsyncOperator[T, R]) Open() error

Open implements the Operator's Open interface.

func (*AsyncOperator[T, R]) SetSink

func (c *AsyncOperator[T, R]) SetSink(ch DataChannel[R])

SetSink set the sink channel.

func (*AsyncOperator[T, R]) SetSource

func (c *AsyncOperator[T, R]) SetSource(ch DataChannel[T])

SetSource set the source channel.

func (*AsyncOperator[T, R]) String

func (*AsyncOperator[T, R]) String() string

String show the name.

func (*AsyncOperator[T, R]) TuneWorkerPoolSize

func (c *AsyncOperator[T, R]) TuneWorkerPoolSize(workerNum int32)

TuneWorkerPoolSize tunes the worker pool size.

type AsyncPipeline

type AsyncPipeline struct {
	// contains filtered or unexported fields
}

AsyncPipeline wraps a list of Operators. The dataflow is from the first operator to the last operator.

func NewAsyncPipeline

func NewAsyncPipeline(ops ...Operator) *AsyncPipeline

NewAsyncPipeline creates a new AsyncPipeline.

func (*AsyncPipeline) Close

func (p *AsyncPipeline) Close() error

Close waits all tasks done.

func (*AsyncPipeline) Execute

func (p *AsyncPipeline) Execute() error

Execute opens all operators, it's run asynchronously.

func (*AsyncPipeline) GetLocalIngestModeReaderAndWriter

func (p *AsyncPipeline) GetLocalIngestModeReaderAndWriter() (operator1, operator2 Operator)

GetLocalIngestModeReaderAndWriter returns the reader and writer in the local ingest mode.

func (*AsyncPipeline) String

func (p *AsyncPipeline) String() string

String shows the pipeline.

type DataChannel

type DataChannel[T any] interface {
	Channel() chan T
	Finish()
}

DataChannel is a channel that can be used to transfer data between operators.

type Operator

type Operator interface {
	Open() error
	// Close wait task done and close the operator.
	// TODO: the wait part should be separated from the close part.
	Close() error
	String() string
}

Operator is the basic operation unit in the task execution.

type SimpleDataChannel

type SimpleDataChannel[T any] struct {
	// contains filtered or unexported fields
}

SimpleDataChannel is a simple implementation of DataChannel.

func NewSimpleDataChannel

func NewSimpleDataChannel[T any](ch chan T) *SimpleDataChannel[T]

NewSimpleDataChannel creates a new SimpleDataChannel.

func (*SimpleDataChannel[T]) Channel

func (s *SimpleDataChannel[T]) Channel() chan T

Channel returns the underlying channel of the SimpleDataChannel.

func (*SimpleDataChannel[T]) Finish

func (s *SimpleDataChannel[T]) Finish()

Finish closes the underlying channel of the SimpleDataChannel.

type WithSink

type WithSink[T any] interface {
	// SetSink sets the sink of the operator.
	// Operator implementations should call the Finish method of the sink when they are done.
	SetSink(channel DataChannel[T])
}

WithSink is an interface that can be used to set the sink of an operator.

type WithSource

type WithSource[T any] interface {
	SetSource(channel DataChannel[T])
}

WithSource is an interface that can be used to set the source of an operator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL