Documentation ¶
Overview ¶
A package for manipulating streams of events, which are basically any pieces of data.
Index ¶
- Variables
- func Drain(s Stream) error
- func Len(s Stream) (int, error)
- func ParseJson(data []byte) (interface{}, error)
- func ParseJsonR(data io.Reader) (interface{}, error)
- func Register(name string, fn Function)
- func RegisterDecoder(name string, d Decoder)
- func RegisterDefault()
- func RegisterDefaultDecoders()
- func RegisterDefaultEncoders()
- func RegisterEncoder(name string, e Encoder)
- type Context
- type Decoder
- type Encoder
- type Event
- type FArg
- type Function
- type Stream
- func And(streams ...Stream) Stream
- func Chan(ch chan Event) Stream
- func Decode(s Stream, d Decoder) Stream
- func Ema(stream Stream, alpha float64) Stream
- func Empty() Stream
- func Encode(s Stream, e Encoder) Stream
- func EqVal(stream Stream, evt Event) Stream
- func Filter(stream Stream, flags Stream) Stream
- func GetField(stream Stream, field string) Stream
- func Join(streams ...Stream) Stream
- func LessEqVal(stream Stream, val float64) Stream
- func LessVal(stream Stream, val float64) Stream
- func List(events []Event) Stream
- func Map(stream Stream, fn func(Event) (Event, error)) Stream
- func MaxBy(datas Stream, vals Stream) Stream
- func MinBy(datas Stream, vals Stream) Stream
- func MoreEqVal(stream Stream, val float64) Stream
- func MoreVal(stream Stream, val float64) Stream
- func NeqVal(stream Stream, evt Event) Stream
- func Or(streams ...Stream) Stream
- func Repeat(stream Stream) Stream
- func RollingMaxBy(datas Stream, vals Stream) Stream
- func RollingMaxByAll(datas Stream, vals Stream) Stream
- func RollingMinBy(datas Stream, vals Stream) Stream
- func RollingMinByAll(datas Stream, vals Stream) Stream
- func Run(stream Stream, defs []string) (Stream, error)
- func SetField(datas Stream, vals Stream, field string) Stream
- func Sprintf(stream Stream, sfmt string, fields []string) Stream
- func StringAppend(stream Stream, suf string) Stream
- func StringPrepend(stream Stream, pref string) Stream
- func Zip(streams ...Stream) Stream
- type StreamContext
- type StreamMultiplexer
Constants ¶
This section is empty.
Variables ¶
var DebugLog = true
var EOI error = errors.New("End of iteration")
An error that represents the end of iteration.
Functions ¶
func Drain ¶
Drain a stream.
This function uses a specific possibly more efficient implementation for streams that define Drain() method.
func Len ¶
Get the length of a stream.
This function uses a specific possibly more efficient implementation for streams that define Len() int method.
func ParseJsonR ¶
A better routine for parsing json to an interface{} than a standart one which parses integer numbers as int64, not float64.
func RegisterDecoder ¶
Register decoder by name for building it from JSON definitions.
func RegisterDefault ¶
func RegisterDefault()
Register stream functions pre-defined by this library.
func RegisterDefaultDecoders ¶
func RegisterDefaultDecoders()
func RegisterDefaultEncoders ¶
func RegisterDefaultEncoders()
func RegisterEncoder ¶
Register encoder by name for building it from JSON definitions.
Types ¶
type Context ¶
type Context map[string]*StreamContext
A context to be used by functions for building a stream from JSON definition.
type Event ¶
type Event interface{}
Event interface represents just about anything you want. It only exists for type safety.
type FArg ¶
type FArg interface{}
An argument to a stream function. Might be anythins, only exists for type safety.
type Stream ¶
A Stream is an interface for any stream of data.
It has only one method for geting next event or an error. An error might be EOI in which case you just need to stop iteration, or any other.
Calling the Next method on a Stream after it already returned an error is unspecified and may be unsafe.
Calling the Next method on a Stream from multiple goroutines at once is unspecified and may be unsafe.
Typical iteration over the steam looks like this:
func iter(s Stream, callback func(Event)) error { for { evt, err := s.Next() if err == EOI { break } if err != nil { return err } callback(evt) } return nil }
func And ¶
Takes multiple boolean streams and creates a boolean stream, and-ing events from original streams.
func EqVal ¶
Creates a boolean stream with true events when the event of an original stream is equal to a given value and false events otherwise.
func Filter ¶
Takes a data stream and flags stream and produces a stream with events from the data stream for which corresponding flag is true.
func GetField ¶
Creates a stream with events which are values of a field in events of the original stream as in JSON. Returns an error if the original field doesn't exist.
The field might be deep inside, as in "object.value.data".
func LessEqVal ¶
Creates a boolean stream with true events when the event of an original stream is less or equal to a given value and false events otherwise.
Original stream must consist of numbers.
func LessVal ¶
Creates a boolean stream with true events when the event of an original stream is less than a given value and false events otherwise.
Original stream must consist of numbers.
func MaxBy ¶
Takes a data stream and numbers stream and produces a stream with event from the data stream for which corresponding value is maximal.
func MinBy ¶
Takes a data stream and numbers stream and produces a stream with event from the data stream for which corresponding value is minimal.
func MoreEqVal ¶
Creates a boolean stream with true events when the event of an original stream is more or equal to a given value and false events otherwise.
Original stream must consist of numbers.
func MoreVal ¶
Creates a boolean stream with true events when the event of an original stream is more than a given value and false events otherwise.
Original stream must consist of numbers.
func NeqVal ¶
Creates a boolean stream with false events when the event of an original stream is equal to a given value and true events otherwise.
func Or ¶
Takes multiple boolean streams and creates a boolean stream, or-ing events from original streams.
func RollingMaxBy ¶
Takes a data stream and numbers stream and produces a stream with events from the data stream for which corresponding maximal value event changes.
func RollingMaxByAll ¶
Takes a data stream and numbers stream and produces a stream with events from the data stream for which the current value is maximal.
func RollingMinBy ¶
Takes a data stream and numbers stream and produces a stream with events from the data stream for which corresponding minimal value event changes.
func RollingMinByAll ¶
Takes a data stream and numbers stream and produces a stream with events from the data stream for which the current value is minimal.
func SetField ¶
Creates a stream with events which are events from a first stream with given field set with values from events of the second stream as in JSON.
The field might be deep inside, as in "object.value.data".
func StringAppend ¶
Takes a stream af strings and append a given string to all of them.
func StringPrepend ¶
Takes a stream af strings and prepend a given string to all of them.
type StreamContext ¶
type StreamContext struct {
// contains filtered or unexported fields
}
A data associated with a stream for building streams from JSON definitions.
Currently, just the multiplexer for stream variables to be used multiple times.
type StreamMultiplexer ¶
type StreamMultiplexer struct {
// contains filtered or unexported fields
}
func Multiplexer ¶
func Multiplexer(stream Stream) *StreamMultiplexer
Create a multiplexer from a stream.
func (*StreamMultiplexer) New ¶
func (self *StreamMultiplexer) New() Stream
Create a stream that pulls from a base stream.