Documentation ¶
Index ¶
- Constants
- Variables
- func Cap(in <-chan interface{}, out chan<- interface{}, errs chan<- error)
- func IF(t l.Tfunc, check IfFunc) l.Tfunc
- func IFElse(t, e l.Tfunc, check IfFunc) l.Tfunc
- func OnlyIF(t l.Tfunc, check IfFunc) l.Tfunc
- func Tap(otherOut chan<- interface{}) func(<-chan interface{}, chan<- interface{}, chan<- error)
- func Tee(targets ...line.Tfunc) line.Tfunc
- type Batch
- type Buffer
- type Cmd
- type Commander
- type Count
- type ErrorHandler
- type Fanout
- type FanoutIncludeTypes
- type FanoutMsgTypesFunc
- type FanoutTfunc
- type Group
- type GroupByFunc
- type GroupMsg
- type Head
- type If
- type IfFunc
- type Progress
- type RateLimit
- type ReduceFunc
- type SQL
- type ShardMany
- type ShardManyKeyFunc
- type Sort
- type Tail
Examples ¶
Constants ¶
const (
ANY_TYPE = "_any_type_"
)
Variables ¶
var ErrSQLTypeConversionError = errors.New("unknown type to convert to SQL")
ErrSQLTypeConversionError is the error
Functions ¶
func Cap ¶
func Cap(in <-chan interface{}, out chan<- interface{}, errs chan<- error)
Cap will cap off a pipeline with no-op. This can be useful when embedding a pipeline and don't want to forward message back to the parent pipeline.
func IF ¶
IF applies an IfFunc to see if the Tfunc should get the message if not, let the message pass through
func OnlyIF ¶
OnlyIF applies an IfFunc to see if the Tfunc should get the message if not, ignore the message
Types ¶
type Batch ¶
type Batch struct { N int Timeout time.Duration ByteLimit int // contains filtered or unexported fields }
Batch will take N number of messages and create a batch message that has the slice of the messages as the metdata key "batch". It will also combine the bodies into the body of the batch separated by newlines if the CombineBody is true.
func CloseableBatch ¶
CloseableBatch creates a new batch
func (*Batch) Close ¶
func (b *Batch) Close()
Close will send a signal to the batcher to break out and shutdown. This will send one final batch of whatever is left.
func (Batch) T ¶
T inplements the pipeline transform interface.
Example (NoTimeout) ¶
package main import ( "fmt" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/message" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for i := 0; i < 4; i++ { out <- "foo" } }).Add( x.Batch{N: 3}.T, l.Inline(func(m interface{}) (interface{}, error) { if b, ok := m.(message.Batch); ok { return fmt.Sprintf("batch of size %d", b.Size()), nil } return fmt.Sprintf("want *x.BatchMsg bot %T", m), nil }), l.Stdout, ).Run() }
Output: batch of size 3 batch of size 1
Example (WithByteLimit) ¶
package main import ( "fmt" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/message" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for i := 0; i < 10; i++ { out <- "foo" } }).Add( x.Batch{N: 4, ByteLimit: 10}.T, l.Inline(func(m interface{}) (interface{}, error) { if b, ok := m.(message.Batch); ok { return fmt.Sprintf("batch of size %d", b.Size()), nil } return fmt.Sprintf("want *x.BatchMsg bot %T", m), nil }), l.Stdout, ).Run() }
Output: batch of size 3 batch of size 3 batch of size 3 batch of size 1
Example (WithTimeout) ¶
package main import ( "fmt" "time" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/message" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for i := 0; i < 2; i++ { out <- "foo" } // cause the producer to delay to allow the // timeout to trigger a batch that isn't full time.Sleep(10 * time.Millisecond) for i := 0; i < 2; i++ { out <- "foo" } }).Add( x.Batch{N: 3, Timeout: 9 * time.Millisecond}.T, l.Inline(func(m interface{}) (interface{}, error) { if b, ok := m.(message.Batch); ok { return fmt.Sprintf("batch of size %d", b.Size()), nil } return fmt.Sprintf("want *x.BatchMsg bot %T", m), nil }), l.Stdout, ).Run() }
Output: batch of size 2 batch of size 2
type Buffer ¶
type Buffer struct {
N int
}
Buffer will create a buffer of Size to help "drain" a previous step.
Example ¶
package main import ( "fmt" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { gateComplete := make(chan bool) gate := 0 flush := make(chan bool) flushed := false msgsToSend := 100 after := x.Count{Silent: true} l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for i := 0; i < msgsToSend; i++ { out <- "foo" } // now wait for the buffer to load before checking the counts <-gateComplete fmt.Printf("before %d after %d\n", msgsToSend, after.Val()) flush <- true }).Add( func(in <-chan interface{}, out chan<- interface{}, errs chan<- error) { for m := range in { gate++ if gate == msgsToSend { gateComplete <- true } out <- m } }, // Buffer will take in 100 messages regardless of downstream x.Buffer{N: msgsToSend}.T, l.Inline(func(m interface{}) (interface{}, error) { // don't process anything until we are flushed if !flushed { <-flush flushed = true } return m, nil // passthrough }), after.Use, ).Run() fmt.Printf("before %d after %d\n", msgsToSend, after.Val()) }
Output: before 100 after 0 before 100 after 100
type Cmd ¶
type Cmd struct { Name string Args []string NoStdin bool // don't send the msg.String() to stdin of the command }
Cmd will execute a system command
func (Cmd) T ¶
T is the transform function for the pipe/line. It will run the shell command for each message.
Example ¶
package main import ( l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- nil out <- nil }).Add( x.Command("echo", "foo").T, l.Stdout, ).Run() }
Output: foo foo
Example (Template_args) ¶
package main import ( l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- "ls" out <- "echo" }).Add( x.Command("echo", "{{.}}").T, l.Stdout, ).Run() }
Output: ls echo
Example (Template_name) ¶
package main import ( l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- "ls" out <- "echo" }).Add( x.Command("{{.}}", "foo").T, l.Stdout, ).Run() }
Output: foo
type Count ¶
type Count struct { Live bool Silent bool Mod int64 Raw bool Mul int64 AutoMod bool // contains filtered or unexported fields }
Count counts up the messages coming down the pipe. Options:
Live: show the counts real-time to the console Silent: don't print to the console at all This is useful if you want to use the count programmatically instead of for the user on the console Mod: Only print counts that are modulus this number Raw: don't humanize the printed value (no commas) Mul: multiply the printed number by this amount instead of 1 for the batch message itself AutoMod: automatically determin the modulus to avoid printing to the console too much
Example ¶
package main import ( l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for i := 0; i < 4; i++ { out <- "foo" } }).Add( x.Count{}.T, ).Run() }
Output: 4
Example (Silent) ¶
package main import ( "fmt" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { c := x.Count{Silent: true} l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for i := 0; i < 4; i++ { out <- "foo" } }).Add( c.Use, // Use is on the pointer so the counted value is kept ).Run() fmt.Println(c.Val()) }
Output: 4
type ErrorHandler ¶
type ErrorHandler struct { TaskToTry l.InlineTfunc ErrorHandler l.Tfunc }
func (ErrorHandler) T ¶
func (eh ErrorHandler) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)
Process a message through the 'try' function. If there is an error, then pass it to the error handler. The error handler can determine what should happen, e.g. log in a special way and pass the error on, or ignore the error
type Fanout ¶
type Fanout struct {
// contains filtered or unexported fields
}
func NewFanout ¶
func NewFanout(tfuncs []FanoutTfunc, msgTypes FanoutMsgTypesFunc) *Fanout
Create a new Fanout with FanoutMsgTypesFunc to get the types (names) of a message with the intention of filtering messages to the provided FanoutTfunc's. The filtering only works if a FanoutTfunc implements FanoutIncludeTypes. If no FanoutMsgTypesFunc is passed in, then FanoutTfunc's get all messages
type FanoutIncludeTypes ¶
type FanoutIncludeTypes interface {
I() []string
}
A FanoutTfunc can implement this interface to tell Fanout that it only wants the message types returned by I() sent to it.
type FanoutMsgTypesFunc ¶
type FanoutMsgTypesFunc func(msg interface{}) (types []string)
Pass this in when creating Fanout if you want to include certain messages sent to a FanoutTfunc. When a message is passed to this function it should return the types (names) of the msg. These along with a FanoutTfunc's implementation of FanoutIncludeTypes will be used to determine the FanoutTfunc's the message is sent to.
type FanoutTfunc ¶
type FanoutTfunc interface {
T(<-chan interface{}, chan<- interface{}, chan<- error)
}
The Tfunc's Fanout fans out messages to.
type Group ¶
type Group struct { By GroupByFunc Size int Reduce ReduceFunc // contains filtered or unexported fields }
Group is the grouping transformer for pipe/line.
Example ¶
package main import ( "fmt" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for i := 0; i < 10; i++ { out <- i } }).Add( x.NewGroup(3, func(msg interface{}) []string { if msg.(int)%2 == 0 { return []string{"even"} // put in the "even" group } return []string{"odd"} // put in the "odd" group }).T, l.I(func(msg interface{}) (interface{}, error) { group := msg.(*x.GroupMsg) // type cast to the group message return fmt.Sprintf("-- %s --\n%v\n", group.Name, group.Batch), nil }), l.Stdout, ).Run() }
Output: -- even -- 0 2 4 -- odd -- 1 3 5 -- even -- 6 8 -- odd -- 7 9
func NewReduceGroup ¶
func NewReduceGroup(by GroupByFunc, reduce ReduceFunc) *Group
NewReduceGroup makes a new Group with a reducer
type GroupByFunc ¶
type GroupByFunc func(msg interface{}) (groups []string)
GroupByFunc is the function that is used to get the group names to group a message by.
type Head ¶
type Head struct {
N int // limit message to N
}
Head only allows the first Limit number of messages through then stops the pipeline.
type IfFunc ¶
type IfFunc func(interface{}) bool
IfFunc is a func passed in to determin if the message should be used
type Progress ¶
type Progress struct {
*pb.ProgressBar
}
Progress shows the progress of the stream
func NewProgress ¶
NewProgress creates a new progress transformer
func (*Progress) AddToTotal ¶
AddToTotal will passthrough message in a stream and add the count to the total
Example ¶
package main import ( "bytes" "fmt" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { // unknown total so set to 0 pb := x.NewProgress(0) // some message to send down stream msg := bytes.NewBufferString("foo") l.New().SetP(func(out chan<- interface{}, errs chan<- error) { for range [30]struct{}{} { // loop 30 times out <- msg } }).Add( pb.AddToTotal, // increments the total for the progress ).Run() fmt.Println(pb.Total) }
Output: 30
func (*Progress) T ¶
T will add each message to the progress bar progress
Example (KnownTotal) ¶
package main import ( "bytes" "time" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { // known total of 3 pb := x.NewProgress(3) l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- bytes.NewBufferString("1") out <- bytes.NewBufferString("2") out <- bytes.NewBufferString("3") }).Add( x.RateLimit{N: 1, Per: time.Second}.T, // do some work pb.T, // increments the progress bar ).Run() }
Output:
Example (UnknownTotal) ¶
package main import ( "bytes" "time" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { // unknown total so set to 0 pb := x.NewProgress(0) l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- bytes.NewBufferString("1") out <- bytes.NewBufferString("2") out <- bytes.NewBufferString("3") }).Add( pb.AddToTotal, // increments the total for the progress x.RateLimit{N: 1, Per: time.Second}.T, // do some work pb.T, // increments the progress bar ).Run() }
Output:
type RateLimit ¶
type RateLimit struct { N int64 Per time.Duration // smooth out the rate instead of bursting Smooth bool }
RateLimit limits how many messages and go through in a time frame.
func (RateLimit) T ¶
T is the Tfunc for the rate limiter
Example ¶
package main import ( "time" l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- "1" out <- "2" out <- "3" }).Add( x.RateLimit{N: 1, Per: time.Millisecond}.T, l.Stdout, ).Run() }
Output: 1 2 3
type ReduceFunc ¶
type ReduceFunc func(memo, msg interface{}) (newmemo interface{})
ReduceFunc defines the reducer you can optionaly specify. The purpose is to define how a group of messages is collapsed down into a single message.
type SQL ¶
type SQL struct { Table string // optional table to apply the mutation to MaskKeys []string // useful for logging without sensitive values like passwords NumberArgs bool // use $1 style placeholders for the resulting queries }
SQL will transform a message to a query. It is batch aware
func (SQL) I ¶
I implements the InlineTfunc interface
Example ¶
package main import ( l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/message" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- message.NewRecordFromMSI(map[string]interface{}{ "foo": "bar", }) }).Add( l.I(x.SQL{Table: "foo"}.I), l.Stdout, ).Run() }
Output: INSERT INTO foo (foo) VALUES ('bar')
func (SQL) SQLInsertFromBatch ¶
SQLInsertFromBatch converts a batch message to a bulk INSERT query message
func (SQL) T ¶
T implements the Tfunc interface
Example ¶
package main import ( l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/message" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- message.NewRecordFromMSI(map[string]interface{}{ "foo": "bar", }) }).Add( x.SQL{Table: "foo"}.T, l.Stdout, ).Run() }
Output: INSERT INTO foo (foo) VALUES ('bar')
Example (Batch) ¶
package main import ( l "github.com/MasteryConnect/pipe/line" "github.com/MasteryConnect/pipe/message" "github.com/MasteryConnect/pipe/x" ) func main() { l.New().SetP(func(out chan<- interface{}, errs chan<- error) { out <- message.NewRecordFromMSI(map[string]interface{}{ "foo": "bar", }) out <- message.NewRecordFromMSI(map[string]interface{}{ "foo": "bar2", }) }).Add( x.Batch{N: 2}.T, x.SQL{Table: "foo"}.T, l.Stdout, ).Run() }
Output: INSERT INTO foo (foo) VALUES ('bar'),('bar2')
type ShardMany ¶
type ShardMany struct {
// contains filtered or unexported fields
}
func NewShardMany ¶
func NewShardMany(concurrency int, tfunc l.Tfunc, shardManyKeyFunc ShardManyKeyFunc) (*ShardMany, error)
Create a new ShardMany instance concurrency - the number of go routines (shards) tfunc - the pipeline/function to process each message shardManyKeyFunc - when passed a message this function with return a key
func (*ShardMany) T ¶
The transformer function Each message pulled from the in channel will be:
- Passed to the ShardManyKeyFunc to get the messages key
- Sent to the shard processor (channel) for messages with the key from step 1
To determine which shard to pass the message to, the key is turned into an int, and then the int of the key is mapped to a shard by using the modulus operator with the number of shards (concurrency).
type ShardManyKeyFunc ¶
type ShardManyKeyFunc func(msg interface{}) (key []byte)
Given a message return a key for the message. Multiple messages can return the same key if they should be processed in order they come down the in channel