go-stream
This library is a framework for stream processing analysis. It is meant to be used as a library for go programs
that need to do stream processing of large volumes of data.
It is made up of a graph connecting a source to 1 or more operators, terminating at a sink.
Operators pass data from one to another with go channels. An example graph to encode objects to snappy is:
var from *util.MemoryBuffer
// fill up from
var to *util.MemoryBuffer
ch := stream.NewOrderedChain()
ch.Add(source.NewNextReaderSource(from))
timingOp, _, dur := timing.NewTimingOp()
ch.Add(timingOp)
ch.Add(compress.NewSnappyEncodeOp())
ch.Add(sink.NewWriterSink(to))
ch.Start()
log.Printf("RES: Compress Snappy.\t\tRatio %v", float64(to.ByteSize())/float64(from.ByteSize()))
log.Printf("RES: Compress Snappy.\t\tBuffered Items: %d\tItem: %v\ttook: %v\trate: %d\tsize: %E\tsize/item: %E", to.Len(), *counter, *dur, int( float64(*counter)/(*dur).Seconds()), float64(to.ByteSize()), float64(to.ByteSize())/float64(*counter))
Operators are the main components of a chain.
They process tuples to produce results. Sources are operators with no output. Sinks are operators
with no input. Operators implement stream.Operator. If it takes input implements stream.In; if it produces output implements stream.Out.
Mappers give a simple way to implement operators. mapper.NewOp() takes a function of the form
func(input stream.Object, out Outputer) which processes the input and outputs it to the Outputer object.
Mappers are automatically parallelized. Generators give a way to give mappers thread-local storage through closures.
You can also give mappers special functionality after they have finished processing the last tuple.
You can also split the data of a chain into other chains. stream.Fanout takes input and copies them to N other chains.
Distributor takes input and puts it onto 1 of N chains according to a mapping function.
Chains can be ordered or unordered. Ordered chains preserve the order of tuples from input to output
(although the operators still use parallelism).
Installing:
go get "github.com/cevian/go-stream/stream
Compiling:
go build
Testing:
go test