processors

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator struct {
	AggregatorFunc AggregatorFunc
	Store          string

	topology.DefaultNode
	// contains filtered or unexported fields
}

func (*Aggregator) Build

func (*Aggregator) Init

func (f *Aggregator) Init(ctx topology.NodeContext) error

func (*Aggregator) Name

func (f *Aggregator) Name() string

func (*Aggregator) ReadsFrom

func (f *Aggregator) ReadsFrom() []string

func (*Aggregator) Run

func (f *Aggregator) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*Aggregator) Type

func (f *Aggregator) Type() topology.Type

func (*Aggregator) WritesAt

func (f *Aggregator) WritesAt() []string

type AggregatorFunc

type AggregatorFunc func(ctx context.Context, key, value, previous interface{}) (newAgg interface{}, err error)

type Branch

type Branch struct {
	topology.DefaultNode
}

func (*Branch) Build

func (*Branch) New

func (*Branch) Run

func (b *Branch) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*Branch) Type

func (b *Branch) Type() topology.Type

type BranchChild

type BranchChild struct {
	Name      string
	Predicate BranchPredicate

	topology.DefaultNode
}

func (*BranchChild) Build

func (*BranchChild) Run

func (b *BranchChild) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*BranchChild) Type

func (b *BranchChild) Type() topology.Type

type BranchDetails

type BranchDetails struct {
	Name      string
	Predicate BranchPredicate
}

type BranchPredicate

type BranchPredicate func(ctx context.Context, key interface{}, val interface{}) (bool, error)

type Each

type Each struct {
	EachFunc EachFunc

	topology.DefaultNode
}

func (*Each) Build

func (*Each) Name

func (f *Each) Name() string

func (*Each) Run

func (f *Each) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*Each) Type

func (f *Each) Type() topology.Type

type EachFunc

type EachFunc func(ctx context.Context, key, value interface{})

type Filter

type Filter struct {
	FilterFunc FilterFunc

	topology.DefaultNode
}

func (*Filter) Build

func (*Filter) Name

func (f *Filter) Name() string

func (*Filter) Run

func (f *Filter) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*Filter) Type

func (f *Filter) Type() topology.Type

type FilterFunc

type FilterFunc func(ctx context.Context, key, value interface{}) (bool, error)

type FlatMap

type FlatMap struct {
	FlatMapFunc FlatMapFunc

	topology.DefaultNode
}

func (*FlatMap) Build

func (*FlatMap) Name

func (f *FlatMap) Name() string

func (*FlatMap) Run

func (f *FlatMap) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*FlatMap) Type

func (f *FlatMap) Type() topology.Type

type FlatMapFunc

type FlatMapFunc func(ctx context.Context, key, value interface{}) ([]topology.KeyValPair, error)

type FlatMapValues

type FlatMapValues struct {
	FlatMapValuesFunc FlatMapValuesFunc

	topology.DefaultNode
}

func (*FlatMapValues) Build

func (*FlatMapValues) Name

func (f *FlatMapValues) Name() string

func (*FlatMapValues) Run

func (f *FlatMapValues) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*FlatMapValues) Type

func (f *FlatMapValues) Type() topology.Type

type FlatMapValuesFunc

type FlatMapValuesFunc func(ctx context.Context, key, value interface{}) ([]interface{}, error)

type GlobalTableJoiner

type GlobalTableJoiner struct {
	Store              string
	KeyMapper          KeyMapper
	ValueMapper        JoinValueMapper
	JoinType           JoinerType
	RightKeyLookupFunc ValueLookupFunc

	topology.DefaultNode
	// contains filtered or unexported fields
}

func (*GlobalTableJoiner) Build

func (*GlobalTableJoiner) Init

func (*GlobalTableJoiner) Join

func (j *GlobalTableJoiner) Join(ctx context.Context, key, leftVal interface{}) (joinedVal interface{}, err error)

func (*GlobalTableJoiner) ReadsFrom

func (j *GlobalTableJoiner) ReadsFrom() []string

func (*GlobalTableJoiner) Run

func (j *GlobalTableJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*GlobalTableJoiner) StateType

func (j *GlobalTableJoiner) StateType() topology.StateType

func (*GlobalTableJoiner) Type

func (j *GlobalTableJoiner) Type() topology.Type

type JoinValueMapper

type JoinValueMapper func(ctx context.Context, left, right interface{}) (joined interface{}, err error)

type JoinerType

type JoinerType int

JoinerType represents a supported join type eg: LeftJoin, RightJoin, InnerJoin.

const (
	LeftJoin JoinerType = iota
	RightJoin
	InnerJoin
	OuterJoin
)

func (JoinerType) String

func (jt JoinerType) String() string

type KeyMapper

type KeyMapper func(ctx context.Context, key, value interface{}) (mappedKey interface{}, err error)

type KeySelector

type KeySelector struct {
	SelectKeyFunc SelectKeyFunc

	topology.DefaultNode
}

func (*KeySelector) Build

func (*KeySelector) Run

func (ks *KeySelector) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*KeySelector) Type

func (ks *KeySelector) Type() topology.Type

type Map

type Map struct {
	MapperFunc MapperFunc
	topology.DefaultNode
}

func (*Map) Build

func (t *Map) Build(ctx topology.SubTopologyContext) (topology.Node, error)

func (*Map) Name

func (t *Map) Name() string

func (*Map) Run

func (t *Map) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*Map) Type

func (t *Map) Type() topology.Type

type MapValueFunc

type MapValueFunc func(ctx context.Context, key, value interface{}) (vOut interface{}, err error)

type MapperFunc

type MapperFunc func(ctx context.Context, key, value interface{}) (kOut, vOut interface{}, err error)

type Merger

type Merger struct {
	topology.DefaultNode
}

func (*Merger) Build

func (*Merger) Run

func (m *Merger) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*Merger) Type

func (m *Merger) Type() topology.Type

type SelectKeyFunc

type SelectKeyFunc func(ctx context.Context, key, value interface{}) (kOut interface{}, err error)

type Side

type Side int

Side represents the current side of the join eg: Left, Right.

const (
	LeftSide Side = iota
	RightSide
)

func (Side) String

func (jt Side) String() string

type StreamConverter

type StreamConverter struct {
	topology.DefaultNode
}

func (*StreamConverter) Build

func (*StreamConverter) Run

func (j *StreamConverter) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*StreamConverter) Type

func (j *StreamConverter) Type() topology.Type

type StreamJoiner

type StreamJoiner struct {
	OtherSideRequired   bool
	CurrentSide         Side
	OtherStoreName      string
	ValueMapper         JoinValueMapper
	OtherSideLookupFunc ValueLookupFunc
	OtherSideFilters    []FilterFunc

	topology.DefaultNode
	// contains filtered or unexported fields
}

func (*StreamJoiner) Build

func (*StreamJoiner) Init

func (sj *StreamJoiner) Init(ctx topology.NodeContext) error

func (*StreamJoiner) Name

func (sj *StreamJoiner) Name() string

func (*StreamJoiner) ReadsFrom

func (sj *StreamJoiner) ReadsFrom() []string

func (*StreamJoiner) Run

func (sj *StreamJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*StreamJoiner) Type

func (sj *StreamJoiner) Type() topology.Type

type StreamJoinerOption

type StreamJoinerOption func(joiner *StreamJoiner)

type TableConverter

type TableConverter struct {
	Store string

	topology.DefaultNode
	// contains filtered or unexported fields
}

func (*TableConverter) Build

func (*TableConverter) Init

func (*TableConverter) Run

func (j *TableConverter) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*TableConverter) Type

func (j *TableConverter) Type() topology.Type

func (*TableConverter) WritesAt

func (j *TableConverter) WritesAt() []string

type ValueLookupFunc

type ValueLookupFunc func(ctx context.Context,
	store stores.ReadOnlyStore, key, leftVal interface{}) (interface{}, error)

type ValueMapper

type ValueMapper struct {
	MapValueFunc MapValueFunc
	topology.DefaultNode
}

func (*ValueMapper) Build

func (*ValueMapper) Run

func (vt *ValueMapper) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*ValueMapper) Type

func (vt *ValueMapper) Type() topology.Type

type WindowJoiner

type WindowJoiner struct {
	OtherSideRequired         bool
	CurrentSide               Side
	Duration                  time.Duration
	StoreName, OtherStoreName string
	ValueMapper               JoinValueMapper

	topology.DefaultNode
	// contains filtered or unexported fields
}

func (*WindowJoiner) Build

func (*WindowJoiner) Init

func (sj *WindowJoiner) Init(ctx topology.NodeContext) error

func (*WindowJoiner) Name

func (sj *WindowJoiner) Name() string

func (*WindowJoiner) ReadsFrom

func (sj *WindowJoiner) ReadsFrom() []string

func (*WindowJoiner) Run

func (sj *WindowJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*WindowJoiner) Type

func (sj *WindowJoiner) Type() topology.Type

func (*WindowJoiner) WritesAt

func (sj *WindowJoiner) WritesAt() []string

type WindowJoinerOption

type WindowJoinerOption func(joiner *WindowJoiner)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL