Documentation ¶
Index ¶
- type Aggregator
- func (f *Aggregator) Build(ctx topology.SubTopologyContext) (topology.Node, error)
- func (f *Aggregator) Init(ctx topology.NodeContext) error
- func (f *Aggregator) Name() string
- func (f *Aggregator) ReadsFrom() []string
- func (f *Aggregator) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
- func (f *Aggregator) Type() topology.Type
- func (f *Aggregator) WritesAt() []string
- type AggregatorFunc
- type Branch
- func (b *Branch) Build(ctx topology.SubTopologyContext) (topology.Node, error)
- func (b *Branch) New(ctx topology.SubTopologyContext) (topology.Node, error)
- func (b *Branch) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (b *Branch) Type() topology.Type
- type BranchChild
- type BranchDetails
- type BranchPredicate
- type Each
- type EachFunc
- type Filter
- type FilterFunc
- type FlatMap
- type FlatMapFunc
- type FlatMapValues
- type FlatMapValuesFunc
- type GlobalTableJoiner
- func (j *GlobalTableJoiner) Build(_ topology.SubTopologyContext) (topology.Node, error)
- func (j *GlobalTableJoiner) Init(ctx topology.NodeContext) error
- func (j *GlobalTableJoiner) Join(ctx context.Context, key, leftVal interface{}) (joinedVal interface{}, err error)
- func (j *GlobalTableJoiner) ReadsFrom() []string
- func (j *GlobalTableJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
- func (j *GlobalTableJoiner) StateType() topology.StateType
- func (j *GlobalTableJoiner) Type() topology.Type
- type JoinValueMapper
- type JoinerType
- type KeyMapper
- type KeySelector
- type Map
- type MapValueFunc
- type MapperFunc
- type Merger
- type SelectKeyFunc
- type Side
- type StreamConverter
- type StreamJoiner
- func (sj *StreamJoiner) Build(_ topology.SubTopologyContext) (topology.Node, error)
- func (sj *StreamJoiner) Init(ctx topology.NodeContext) error
- func (sj *StreamJoiner) Name() string
- func (sj *StreamJoiner) ReadsFrom() []string
- func (sj *StreamJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (sj *StreamJoiner) Type() topology.Type
- type StreamJoinerOption
- type TableConverter
- func (j *TableConverter) Build(ctx topology.SubTopologyContext) (topology.Node, error)
- func (j *TableConverter) Init(ctx topology.NodeContext) error
- func (j *TableConverter) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (j *TableConverter) Type() topology.Type
- func (j *TableConverter) WritesAt() []string
- type ValueLookupFunc
- type ValueMapper
- type WindowJoiner
- func (sj *WindowJoiner) Build(ctx topology.SubTopologyContext) (topology.Node, error)
- func (sj *WindowJoiner) Init(ctx topology.NodeContext) error
- func (sj *WindowJoiner) Name() string
- func (sj *WindowJoiner) ReadsFrom() []string
- func (sj *WindowJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (sj *WindowJoiner) Type() topology.Type
- func (sj *WindowJoiner) WritesAt() []string
- type WindowJoinerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Aggregator ¶
type Aggregator struct { AggregatorFunc AggregatorFunc Store string topology.DefaultNode // contains filtered or unexported fields }
func (*Aggregator) Build ¶
func (f *Aggregator) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*Aggregator) Init ¶
func (f *Aggregator) Init(ctx topology.NodeContext) error
func (*Aggregator) Name ¶
func (f *Aggregator) Name() string
func (*Aggregator) ReadsFrom ¶
func (f *Aggregator) ReadsFrom() []string
func (*Aggregator) Run ¶
func (f *Aggregator) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
func (*Aggregator) Type ¶
func (f *Aggregator) Type() topology.Type
func (*Aggregator) WritesAt ¶
func (f *Aggregator) WritesAt() []string
type AggregatorFunc ¶
type Branch ¶
type Branch struct {
topology.DefaultNode
}
type BranchChild ¶
type BranchChild struct { Name string Predicate BranchPredicate topology.DefaultNode }
func (*BranchChild) Build ¶
func (b *BranchChild) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*BranchChild) Run ¶
func (b *BranchChild) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*BranchChild) Type ¶
func (b *BranchChild) Type() topology.Type
type BranchDetails ¶
type BranchDetails struct { Name string Predicate BranchPredicate }
type BranchPredicate ¶
type Filter ¶
type Filter struct { FilterFunc FilterFunc topology.DefaultNode }
type FlatMap ¶
type FlatMap struct { FlatMapFunc FlatMapFunc topology.DefaultNode }
type FlatMapFunc ¶
type FlatMapFunc func(ctx context.Context, key, value interface{}) ([]topology.KeyValPair, error)
type FlatMapValues ¶
type FlatMapValues struct { FlatMapValuesFunc FlatMapValuesFunc topology.DefaultNode }
func (*FlatMapValues) Build ¶
func (f *FlatMapValues) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*FlatMapValues) Name ¶
func (f *FlatMapValues) Name() string
func (*FlatMapValues) Run ¶
func (f *FlatMapValues) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
func (*FlatMapValues) Type ¶
func (f *FlatMapValues) Type() topology.Type
type FlatMapValuesFunc ¶
type GlobalTableJoiner ¶
type GlobalTableJoiner struct { Store string KeyMapper KeyMapper ValueMapper JoinValueMapper JoinType JoinerType RightKeyLookupFunc ValueLookupFunc topology.DefaultNode // contains filtered or unexported fields }
func (*GlobalTableJoiner) Build ¶
func (j *GlobalTableJoiner) Build(_ topology.SubTopologyContext) (topology.Node, error)
func (*GlobalTableJoiner) Init ¶
func (j *GlobalTableJoiner) Init(ctx topology.NodeContext) error
func (*GlobalTableJoiner) Join ¶
func (j *GlobalTableJoiner) Join(ctx context.Context, key, leftVal interface{}) (joinedVal interface{}, err error)
func (*GlobalTableJoiner) ReadsFrom ¶
func (j *GlobalTableJoiner) ReadsFrom() []string
func (*GlobalTableJoiner) Run ¶
func (j *GlobalTableJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
func (*GlobalTableJoiner) StateType ¶
func (j *GlobalTableJoiner) StateType() topology.StateType
func (*GlobalTableJoiner) Type ¶
func (j *GlobalTableJoiner) Type() topology.Type
type JoinValueMapper ¶
type JoinerType ¶
type JoinerType int
JoinerType represents a supported join type eg: LeftJoin, RightJoin, InnerJoin.
const ( LeftJoin JoinerType = iota RightJoin InnerJoin OuterJoin )
func (JoinerType) String ¶
func (jt JoinerType) String() string
type KeySelector ¶
type KeySelector struct { SelectKeyFunc SelectKeyFunc topology.DefaultNode }
func (*KeySelector) Build ¶
func (ks *KeySelector) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*KeySelector) Run ¶
func (ks *KeySelector) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*KeySelector) Type ¶
func (ks *KeySelector) Type() topology.Type
type MapValueFunc ¶
type MapperFunc ¶
type Merger ¶
type Merger struct {
topology.DefaultNode
}
type SelectKeyFunc ¶
type StreamConverter ¶
type StreamConverter struct {
topology.DefaultNode
}
func (*StreamConverter) Build ¶
func (j *StreamConverter) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*StreamConverter) Run ¶
func (j *StreamConverter) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*StreamConverter) Type ¶
func (j *StreamConverter) Type() topology.Type
type StreamJoiner ¶
type StreamJoiner struct { OtherSideRequired bool CurrentSide Side OtherStoreName string ValueMapper JoinValueMapper OtherSideLookupFunc ValueLookupFunc OtherSideFilters []FilterFunc topology.DefaultNode // contains filtered or unexported fields }
func (*StreamJoiner) Build ¶
func (sj *StreamJoiner) Build(_ topology.SubTopologyContext) (topology.Node, error)
func (*StreamJoiner) Init ¶
func (sj *StreamJoiner) Init(ctx topology.NodeContext) error
func (*StreamJoiner) Name ¶
func (sj *StreamJoiner) Name() string
func (*StreamJoiner) ReadsFrom ¶
func (sj *StreamJoiner) ReadsFrom() []string
func (*StreamJoiner) Run ¶
func (sj *StreamJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*StreamJoiner) Type ¶
func (sj *StreamJoiner) Type() topology.Type
type StreamJoinerOption ¶
type StreamJoinerOption func(joiner *StreamJoiner)
type TableConverter ¶
type TableConverter struct { Store string topology.DefaultNode // contains filtered or unexported fields }
func (*TableConverter) Build ¶
func (j *TableConverter) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*TableConverter) Init ¶
func (j *TableConverter) Init(ctx topology.NodeContext) error
func (*TableConverter) Run ¶
func (j *TableConverter) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*TableConverter) Type ¶
func (j *TableConverter) Type() topology.Type
func (*TableConverter) WritesAt ¶
func (j *TableConverter) WritesAt() []string
type ValueLookupFunc ¶
type ValueLookupFunc func(ctx context.Context, store stores.ReadOnlyStore, key, leftVal interface{}) (interface{}, error)
type ValueMapper ¶
type ValueMapper struct { MapValueFunc MapValueFunc topology.DefaultNode }
func (*ValueMapper) Build ¶
func (vt *ValueMapper) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*ValueMapper) Run ¶
func (vt *ValueMapper) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*ValueMapper) Type ¶
func (vt *ValueMapper) Type() topology.Type
type WindowJoiner ¶
type WindowJoiner struct { OtherSideRequired bool CurrentSide Side Duration time.Duration StoreName, OtherStoreName string ValueMapper JoinValueMapper topology.DefaultNode // contains filtered or unexported fields }
func (*WindowJoiner) Build ¶
func (sj *WindowJoiner) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*WindowJoiner) Init ¶
func (sj *WindowJoiner) Init(ctx topology.NodeContext) error
func (*WindowJoiner) Name ¶
func (sj *WindowJoiner) Name() string
func (*WindowJoiner) ReadsFrom ¶
func (sj *WindowJoiner) ReadsFrom() []string
func (*WindowJoiner) Run ¶
func (sj *WindowJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*WindowJoiner) Type ¶
func (sj *WindowJoiner) Type() topology.Type
func (*WindowJoiner) WritesAt ¶
func (sj *WindowJoiner) WritesAt() []string
type WindowJoinerOption ¶
type WindowJoinerOption func(joiner *WindowJoiner)
Click to show internal directories.
Click to hide internal directories.