pipeline

package
v0.0.0-...-34e0b2d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyPipeline     = errors.New("EmptyPipeline")
	ErrNilPosition       = errors.New("SeekToNilPosition")
	ErrRegisterDispather = errors.New("FailToRegisterDispather")
)

Functions

func NewNodeCtx

func NewNodeCtx(node Node) *nodeCtx

func WrapErrRegDispather

func WrapErrRegDispather(err error) error

Types

type BaseNode

type BaseNode struct {
	// contains filtered or unexported fields
}

func NewBaseNode

func NewBaseNode(name string, maxQueryLength int32) *BaseNode

func (*BaseNode) MaxQueueLength

func (node *BaseNode) MaxQueueLength() int32

length of pipeline input chnnel

func (*BaseNode) Name

func (node *BaseNode) Name() string

Return name of Node

type Msg

type Msg interface{}

type Node

type Node interface {
	Name() string
	MaxQueueLength() int32
	Operate(in Msg) Msg
}

type Pipeline

type Pipeline interface {
	Add(node ...Node)
	Start() error
	Close()
}

type StreamPipeline

type StreamPipeline interface {
	Pipeline
	ConsumeMsgStream(position *msgpb.MsgPosition) error
}

func NewPipelineWithStream

func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval time.Duration, enableTtChecker bool, vChannel string) StreamPipeline

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL