base

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2024 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DamperFlow

func DamperFlow(ctx *engine.Context) engine.Flow
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	recipe := `
		name="pipeline-1"
		[log]
			path = "-"
			level = "warn"
			no_color = true
			timeformat = "no-time-for-test"
		[[inlets.file]]
			data = [
				"a,100",
				"b,200",
				"c,300",
				"d,400",
			]
		[[flows.damper]]
			buffer_limit = 2
		[[flows.select]]
			includes = ["#_ts", "*"]
		[[outlets.file]]
			path = "-"
			format = "json"
	`
	seq := int64(0)
	engine.Now = func() time.Time { seq++; return time.Unix(1721954797+seq, 0) }
	pipe, err := engine.New(engine.WithConfig(recipe))
	if err != nil {
		panic(err)
	}
	err = pipe.Run()
	if err != nil {
		panic(err)
	}

}
Output:

{"0":"a","1":"100","_ts":1721954798}
{"0":"b","1":"200","_ts":1721954799}
{"0":"c","1":"300","_ts":1721954800}
{"0":"d","1":"400","_ts":1721954801}

func DumpFlow

func DumpFlow(ctx *engine.Context) engine.Flow
Example
package main

import (
	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	recipe := `
		name="pipeline-1"
		[log]
			path = "-"
			level = "warn"
			no_color = true
			timeformat = "no-time-for-test"
		[[inlets.file]]
			data = [
				"a,100",
				"b,200",
			]
		[[inlets.file.flows.dump]]
			level = "warn"
		[[flows.dump]]
			level = "error"
		[[outlets.file]]
			path = "-"
			format = "json"
	`
	pipe, err := engine.New(engine.WithConfig(recipe))
	if err != nil {
		panic(err)
	}
	err = pipe.Run()
	if err != nil {
		panic(err)
	}

}
Output:

no-time-for-test WRN pipeline pipeline-1 flow-dump rec=1/2 0=a 1=100
no-time-for-test WRN pipeline pipeline-1 flow-dump rec=2/2 0=b 1=200
no-time-for-test ERR pipeline pipeline-1 flow-dump rec=1/2 0=a 1=100
no-time-for-test ERR pipeline pipeline-1 flow-dump rec=2/2 0=b 1=200
{"0":"a","1":"100"}
{"0":"b","1":"200"}

func FileInlet

func FileInlet(ctx *engine.Context) engine.Inlet
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1", 
			"b,2", 
			"c,3",
		]
		format = "csv"
	[[outlets.file]]
		path = "-"
		format = "csv"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

a,1
b,2
c,3
Example (Fields)
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"1,key1,1722642405,1.234",
			"2,key2,1722642406,2.345",
		]
		format = "csv"
		fields = ["line", "name", "time", "value"]
	[[outlets.file]]
		format = "json"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"line":"1","name":"key1","time":"1722642405","value":"1.234"}
{"line":"2","name":"key2","time":"1722642406","value":"2.345"}
Example (File)
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		path = "testdata/testdata.csv"
		format = "csv"
		fields = ["line", "name", "time", "value"]
		types  = ["int", "string", "time", "float"]
	[[outlets.file]]
		path = "-"
		format = "json"
		decimal = 2
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"line":1,"name":"key1","time":1722642405,"value":1.23}
{"line":2,"name":"key2","time":1722642406,"value":2.35}

func FileOutlet

func FileOutlet(ctx *engine.Context) engine.Outlet
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1", 
			"b,2", 
			"c,3",
		]
		format = "csv"
	[[outlets.file]]
		path = "-"
		format = "csv"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

a,1
b,2
c,3

func FlattenFlow

func FlattenFlow(ctx *engine.Context) engine.Flow
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1", 
			"b,2", 
			"c,3",
		]
		format = "csv"
	[[flows.flatten]]
		name_infix = "::"
	[[flows.select]]
		includes = ["**"]
	[[outlets.file]]
		path = "-"
		format = "csv"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	count := int64(0)
	engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Run the pipeline
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

1721954798,file::0,a
1721954798,file::1,1
1721954799,file::0,b
1721954799,file::1,2
1721954800,file::0,c
1721954800,file::1,3

func InjectFlow

func InjectFlow(ctx *engine.Context) engine.Flow
Example
package main

import (
	"os"
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/args"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	// This example demonstrates how to use the exec inlet to run a command and
	dsl := `
	[[inlets.args]]
	[[flows.select]]
		includes = ["**"]
	[[flows.inject]]
		id = "here"
	[[outlets.file]]
		path = "-"
		format = "json"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Build pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}

	pipeline.Context().Inject("here", func(r []engine.Record) ([]engine.Record, error) {
		for i, rec := range r {
			r[i] = rec.AppendOrReplace(engine.NewField("msg", "hello world - here updated"))
		}
		return r, nil
	})

	// Simulate the command line arguments
	os.Args = []string{"command", "command-arg", "--", "msg=hello world"}
	// Run the pipeline
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"args","_ts":1721954797,"msg":"hello world - here updated"}

func MergeFlow

func MergeFlow(ctx *engine.Context) engine.Flow
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
	_ "github.com/OutOfBedlam/tine/plugins/exec"
	_ "github.com/OutOfBedlam/tine/plugins/psutil"
)

func main() {
	// This example demonstrates how to use the merge flow.
	dsl := `
	[[inlets.file]]
		data = [
			"a,1",
		]
		format = "csv"
	[[inlets.exec]]
		commands = ["echo", "hello world"]
		count = 1
		trim_space = true
		ignore_error = true
	[[flows.merge]]
		wait_limit = "1s"
	[[outlets.file]]
		path = "-"
		format = "json"
	`
	// Make the output time deterministic. so we can compare it.
	// This line is not needed in production code.
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new engine.
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Run the engine.
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_ts":1721954797,"exec_stdout":"hello world","file_0":"a","file_1":"1"}

func NewCSVDecoder

func NewCSVDecoder(conf engine.DecoderConfig) engine.Decoder

func NewCSVEncoder

func NewCSVEncoder(c engine.EncoderConfig) engine.Encoder

func NewJSONDecoder

func NewJSONDecoder(c engine.DecoderConfig) engine.Decoder

func NewJSONEncoder

func NewJSONEncoder(c engine.EncoderConfig) engine.Encoder

func SelectFlow

func SelectFlow(ctx *engine.Context) engine.Flow
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1", 
			"b,2", 
			"c,3",
		]
		format = "csv"
	[[flows.select]]
		includes = ["**", "not-exist", "#_ts", "1"]
	[[outlets.file]]
		path = "-"
		format = "csv"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	count := int64(0)
	engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Run the pipeline
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

file,1721954798,a,1,,1721954798,1
file,1721954799,b,2,,1721954799,2
file,1721954800,c,3,,1721954800,3
Example (Tag)
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1", 
			"b,2", 
			"c,3",
		]
		format = "csv"
		fields = ["area", "ival"]
		types = ["string", "int"]
	[[flows.select]]
		includes = ["#_in", "#non_exist", "*"]
	[[outlets.file]]
		format = "json"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	count := int64(0)
	engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Run the pipeline
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"file","area":"a","ival":1,"non_exist":null}
{"_in":"file","area":"b","ival":2,"non_exist":null}
{"_in":"file","area":"c","ival":3,"non_exist":null}

func StringFields

func StringFields(r engine.Record) []string

func StringFieldsWithFormat

func StringFieldsWithFormat(r engine.Record, tf *engine.Timeformatter, decimal int) []string

func UpdateFlow

func UpdateFlow(ctx *engine.Context) engine.Flow
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"James,1,1.23,true", 
			"Jane,2,2.34,false", 
			"Scott,3,3.45,true",
		]
		fields = ["my_name", "my_int", "my_float", "flag"]
		format = "csv"
	[[flows.update]]
		set = [
			{ field = "my_name", name = "new_name" },
			{ field = "my_int", value = 10 },
			{ field = "my_float", value = 9.87, name = "new_float" },
			{ field = "flag", value = true, name = "new_flag" },
			{ tag = "_in", value = "mine" },
		]
	[[flows.select]]
		includes = ["#_in", "*"]
	[[outlets.file]]
		format = "json"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Run the pipeline
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"James"}
{"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Jane"}
{"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Scott"}

Types

type CSVDecoder

type CSVDecoder struct {
	engine.DecoderConfig
	// contains filtered or unexported fields
}

func (*CSVDecoder) Decode

func (cr *CSVDecoder) Decode() ([]engine.Record, error)

type CSVEncoder

type CSVEncoder struct {
	engine.EncoderConfig
	// contains filtered or unexported fields
}
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1,1.234,true,2024/08/09 16:01:02", 
			"b,2,2.345,false,2024/08/09 16:03:04", 
			"c,3,3.456,true,2024/08/09 16:05:06",
		]
		format = "csv"
		timeformat = "2006/01/02 15:04:05"
		tz = "UTC"
		fields = ["area","ival","fval","bval","tval"]
		types  = ["string", "int", "float", "bool", "time"]
	[[flows.select]]
		includes = ["#*", "ival", "area", "ival", "fval", "bval", "tval"]
	[[outlets.file]]
		path = "-"
		format = "csv"
	`
	// Mock the current time
	count := int64(0)
	engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }

	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

file,1721954798,1,a,1,1.234,true,1723219262
file,1721954799,2,b,2,2.345,false,1723219384
file,1721954800,3,c,3,3.456,true,1723219506

func (*CSVEncoder) Encode

func (cw *CSVEncoder) Encode(recs []engine.Record) error

type JSONDecoder

type JSONDecoder struct {
	engine.DecoderConfig
	// contains filtered or unexported fields
}
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		path = "testdata/testdata.json"
		format = "json"
	[[flows.select]]
		includes = ["#*", "area", "bval", "ival", "fval", "time"]
	[[outlets.file]]
		path = "-"
		format = "json"
		decimal = 2
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"file","_ts":1721954797,"area":"a","bval":true,"fval":1.23,"ival":1.00,"time":"2020-01-01T00:00:00Z"}
{"_in":"file","_ts":1721954797,"area":"b","bval":true,"fval":2.35,"ival":2.00,"time":"2020-01-02T00:00:00Z"}
Example (Data)
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			'{"area": "a","ival": 1,"fval": 1.234,"time": "2020-01-01T00:00:00Z","bval": true}',
			'{"area": "b","ival": 2,"fval": 2.345,"time": "2020-01-02T00:00:00Z","bval": true}',
		]
		format = "json"
	[[flows.select]]
		includes = ["#*", "area", "bval", "ival", "fval", "time"]
	[[outlets.file]]
		path = "-"
		format = "json"
		decimal = 2
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"file","_ts":1721954797,"area":"a","bval":true,"fval":1.23,"ival":1.00,"time":"2020-01-01T00:00:00Z"}
{"_in":"file","_ts":1721954797,"area":"b","bval":true,"fval":2.35,"ival":2.00,"time":"2020-01-02T00:00:00Z"}

func (*JSONDecoder) Decode

func (jd *JSONDecoder) Decode() ([]engine.Record, error)

type JSONEncoder

type JSONEncoder struct {
	engine.EncoderConfig
	// contains filtered or unexported fields
}
Example
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1,1.2345", 
			"b,2,2.3456", 
			"c,3,3.4567",
		]
		format = "csv"
		fields = ["area", "ival", "fval"]
		types  = ["string", "int", "float"]
	[[flows.select]]
		includes = ["#*", "area", "ival", "fval"]
	[[outlets.file]]
		path = "-"
		format = "json"
	`

	// Mock the current time
	count := int64(0)
	engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }

	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"file","_ts":1721954798,"area":"a","fval":1.2345,"ival":1}
{"_in":"file","_ts":1721954799,"area":"b","fval":2.3456,"ival":2}
{"_in":"file","_ts":1721954800,"area":"c","fval":3.4567,"ival":3}
Example (Decimal)
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/base"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1,1.2345", 
			"b,2,2.3456", 
			"c,3,3.4567",
		]
		format = "csv"
		fields = ["area", "ival", "fval"]
		types  = ["string", "int", "float"]
	[[flows.select]]
		includes = ["#*", "area", "ival", "fval"]
	[[outlets.file]]
		path = "-"
		format = "json"
		decimal = 2
	`

	// Mock the current time
	count := int64(0)
	engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }

	// Create a new pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"file","_ts":1721954798,"area":"a","fval":1.23,"ival":1}
{"_in":"file","_ts":1721954799,"area":"b","fval":2.35,"ival":2}
{"_in":"file","_ts":1721954800,"area":"c","fval":3.46,"ival":3}

func (*JSONEncoder) Encode

func (jw *JSONEncoder) Encode(recs []engine.Record) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL