engine

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2024 License: BSD-3-Clause Imports: 25 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const TAG_INLET = "_in"
View Source
const TAG_TIMESTAMP = "_ts"

Variables

View Source
var DefaultConfigString string
View Source
var DefaultTimeformatter = &Timeformatter{format: time.RFC3339, loc: time.Local}
View Source
var Now = time.Now

Now is a function that returns the current time It is used to generate the timestamp for each record The default value is time.Now Set this to a fixed time in the purpose of testing

Functions

func BuildCompiler

func BuildCompiler() string

func BuildTimestamp

func BuildTimestamp() string

func CanonicalTagKey added in v0.0.2

func CanonicalTagKey(key string) string

func CompressorNames

func CompressorNames() []string

func DecoderNames

func DecoderNames() []string

func DecompressorNames

func DecompressorNames() []string

func DisplayVersion

func DisplayVersion() string

func EncoderNames

func EncoderNames() []string

func FlowNames

func FlowNames() []string

func GetTagInt64 added in v0.0.2

func GetTagInt64(t Tags, key string) int64

func GetTagString added in v0.0.2

func GetTagString(t Tags, key string) string

func HttpHandleFunc

func HttpHandleFunc(config string) http.HandlerFunc

HttpHandleFunc is a convenience function to create a http.HandlerFunc from a pipeline configuration

func InletNames

func InletNames() []string

func LoadConfig

func LoadConfig(content string, cfg *PipelineConfig) error

func NopCloser

func NopCloser(w io.Writer) io.WriteCloser

func OutletNames

func OutletNames() []string

func RegisterCompressor

func RegisterCompressor(reg *Compressor)

func RegisterDecoder

func RegisterDecoder(reg *DecoderReg)

func RegisterDecompressor

func RegisterDecompressor(reg *Decompressor)

func RegisterEncoder

func RegisterEncoder(reg *EncoderReg)

func RegisterFlow

func RegisterFlow(reg *FlowReg)

func RegisterInlet

func RegisterInlet(reg *InletReg)

func RegisterOutlet

func RegisterOutlet(reg *OutletReg)

func SetDefaultValueFormat

func SetDefaultValueFormat(vf ValueFormat)

func UnboxFields

func UnboxFields(fields []*Field) []any

func UnregisterCompressor

func UnregisterCompressor(name string)

func UnregisterDecoder

func UnregisterDecoder(name string)

func UnregisterDecompressor

func UnregisterDecompressor(name string)

func UnregisterEncoder

func UnregisterEncoder(name string)

func UnregisterFlow

func UnregisterFlow(name string)

func VersionString

func VersionString() string

Types

type AND

type AND struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	// AND predicate
	// name == "John" AND age > 18
	p := engine.PredicateAND(
		engine.F{ColName: "name", Comparator: engine.EQ, Comparando: "John"},
		engine.F{ColName: "age", Comparator: engine.GT, Comparando: 18},
	)
	// Record
	records := []engine.Record{
		engine.NewRecord(
			engine.NewField("age", int64(19)),
			engine.NewField("name", "John"),
		),
		engine.NewRecord(
			engine.NewField("age", int64(17)),
			engine.NewField("name", "John"),
		),
	}
	// Apply AND predicate
	for _, r := range records {
		fmt.Println(p.Apply(r))
	}
}
Output:

true
false

func (AND) Apply

func (of AND) Apply(r Record) bool

type BufferedFlow added in v0.0.2

type BufferedFlow interface {
	Flow
	Flush(FlowNextFunc)
}

type Comparator

type Comparator string
const (
	EQ       Comparator = "=="
	NEQ      Comparator = "!="
	GT       Comparator = ">"
	GTE      Comparator = ">="
	LT       Comparator = "<"
	LTE      Comparator = "<="
	IN       Comparator = "in"
	NOT_IN   Comparator = "not in"
	CompFunc Comparator = "func"
)

type Compressor

type Compressor struct {
	Name            string
	Factory         func(io.Writer) io.WriteCloser
	ContentEncoding string
}

func GetCompressor

func GetCompressor(name string) *Compressor

type Config

type Config map[string]any

func NewConfig

func NewConfig() Config

func (Config) GetBool

func (c Config) GetBool(key string, defaultVal bool) bool

func (Config) GetConfig

func (c Config) GetConfig(key string, defaultVal Config) Config

func (Config) GetConfigSlice added in v0.0.2

func (c Config) GetConfigSlice(key string, defaultVal []Config) []Config

func (Config) GetDuration

func (c Config) GetDuration(key string, defaultVal time.Duration) time.Duration

func (Config) GetFloat

func (c Config) GetFloat(key string, defaultVal float64) float64

func (Config) GetInt

func (c Config) GetInt(key string, defaultVal int) int

func (Config) GetInt64

func (c Config) GetInt64(key string, defaultVal int64) int64

func (Config) GetIntSlice added in v0.0.2

func (c Config) GetIntSlice(key string, defaultVal []int) []int

func (Config) GetString

func (c Config) GetString(key string, defaultVal string) string

func (Config) GetStringSlice added in v0.0.2

func (c Config) GetStringSlice(key string, defaultVal []string) []string

func (Config) GetUint32

func (c Config) GetUint32(key string, defaultVal uint32) uint32

func (Config) GetValue added in v0.0.6

func (c Config) GetValue(key string) *Value

func (Config) Set

func (c Config) Set(key string, val any) Config

func (Config) Unset

func (c Config) Unset(key string) Config

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) CircuitBreak

func (ctx *Context) CircuitBreak()

func (*Context) Config

func (ctx *Context) Config() Config

func (*Context) Deadline

func (ctx *Context) Deadline() (deadline time.Time, ok bool)

func (*Context) Done

func (ctx *Context) Done() <-chan struct{}

func (*Context) Err

func (ctx *Context) Err() error

func (*Context) GetInject added in v0.0.5

func (ctx *Context) GetInject(id string) (InjectFunc, bool)

func (*Context) Inject added in v0.0.5

func (ctx *Context) Inject(id string, cb InjectFunc)
Example
package main

import (
	"os"
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/args"
	_ "github.com/OutOfBedlam/tine/plugins/base"
	_ "github.com/OutOfBedlam/tine/plugins/exec"
	_ "github.com/OutOfBedlam/tine/plugins/psutil"
)

func main() {
	// This example demonstrates how to use the exec inlet to run a command and
	dsl := `
	[[inlets.args]]
	[[flows.select]]
		includes = ["**"]
	[[flows.inject]]
		id = "here"
	[[outlets.file]]
		path = "-"
		format = "json"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Build pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}

	pipeline.Context().Inject("here", func(r []engine.Record) ([]engine.Record, error) {
		for i, rec := range r {
			r[i] = rec.AppendOrReplace(engine.NewField("msg", "hello world - here updated"))
		}
		return r, nil
	})

	// Simulate the command line arguments
	os.Args = []string{"command", "command-arg", "--", "msg=hello world"}
	// Run the pipeline
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"args","_ts":1721954797,"msg":"hello world - here updated"}

func (*Context) Log added in v0.0.6

func (ctx *Context) Log(level slog.Level, msg string, keyvals ...interface{})

func (*Context) LogDebug

func (ctx *Context) LogDebug(msg string, keyvals ...interface{})

func (*Context) LogEnabled added in v0.0.5

func (ctx *Context) LogEnabled(level slog.Level) bool

func (*Context) LogError

func (ctx *Context) LogError(msg string, keyvals ...interface{})

func (*Context) LogInfo

func (ctx *Context) LogInfo(msg string, keyvals ...interface{})

func (*Context) LogWarn

func (ctx *Context) LogWarn(msg string, keyvals ...interface{})

func (*Context) PipelineName

func (ctx *Context) PipelineName() string

func (*Context) SetContentEncoding

func (ctx *Context) SetContentEncoding(contentEncoding string)

func (*Context) SetContentLength

func (ctx *Context) SetContentLength(contentLength int)

func (*Context) SetContentType

func (ctx *Context) SetContentType(contentType string)

func (*Context) Value

func (ctx *Context) Value(key interface{}) interface{}

func (*Context) WithConfig

func (ctx *Context) WithConfig(conf Config) *Context

func (*Context) WithLogger

func (ctx *Context) WithLogger(logger *slog.Logger) *Context

func (*Context) Writer

func (ctx *Context) Writer() io.Writer

type Decoder

type Decoder interface {
	Decode() ([]Record, error)
}

type DecoderConfig

type DecoderConfig struct {
	Reader       io.Reader
	FormatOption ValueFormat
	Fields       []string
	Types        []Type
}

type DecoderReg

type DecoderReg struct {
	Name    string
	Factory func(DecoderConfig) Decoder
}

func GetDecoder

func GetDecoder(name string) *DecoderReg

type Decompressor

type Decompressor struct {
	Name    string
	Factory func(io.Reader) io.ReadCloser
}

func GetDecompressor

func GetDecompressor(name string) *Decompressor

type Encoder

type Encoder interface {
	Encode([]Record) error
}

type EncoderConfig

type EncoderConfig struct {
	Writer       io.Writer
	Fields       []string
	FormatOption ValueFormat
}

type EncoderReg

type EncoderReg struct {
	Name        string
	Factory     func(EncoderConfig) Encoder
	ContentType string
}

func GetEncoder

func GetEncoder(name string) *EncoderReg

type F

type F struct {
	ColName    string
	Comparator Comparator
	Comparando any
}

func (F) Apply

func (f F) Apply(r Record) bool

type Field

type Field struct {
	Name  string `json:"name"`
	Value *Value `json:"value"`
	Tags  Tags   `json:"tags,omitempty"`
}

Field is Value with name and Tags

func NewField added in v0.0.2

func NewField[T RawValue](name string, value T) *Field

func NewFieldWithValue added in v0.0.2

func NewFieldWithValue(name string, value *Value) *Field

if name is empty or value is nil, return nil

func (*Field) BinaryField

func (f *Field) BinaryField() *Field

func (*Field) BoolField

func (f *Field) BoolField() *Field

func (*Field) Clone

func (v *Field) Clone() *Field

Clone returns a deep copy of the field

func (*Field) Convert

func (f *Field) Convert(to Type) *Field

func (*Field) Copy

func (v *Field) Copy(newName string) *Field

Copy returns a shallow copy of the field with a new name

func (*Field) FloatField

func (f *Field) FloatField() *Field

func (*Field) Func

func (f *Field) Func(fn func(any) bool) bool

func (*Field) IntField

func (f *Field) IntField() *Field

func (*Field) IsNull

func (f *Field) IsNull() bool

func (*Field) String

func (f *Field) String() string

func (*Field) StringField

func (f *Field) StringField() *Field

func (*Field) TimeField

func (f *Field) TimeField() *Field

func (*Field) Type

func (f *Field) Type() Type

func (*Field) UintField

func (f *Field) UintField() *Field

type Flow

type Flow interface {
	OpenCloser
	Process([]Record, FlowNextFunc)
	Parallelism() int
}

func FanInFlow

func FanInFlow(ctx *Context) Flow

func FanOutFlow

func FanOutFlow(ctx *Context) Flow

func FlowWithFunc

func FlowWithFunc(fn func([]Record) ([]Record, error), opts ...FlowFuncWrapOption) Flow

type FlowConfig

type FlowConfig struct {
	Plugin string
	Params Config
}

type FlowFuncWrap

type FlowFuncWrap struct {
	// contains filtered or unexported fields
}

func (*FlowFuncWrap) Close

func (fw *FlowFuncWrap) Close() error

func (*FlowFuncWrap) Open

func (fw *FlowFuncWrap) Open() error

func (*FlowFuncWrap) Parallelism

func (fw *FlowFuncWrap) Parallelism() int

func (*FlowFuncWrap) Process

func (fw *FlowFuncWrap) Process(r []Record, cb FlowNextFunc)

type FlowFuncWrapOption

type FlowFuncWrapOption func(*FlowFuncWrap)

func WithFlowFuncParallelism

func WithFlowFuncParallelism(parallelism int) FlowFuncWrapOption

type FlowHandler

type FlowHandler struct {
	// contains filtered or unexported fields
}

func NewFlowHandler

func NewFlowHandler(ctx *Context, name string, flow Flow) *FlowHandler

func (*FlowHandler) Start

func (fh *FlowHandler) Start() error

func (*FlowHandler) Stop

func (fh *FlowHandler) Stop() error

func (*FlowHandler) Via

func (fh *FlowHandler) Via(next *FlowHandler) *FlowHandler

type FlowNextFunc added in v0.0.3

type FlowNextFunc func([]Record, error)

type FlowReg

type FlowReg struct {
	Name    string
	Factory func(ctx *Context) Flow
}

func GetFlowRegistry

func GetFlowRegistry(name string) *FlowReg

type Flusher added in v0.0.4

type Flusher interface {
	Flush()
}

type InjectFunc added in v0.0.5

type InjectFunc func([]Record) ([]Record, error)

type Inlet

type Inlet interface {
	OpenCloser
	Process(InletNextFunc)
}

func InletWithFunc added in v0.0.3

func InletWithFunc(fn func() ([]Record, error), opts ...InletFuncOption) Inlet

type InletConfig

type InletConfig struct {
	Plugin string
	Params Config
	Flows  []FlowConfig
}

type InletFuncOption added in v0.0.3

type InletFuncOption func(*InletFuncWrap)

func WithInterval

func WithInterval(interval time.Duration) InletFuncOption

func WithRunCountLimit

func WithRunCountLimit(limit int64) InletFuncOption

type InletFuncWrap added in v0.0.3

type InletFuncWrap struct {
	// contains filtered or unexported fields
}

func (*InletFuncWrap) Close added in v0.0.3

func (in *InletFuncWrap) Close() error

func (*InletFuncWrap) Interval added in v0.0.3

func (in *InletFuncWrap) Interval() time.Duration

func (*InletFuncWrap) Open added in v0.0.3

func (in *InletFuncWrap) Open() error

func (*InletFuncWrap) Process added in v0.0.3

func (in *InletFuncWrap) Process(next InletNextFunc)

type InletHandler

type InletHandler struct {
	// contains filtered or unexported fields
}

func NewInletHandler

func NewInletHandler(ctx *Context, name string, inlet Inlet, outCh chan<- []Record) (*InletHandler, error)

func (*InletHandler) AddFlow

func (in *InletHandler) AddFlow(flow *FlowHandler)

AddFlow adds a sub-flow to the inlet handler

func (*InletHandler) Run

func (in *InletHandler) Run() error

func (*InletHandler) Start

func (in *InletHandler) Start()

func (*InletHandler) Stop

func (in *InletHandler) Stop()

func (*InletHandler) Via

func (in *InletHandler) Via(flow *FlowHandler) *FlowHandler

Via connects the inlet handler to a flow handler

func (*InletHandler) Walk added in v0.0.3

func (in *InletHandler) Walk(walker func(inletName string, kind string, step string, handler any))

type InletNextFunc added in v0.0.3

type InletNextFunc func([]Record, error)

type InletReg

type InletReg struct {
	Name    string
	Factory func(*Context) Inlet
}

func GetInletRegistry

func GetInletRegistry(name string) *InletReg

func (*InletReg) New

func (ir *InletReg) New(ctx *Context) Inlet

type Key

type Key interface {
	cmp.Ordered
}

type OR

type OR struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	// OR predicate
	// name == "John" OR age < 18
	p := engine.PredicateOR(
		engine.F{ColName: "name", Comparator: engine.EQ, Comparando: "John"},
		engine.F{ColName: "age", Comparator: engine.LT, Comparando: 18},
	)
	// Record
	records := []engine.Record{
		engine.NewRecord(
			engine.NewField("age", int64(19)),
			engine.NewField("name", "John"),
		),
		engine.NewRecord(
			engine.NewField("age", int64(20)),
			engine.NewField("name", "Jane"),
		),
	}
	// Apply OR predicate
	for _, r := range records {
		fmt.Println(p.Apply(r))
	}
}
Output:

true
false

func (OR) Apply

func (of OR) Apply(r Record) bool

type OpenCloser

type OpenCloser interface {
	Open() error
	Close() error
}

type Option

type Option func(*Pipeline) error

func WithConfig

func WithConfig(conf string) Option

WithConfig loads a TOML configuration string into a PipelineConfig struct

func WithConfigFile

func WithConfigFile(path string) Option

WithConfigFile loads a TOML configuration file into a PipelineConfig struct

func WithLogWriter added in v0.0.5

func WithLogWriter(w io.Writer) Option

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets the logger for the pipeline

func WithName

func WithName(name string) Option

WithName sets the name of the pipeline

func WithSetContentEncodingFunc

func WithSetContentEncodingFunc(fn SetContentEncodingCallback) Option

WithSetContentEncodingFunc sets the callback function to set the content encoding

func WithSetContentLengthFunc

func WithSetContentLengthFunc(fn SetContentLengthCallback) Option

WithSetContentLengthFunc sets the callback function to set the content length

func WithSetContentTypeFunc

func WithSetContentTypeFunc(fn SetContentTypeCallback) Option

WithSetContentTypeFunc sets the callback function to set the content type

func WithVerbose added in v0.0.6

func WithVerbose(flag bool) Option

func WithWriter

func WithWriter(w io.Writer) Option

WithDefaults sets the default output writer for the pipeline

type Outlet

type Outlet interface {
	OpenCloser
	Handle(r []Record) error
}

func OutletWithFunc

func OutletWithFunc(fn func([]Record) error) Outlet

type OutletConfig

type OutletConfig struct {
	Plugin string
	Params Config
}

type OutletFuncWrap

type OutletFuncWrap struct {
	// contains filtered or unexported fields
}

func (*OutletFuncWrap) Close

func (out *OutletFuncWrap) Close() error

func (*OutletFuncWrap) Handle

func (out *OutletFuncWrap) Handle(r []Record) error

func (*OutletFuncWrap) Open

func (out *OutletFuncWrap) Open() error

type OutletHandler

type OutletHandler struct {
	// contains filtered or unexported fields
}

func NewOutletHandler

func NewOutletHandler(ctx *Context, name string, outlet Outlet) (*OutletHandler, error)

func (*OutletHandler) Sink

func (out *OutletHandler) Sink() chan<- []Record

func (*OutletHandler) Start

func (out *OutletHandler) Start() error

func (*OutletHandler) Stop

func (out *OutletHandler) Stop()

type OutletReg

type OutletReg struct {
	Name    string
	Factory func(*Context) Outlet
}

func GetOutletRegistry

func GetOutletRegistry(name string) *OutletReg

func (*OutletReg) New

func (or *OutletReg) New(ctx *Context) Outlet

type PeriodicInlet added in v0.0.3

type PeriodicInlet interface {
	Inlet
	Interval() time.Duration
}

type Pipeline

type Pipeline struct {
	PipelineConfig
	// contains filtered or unexported fields
}

func New

func New(opts ...Option) (*Pipeline, error)

New creates a new pipeline with the given options

Example
package main

import (
	"os"
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/args"
	_ "github.com/OutOfBedlam/tine/plugins/base"
	_ "github.com/OutOfBedlam/tine/plugins/exec"
	_ "github.com/OutOfBedlam/tine/plugins/psutil"
)

func main() {
	// This example demonstrates how to use the exec inlet to run a command and
	dsl := `
	[[inlets.args]]
	[[flows.update]]
		set = [
			{ field = "msg", name_format = "pre_%s_suf" },
		]
	[[flows.select]]
		includes = ["**"]
	[[outlets.file]]
		path = "-"
		format = "json"
	`
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Build pipeline
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Simulate the command line arguments
	os.Args = []string{"command", "command-arg", "--", "msg=hello world"}
	// Run the pipeline
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"args","_ts":1721954797,"pre_msg_suf":"hello world"}
Example (Multi_inlets)
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/args"
	_ "github.com/OutOfBedlam/tine/plugins/base"
	_ "github.com/OutOfBedlam/tine/plugins/exec"
	_ "github.com/OutOfBedlam/tine/plugins/psutil"
)

func main() {
	// This example demonstrates how to use the merge flow.
	dsl := `
	[[inlets.file]]
		data = [
			"a,1",
		]
		format = "csv"
	[[inlets.exec]]
		commands = ["echo", "hello world"]
		count = 1
		trim_space = true
		ignore_error = true
	[[flows.merge]]
		wait_limit = "1s"
	[[outlets.file]]
		path = "-"
		format = "json"
	`
	// Make the output time deterministic. so we can compare it.
	// This line is not needed in production code.
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new engine.
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Run the engine.
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_ts":1721954797,"exec_stdout":"hello world","file_0":"a","file_1":"1"}

func (*Pipeline) AddFlow

func (p *Pipeline) AddFlow(name string, flow Flow) (*FlowHandler, error)

AddFlow adds a flow to the pipeline

func (*Pipeline) AddInlet

func (p *Pipeline) AddInlet(name string, inlet Inlet) (*InletHandler, error)

AddInlet adds an inlet to the pipeline

func (*Pipeline) AddOutlet

func (p *Pipeline) AddOutlet(name string, outlet Outlet) (*OutletHandler, error)

AddOutlet adds an outlet to the pipeline

func (*Pipeline) Build

func (p *Pipeline) Build() (returnErr error)

Build the pipeline, this will create all inlets, outlets and flows

func (*Pipeline) Context

func (p *Pipeline) Context() *Context

Context returns the context of the pipeline

func (*Pipeline) Run

func (p *Pipeline) Run() (returnErr error)

Run the pipeline, this will start all inlets, outlets and flows and wait until the pipeline is stopped

func (*Pipeline) Start

func (p *Pipeline) Start()

Start the pipeline, this will start all inlets, outlets and flows and returns immediately without waiting for the pipeline to stop

func (*Pipeline) Stop

func (p *Pipeline) Stop() error

Stop the pipeline, this will stop all inlets, outlets and flows. Start() or Run() should be called before calling Stop()

func (*Pipeline) Walk added in v0.0.3

func (p *Pipeline) Walk(walker func(pipelineName string, kind string, step string, handler any))
Example
package main

import (
	"fmt"
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/args"
	_ "github.com/OutOfBedlam/tine/plugins/base"
	_ "github.com/OutOfBedlam/tine/plugins/exec"
	_ "github.com/OutOfBedlam/tine/plugins/psutil"
)

func main() {
	// Make the output timestamp deterministic, so we can compare it
	// This line is required only for testing
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }

	pipeline, err := engine.New(
		engine.WithConfigFile("testdata/args-file.toml"),
		engine.WithName("my-pipeline"),
	)
	if err != nil {
		panic(err)
	}
	err = pipeline.Build()
	if err != nil {
		panic(err)
	}
	pipeline.Walk(func(pipelineName, kind, name string, step any) {
		// This will print the name of the step
		fmt.Println(pipelineName, kind, name)
	})

}
Output:

my-pipeline inlets args
my-pipeline inlets.args.flows dump
my-pipeline flows fan-in
my-pipeline flows select
my-pipeline flows inject
my-pipeline flows fan-out
my-pipeline outlets file

type PipelineConfig

type PipelineConfig struct {
	Name     string
	Log      util.LogConfig
	Defaults Config
	Inlets   []InletConfig
	Outlets  []OutletConfig
	Flows    []FlowConfig
}

type Predicate

type Predicate interface {
	Apply(Record) bool
}

func PredicateAND added in v0.0.2

func PredicateAND(l, r Predicate) Predicate

func PredicateOR added in v0.0.2

func PredicateOR(l, r Predicate) Predicate

type RawValue

type RawValue interface {
	string | bool | []byte | int64 | uint64 | float64 | time.Time
}

type Reader

type Reader struct {
	Format     string
	Timeformat string
	Timezone   string
	Compress   string
	Fields     []string
	Types      []string
	// contains filtered or unexported fields
}

func NewReader

func NewReader(r io.Reader, cfg Config) (*Reader, error)
Example (WithTypes)
package main

import (
	"time"

	"github.com/OutOfBedlam/tine/engine"
	_ "github.com/OutOfBedlam/tine/plugins/args"
	_ "github.com/OutOfBedlam/tine/plugins/base"
	_ "github.com/OutOfBedlam/tine/plugins/exec"
	_ "github.com/OutOfBedlam/tine/plugins/psutil"
)

func main() {
	dsl := `
	[[inlets.file]]
		data = [
			"a,1,1.2345", 
			"b,2,2.3456", 
			"c,3,3.4567",
		]
		format = "csv"
		fields = ["area", "ival", "fval"]
		types  = ["string", "int", "float"]
	[[flows.select]]
		includes = ["#*", "area", "ival", "fval"]
	[[outlets.file]]
		path = "-"
		format = "json"
	`
	// Make the output time deterministic. so we can compare it.
	// This line is not needed in production code.
	engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
	// Create a new engine.
	pipeline, err := engine.New(engine.WithConfig(dsl))
	if err != nil {
		panic(err)
	}
	// Run the engine.
	if err := pipeline.Run(); err != nil {
		panic(err)
	}
}
Output:

{"_in":"file","_ts":1721954797,"area":"a","fval":1.2345,"ival":1}
{"_in":"file","_ts":1721954797,"area":"b","fval":2.3456,"ival":2}
{"_in":"file","_ts":1721954797,"area":"c","fval":3.4567,"ival":3}

func (*Reader) Close

func (rd *Reader) Close() error

func (*Reader) Read

func (rd *Reader) Read() ([]Record, error)

type ReaderOption

type ReaderOption func(*Reader)

type Record

type Record interface {
	// Field retursn field by name
	Field(name string) *Field

	// Fields returns fields in the order of names
	// if names is empty, return all fields
	Fields(names ...string) []*Field

	// FieldAt returns field by index
	FieldAt(index int) *Field

	// FieldsAt return fields in the order of indexes
	FieldsAt(indexes ...int) []*Field

	// Empty returns true if the record has no fields
	Empty() bool

	// Names returns all field names
	Names() []string

	// Append returns a new record with fields appended
	Append(...*Field) Record

	// AppendOrReplace returns a new record with fields appended or replaced if the field name already exists
	AppendOrReplace(...*Field) Record

	Tags() Tags
}

func NewRecord

func NewRecord(fields ...*Field) Record

type Row

type Row[T Key] struct {
	Key    T
	Fields []*Field
}

func NewRow

func NewRow[T Key](key T, cap int) *Row[T]

type SetContentEncodingCallback

type SetContentEncodingCallback func(contentEncoding string)

type SetContentLengthCallback

type SetContentLengthCallback func(contentLength int)

type SetContentTypeCallback

type SetContentTypeCallback func(contentType string)

type Table

type Table[T Key] struct {
	// contains filtered or unexported fields
}

func NewTable

func NewTable[T Key]() *Table[T]

func (*Table[T]) AddColumn

func (tb *Table[T]) AddColumn(name string, t Type)

AddColumn adds a column to the table

func (*Table[T]) AddColumns

func (tb *Table[T]) AddColumns(names []string, types []Type)

AddColumns adds columns to the table

names and types should be same size, otherwise panic

func (*Table[T]) Clear

func (tb *Table[T]) Clear()

Clear removes all records from the table

func (*Table[T]) Columns

func (tb *Table[T]) Columns() []string

Columns returns the column names

func (*Table[T]) Compact

func (tb *Table[T]) Compact() *Table[T]

Compact compacts records by predicate it removes records that do not match the predicate if predicate is nil, it does nothing. and returns the table itself. so that chain calls are possible.

tb := tb.Filter(...).Compact().Select(...)

func (*Table[T]) Filter

func (tb *Table[T]) Filter(filter Predicate) *Table[T]

Filter returns a new table with filtered records

func (*Table[T]) Get

func (tb *Table[T]) Get(k T) *Row[T]

func (*Table[T]) Keys

func (tb *Table[T]) Keys() []T

func (*Table[T]) Len

func (tb *Table[T]) Len() int

Len returns the number of records in the table

func (*Table[T]) Rows

func (tb *Table[T]) Rows() [][]*Field

func (*Table[T]) Select

func (tb *Table[T]) Select(fields []string) (*Table[T], error)

Select returns a new table with selected columns

func (*Table[T]) Series

func (tb *Table[T]) Series(colName string) []*Field

Series returns a series of a column by name

func (*Table[T]) SeriesByIdx

func (tb *Table[T]) SeriesByIdx(colIdx int) []*Field

SeriesByIdx returns a series of a column by index

func (*Table[T]) SeriesFields

func (tb *Table[T]) SeriesFields(colName string) []*Field

SeriesFields returns a series of a column by name

func (*Table[T]) SeriesFieldsByIdx

func (tb *Table[T]) SeriesFieldsByIdx(colIdx int) []*Field

SeriesFieldsByIdx returns a series of a column by index

func (*Table[T]) Set

func (tb *Table[T]) Set(k T, fields ...*Field)

func (*Table[T]) Split

func (tb *Table[T]) Split(filter Predicate) (*Table[T], *Table[T])

func (*Table[T]) Types

func (tb *Table[T]) Types() []Type

Types returns the column types

type Tags added in v0.0.2

type Tags map[string]*Value

func (Tags) Clear added in v0.0.2

func (t Tags) Clear()
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags.Clear()
	fmt.Println(tags)
}
Output:

map[]

func (Tags) Clone added in v0.0.2

func (t Tags) Clone() Tags
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags2 := tags.Clone()
	str, _ := tags2.Get("key1").String()
	fmt.Println(str)
}
Output:

value1
Example (Empty)
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{}
	tags2 := tags.Clone()
	fmt.Println(tags2)
}
Output:

map[]

func (Tags) Del added in v0.0.2

func (t Tags) Del(key string)
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags.Del("key1")
	fmt.Println(tags.Get("key1"))
}
Output:

<nil>

func (Tags) Get added in v0.0.2

func (t Tags) Get(key string) *Value
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	str, _ := tags.Get("key1").String()
	fmt.Println(str)
	fmt.Println(tags.Get("key3"))
}
Output:

value1
<nil>

func (Tags) IsEmpty added in v0.0.2

func (t Tags) IsEmpty() bool
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{}
	fmt.Println(tags.IsEmpty())
}
Output:

true

func (Tags) IsNotEmpty added in v0.0.2

func (t Tags) IsNotEmpty() bool
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	fmt.Println(tags.IsNotEmpty())
}
Output:

true

func (Tags) Len added in v0.0.2

func (t Tags) Len() int
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	fmt.Println(tags.Len())
}
Output:

2

func (Tags) Merge added in v0.0.2

func (t Tags) Merge(other Tags)
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags2 := engine.Tags{
		"key3": engine.NewValue("value3"),
		"key4": engine.NewValue("value4"),
	}
	tags.Merge(tags2)
	str, _ := tags.Get("key3").String()
	fmt.Println(str)
}
Output:

value3

func (Tags) MergeWithPrefix added in v0.0.2

func (t Tags) MergeWithPrefix(m Tags, prefix string)
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags2 := engine.Tags{
		"key3": engine.NewValue("value3"),
		"key4": engine.NewValue("value4"),
	}
	tags.MergeWithPrefix(tags2, "prefix.")
	str, _ := tags.Get("prefix.key3").String()
	fmt.Println(str)
}
Output:

value3

func (Tags) MergeWithPrefixSuffix added in v0.0.2

func (t Tags) MergeWithPrefixSuffix(m Tags, prefix, suffix string)
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags2 := engine.Tags{
		"key3": engine.NewValue("value3"),
		"key4": engine.NewValue("value4"),
	}
	tags.MergeWithPrefixSuffix(tags2, "prefix.", ".suffix")
	str, _ := tags.Get("prefix.key3.suffix").String()
	fmt.Println(str)
}
Output:

value3

func (Tags) MergeWithSuffix added in v0.0.2

func (t Tags) MergeWithSuffix(m Tags, suffix string)
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags2 := engine.Tags{
		"key3": engine.NewValue("value3"),
		"key4": engine.NewValue("value4"),
	}
	tags.MergeWithSuffix(tags2, ".suffix")
	str, _ := tags.Get("key3.suffix").String()
	fmt.Println(str)
}
Output:

value3

func (Tags) Names added in v0.0.2

func (t Tags) Names() []string
Example
package main

import (
	"fmt"
	"slices"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	names := tags.Names()
	slices.Sort(names)
	fmt.Println(names)
}
Output:

[key1 key2]
Example (Empty)
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{}
	names := tags.Names()
	fmt.Println(names)
}
Output:

[]

func (Tags) Set added in v0.0.2

func (t Tags) Set(key string, value *Value)
Example
package main

import (
	"fmt"

	"github.com/OutOfBedlam/tine/engine"
)

func main() {
	tags := engine.Tags{
		"key1": engine.NewValue("value1"),
		"key2": engine.NewValue("value2"),
	}
	tags.Set("key1", engine.NewValue("value3"))
	str, _ := tags.Get("key1").String()
	fmt.Println(str)
}
Output:

value3

type Timeformatter

type Timeformatter struct {
	// contains filtered or unexported fields
}

func NewTimeformatter

func NewTimeformatter(format string) *Timeformatter

func NewTimeformatterWithLocation

func NewTimeformatterWithLocation(format string, tz *time.Location) *Timeformatter

func (*Timeformatter) Epoch

func (tf *Timeformatter) Epoch(t time.Time) int64

func (*Timeformatter) Format

func (tf *Timeformatter) Format(t time.Time) string

func (*Timeformatter) IsEpoch

func (tf *Timeformatter) IsEpoch() bool

func (*Timeformatter) Parse

func (tf *Timeformatter) Parse(str string) (time.Time, error)

type Type

type Type byte
const (
	UNTYPED Type = 0
	BOOL    Type = 'b' // bool
	INT     Type = 'i' // int64
	UINT    Type = 'u' // uint64
	FLOAT   Type = 'f' // float64
	STRING  Type = 's' // string
	TIME    Type = 't' // time.Time
	BINARY  Type = 'B' // *BinaryType
)

func (Type) String

func (typ Type) String() string

type Value

type Value struct {
	// contains filtered or unexported fields
}

func NewNullValue

func NewNullValue(kind Type) *Value

func NewUntypedNullValue

func NewUntypedNullValue() *Value

func NewValue

func NewValue[T RawValue](data T) *Value
Example
var df = DefaultValueFormat()
var v, nv *Value

v = NewValue("string")
nv = NewNullValue(STRING)
fmt.Println(v.Format(df), nv.Format(df))

v = NewValue(int64(-123))
nv = NewNullValue(INT)
fmt.Println(v.Format(df), nv.Format(df))

v = NewValue(uint64(123))
nv = NewNullValue(UINT)
fmt.Println(v.Format(df), nv.Format(df))

v = NewValue(float64(3.141592))
nv = NewNullValue(FLOAT)
fmt.Println(v.Format(df), nv.Format(df))

v = NewValue(true)
nv = NewNullValue(BOOL)
fmt.Println(v.Format(df), nv.Format(df))

v = NewValue(time.Unix(1721865054, 0))
nv = NewNullValue(TIME)
fmt.Println(v.Format(df), nv.Format(df))

v = NewValue([]byte("binary"))
nv = NewNullValue(BINARY)
fmt.Println(v.Format(df), nv.Format(df))
Output:

string NULL
-123 NULL
123 NULL
3.141592 NULL
true NULL
2024-07-24T23:50:54Z NULL
BINARY(6B) NULL

func (*Value) BinaryValue

func (val *Value) BinaryValue() *Value

BinaryValue returns the binary value of the value. It will return a NewNullValue(BINARY) if it can not be converted to binary.

Example
v := NewValue("binary")
bv := v.BinaryValue()
fmt.Println(hex.EncodeToString(bv.Raw().([]byte)))
Output:

62696e617279

func (*Value) Bool

func (val *Value) Bool() (bool, bool)
Example
sv := NewValue("TRUE")
bv, ok := sv.Bool()
fmt.Println(bv, ok)
Output:

true true

func (*Value) BoolValue

func (v *Value) BoolValue() *Value

BoolValue returns the bool value of the value. It will return a NewNullValue(BOOL) if it can not be converted to bool.

Example
sv := NewValue("TRUE")
bv := sv.BoolValue()
fmt.Println(bv.Format(DefaultValueFormat()))
Output:

true

func (*Value) Bytes

func (val *Value) Bytes() ([]byte, bool)
Example
v := NewValue("binary")
bv, ok := v.Bytes()
fmt.Println(hex.EncodeToString(bv), ok)
Output:

62696e617279 true

func (*Value) Clone

func (v *Value) Clone() *Value

func (*Value) Eq

func (val *Value) Eq(other any) bool

Compare this value with other primitive type If other is nil, the result is not defined.

Example (Int_vs_float)
v1 := NewValue(int64(123))
v2 := NewValue(123.0)
fmt.Println(v1.Eq(v2), v1.Eq(123), v1.Eq(123.0), v1.Eq(123.4))
Output:

true true true false
Example (String_vs_float)
v1 := NewValue("123.456")
v2 := NewValue(123.456)
v3 := v2.StringValue()
fmt.Println(v1.Eq(v2), v1.Eq(v3), v2.Eq("123.456"))
Output:

true true true
Example (String_vs_string)
v1 := NewValue("string")
v2 := NewValue("string")
fmt.Println(v1.Eq(v2), v1.Eq("string"), v1.Eq("not"))
Output:

true true false

func (*Value) Float64

func (val *Value) Float64() (float64, bool)
Example
sv := NewValue("3.141592")
iv, ok := sv.Float64()
fmt.Println(iv, ok)
Output:

3.141592 true

func (*Value) FloatValue

func (val *Value) FloatValue() *Value

FloatValue returns the float value of the value. It will return a NewNullValue(FLOAT) if it can not be converted to float.

Example
sv := NewValue("3.141592")
iv := sv.FloatValue()
fmt.Println(iv.Format(DefaultValueFormat()))
Output:

3.141592

func (*Value) Format

func (val *Value) Format(vf ValueFormat) string

func (*Value) Gt

func (val *Value) Gt(other any) bool

Compare this value with other primitive type If other is nil, the result is not defined.

Example
v1 := NewValue(int64(123))
v2 := NewValue(234.0)
v3 := NewValue(345.4)
fmt.Println(v2.Gt(v1), v1.Gt(v3), v3.Gt(v2))
Output:

true false true
Example (Bool)
v1 := NewValue(true)
v2 := NewValue(false)
fmt.Println(v1.Gt(v2))
Output:

true

func (*Value) In

func (val *Value) In(other any) bool

func (*Value) Int64

func (val *Value) Int64() (int64, bool)
Example
sv := NewValue("123")
iv, ok := sv.Int64()
fmt.Println(iv, ok)
Output:

123 true

func (*Value) IntValue

func (val *Value) IntValue() *Value

IntValue returns the int value of the value. It will return a NewNullValue(INT) if it can not be converted to int.

Example
sv := NewValue("123")
iv := sv.IntValue()
fmt.Println(iv.Format(DefaultValueFormat()))
Output:

123

func (*Value) IsNotNull added in v0.0.3

func (v *Value) IsNotNull() bool

func (*Value) IsNull

func (v *Value) IsNull() bool

func (*Value) Lt

func (val *Value) Lt(other any) bool

Compare this value with other primitive type If other is nil, the result is not defined.

Example
v1 := NewValue(int64(123))
v2 := NewValue(234.0)
v3 := NewValue(345.4)
fmt.Println(v2.Lt(v1), v1.Lt(v2), v3.Lt(v2), v1.Lt(v3))
Output:

false true false true
Example (Bool)
v1 := NewValue(true)
v2 := NewValue(true)
fmt.Println(v2.Lt(v1))
Output:

true

func (*Value) Raw

func (v *Value) Raw() any

func (*Value) String

func (val *Value) String() (string, bool)
Example
v := NewValue(123.456)
sv, ok := v.String()
fmt.Println(sv, ok)
Output:

123.456 true

func (*Value) StringValue

func (f *Value) StringValue() *Value

StringValue returns the string value of the value. It will return a NewNullValue(STRING) if it can not be converted to string.

Example
v := NewValue(123.456)
sv := v.StringValue()
fmt.Println(sv.Format(DefaultValueFormat()))
Output:

123.456

func (*Value) Time

func (val *Value) Time() (time.Time, bool)
Example
v := NewValue(int64(1721865054))
tv, ok := v.Time()
fmt.Println(tv.Unix(), ok)
Output:

1721865054 true

func (*Value) TimeValue

func (val *Value) TimeValue() *Value

TimeValue returns the time value of the value. It will return a NewNullValue(TIME) if it can not be converted to time.

Example
v := NewValue(int64(1721865054))
tv := v.TimeValue()
fmt.Println(tv.Format(DefaultValueFormat()))
Output:

2024-07-24T23:50:54Z

func (*Value) Type

func (v *Value) Type() Type

func (*Value) Uint64

func (val *Value) Uint64() (uint64, bool)
Example
sv := NewValue("123")
iv, ok := sv.Uint64()
fmt.Println(iv, ok)
Output:

123 true

func (*Value) UintValue

func (val *Value) UintValue() *Value

UintValue returns the uint value of the value. It will return a NewNullValue(UINT) if it can not be converted to uint.

Example
sv := NewValue("123")
iv := sv.UintValue()
fmt.Println(iv.Format(DefaultValueFormat()))
Output:

123

type ValueFormat

type ValueFormat struct {
	Timeformat *Timeformatter
	Decimal    int
	NullString string
	NullInt    int
	NullFloat  float64
	NullTime   time.Time
	NullBool   bool
}
Example
tm := time.Unix(1721865054, 0)
vf := DefaultValueFormat()
vf.Timeformat = NewTimeformatterWithLocation("2006-01-02 15:04:05", time.UTC)
vf.Decimal = 2
fmt.Println(NewValue(tm).Format(vf), NewValue(3.141592).Format(vf))
Output:

2024-07-24 23:50:54 3.14
Example (Epoch)
tm := time.Unix(1721865054, 0)
vf := DefaultValueFormat()
vf.Timeformat = NewTimeformatter("s")
vf.Decimal = 4
fmt.Println(NewValue(tm).Format(vf), NewValue(3.141592).Format(vf))
Output:

1721865054 3.1416

func DefaultValueFormat

func DefaultValueFormat() ValueFormat
Example
tm := time.Unix(1721865054, 0)
vf := DefaultValueFormat()
fmt.Println(NewValue(tm).Format(vf), NewValue(3.141592).Format(vf))
Output:

2024-07-24T23:50:54Z 3.141592

func (ValueFormat) FormatTime

func (fo ValueFormat) FormatTime(tm time.Time) string

type Version

type Version struct {
	Major  int    `json:"major"`
	Minor  int    `json:"minor"`
	Patch  int    `json:"patch"`
	GitSHA string `json:"git"`
}

func GetVersion

func GetVersion() *Version

type Writer

type Writer struct {
	Format     string
	Timeformat string
	Timezone   string
	Decimal    int
	Compress   string
	Fields     []string

	ContentType     string
	ContentEncoding string
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(w io.Writer, cfg Config) (*Writer, error)

func (*Writer) Close

func (rw *Writer) Close() error

func (*Writer) Write

func (rw *Writer) Write(recs []Record) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL