Documentation
¶
Index ¶
- Constants
- Variables
- func Blocked(b *Block)
- func BuildLibrary()
- func Connection(b *Block)
- func Count(b *Block)
- func Date(b *Block)
- func Filter(b *Block)
- func FromHTTPStream(b *Block)
- func FromNSQ(b *Block)
- func FromPost(b *Block)
- func FromSQS(b *Block)
- func GenRandom(b *Block)
- func GenTicker(b *Block)
- func Get(msg interface{}, branch ...string) (interface{}, error)
- func GetHTTP(b *Block)
- func GetS3(b *Block)
- func GroupHistogram(b *Block)
- func Histogram(b *Block)
- func Learn(b *Block)
- func LinearModel(b *Block)
- func ListS3(b *Block)
- func Map(b *Block)
- func Mask(b *Block)
- func Mean(b *Block)
- func MovingAverage(b *Block)
- func Pack(b *Block)
- func PostHTTP(b *Block)
- func Sd(b *Block)
- func Set(m interface{}, key string, val interface{}) error
- func SkeletonState(b *Block)
- func SkeletonTransfer(b *Block)
- func Sync(b *Block)
- func Ticker(b *Block)
- func Timeseries(b *Block)
- func ToBeanstalkd(b *Block)
- func ToElasticsearch(b *Block)
- func ToFile(b *Block)
- func ToLog(b *Block)
- func ToNSQ(b *Block)
- func ToWebsocket(b *Block)
- func Unpack(b *Block)
- func Var(b *Block)
- type BMsg
- type Block
- type BlockLibrary
- type BlockRoutine
- type BlockTemplate
- type Message
- type OutChanMsg
- type PQMessage
- type PriorityQueue
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) PeekAndShift(max time.Time, lag time.Duration) (interface{}, time.Duration)
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq PriorityQueue) Swap(i, j int)
Constants ¶
const ( CREATE_OUT_CHAN = iota DELETE_OUT_CHAN = iota )
Variables ¶
var ( Library BlockLibrary LibraryBlob string )
Functions ¶
func BuildLibrary ¶
func BuildLibrary()
func Connection ¶
func Connection(b *Block)
Connection accepts the input from a block and outputs it to another block. This block is a special case in that it requires an input and an output block to be created.
func Count ¶
func Count(b *Block)
Count uses a priority queue to count the number of messages that have been sent to the count block over a duration of time in seconds.
Note that this is an exact count and therefore has O(N) memory requirements.
func FromHTTPStream ¶
func FromHTTPStream(b *Block)
creates a persistent HTTP connection, emitting all messages from the stream into streamtools
func FromNSQ ¶
func FromNSQ(b *Block)
connects to an NSQ topic and emits each message into streamtools.
func FromPost ¶
func FromPost(b *Block)
accepts JSON through POSTs to the /in endpoint and broadcasts to other blocks.
func FromSQS ¶
func FromSQS(b *Block)
hooks into an Amazon SQS, and emits every message it sees into streamtools
func GenRandom ¶
func GenRandom(b *Block)
emits a JSON blob full of random stuff. Set the Interval using a rule.
func GenTicker ¶
func GenTicker(b *Block)
emits the time. Specify the period - the time between emissions - in seconds as a rule.
func GetHTTP ¶
func GetHTTP(b *Block)
Get, on any inbound message, GETs an external JSON and emits it
func GetS3 ¶
func GetS3(b *Block)
Gets the key specified in the inbound message. Specify the bucket using a rule.
func GroupHistogram ¶
func GroupHistogram(b *Block)
GroupHistogram is a group of histograms, where each histogram is indexed by one field in the incoming message, and the histogram captures a distrbuition over another field.
func LinearModel ¶
func LinearModel(b *Block)
func ListS3 ¶
func ListS3(b *Block)
lists an S3 bucket, within a specified time inteval, starting with a specified prefix.
func Mask ¶
func Mask(b *Block)
Mask modifies a JSON stream with an additive key filter. Mask uses the JSON object recieved through the rule channel to determine which keys should be included in the resulting object. An empty JSON object ({}) is used as the notation to include all values for a key.
For instance, if the JSON rule is:
{"a":{}, "b":{"d":{}},"x":{}}
And an incoming message looks like:
{"a":24, "b":{"c":"test", "d":[1,3,4]}, "f":5, "x":{"y":5, "z":10}}
The resulting object after the application of Mask would be:
{"a":24, "b":{"d":[1,3,4]}, "x":{"y":5, "z":10}}
func Mean ¶
func Mean(b *Block)
Mean() is an online mean The mean for a stream of data is updated 1 data point at a time. Formula: mu_i+1 = mu_i * (n - 1) /n + (1/n) * x_i
func MovingAverage ¶
func MovingAverage(b *Block)
func Pack ¶
func Pack(b *Block)
Pack groups messages together by testing equality of keys, and emits the group of messages after no new messages have been added to the group for a specified amount of time.
func PostHTTP ¶
func PostHTTP(b *Block)
POSTs an input message to an HTTP endpoint and emits the response
func Sd ¶
func Sd(b *Block)
Sd calculates standard deviation in an online fashion using Welford's Algorithm. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf
func SkeletonState ¶
func SkeletonState(b *Block)
this is a skeleton state block. It doesn't do anything, but can be used as a template to make new state blocks.
func SkeletonTransfer ¶
func SkeletonTransfer(b *Block)
func Sync ¶
func Sync(b *Block)
The Sync block delays a message by a specified lag relative to a timestamp internal to the message. Specify which key holds the timestamp, and how long the lag should be, using a rule.
func Ticker ¶
func Ticker(b *Block)
emits the time. Specify the period - the time between emissions - in seconds as a rule.
func Timeseries ¶
func Timeseries(b *Block)
stores a specified key from the last `NumSamples` messages
func ToBeanstalkd ¶
func ToBeanstalkd(b *Block)
func ToElasticsearch ¶
func ToElasticsearch(b *Block)
Posts a message to a specified Elasticsearch index with the given type.
func Var ¶
func Var(b *Block)
Var calculates variance in an online fashion using Welford's Algorithm. The Var() block is the Sd() block with the squared correction. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf
Types ¶
type Block ¶
type BlockLibrary ¶
type BlockLibrary map[string]*BlockTemplate
A block library is a collection of possible block templates
type BlockRoutine ¶
type BlockRoutine func(*Block)
type BlockTemplate ¶
type BlockTemplate struct { BlockType string RouteNames []string // BlockRoutine is the central processing routine for a block. All the work gets done in here Routine BlockRoutine }
Block is the basic interface for processing units in streamtools
type OutChanMsg ¶
type PriorityQueue ¶
type PriorityQueue []*PQMessage
A PriorityQueue implements heap.Interface and holds Items.
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) PeekAndShift ¶
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
Source Files
¶
- blocks.go
- connections.go
- count.go
- date.go
- filter.go
- fromHTTPStream.go
- fromNSQ.go
- fromPost.go
- fromSQS.go
- genRandom.go
- genTicker.go
- getHTTP.go
- getS3.go
- groupHistogram.go
- histogram.go
- jsonutils.go
- learn.go
- library.go
- linear_model.go
- listS3.go
- map.go
- mask.go
- mean.go
- movingAverage.go
- pack.go
- postHTTP.go
- priority_queue.go
- sd.go
- skeleton_state.go
- skeleton_transfer.go
- sync.go
- testBlocked.go
- testUtils.go
- ticker.go
- timeseries.go
- toBeanstalkd.go
- toElasticsearch.go
- toNSQ.go
- toWebsocket.go
- tofile.go
- tolog.go
- unpack.go
- utils.go
- var.go