flow

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Keygen

func Keygen(prefix string) func() string

Types

type Plan added in v0.3.0

type Plan struct {
	// contains filtered or unexported fields
}

func New added in v0.3.0

func New(opt ...glow.NetworkOpt) *Plan

func (*Plan) Draw added in v0.3.0

func (p *Plan) Draw(name string) *Plan

func (*Plan) Error added in v0.3.0

func (p *Plan) Error() error

Error retrieves any error that occurred during the building and execution of the pipeline.

func (*Plan) Run added in v0.3.0

func (p *Plan) Run(ctx context.Context) *Plan

func (*Plan) Step added in v0.3.0

func (p *Plan) Step(opt ...StepOpt) *Plan

func (*Plan) Stop added in v0.3.0

func (p *Plan) Stop() *Plan

func (*Plan) Uptime added in v0.3.0

func (p *Plan) Uptime(uf func(d time.Duration)) *Plan

type Seq

type Seq struct {
	// contains filtered or unexported fields
}

func Sequential

func Sequential(opt ...glow.NetworkOpt) *Seq

func (*Seq) Capture

func (s *Seq) Capture(cf func(ctx context.Context, in any) error, opt ...StepOpt) *Seq

func (*Seq) Collect

func (s *Seq) Collect(cb func([]any), compare func(a any, b any) int, opt ...StepOpt) *Seq

func (*Seq) Combine

func (s *Seq) Combine(opt ...StepOpt) *Seq

func (*Seq) Count

func (s *Seq) Count(cb func(num int), opt ...StepOpt) *Seq

func (*Seq) Draw

func (s *Seq) Draw(name string) *Seq

func (*Seq) Error

func (s *Seq) Error() error

func (*Seq) Filter

func (s *Seq) Filter(ff func(in any) bool, opt ...StepOpt) *Seq

func (*Seq) Map

func (s *Seq) Map(mf func(ctx context.Context, in any, emit func(any)) error, opt ...StepOpt) *Seq

func (*Seq) Peek

func (s *Seq) Peek(pf func(in any), opt ...StepOpt) *Seq

func (*Seq) Read

func (s *Seq) Read(rf func(ctx context.Context, emit func(any)) error, opt ...StepOpt) *Seq

func (*Seq) Run

func (s *Seq) Run(ctx context.Context) *Seq

func (*Seq) Stop added in v0.0.2

func (s *Seq) Stop() *Seq

func (*Seq) Uptime

func (s *Seq) Uptime(uf func(d time.Duration)) *Seq

type Step

type Step struct {
	// contains filtered or unexported fields
}

type StepKind

type StepKind int
const (
	ReadStep StepKind = iota
	MapStep
	FilterStep
	CaptureStep
	CollectStep
	CountStep
	PeekStep
	CombineStep
)

func (StepKind) String

func (k StepKind) String() string

type StepOpt added in v0.1.0

type StepOpt func(*stepOpts)

func Capture added in v0.3.0

func Capture(cf func(ctx context.Context, in any) error) StepOpt

Capture captures each element in the input data stream and feeds it to a capturing function. The capturing function receives a context and captured data point. Being a terminal step in the pipeline, it does not emit data.

func Collect added in v0.3.0

func Collect(cb func([]any), compare func(a any, b any) int) StepOpt

Collect aggregates all elements in the input data stream and provides the collection to the provided callback. It accepts the Compare function to allow sorting the collected data points as they arrive. Compare function must return

< 0 if a is less than b,
= 0 if a equals b,
> 0 if a is greater than b.

As a terminal step in the pipeline, it does not emit data, marking the end of the data processing flow.

func Combine added in v0.3.0

func Combine() StepOpt

Combine merges the elements of multiple streams into a single stream, concatenating all the data points from the individual streams in the order they arrive. This enforces the flow to become linear, ensuring that each data point is processed sequentially. The combined stream can then be broadcast or distributed.

func Connection added in v0.3.0

func Connection(key ...string) StepOpt

Connection sets up a connection between a Step and the Steps identified by the provided key(s). The provided keys represent upstream steps, enabling data to flow from these Steps to the current Step. Upstream steps can either distribute or broadcast data.

func Count added in v0.3.0

func Count(cb func(num int)) StepOpt

Count keeps track of the number of elements in the input data stream. Being a terminal step in the pipeline, it does not emit data.

func Distributor

func Distributor() StepOpt

Distributor enables a Step to distribute work among next Step, and it's replicas. See

  • Replicas

func Filter added in v0.3.0

func Filter(ff func(in any) bool) StepOpt

Filter applies a filtering function to each element in the input data stream. The filtering function is invoked with an input element and returns a boolean indicating whether the element should be retained or not. If the filtering function returns true, the element is passed through to the output stream; otherwise, it is discarded. This step serves as an intermediate step facilitating data filtering operations.

func Map added in v0.3.0

func Map(mf func(ctx context.Context, in any, emit func(any)) error) StepOpt

Map applies a mapping function to each element in the input data stream. The mapper function is invoked with a context, an input element, and an emit function. It processes each input element and emits zero or more transformed data points using the 'emit' function. The emitted data can be of any type. Typically, this step is an intermediate step enabling data transformation operations.

func Peek added in v0.3.0

func Peek(pf func(in any)) StepOpt

Peek allows observing the data stream without modifying it, typically for debugging, logging, or monitoring purposes.

func Read added in v0.3.0

func Read(rf func(context.Context, func(any)) error) StepOpt

Read retrieves data from a specified source using a provided reader function. The reader function is called with a context and an emit function, responsible for reading data and emitting it. The emitted data can be of any type. Usually, this is the first step, feeding data for processing on to subsequent steps.

func Replicas added in v0.3.0

func Replicas(v int) StepOpt

Replicas sets the number of replicas for the Step, determining how many instances of the Step will run concurrently. Depending on whether the preceding Step is in distributing or broadcasting mode, these replicas will either operate in a synchronized manner, collaborating on the same stream of data, or function independently, each handling the full stream.

func StepKey added in v0.2.0

func StepKey(v string) StepOpt

StepKey sets unique key for the Step.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL