pipeline

package
v0.0.0-...-af67c94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2021 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNilOutput = errors.New("source step output is nil")
	ErrTooFew    = errors.New("too few steps")
)
View Source
var ErrDrop = errors.New("dropped data")
View Source
var ErrEOS = errors.New("end of stream")

Functions

func Register

func Register(name string, builder StepBuilder) error

func StepMakeChan

func StepMakeChan(o plugin.Options) chan Data

func StepSend

func StepSend(ctx context.Context, o plugin.Options, out chan Data, data Data) error

func Unregister

func Unregister(name string) error

Types

type Data

type Data = store.Store

func StepReceive

func StepReceive(ctx context.Context, o plugin.Options, in <-chan Data) (Data, error)

type MarshallableStep

type MarshallableStep interface {
	Step
	Size() int
	MarshalToSizedBuffer([]byte) (int, error)
	Unmarshal([]byte) (Step, error)
}

type NamedStep

type NamedStep interface {
	Name() string
}

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func FromJSON

func FromJSON(json string) Pipeline

func New

func New() Pipeline

func (*Pipeline) AddOrFindStep

func (p *Pipeline) AddOrFindStep(step Step) (int, error)

func (*Pipeline) AddStep

func (p *Pipeline) AddStep(step Step) (int, error)

func (*Pipeline) ConnectSteps

func (p *Pipeline) ConnectSteps(steps ...Step) (<-chan Data, error)

func (*Pipeline) ConnectStepsById

func (p *Pipeline) ConnectStepsById(ids ...int) (<-chan Data, error)

func (*Pipeline) FindStep

func (p *Pipeline) FindStep(step Step) (int, error)

func (*Pipeline) GetStep

func (p *Pipeline) GetStep(id int) (Step, bool)

func (*Pipeline) Reset

func (p *Pipeline) Reset() ([]<-chan Data, error)

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context)

type Step

type Step interface {
	In(<-chan Data)
	Out() <-chan Data
	Run(ctx context.Context)
	Reset()
}

func NewStep

func NewStep(name string, opts ...options.Option) (Step, error)

type StepBuilder

type StepBuilder func(opts ...options.Option) (Step, error)

type StepConfigurator

type StepConfigurator interface {
	Config() interface{}
	SetConfig(opts ...options.Option)
}

Directories

Path Synopsis
fps

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL