Documentation ¶
Index ¶
- func Run(stream Stream, agg Agg, sink Sink, checkpoint uint) error
- type Agg
- type Aggs
- type CSVSink
- type Diff
- type ErrRow
- type ErrUnknownField
- type Extractor
- type GroupBy
- type Kurtosis
- type Max
- type Mean
- type Metric
- type Metrics
- type Min
- type Row
- type SequentialGroupBy
- type Sink
- type Skew
- type Stream
- type Sum
- type Variance
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Run ¶
Run applies an against a stream. It will display the current progression at every multiple of checkpoint.
Example ¶
// For the sake of example we inline the data, but usually it should be // located in a file, database, or some other source in := `name,£,bangers Del Boy,-42,1 Rodney,1001,1 Rodney,1002,2 Del Boy,42,0 Grandad,0,3` // Define a Stream stream, _ := NewCSVStream(strings.NewReader(in)) // Define an Agg agg := NewGroupBy( "name", func() Agg { return Aggs{ NewExtractor("£", NewMean(), NewSum()), NewExtractor("bangers", NewSum()), } }, ) // Define a Sink sink := NewCSVSink(os.Stdout) // Run Run(stream, agg, sink, 0)
Output: bangers_sum,name,£_mean,£_sum 1,Del Boy,0,0 3,Grandad,0,0 3,Rodney,1001.5,2003
Types ¶
type Aggs ¶
type Aggs []Agg
Aggs is also an Agg.
type CSVSink ¶
type CSVSink struct {
// contains filtered or unexported fields
}
CSVSink persist the output of an Agg's Collect method to a CSV file. The columns are ordered in lexical order.
func NewCSVSink ¶
NewCSVSink returns a CSVSink which persists results to the given file.
func NewCSVSinkFromPath ¶
NewCSVSinkFromPath returns a CSVSink which persists results to the given path.
type Diff ¶
type Diff struct { Metric Metric // contains filtered or unexported fields }
Diff runs an Agg on the (x[i+1] - x[i]) version of a stream of values. This can be used in conjunction with a GroupBy to compute rolling statistics.
type ErrUnknownField ¶
type ErrUnknownField struct {
// contains filtered or unexported fields
}
An ErrUnknownField occurs trying to access an unexisting Row field.
func (ErrUnknownField) Error ¶
func (e ErrUnknownField) Error() string
Error implements the Error interface.
type Extractor ¶
An Extractor is the simplest kind of Agg. It feeds the output of Extract to a Metric.
func NewExtractor ¶
NewExtractor returns an Extractor that parses a field as a float64.
type GroupBy ¶
type GroupBy struct { By string NewAgg func() Agg SortResults bool // contains filtered or unexported fields }
GroupBy maintains one Agg instance per group.
func NewGroupBy ¶
NewGroupBy returns a GroupBy that maintains a Agg for each distinct value of a given variable.
type Kurtosis ¶
type Kurtosis struct {
// contains filtered or unexported fields
}
Kurtosis computes a running kurtosis using an extension of Welford's algorithm.
type Max ¶
type Max struct {
// contains filtered or unexported fields
}
Max computes the maximal value of a column.
type Mean ¶
type Mean struct {
// contains filtered or unexported fields
}
Mean computes a running average. The result is an approximation but it is good enough for most purposes.
type Metrics ¶
type Metrics []Metric
Metrics is also a Metric.
type Min ¶
type Min struct {
// contains filtered or unexported fields
}
Min computes the minimal value of a column.
type SequentialGroupBy ¶
type SequentialGroupBy struct { By string NewAgg func() Agg Sink Sink // contains filtered or unexported fields }
SequentialGroupBy maintains one Agg instance. Once a new group key is encoutered the Trigger is called. This has many practical use case for large but sequential data.
func NewSequentialGroupBy ¶
func NewSequentialGroupBy(by string, newAgg func() Agg, sink Sink) *SequentialGroupBy
NewSequentialGroupBy returns a SequentialGroupBy that maintains an Agg for the given variable.
func (SequentialGroupBy) Collect ¶
func (sgb SequentialGroupBy) Collect() <-chan Row
Collect streams the Collect of the current Agg.
func (*SequentialGroupBy) Flush ¶
func (sgb *SequentialGroupBy) Flush() error
Flush writes the results of the Agg and resets it.
func (*SequentialGroupBy) Update ¶
func (sgb *SequentialGroupBy) Update(row Row) error
Update updates the Agg of the Row's group.
type Skew ¶
type Skew struct {
// contains filtered or unexported fields
}
Skew computes a running skew using an extension of Welford's algorithm.
type Stream ¶
type Stream chan ErrRow
A Stream returns Rows one by one until it's source is depleted.
func NewCSVStream ¶
NewCSVStream returns a Stream from an io.Reader that reads strings that are assumed to CSV-parsable.
func NewCSVStreamFromPath ¶
NewCSVStreamFromPath returns a Stream from a CSV file.
func NewFuncStream ¶
NewFuncStream returns a Stream that calls function n times and returns the resulting Rows.
func NewStream ¶
NewStream returns a Stream from a slice of Rows. It is mainly here for demonstration and testing purposes.
func ZipStreams ¶
ZipStreams returns a Stream that iterates over multiple streams one by one. This is quite convinient for going through a dataset which has been split into multiple parts.
type Sum ¶
type Sum struct {
// contains filtered or unexported fields
}
Sum computes a running sum.
type Variance ¶
type Variance struct {
// contains filtered or unexported fields
}
Variance computes a running average using Welford's algorithm.
func NewVariance ¶
func NewVariance() *Variance
NewVariance returns a Variance that computes the mean of a given field.