Documentation ¶
Index ¶
- Variables
- type AbstractOp
- type Comparator
- type Concat
- type FilterFunction
- type HashCodeFunc
- type IteratorFunc
- type JoinOp
- type KeyFunc
- type MapFunction
- type Miss
- type On
- type Op
- func Filter[T any](fn FilterFunction[T]) Op
- func Foreach[T any](fn IteratorFunc[T]) Op
- func Map[T, R any](fn MapFunction[T, R]) Op
- func NewCollect[T any](c generic.Collect[T]) Op
- func NewComparableDistinct() Op
- func NewDistinct[T any, R comparable](hashcode HashCodeFunc[T, R]) Op
- func NewGroup[T any, R comparable](keyFunc KeyFunc[T, R]) Op
- func NewHead(spl generic.Splittable) Op
- func NewLimit(count int64) Op
- func NewReduce[T, R any](init R, fn ReduceFunction[T, R]) Op
- func NewSkip(offset int64) Op
- func NewSort[T any](fn func(T, T) bool) Op
- func NewTail() Op
- func Parallel(options ...ParallelOption) Op
- func Peek[T any](fn IteratorFunc[T]) Op
- func UseFlipWindow[T any](size int) Op
- type ParallelOption
- type ReduceFunction
- type WorkerStrategy
Constants ¶
This section is empty.
Variables ¶
View Source
var ( IntSum = func() Op { return NewReduce[int, int](0, func(e1, e2 int) (int, error) { return functions.Sum(e1, e2), nil }) }() )
View Source
var Print = Foreach(functions.Print)
View Source
var PrintJson = Foreach(functions.PrintJson)
Functions ¶
This section is empty.
Types ¶
type AbstractOp ¶
type AbstractOp struct {
// contains filtered or unexported fields
}
func (*AbstractOp) Accept ¶
func (a *AbstractOp) Accept(a2 any) error
func (*AbstractOp) Begin ¶
func (a *AbstractOp) Begin(i int64)
func (*AbstractOp) End ¶
func (a *AbstractOp) End() (any, error)
func (*AbstractOp) Link ¶
func (a *AbstractOp) Link(next Op)
type Comparator ¶
type FilterFunction ¶
type HashCodeFunc ¶
type HashCodeFunc[T any, R comparable] func(T) (R, error)
type IteratorFunc ¶
type KeyFunc ¶ added in v0.0.2
type KeyFunc[T any, R comparable] func(T) (R, error)
type MapFunction ¶
type Op ¶
type Op interface { Begin(int64) End() (any, error) Accept(any) error Handle(consumer generic.Consumer) (any, error) Link(next Op) }
func Filter ¶
func Filter[T any](fn FilterFunction[T]) Op
func Foreach ¶
func Foreach[T any](fn IteratorFunc[T]) Op
func Map ¶
func Map[T, R any](fn MapFunction[T, R]) Op
func NewComparableDistinct ¶
func NewComparableDistinct() Op
func NewDistinct ¶
func NewDistinct[T any, R comparable](hashcode HashCodeFunc[T, R]) Op
func NewHead ¶
func NewHead(spl generic.Splittable) Op
func NewReduce ¶
func NewReduce[T, R any](init R, fn ReduceFunction[T, R]) Op
func Parallel ¶
func Parallel(options ...ParallelOption) Op
func Peek ¶ added in v0.10.3
func Peek[T any](fn IteratorFunc[T]) Op
func UseFlipWindow ¶ added in v0.0.8
type ParallelOption ¶
type ParallelOption func(config *parallelConfig)
func WithFixedPool ¶
func WithFixedPool(size int) ParallelOption
type ReduceFunction ¶
type WorkerStrategy ¶
type WorkerStrategy int
WorkerStrategy define the concurrent model
const ( // BufferPoolStrategy will trigger one goroutine for all the incoming message from upstream BufferPoolStrategy WorkerStrategy = 1 // FixedPoolStrategy will set an upper size for the worker pool // will use a producer-consumer model to deal with message stream, need a hasher to make the stream data more consistent // or we can use a work steel method to speed up the consumer // todo propagate the parallel flag to downstream and enable all parallel process in all following nodes transparently FixedPoolStrategy WorkerStrategy = 2 )
Click to show internal directories.
Click to hide internal directories.