blocks

package
v0.0.0-...-aaea172 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2013 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CREATE_OUT_CHAN = iota
	DELETE_OUT_CHAN = iota
)

Variables

View Source
var (
	AWSSQSAPIVersion    string = "2012-11-05"
	AWSSignatureVersion string = "4"
)
View Source
var (
	Library     BlockLibrary
	LibraryBlob string
)

Functions

func Avg

func Avg(b *Block)

Avg() is an online average The average for a stream of data is updated 1 data point at a time. Formula: mu_i+1 = mu_i * (n - 1) /n + (1/n) * x_i

func Blocked

func Blocked(b *Block)

func BuildLibrary

func BuildLibrary()

func Bunch

func Bunch(b *Block)

func Connection

func Connection(b *Block)

Connection accepts the input from a block and outputs it to another block. This block is a special case in that it requires an input and an output block to be created.

func Count

func Count(b *Block)

Count uses a priority queue to count the number of messages that have been sent to the count block over a duration of time in seconds.

func Date

func Date(b *Block)

func Filter

func Filter(b *Block)

Filter queries a message for all values that match the given Path parameter and compares those values to a rule given an operator and a comparator. If any value passes the filter operation then the message is broadcast. If no value satisfies the filter operation the message is ignored.

Filter is capable of traversing arrays that contain elements with and without keys. For example, if Path is set to

foo.bar[]

All elements within the "bar" array will be compared to the filter operation. In the case of

foo.bar[].baz

Only the value of the "baz" keys within elements of the "bar" array will be used for the filter operation.

There are four filter comparators: greater than "gt", less than "lt", equal to "eq" and subset of "subset".

gt, lt, eq operations are available for values of a number type. eq, subset operations are avilable for values of a string type. eq operations are availble for value of a bool or null type.

func FromNSQ

func FromNSQ(b *Block)

func FromSQS

func FromSQS(b *Block)

func Get

func Get(msg interface{}, branch ...string) (interface{}, error)

func GetS3

func GetS3(b *Block)

func GroupHistogram

func GroupHistogram(b *Block)

GroupHistogram is a group of histograms, where each histogram is indexed by one field in the incoming message, and the histogram captures a distrbuition over another field.

func HTTPGet

func HTTPGet(b *Block)

Get GETs an external JSON and emits it

func Histogram

func Histogram(b *Block)

func ListS3

func ListS3(b *Block)

func LongHTTP

func LongHTTP(b *Block)

LongHTTP creates a long-poll HTTP connection

func Mask

func Mask(b *Block)

Mask modifies a JSON stream with an additive key filter. Mask uses the JSON object recieved through the rule channel to determine which keys should be included in the resulting object. An empty JSON object ({}) is used as the notation to include all values for a key.

For instance, if the JSON rule is:

{"a":{}, "b":{"d":{}},"x":{}}

And an incoming message looks like:

{"a":24, "b":{"c":"test", "d":[1,3,4]}, "f":5, "x":{"y":5, "z":10}}

The resulting object after the application of Mask would be:

{"a":24, "b":{"d":[1,3,4]}, "x":{"y":5, "z":10}}

func Post

func Post(b *Block)

Post POSTs an input message to an HTTP endpoint.

func PostTo

func PostTo(b *Block)

PostTo accepts JSON through POSTs to the /in endpoint and broadcasts to other blocks.

func PostValue

func PostValue(b *Block)

func Random

func Random(b *Block)

func ScaleRange

func ScaleRange(b *Block)

ScaleRange() rescales data in an online fashion All data will be mapped to the specified range, but the position each point maps to may change as the empirical min and max values for the stream are updated. Formula: x_scaled = (x - min / (max - min)) * (scaled_max - scaled_min) + scaled_min

func Sd

func Sd(b *Block)

Sd calculates standard deviation in an online fashion using Welford's Algorithm. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf

func Set

func Set(m interface{}, key string, val interface{}) error

func SkeletonState

func SkeletonState(b *Block)

this is a skeleton state block. It doesn't do anything, but can be used as a template to make new state blocks.

func SkeletonTransfer

func SkeletonTransfer(b *Block)

func Sync

func Sync(b *Block)

func Ticker

func Ticker(b *Block)

func Timeseries

func Timeseries(b *Block)

func ToFile

func ToFile(b *Block)

func ToLog

func ToLog(b *Block)

ToLog prints recieved messages to the stream tools logger.

func ToNSQ

func ToNSQ(b *Block)

func Unpack

func Unpack(b *Block)

func Var

func Var(b *Block)

Var calculates variance in an online fashion using Welford's Algorithm. The Var() block is the Sd() block with the squared correction. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf

Types

type BMsg

type BMsg interface{}

type Block

type Block struct {
	BlockType string
	ID        string
	InChan    chan BMsg
	OutChans  map[string]chan BMsg
	Routes    map[string]chan BMsg
	AddChan   chan *OutChanMsg
	InBlocks  map[string]bool // bool is dumb.
	OutBlocks map[string]bool // bool is dumb.
	QuitChan  chan bool
}

func NewBlock

func NewBlock(name string, ID string) (*Block, error)

type BlockLibrary

type BlockLibrary map[string]*BlockTemplate

A block library is a collection of possible block templates

type BlockRoutine

type BlockRoutine func(*Block)

type BlockTemplate

type BlockTemplate struct {
	BlockType  string
	RouteNames []string
	// BlockRoutine is the central processing routine for a block. All the work gets done in here
	Routine BlockRoutine
}

Block is the basic interface for processing units in streamtools

type Message

type Message struct {
	// this is a list in case I'm ever brave enough to up the "MaxNumberOfMessages" away from 1
	Body          []string `xml:"ReceiveMessageResult>Message>Body"`
	ReceiptHandle []string `xml:"ReceiveMessageResult>Message>ReceiptHandle"`
}

type OutChanMsg

type OutChanMsg struct {
	// type of action to perform
	Action int
	// new channel to introduce to a block's outChan array
	OutChan chan BMsg
	// ID of the connection block
	ID string
}

type PQMessage

type PQMessage struct {
	// contains filtered or unexported fields
}

type PriorityQueue

type PriorityQueue []*PQMessage

A PriorityQueue implements heap.Interface and holds Items.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) PeekAndShift

func (pq *PriorityQueue) PeekAndShift(max time.Time, lag time.Duration) (interface{}, time.Duration)

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type RouteResponse

type RouteResponse struct {
	Msg          BMsg
	ResponseChan chan BMsg
}

RouteResponse is passed into a block to query via established handlers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL