Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FlattenFlow ¶
Example ¶
package main import ( "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugin/codec/csv" _ "github.com/OutOfBedlam/tine/plugin/codec/json" _ "github.com/OutOfBedlam/tine/plugin/flows/base" _ "github.com/OutOfBedlam/tine/plugin/inlets/file" _ "github.com/OutOfBedlam/tine/plugin/outlets/file" ) func main() { dsl := ` [[inlets.file]] data = [ "a,1", "b,2", "c,3", ] format = "csv" [[flows.flatten]] name_infix = "::" [[flows.select]] includes = ["**"] [[outlets.file]] path = "-" format = "csv" ` // Make the output timestamp deterministic, so we can compare it // This line is required only for testing count := int64(0) engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) } // Create a new pipeline pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Run the pipeline if err := pipeline.Run(); err != nil { panic(err) } }
Output: 1721954798,file::0,a 1721954798,file::1,1 1721954799,file::0,b 1721954799,file::1,2 1721954800,file::0,c 1721954800,file::1,3
func MergeFlow ¶
Example ¶
package main import ( "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugin/codec/csv" _ "github.com/OutOfBedlam/tine/plugin/codec/json" _ "github.com/OutOfBedlam/tine/plugin/flows/base" _ "github.com/OutOfBedlam/tine/plugin/inlets/exec" _ "github.com/OutOfBedlam/tine/plugin/inlets/file" _ "github.com/OutOfBedlam/tine/plugin/outlets/file" ) func main() { // This example demonstrates how to use the merge flow. dsl := ` [[inlets.file]] data = [ "a,1", ] format = "csv" [[inlets.exec]] commands = ["echo", "hello world"] count = 1 trim_space = true ignore_error = true [[flows.merge]] wait_limit = "1s" [[outlets.file]] path = "-" format = "json" ` // Make the output time deterministic. so we can compare it. // This line is not needed in production code. engine.Now = func() time.Time { return time.Unix(1721954797, 0) } // Create a new engine. pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Run the engine. if err := pipeline.Run(); err != nil { panic(err) } }
Output: {"_ts":1721954797,"exec.stdout":"hello world","file.0":"a","file.1":"1"}
func SelectFlow ¶ added in v0.0.2
Example ¶
package main import ( "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugin/codec/csv" _ "github.com/OutOfBedlam/tine/plugin/codec/json" _ "github.com/OutOfBedlam/tine/plugin/flows/base" _ "github.com/OutOfBedlam/tine/plugin/inlets/file" _ "github.com/OutOfBedlam/tine/plugin/outlets/file" ) func main() { dsl := ` [[inlets.file]] data = [ "a,1", "b,2", "c,3", ] format = "csv" [[flows.select]] includes = ["**", "not-exist", "#_ts", "1"] [[outlets.file]] path = "-" format = "csv" ` // Make the output timestamp deterministic, so we can compare it // This line is required only for testing count := int64(0) engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) } // Create a new pipeline pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Run the pipeline if err := pipeline.Run(); err != nil { panic(err) } }
Output: file,1721954798,a,1,,1721954798,1 file,1721954799,b,2,,1721954799,2 file,1721954800,c,3,,1721954800,3
Example (Tag) ¶
package main import ( "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugin/codec/csv" _ "github.com/OutOfBedlam/tine/plugin/codec/json" _ "github.com/OutOfBedlam/tine/plugin/flows/base" _ "github.com/OutOfBedlam/tine/plugin/inlets/file" _ "github.com/OutOfBedlam/tine/plugin/outlets/file" ) func main() { dsl := ` [[inlets.file]] data = [ "a,1", "b,2", "c,3", ] format = "csv" [[flows.select]] includes = ["#_in", "*"] [[outlets.file]] path = "-" format = "csv" ` // Make the output timestamp deterministic, so we can compare it // This line is required only for testing count := int64(0) engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) } // Create a new pipeline pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Run the pipeline if err := pipeline.Run(); err != nil { panic(err) } }
Output: file,a,1 file,b,2 file,c,3
func UpdateFlow ¶ added in v0.0.3
Example ¶
package main import ( "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugin/codec/csv" _ "github.com/OutOfBedlam/tine/plugin/codec/json" _ "github.com/OutOfBedlam/tine/plugin/flows/base" _ "github.com/OutOfBedlam/tine/plugin/inlets/file" _ "github.com/OutOfBedlam/tine/plugin/outlets/file" ) func main() { dsl := ` [[inlets.file]] data = [ "James,1,1.23,true", "Jane,2,2.34,false", "Scott,3,3.45,true", ] fields = ["my_name", "my_int", "my_float", "flag"] format = "csv" [[flows.update]] set = [ { field = "my_name", name = "new_name" }, { field = "my_int", value = 10 }, { field = "my_float", value = 9.87, name = "new_float" }, { field = "flag", value = true, name = "new_flag" }, { tag = "_in", value = "mine" }, ] [[flows.select]] includes = ["#_in", "*"] [[outlets.file]] format = "json" ` // Make the output timestamp deterministic, so we can compare it // This line is required only for testing engine.Now = func() time.Time { return time.Unix(1721954797, 0) } // Create a new pipeline pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Run the pipeline if err := pipeline.Run(); err != nil { panic(err) } }
Output: {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"James"} {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Jane"} {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Scott"}
Types ¶
This section is empty.
Source Files
¶
Click to show internal directories.
Click to hide internal directories.