Documentation ¶
Index ¶
- Constants
- Variables
- func Avg(b *Block)
- func Blocked(b *Block)
- func BuildLibrary()
- func Bunch(b *Block)
- func Connection(b *Block)
- func Count(b *Block)
- func Date(b *Block)
- func Filter(b *Block)
- func FromNSQ(b *Block)
- func FromSQS(b *Block)
- func Get(msg interface{}, branch ...string) (interface{}, error)
- func GetS3(b *Block)
- func GroupHistogram(b *Block)
- func HTTPGet(b *Block)
- func Histogram(b *Block)
- func ListS3(b *Block)
- func LongHTTP(b *Block)
- func Mask(b *Block)
- func Post(b *Block)
- func PostTo(b *Block)
- func PostValue(b *Block)
- func Random(b *Block)
- func ScaleRange(b *Block)
- func Sd(b *Block)
- func Set(m interface{}, key string, val interface{}) error
- func SkeletonState(b *Block)
- func SkeletonTransfer(b *Block)
- func Sync(b *Block)
- func Ticker(b *Block)
- func Timeseries(b *Block)
- func ToFile(b *Block)
- func ToLog(b *Block)
- func ToNSQ(b *Block)
- func Unpack(b *Block)
- func Var(b *Block)
- type BMsg
- type Block
- type BlockLibrary
- type BlockRoutine
- type BlockTemplate
- type Message
- type OutChanMsg
- type PQMessage
- type PriorityQueue
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) PeekAndShift(max time.Time, lag time.Duration) (interface{}, time.Duration)
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq PriorityQueue) Swap(i, j int)
- type RouteResponse
Constants ¶
const ( CREATE_OUT_CHAN = iota DELETE_OUT_CHAN = iota )
Variables ¶
var ( AWSSQSAPIVersion string = "2012-11-05" AWSSignatureVersion string = "4" )
var ( Library BlockLibrary LibraryBlob string )
Functions ¶
func Avg ¶
func Avg(b *Block)
Avg() is an online average The average for a stream of data is updated 1 data point at a time. Formula: mu_i+1 = mu_i * (n - 1) /n + (1/n) * x_i
func BuildLibrary ¶
func BuildLibrary()
func Connection ¶
func Connection(b *Block)
Connection accepts the input from a block and outputs it to another block. This block is a special case in that it requires an input and an output block to be created.
func Count ¶
func Count(b *Block)
Count uses a priority queue to count the number of messages that have been sent to the count block over a duration of time in seconds.
func Filter ¶
func Filter(b *Block)
Filter queries a message for all values that match the given Path parameter and compares those values to a rule given an operator and a comparator. If any value passes the filter operation then the message is broadcast. If no value satisfies the filter operation the message is ignored.
Filter is capable of traversing arrays that contain elements with and without keys. For example, if Path is set to
foo.bar[]
All elements within the "bar" array will be compared to the filter operation. In the case of
foo.bar[].baz
Only the value of the "baz" keys within elements of the "bar" array will be used for the filter operation.
There are four filter comparators: greater than "gt", less than "lt", equal to "eq" and subset of "subset".
gt, lt, eq operations are available for values of a number type. eq, subset operations are avilable for values of a string type. eq operations are availble for value of a bool or null type.
func GroupHistogram ¶
func GroupHistogram(b *Block)
GroupHistogram is a group of histograms, where each histogram is indexed by one field in the incoming message, and the histogram captures a distrbuition over another field.
func Mask ¶
func Mask(b *Block)
Mask modifies a JSON stream with an additive key filter. Mask uses the JSON object recieved through the rule channel to determine which keys should be included in the resulting object. An empty JSON object ({}) is used as the notation to include all values for a key.
For instance, if the JSON rule is:
{"a":{}, "b":{"d":{}},"x":{}}
And an incoming message looks like:
{"a":24, "b":{"c":"test", "d":[1,3,4]}, "f":5, "x":{"y":5, "z":10}}
The resulting object after the application of Mask would be:
{"a":24, "b":{"d":[1,3,4]}, "x":{"y":5, "z":10}}
func PostTo ¶
func PostTo(b *Block)
PostTo accepts JSON through POSTs to the /in endpoint and broadcasts to other blocks.
func ScaleRange ¶
func ScaleRange(b *Block)
ScaleRange() rescales data in an online fashion All data will be mapped to the specified range, but the position each point maps to may change as the empirical min and max values for the stream are updated. Formula: x_scaled = (x - min / (max - min)) * (scaled_max - scaled_min) + scaled_min
func Sd ¶
func Sd(b *Block)
Sd calculates standard deviation in an online fashion using Welford's Algorithm. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf
func SkeletonState ¶
func SkeletonState(b *Block)
this is a skeleton state block. It doesn't do anything, but can be used as a template to make new state blocks.
func SkeletonTransfer ¶
func SkeletonTransfer(b *Block)
func Timeseries ¶
func Timeseries(b *Block)
func Var ¶
func Var(b *Block)
Var calculates variance in an online fashion using Welford's Algorithm. The Var() block is the Sd() block with the squared correction. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf
Types ¶
type Block ¶
type BlockLibrary ¶
type BlockLibrary map[string]*BlockTemplate
A block library is a collection of possible block templates
type BlockRoutine ¶
type BlockRoutine func(*Block)
type BlockTemplate ¶
type BlockTemplate struct { BlockType string RouteNames []string // BlockRoutine is the central processing routine for a block. All the work gets done in here Routine BlockRoutine }
Block is the basic interface for processing units in streamtools
type OutChanMsg ¶
type PriorityQueue ¶
type PriorityQueue []*PQMessage
A PriorityQueue implements heap.Interface and holds Items.
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) PeekAndShift ¶
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type RouteResponse ¶
RouteResponse is passed into a block to query via established handlers
Source Files ¶
- avg.go
- blocks.go
- bunch.go
- connections.go
- count.go
- date.go
- filter.go
- fromNSQ.go
- fromSQS.go
- get.go
- getS3.go
- groupHistogram.go
- histogram.go
- jsonutils.go
- library.go
- listS3.go
- long_http.go
- mask.go
- post.go
- post_value.go
- postto.go
- priority_queue.go
- random.go
- scale_range.go
- sd.go
- skeleton_state.go
- skeleton_transfer.go
- sync.go
- testBlocked.go
- testUtils.go
- ticker.go
- timeseries.go
- toNSQ.go
- tofile.go
- tolog.go
- unpack.go
- utils.go
- var.go