Documentation
¶
Index ¶
- Variables
- func RunGraphs(graphs ...Graph)
- type Checkpoint
- type Closeable
- type Connector
- type Context
- func (c *Context) Get(index int) interface{}
- func (c *Context) GetNodeID() uint16
- func (c *Context) GetNumPeers() uint16
- func (c *Context) GetReceiver() Receiver
- func (c *Context) GetStage() uint16
- func (c *Context) MakeSender(targetNodeId uint16) Sender
- func (c *Context) MakeSenders() []Sender
- func (c *Context) Put(index int, data interface{})
- func (c *Context) Start()
- func (c *Context) Termination() <-chan bool
- type Def
- func (def *Def) Apply(f Fn) *Def
- func (def *Def) Buffer(i int) *Def
- func (def *Def) Count() *Def
- func (def *Def) Filter(f interface{}) *Def
- func (def *Def) FlatMap(f interface{}) *Def
- func (def *Def) Fold(init interface{}, acc interface{}) *Def
- func (def *Def) Limit(i uint64) *Def
- func (def *Def) Map(f interface{}) *Def
- func (def *Def) Par(i int) *Def
- func (def *Def) TriggerEach(i int) *Def
- func (def *Def) TriggerEvery(i time.Duration) *Def
- type Element
- type Filter
- type FilterProcessor
- type Fn
- type FoldFn
- type Graph
- type Input
- type KVBinary
- type MapProcessor
- type Mapper
- type NetTransform
- type OrderedElementSet
- type Output
- type PC
- type PContext
- type Pipeline
- func (p *Pipeline) Apply(def *Def, f Fn) *Def
- func (p *Pipeline) Filter(that *Def, fn Filter) *Def
- func (p *Pipeline) Fold(that *Def, fn FoldFn) *Def
- func (p *Pipeline) Mapper(that *Def, fn Mapper) *Def
- func (p *Pipeline) Par(defaultPar int) *Pipeline
- func (p *Pipeline) Processor(that *Def, fn Processor) *Def
- func (p *Pipeline) Root(source Root) *Def
- func (p *Pipeline) Run()
- func (p *Pipeline) Sink(that *Def, fn Sink) *Def
- func (p *Pipeline) Transform(that *Def, fn NetTransform) *Def
- func (p *Pipeline) WithCoders(coders []Transform) *Pipeline
- type Processor
- type Receiver
- type Root
- type Sender
- type Sink
- type Stamp
- type Transform
- type Watermark
- type Work
- type WorkResult
- type WorkerGroup
Constants ¶
This section is empty.
Variables ¶
View Source
var AnyType = reflect.TypeOf([]interface{}{}).Elem()
View Source
var BinaryType = reflect.TypeOf([]byte{})
View Source
var ByteType = reflect.TypeOf(byte(0))
View Source
var ErrorType = reflect.TypeOf(fmt.Errorf("{}"))
View Source
var Int16Type = reflect.TypeOf(int16(0))
View Source
var Int32Type = reflect.TypeOf(int32(0))
View Source
var Int64Type = reflect.TypeOf(int64(0))
View Source
var IntType = reflect.TypeOf(int(0))
View Source
var KVBinaryType = reflect.TypeOf(&KVBinary{})
View Source
var StringType = reflect.TypeOf("")
Functions ¶
Types ¶
type Checkpoint ¶
type Checkpoint struct { Part int Data interface{} }
type Context ¶
type Context struct { Emit func(element *Element) Close func() // contains filtered or unexported fields }
func (*Context) GetNumPeers ¶
func (*Context) GetReceiver ¶
func (*Context) MakeSender ¶
func (*Context) MakeSenders ¶
func (*Context) Put ¶
FIXME instead of Put and Get on Context migrate all transforms that needs to materialized forms
func (*Context) Termination ¶
type Def ¶
type Def struct { Type reflect.Type Fn Fn Id int Up *Def // contains filtered or unexported fields }
func (*Def) TriggerEach ¶
type Element ¶
type Element struct { Checkpoint Checkpoint Value interface{} Stamp Stamp FromNodeId uint16 // contains filtered or unexported fields }
type Filter ¶
func UserFilterFn ¶
func UserFilterFn(f interface{}) Filter
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
func (*FilterProcessor) Materialize ¶
func (p *FilterProcessor) Materialize() func(input *Element, context PContext)
type FoldFn ¶
type FoldFn interface { InType() reflect.Type OutType() reflect.Type Process(interface{}) Collect() Element }
func UserFoldFn ¶
func UserFoldFn(initial interface{}, f interface{}) FoldFn
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
func (*MapProcessor) Materialize ¶
func (p *MapProcessor) Materialize() func(input *Element, context PContext)
type Mapper ¶
type NetTransform ¶
type OrderedElementSet ¶
type OrderedElementSet struct {
// contains filtered or unexported fields
}
func NewOrderedElementSet ¶
func NewOrderedElementSet(cap int) *OrderedElementSet
func (*OrderedElementSet) AddElement ¶
func (set *OrderedElementSet) AddElement(elementToAdd *Element, context *Context)
type PC ¶
type PC struct { Uniq uint64 UpstreamNodeId uint16 Checkpoint *Checkpoint }
type PContext ¶
type PContext interface { //TODO EmitTime(ts time.Time) //TODO Checkpoint(partition int, data interface{}) //TODO Emit(value interface{}) Emit(*Element) }
type Pipeline ¶
type Pipeline struct { Defs []*Def // contains filtered or unexported fields }
func NewPipeline ¶
func NewPipeline() *Pipeline
func (*Pipeline) WithCoders ¶
type Processor ¶
type Processor interface { //Materialize() creates a single-routine context that will not be shared Materialize() func(input *Element, context PContext) }
func UserFlatMapFn ¶
func UserFlatMapFn(f interface{}) Processor
type WorkResult ¶
type WorkResult struct {
// contains filtered or unexported fields
}
type WorkerGroup ¶
type WorkerGroup struct {
// contains filtered or unexported fields
}
func NewWorkerGroup ¶
func NewWorkerGroup(c *Context, p Processor) *WorkerGroup
func (*WorkerGroup) Start ¶
func (g *WorkerGroup) Start(input chan *Element) *WorkerGroup
Source Files
¶
Click to show internal directories.
Click to hide internal directories.