Versions in this module Expand all Collapse all v0 v0.0.1 Aug 19, 2019 Changes in this version + var AnyType = reflect.TypeOf([]interface{}{}).Elem() + var BinaryType = reflect.TypeOf([]byte{}) + var ByteType = reflect.TypeOf(byte(0)) + var ErrorType = reflect.TypeOf(fmt.Errorf("{}")) + var Int16Type = reflect.TypeOf(int16(0)) + var Int32Type = reflect.TypeOf(int32(0)) + var Int64Type = reflect.TypeOf(int64(0)) + var IntType = reflect.TypeOf(int(0)) + var KVBinaryType = reflect.TypeOf(&KVBinary{}) + var StringType = reflect.TypeOf("") + func RunGraphs(graphs ...Graph) + type Checkpoint struct + Data interface{} + Part int + type Closeable interface + Close func(*Context) error + type Connector interface + GetNodeID func() uint16 + GetNumPeers func() uint16 + MakeReceiver func(stage uint16) Receiver + NewSender func(nodeId uint16, stage uint16) Sender + type Context struct + Close func() + Emit func(element *Element) + func NewContext(connector Connector, stageId uint16, def *Def) *Context + func (c *Context) Get(index int) interface{} + func (c *Context) GetNodeID() uint16 + func (c *Context) GetNumPeers() uint16 + func (c *Context) GetReceiver() Receiver + func (c *Context) GetStage() uint16 + func (c *Context) MakeSender(targetNodeId uint16) Sender + func (c *Context) MakeSenders() []Sender + func (c *Context) Put(index int, data interface{}) + func (c *Context) Start() + func (c *Context) Termination() <-chan bool + type Def struct + Fn Fn + Id int + Type reflect.Type + Up *Def + func (def *Def) Apply(f Fn) *Def + func (def *Def) Buffer(i int) *Def + func (def *Def) Count() *Def + func (def *Def) Filter(f interface{}) *Def + func (def *Def) FlatMap(f interface{}) *Def + func (def *Def) Fold(init interface{}, acc interface{}) *Def + func (def *Def) Limit(i uint64) *Def + func (def *Def) Map(f interface{}) *Def + func (def *Def) Par(i int) *Def + func (def *Def) TriggerEach(i int) *Def + func (def *Def) TriggerEvery(i time.Duration) *Def + type Element struct + Checkpoint Checkpoint + FromNodeId uint16 + Stamp Stamp + Value interface{} + func (e *Element) Ack() + type Filter interface + Materialize func() func(input interface{}) bool + Type func() reflect.Type + func UserFilterFn(f interface{}) Filter + type FilterProcessor struct + func (p *FilterProcessor) Materialize() func(input *Element, context PContext) + type Fn interface + type FoldFn interface + Collect func() Element + InType func() reflect.Type + OutType func() reflect.Type + Process func(interface{}) + func UserFoldFn(initial interface{}, f interface{}) FoldFn + type Graph []*Context + func ConnectStages(connector Connector, pipeline *Pipeline) Graph + type Input interface + InType func() reflect.Type + type KVBinary struct + Key []byte + Value []byte + type MapProcessor struct + func (p *MapProcessor) Materialize() func(input *Element, context PContext) + type Mapper interface + InType func() reflect.Type + Materialize func() func(input interface{}) interface{} + OutType func() reflect.Type + func UserMapFn(f interface{}) Mapper + type NetTransform interface + InType func() reflect.Type + OutType func() reflect.Type + Run func(<-chan *Element, *Context) + type OrderedElementSet struct + func NewOrderedElementSet(cap int) *OrderedElementSet + func (set *OrderedElementSet) AddElement(elementToAdd *Element, context *Context) + type Output interface + OutType func() reflect.Type + type PC struct + Checkpoint *Checkpoint + Uniq uint64 + UpstreamNodeId uint16 + type PContext interface + Emit func(*Element) + type Pipeline struct + Defs []*Def + func NewPipeline() *Pipeline + func (p *Pipeline) Apply(def *Def, f Fn) *Def + func (p *Pipeline) Filter(that *Def, fn Filter) *Def + func (p *Pipeline) Fold(that *Def, fn FoldFn) *Def + func (p *Pipeline) Mapper(that *Def, fn Mapper) *Def + func (p *Pipeline) Par(defaultPar int) *Pipeline + func (p *Pipeline) Processor(that *Def, fn Processor) *Def + func (p *Pipeline) Root(source Root) *Def + func (p *Pipeline) Run() + func (p *Pipeline) Sink(that *Def, fn Sink) *Def + func (p *Pipeline) Transform(that *Def, fn NetTransform) *Def + func (p *Pipeline) WithCoders(coders []Transform) *Pipeline + type Processor interface + Materialize func() func(input *Element, context PContext) + func UserFlatMapFn(f interface{}) Processor + type Receiver interface + Ack func(upstreamNodeId uint16, uniq uint64) error + Elements func() <-chan *Element + type Root interface + Commit func(Watermark, *Context) error + OutType func() reflect.Type + Run func(*Context) + type Sender interface + Acks func() <-chan uint64 + Close func() error + Eos func() + Send func(element *Element) + type Sink interface + Flush func(*Context) error + InType func() reflect.Type + Process func(*Element, *Context) + type Stamp struct + Uniq uint64 + Unix int64 + func (s *Stamp) String() string + func (s *Stamp) Valid() bool + type Transform interface + InType func() reflect.Type + OutType func() reflect.Type + type Watermark map[int]interface + type Work struct + func (w *Work) Emit(e *Element) + type WorkResult struct + type WorkerGroup struct + func NewWorkerGroup(c *Context, p Processor) *WorkerGroup + func (g *WorkerGroup) Start(input chan *Element) *WorkerGroup