Documentation ¶
Index ¶
- Constants
- Variables
- func BuildCompiler() string
- func BuildTimestamp() string
- func CanonicalTagKey(key string) string
- func CompressorNames() []string
- func DecoderNames() []string
- func DecompressorNames() []string
- func DisplayVersion() string
- func EncoderNames() []string
- func FlowNames() []string
- func GetTagInt64(t Tags, key string) int64
- func GetTagString(t Tags, key string) string
- func HttpHandleFunc(config string) http.HandlerFunc
- func InletNames() []string
- func LoadConfig(content string, cfg *PipelineConfig) error
- func NopCloser(w io.Writer) io.WriteCloser
- func OutletNames() []string
- func RegisterCompressor(reg *Compressor)
- func RegisterDecoder(reg *DecoderReg)
- func RegisterDecompressor(reg *Decompressor)
- func RegisterEncoder(reg *EncoderReg)
- func RegisterFlow(reg *FlowReg)
- func RegisterInlet(reg *InletReg)
- func RegisterOutlet(reg *OutletReg)
- func SetDefaultValueFormat(vf ValueFormat)
- func UnboxFields(fields []*Field) []any
- func UnregisterCompressor(name string)
- func UnregisterDecoder(name string)
- func UnregisterDecompressor(name string)
- func UnregisterEncoder(name string)
- func UnregisterFlow(name string)
- func VersionString() string
- type AND
- type BufferedFlow
- type Comparator
- type Compressor
- type Config
- func (c Config) GetBool(key string, defaultVal bool) bool
- func (c Config) GetConfig(key string, defaultVal Config) Config
- func (c Config) GetConfigSlice(key string, defaultVal []Config) []Config
- func (c Config) GetDuration(key string, defaultVal time.Duration) time.Duration
- func (c Config) GetFloat(key string, defaultVal float64) float64
- func (c Config) GetInt(key string, defaultVal int) int
- func (c Config) GetInt64(key string, defaultVal int64) int64
- func (c Config) GetIntSlice(key string, defaultVal []int) []int
- func (c Config) GetString(key string, defaultVal string) string
- func (c Config) GetStringSlice(key string, defaultVal []string) []string
- func (c Config) GetUint32(key string, defaultVal uint32) uint32
- func (c Config) GetValue(key string) *Value
- func (c Config) Set(key string, val any) Config
- func (c Config) Unset(key string) Config
- type Context
- func (ctx *Context) CircuitBreak()
- func (ctx *Context) Config() Config
- func (ctx *Context) Deadline() (deadline time.Time, ok bool)
- func (ctx *Context) Done() <-chan struct{}
- func (ctx *Context) Err() error
- func (ctx *Context) GetInject(id string) (InjectFunc, bool)
- func (ctx *Context) Inject(id string, cb InjectFunc)
- func (ctx *Context) Log(level slog.Level, msg string, keyvals ...interface{})
- func (ctx *Context) LogDebug(msg string, keyvals ...interface{})
- func (ctx *Context) LogEnabled(level slog.Level) bool
- func (ctx *Context) LogError(msg string, keyvals ...interface{})
- func (ctx *Context) LogInfo(msg string, keyvals ...interface{})
- func (ctx *Context) LogWarn(msg string, keyvals ...interface{})
- func (ctx *Context) PipelineName() string
- func (ctx *Context) SetContentEncoding(contentEncoding string)
- func (ctx *Context) SetContentLength(contentLength int)
- func (ctx *Context) SetContentType(contentType string)
- func (ctx *Context) Value(key interface{}) interface{}
- func (ctx *Context) WithConfig(conf Config) *Context
- func (ctx *Context) WithLogger(logger *slog.Logger) *Context
- func (ctx *Context) Writer() io.Writer
- type Decoder
- type DecoderConfig
- type DecoderReg
- type Decompressor
- type Encoder
- type EncoderConfig
- type EncoderReg
- type F
- type Field
- func (f *Field) BinaryField() *Field
- func (f *Field) BoolField() *Field
- func (v *Field) Clone() *Field
- func (f *Field) Convert(to Type) *Field
- func (v *Field) Copy(newName string) *Field
- func (f *Field) FloatField() *Field
- func (f *Field) Func(fn func(any) bool) bool
- func (f *Field) IntField() *Field
- func (f *Field) IsNull() bool
- func (f *Field) String() string
- func (f *Field) StringField() *Field
- func (f *Field) TimeField() *Field
- func (f *Field) Type() Type
- func (f *Field) UintField() *Field
- type Flow
- type FlowConfig
- type FlowFuncWrap
- type FlowFuncWrapOption
- type FlowHandler
- type FlowNextFunc
- type FlowReg
- type Flusher
- type InjectFunc
- type Inlet
- type InletConfig
- type InletFuncOption
- type InletFuncWrap
- type InletHandler
- func (in *InletHandler) AddFlow(flow *FlowHandler)
- func (in *InletHandler) Run() error
- func (in *InletHandler) Start()
- func (in *InletHandler) Stop()
- func (in *InletHandler) Via(flow *FlowHandler) *FlowHandler
- func (in *InletHandler) Walk(walker func(inletName string, kind string, step string, handler any))
- type InletNextFunc
- type InletReg
- type Key
- type OR
- type OpenCloser
- type Option
- func WithConfig(conf string) Option
- func WithConfigFile(path string) Option
- func WithLogWriter(w io.Writer) Option
- func WithLogger(logger *slog.Logger) Option
- func WithName(name string) Option
- func WithSetContentEncodingFunc(fn SetContentEncodingCallback) Option
- func WithSetContentLengthFunc(fn SetContentLengthCallback) Option
- func WithSetContentTypeFunc(fn SetContentTypeCallback) Option
- func WithVerbose(flag bool) Option
- func WithWriter(w io.Writer) Option
- type Outlet
- type OutletConfig
- type OutletFuncWrap
- type OutletHandler
- type OutletReg
- type PeriodicInlet
- type Pipeline
- func (p *Pipeline) AddFlow(name string, flow Flow) (*FlowHandler, error)
- func (p *Pipeline) AddInlet(name string, inlet Inlet) (*InletHandler, error)
- func (p *Pipeline) AddOutlet(name string, outlet Outlet) (*OutletHandler, error)
- func (p *Pipeline) Build() (returnErr error)
- func (p *Pipeline) Context() *Context
- func (p *Pipeline) Run() (returnErr error)
- func (p *Pipeline) Start()
- func (p *Pipeline) Stop() error
- func (p *Pipeline) Walk(walker func(pipelineName string, kind string, step string, handler any))
- type PipelineConfig
- type Predicate
- type RawValue
- type Reader
- type ReaderOption
- type Record
- type Row
- type SetContentEncodingCallback
- type SetContentLengthCallback
- type SetContentTypeCallback
- type Table
- func (tb *Table[T]) AddColumn(name string, t Type)
- func (tb *Table[T]) AddColumns(names []string, types []Type)
- func (tb *Table[T]) Clear()
- func (tb *Table[T]) Columns() []string
- func (tb *Table[T]) Compact() *Table[T]
- func (tb *Table[T]) Filter(filter Predicate) *Table[T]
- func (tb *Table[T]) Get(k T) *Row[T]
- func (tb *Table[T]) Keys() []T
- func (tb *Table[T]) Len() int
- func (tb *Table[T]) Rows() [][]*Field
- func (tb *Table[T]) Select(fields []string) (*Table[T], error)
- func (tb *Table[T]) Series(colName string) []*Field
- func (tb *Table[T]) SeriesByIdx(colIdx int) []*Field
- func (tb *Table[T]) SeriesFields(colName string) []*Field
- func (tb *Table[T]) SeriesFieldsByIdx(colIdx int) []*Field
- func (tb *Table[T]) Set(k T, fields ...*Field)
- func (tb *Table[T]) Split(filter Predicate) (*Table[T], *Table[T])
- func (tb *Table[T]) Types() []Type
- type Tags
- func (t Tags) Clear()
- func (t Tags) Clone() Tags
- func (t Tags) Del(key string)
- func (t Tags) Get(key string) *Value
- func (t Tags) IsEmpty() bool
- func (t Tags) IsNotEmpty() bool
- func (t Tags) Len() int
- func (t Tags) Merge(other Tags)
- func (t Tags) MergeWithPrefix(m Tags, prefix string)
- func (t Tags) MergeWithPrefixSuffix(m Tags, prefix, suffix string)
- func (t Tags) MergeWithSuffix(m Tags, suffix string)
- func (t Tags) Names() []string
- func (t Tags) Set(key string, value *Value)
- type Timeformatter
- type Type
- type Value
- func (val *Value) BinaryValue() *Value
- func (val *Value) Bool() (bool, bool)
- func (v *Value) BoolValue() *Value
- func (val *Value) Bytes() ([]byte, bool)
- func (v *Value) Clone() *Value
- func (val *Value) Eq(other any) bool
- func (val *Value) Float64() (float64, bool)
- func (val *Value) FloatValue() *Value
- func (val *Value) Format(vf ValueFormat) string
- func (val *Value) Gt(other any) bool
- func (val *Value) In(other any) bool
- func (val *Value) Int64() (int64, bool)
- func (val *Value) IntValue() *Value
- func (v *Value) IsNotNull() bool
- func (v *Value) IsNull() bool
- func (val *Value) Lt(other any) bool
- func (v *Value) Raw() any
- func (val *Value) String() (string, bool)
- func (f *Value) StringValue() *Value
- func (val *Value) Time() (time.Time, bool)
- func (val *Value) TimeValue() *Value
- func (v *Value) Type() Type
- func (val *Value) Uint64() (uint64, bool)
- func (val *Value) UintValue() *Value
- type ValueFormat
- type Version
- type Writer
Examples ¶
- AND
- Context.Inject
- DefaultValueFormat
- New
- New (Multi_inlets)
- NewReader (WithTypes)
- NewValue
- OR
- Pipeline.Walk
- Tags.Clear
- Tags.Clone
- Tags.Clone (Empty)
- Tags.Del
- Tags.Get
- Tags.IsEmpty
- Tags.IsNotEmpty
- Tags.Len
- Tags.Merge
- Tags.MergeWithPrefix
- Tags.MergeWithPrefixSuffix
- Tags.MergeWithSuffix
- Tags.Names
- Tags.Names (Empty)
- Tags.Set
- Value.BinaryValue
- Value.Bool
- Value.BoolValue
- Value.Bytes
- Value.Eq (Int_vs_float)
- Value.Eq (String_vs_float)
- Value.Eq (String_vs_string)
- Value.Float64
- Value.FloatValue
- Value.Gt
- Value.Gt (Bool)
- Value.Int64
- Value.IntValue
- Value.Lt
- Value.Lt (Bool)
- Value.String
- Value.StringValue
- Value.Time
- Value.TimeValue
- Value.Uint64
- Value.UintValue
- ValueFormat
- ValueFormat (Epoch)
Constants ¶
const TAG_INLET = "_in"
const TAG_TIMESTAMP = "_ts"
Variables ¶
var DefaultConfigString string
var DefaultTimeformatter = &Timeformatter{format: time.RFC3339, loc: time.Local}
var Now = time.Now
Now is a function that returns the current time It is used to generate the timestamp for each record The default value is time.Now Set this to a fixed time in the purpose of testing
Functions ¶
func BuildCompiler ¶
func BuildCompiler() string
func BuildTimestamp ¶
func BuildTimestamp() string
func CanonicalTagKey ¶ added in v0.0.2
func CompressorNames ¶
func CompressorNames() []string
func DecoderNames ¶
func DecoderNames() []string
func DecompressorNames ¶
func DecompressorNames() []string
func DisplayVersion ¶
func DisplayVersion() string
func EncoderNames ¶
func EncoderNames() []string
func GetTagInt64 ¶ added in v0.0.2
func GetTagString ¶ added in v0.0.2
func HttpHandleFunc ¶
func HttpHandleFunc(config string) http.HandlerFunc
HttpHandleFunc is a convenience function to create a http.HandlerFunc from a pipeline configuration
func InletNames ¶
func InletNames() []string
func LoadConfig ¶
func LoadConfig(content string, cfg *PipelineConfig) error
func OutletNames ¶
func OutletNames() []string
func RegisterCompressor ¶
func RegisterCompressor(reg *Compressor)
func RegisterDecoder ¶
func RegisterDecoder(reg *DecoderReg)
func RegisterDecompressor ¶
func RegisterDecompressor(reg *Decompressor)
func RegisterEncoder ¶
func RegisterEncoder(reg *EncoderReg)
func RegisterFlow ¶
func RegisterFlow(reg *FlowReg)
func RegisterInlet ¶
func RegisterInlet(reg *InletReg)
func RegisterOutlet ¶
func RegisterOutlet(reg *OutletReg)
func SetDefaultValueFormat ¶
func SetDefaultValueFormat(vf ValueFormat)
func UnboxFields ¶
func UnregisterCompressor ¶
func UnregisterCompressor(name string)
func UnregisterDecoder ¶
func UnregisterDecoder(name string)
func UnregisterDecompressor ¶
func UnregisterDecompressor(name string)
func UnregisterEncoder ¶
func UnregisterEncoder(name string)
func UnregisterFlow ¶
func UnregisterFlow(name string)
func VersionString ¶
func VersionString() string
Types ¶
type AND ¶
type AND struct {
// contains filtered or unexported fields
}
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { // AND predicate // name == "John" AND age > 18 p := engine.PredicateAND( engine.F{ColName: "name", Comparator: engine.EQ, Comparando: "John"}, engine.F{ColName: "age", Comparator: engine.GT, Comparando: 18}, ) // Record records := []engine.Record{ engine.NewRecord( engine.NewField("age", int64(19)), engine.NewField("name", "John"), ), engine.NewRecord( engine.NewField("age", int64(17)), engine.NewField("name", "John"), ), } // Apply AND predicate for _, r := range records { fmt.Println(p.Apply(r)) } }
Output: true false
type BufferedFlow ¶ added in v0.0.2
type BufferedFlow interface { Flow Flush(FlowNextFunc) }
type Comparator ¶
type Comparator string
const ( EQ Comparator = "==" NEQ Comparator = "!=" GT Comparator = ">" GTE Comparator = ">=" LT Comparator = "<" LTE Comparator = "<=" IN Comparator = "in" NOT_IN Comparator = "not in" CompFunc Comparator = "func" )
type Compressor ¶
type Compressor struct { Name string Factory func(io.Writer) io.WriteCloser ContentEncoding string }
func GetCompressor ¶
func GetCompressor(name string) *Compressor
type Config ¶
func (Config) GetConfigSlice ¶ added in v0.0.2
func (Config) GetDuration ¶
func (Config) GetIntSlice ¶ added in v0.0.2
func (Config) GetStringSlice ¶ added in v0.0.2
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func (*Context) CircuitBreak ¶
func (ctx *Context) CircuitBreak()
func (*Context) GetInject ¶ added in v0.0.5
func (ctx *Context) GetInject(id string) (InjectFunc, bool)
func (*Context) Inject ¶ added in v0.0.5
func (ctx *Context) Inject(id string, cb InjectFunc)
Example ¶
package main import ( "os" "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugins/args" _ "github.com/OutOfBedlam/tine/plugins/base" _ "github.com/OutOfBedlam/tine/plugins/exec" _ "github.com/OutOfBedlam/tine/plugins/psutil" ) func main() { // This example demonstrates how to use the exec inlet to run a command and dsl := ` [[inlets.args]] [[flows.select]] includes = ["**"] [[flows.inject]] id = "here" [[outlets.file]] path = "-" format = "json" ` // Make the output timestamp deterministic, so we can compare it // This line is required only for testing engine.Now = func() time.Time { return time.Unix(1721954797, 0) } // Build pipeline pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } pipeline.Context().Inject("here", func(r []engine.Record) ([]engine.Record, error) { for i, rec := range r { r[i] = rec.AppendOrReplace(engine.NewField("msg", "hello world - here updated")) } return r, nil }) // Simulate the command line arguments os.Args = []string{"command", "command-arg", "--", "msg=hello world"} // Run the pipeline if err := pipeline.Run(); err != nil { panic(err) } }
Output: {"_in":"args","_ts":1721954797,"msg":"hello world - here updated"}
func (*Context) PipelineName ¶
func (*Context) SetContentEncoding ¶
func (*Context) SetContentLength ¶
func (*Context) SetContentType ¶
func (*Context) WithConfig ¶
type DecoderConfig ¶
type DecoderConfig struct { Reader io.Reader FormatOption ValueFormat Fields []string Types []Type }
type DecoderReg ¶
type DecoderReg struct { Name string Factory func(DecoderConfig) Decoder }
func GetDecoder ¶
func GetDecoder(name string) *DecoderReg
type Decompressor ¶
type Decompressor struct { Name string Factory func(io.Reader) io.ReadCloser }
func GetDecompressor ¶
func GetDecompressor(name string) *Decompressor
type EncoderConfig ¶
type EncoderConfig struct { Writer io.Writer Fields []string FormatOption ValueFormat }
type EncoderReg ¶
type EncoderReg struct { Name string Factory func(EncoderConfig) Encoder ContentType string }
func GetEncoder ¶
func GetEncoder(name string) *EncoderReg
type F ¶
type F struct { ColName string Comparator Comparator Comparando any }
type Field ¶
type Field struct { Name string `json:"name"` Value *Value `json:"value"` Tags Tags `json:"tags,omitempty"` }
Field is Value with name and Tags
func NewFieldWithValue ¶ added in v0.0.2
if name is empty or value is nil, return nil
func (*Field) BinaryField ¶
func (*Field) FloatField ¶
func (*Field) StringField ¶
type Flow ¶
type Flow interface { OpenCloser Process([]Record, FlowNextFunc) Parallelism() int }
func FanOutFlow ¶
func FlowWithFunc ¶
func FlowWithFunc(fn func([]Record) ([]Record, error), opts ...FlowFuncWrapOption) Flow
type FlowConfig ¶
type FlowFuncWrap ¶
type FlowFuncWrap struct {
// contains filtered or unexported fields
}
func (*FlowFuncWrap) Close ¶
func (fw *FlowFuncWrap) Close() error
func (*FlowFuncWrap) Open ¶
func (fw *FlowFuncWrap) Open() error
func (*FlowFuncWrap) Parallelism ¶
func (fw *FlowFuncWrap) Parallelism() int
func (*FlowFuncWrap) Process ¶
func (fw *FlowFuncWrap) Process(r []Record, cb FlowNextFunc)
type FlowFuncWrapOption ¶
type FlowFuncWrapOption func(*FlowFuncWrap)
func WithFlowFuncParallelism ¶
func WithFlowFuncParallelism(parallelism int) FlowFuncWrapOption
type FlowHandler ¶
type FlowHandler struct {
// contains filtered or unexported fields
}
func NewFlowHandler ¶
func NewFlowHandler(ctx *Context, name string, flow Flow) *FlowHandler
func (*FlowHandler) Start ¶
func (fh *FlowHandler) Start() error
func (*FlowHandler) Stop ¶
func (fh *FlowHandler) Stop() error
func (*FlowHandler) Via ¶
func (fh *FlowHandler) Via(next *FlowHandler) *FlowHandler
type FlowNextFunc ¶ added in v0.0.3
type FlowReg ¶
func GetFlowRegistry ¶
type InjectFunc ¶ added in v0.0.5
type Inlet ¶
type Inlet interface { OpenCloser Process(InletNextFunc) }
func InletWithFunc ¶ added in v0.0.3
func InletWithFunc(fn func() ([]Record, error), opts ...InletFuncOption) Inlet
type InletConfig ¶
type InletConfig struct { Plugin string Params Config Flows []FlowConfig }
type InletFuncOption ¶ added in v0.0.3
type InletFuncOption func(*InletFuncWrap)
func WithInterval ¶
func WithInterval(interval time.Duration) InletFuncOption
func WithRunCountLimit ¶
func WithRunCountLimit(limit int64) InletFuncOption
type InletFuncWrap ¶ added in v0.0.3
type InletFuncWrap struct {
// contains filtered or unexported fields
}
func (*InletFuncWrap) Close ¶ added in v0.0.3
func (in *InletFuncWrap) Close() error
func (*InletFuncWrap) Interval ¶ added in v0.0.3
func (in *InletFuncWrap) Interval() time.Duration
func (*InletFuncWrap) Open ¶ added in v0.0.3
func (in *InletFuncWrap) Open() error
func (*InletFuncWrap) Process ¶ added in v0.0.3
func (in *InletFuncWrap) Process(next InletNextFunc)
type InletHandler ¶
type InletHandler struct {
// contains filtered or unexported fields
}
func NewInletHandler ¶
func (*InletHandler) AddFlow ¶
func (in *InletHandler) AddFlow(flow *FlowHandler)
AddFlow adds a sub-flow to the inlet handler
func (*InletHandler) Run ¶
func (in *InletHandler) Run() error
func (*InletHandler) Start ¶
func (in *InletHandler) Start()
func (*InletHandler) Stop ¶
func (in *InletHandler) Stop()
func (*InletHandler) Via ¶
func (in *InletHandler) Via(flow *FlowHandler) *FlowHandler
Via connects the inlet handler to a flow handler
type InletNextFunc ¶ added in v0.0.3
type InletReg ¶
func GetInletRegistry ¶
type OR ¶
type OR struct {
// contains filtered or unexported fields
}
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { // OR predicate // name == "John" OR age < 18 p := engine.PredicateOR( engine.F{ColName: "name", Comparator: engine.EQ, Comparando: "John"}, engine.F{ColName: "age", Comparator: engine.LT, Comparando: 18}, ) // Record records := []engine.Record{ engine.NewRecord( engine.NewField("age", int64(19)), engine.NewField("name", "John"), ), engine.NewRecord( engine.NewField("age", int64(20)), engine.NewField("name", "Jane"), ), } // Apply OR predicate for _, r := range records { fmt.Println(p.Apply(r)) } }
Output: true false
type OpenCloser ¶
type Option ¶
func WithConfig ¶
WithConfig loads a TOML configuration string into a PipelineConfig struct
func WithConfigFile ¶
WithConfigFile loads a TOML configuration file into a PipelineConfig struct
func WithLogWriter ¶ added in v0.0.5
func WithLogger ¶
WithLogger sets the logger for the pipeline
func WithSetContentEncodingFunc ¶
func WithSetContentEncodingFunc(fn SetContentEncodingCallback) Option
WithSetContentEncodingFunc sets the callback function to set the content encoding
func WithSetContentLengthFunc ¶
func WithSetContentLengthFunc(fn SetContentLengthCallback) Option
WithSetContentLengthFunc sets the callback function to set the content length
func WithSetContentTypeFunc ¶
func WithSetContentTypeFunc(fn SetContentTypeCallback) Option
WithSetContentTypeFunc sets the callback function to set the content type
func WithVerbose ¶ added in v0.0.6
func WithWriter ¶
WithDefaults sets the default output writer for the pipeline
type OutletConfig ¶
type OutletFuncWrap ¶
type OutletFuncWrap struct {
// contains filtered or unexported fields
}
func (*OutletFuncWrap) Close ¶
func (out *OutletFuncWrap) Close() error
func (*OutletFuncWrap) Handle ¶
func (out *OutletFuncWrap) Handle(r []Record) error
func (*OutletFuncWrap) Open ¶
func (out *OutletFuncWrap) Open() error
type OutletHandler ¶
type OutletHandler struct {
// contains filtered or unexported fields
}
func NewOutletHandler ¶
func NewOutletHandler(ctx *Context, name string, outlet Outlet) (*OutletHandler, error)
func (*OutletHandler) Sink ¶
func (out *OutletHandler) Sink() chan<- []Record
func (*OutletHandler) Start ¶
func (out *OutletHandler) Start() error
func (*OutletHandler) Stop ¶
func (out *OutletHandler) Stop()
type OutletReg ¶
func GetOutletRegistry ¶
type PeriodicInlet ¶ added in v0.0.3
type Pipeline ¶
type Pipeline struct { PipelineConfig // contains filtered or unexported fields }
func New ¶
New creates a new pipeline with the given options
Example ¶
package main import ( "os" "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugins/args" _ "github.com/OutOfBedlam/tine/plugins/base" _ "github.com/OutOfBedlam/tine/plugins/exec" _ "github.com/OutOfBedlam/tine/plugins/psutil" ) func main() { // This example demonstrates how to use the exec inlet to run a command and dsl := ` [[inlets.args]] [[flows.update]] set = [ { field = "msg", name_format = "pre_%s_suf" }, ] [[flows.select]] includes = ["**"] [[outlets.file]] path = "-" format = "json" ` // Make the output timestamp deterministic, so we can compare it // This line is required only for testing engine.Now = func() time.Time { return time.Unix(1721954797, 0) } // Build pipeline pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Simulate the command line arguments os.Args = []string{"command", "command-arg", "--", "msg=hello world"} // Run the pipeline if err := pipeline.Run(); err != nil { panic(err) } }
Output: {"_in":"args","_ts":1721954797,"pre_msg_suf":"hello world"}
Example (Multi_inlets) ¶
package main import ( "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugins/args" _ "github.com/OutOfBedlam/tine/plugins/base" _ "github.com/OutOfBedlam/tine/plugins/exec" _ "github.com/OutOfBedlam/tine/plugins/psutil" ) func main() { // This example demonstrates how to use the merge flow. dsl := ` [[inlets.file]] data = [ "a,1", ] format = "csv" [[inlets.exec]] commands = ["echo", "hello world"] count = 1 trim_space = true ignore_error = true [[flows.merge]] wait_limit = "1s" [[outlets.file]] path = "-" format = "json" ` // Make the output time deterministic. so we can compare it. // This line is not needed in production code. engine.Now = func() time.Time { return time.Unix(1721954797, 0) } // Create a new engine. pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Run the engine. if err := pipeline.Run(); err != nil { panic(err) } }
Output: {"_ts":1721954797,"exec_stdout":"hello world","file_0":"a","file_1":"1"}
func (*Pipeline) AddFlow ¶
func (p *Pipeline) AddFlow(name string, flow Flow) (*FlowHandler, error)
AddFlow adds a flow to the pipeline
func (*Pipeline) AddInlet ¶
func (p *Pipeline) AddInlet(name string, inlet Inlet) (*InletHandler, error)
AddInlet adds an inlet to the pipeline
func (*Pipeline) AddOutlet ¶
func (p *Pipeline) AddOutlet(name string, outlet Outlet) (*OutletHandler, error)
AddOutlet adds an outlet to the pipeline
func (*Pipeline) Run ¶
Run the pipeline, this will start all inlets, outlets and flows and wait until the pipeline is stopped
func (*Pipeline) Start ¶
func (p *Pipeline) Start()
Start the pipeline, this will start all inlets, outlets and flows and returns immediately without waiting for the pipeline to stop
func (*Pipeline) Stop ¶
Stop the pipeline, this will stop all inlets, outlets and flows. Start() or Run() should be called before calling Stop()
func (*Pipeline) Walk ¶ added in v0.0.3
Example ¶
package main import ( "fmt" "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugins/args" _ "github.com/OutOfBedlam/tine/plugins/base" _ "github.com/OutOfBedlam/tine/plugins/exec" _ "github.com/OutOfBedlam/tine/plugins/psutil" ) func main() { // Make the output timestamp deterministic, so we can compare it // This line is required only for testing engine.Now = func() time.Time { return time.Unix(1721954797, 0) } pipeline, err := engine.New( engine.WithConfigFile("testdata/args-file.toml"), engine.WithName("my-pipeline"), ) if err != nil { panic(err) } err = pipeline.Build() if err != nil { panic(err) } pipeline.Walk(func(pipelineName, kind, name string, step any) { // This will print the name of the step fmt.Println(pipelineName, kind, name) }) }
Output: my-pipeline inlets args my-pipeline inlets.args.flows dump my-pipeline flows fan-in my-pipeline flows select my-pipeline flows inject my-pipeline flows fan-out my-pipeline outlets file
type PipelineConfig ¶
type PipelineConfig struct { Name string Log util.LogConfig Defaults Config Inlets []InletConfig Outlets []OutletConfig Flows []FlowConfig }
type Reader ¶
type Reader struct { Format string Timeformat string Timezone string Compress string Fields []string Types []string // contains filtered or unexported fields }
func NewReader ¶
Example (WithTypes) ¶
package main import ( "time" "github.com/OutOfBedlam/tine/engine" _ "github.com/OutOfBedlam/tine/plugins/args" _ "github.com/OutOfBedlam/tine/plugins/base" _ "github.com/OutOfBedlam/tine/plugins/exec" _ "github.com/OutOfBedlam/tine/plugins/psutil" ) func main() { dsl := ` [[inlets.file]] data = [ "a,1,1.2345", "b,2,2.3456", "c,3,3.4567", ] format = "csv" fields = ["area", "ival", "fval"] types = ["string", "int", "float"] [[flows.select]] includes = ["#*", "area", "ival", "fval"] [[outlets.file]] path = "-" format = "json" ` // Make the output time deterministic. so we can compare it. // This line is not needed in production code. engine.Now = func() time.Time { return time.Unix(1721954797, 0) } // Create a new engine. pipeline, err := engine.New(engine.WithConfig(dsl)) if err != nil { panic(err) } // Run the engine. if err := pipeline.Run(); err != nil { panic(err) } }
Output: {"_in":"file","_ts":1721954797,"area":"a","fval":1.2345,"ival":1} {"_in":"file","_ts":1721954797,"area":"b","fval":2.3456,"ival":2} {"_in":"file","_ts":1721954797,"area":"c","fval":3.4567,"ival":3}
type ReaderOption ¶
type ReaderOption func(*Reader)
type Record ¶
type Record interface { // Field retursn field by name Field(name string) *Field // Fields returns fields in the order of names // if names is empty, return all fields Fields(names ...string) []*Field // FieldAt returns field by index FieldAt(index int) *Field // FieldsAt return fields in the order of indexes FieldsAt(indexes ...int) []*Field // Empty returns true if the record has no fields Empty() bool // Names returns all field names Names() []string // Append returns a new record with fields appended Append(...*Field) Record // AppendOrReplace returns a new record with fields appended or replaced if the field name already exists AppendOrReplace(...*Field) Record Tags() Tags }
type SetContentEncodingCallback ¶
type SetContentEncodingCallback func(contentEncoding string)
type SetContentLengthCallback ¶
type SetContentLengthCallback func(contentLength int)
type SetContentTypeCallback ¶
type SetContentTypeCallback func(contentType string)
type Table ¶
type Table[T Key] struct { // contains filtered or unexported fields }
func (*Table[T]) AddColumns ¶
AddColumns adds columns to the table
names and types should be same size, otherwise panic
func (*Table[T]) Compact ¶
Compact compacts records by predicate it removes records that do not match the predicate if predicate is nil, it does nothing. and returns the table itself. so that chain calls are possible.
tb := tb.Filter(...).Compact().Select(...)
func (*Table[T]) SeriesByIdx ¶
SeriesByIdx returns a series of a column by index
func (*Table[T]) SeriesFields ¶
SeriesFields returns a series of a column by name
func (*Table[T]) SeriesFieldsByIdx ¶
SeriesFieldsByIdx returns a series of a column by index
type Tags ¶ added in v0.0.2
func (Tags) Clear ¶ added in v0.0.2
func (t Tags) Clear()
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags.Clear() fmt.Println(tags) }
Output: map[]
func (Tags) Clone ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags2 := tags.Clone() str, _ := tags2.Get("key1").String() fmt.Println(str) }
Output: value1
Example (Empty) ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{} tags2 := tags.Clone() fmt.Println(tags2) }
Output: map[]
func (Tags) Del ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags.Del("key1") fmt.Println(tags.Get("key1")) }
Output: <nil>
func (Tags) Get ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } str, _ := tags.Get("key1").String() fmt.Println(str) fmt.Println(tags.Get("key3")) }
Output: value1 <nil>
func (Tags) IsEmpty ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{} fmt.Println(tags.IsEmpty()) }
Output: true
func (Tags) IsNotEmpty ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } fmt.Println(tags.IsNotEmpty()) }
Output: true
func (Tags) Len ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } fmt.Println(tags.Len()) }
Output: 2
func (Tags) Merge ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags2 := engine.Tags{ "key3": engine.NewValue("value3"), "key4": engine.NewValue("value4"), } tags.Merge(tags2) str, _ := tags.Get("key3").String() fmt.Println(str) }
Output: value3
func (Tags) MergeWithPrefix ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags2 := engine.Tags{ "key3": engine.NewValue("value3"), "key4": engine.NewValue("value4"), } tags.MergeWithPrefix(tags2, "prefix.") str, _ := tags.Get("prefix.key3").String() fmt.Println(str) }
Output: value3
func (Tags) MergeWithPrefixSuffix ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags2 := engine.Tags{ "key3": engine.NewValue("value3"), "key4": engine.NewValue("value4"), } tags.MergeWithPrefixSuffix(tags2, "prefix.", ".suffix") str, _ := tags.Get("prefix.key3.suffix").String() fmt.Println(str) }
Output: value3
func (Tags) MergeWithSuffix ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags2 := engine.Tags{ "key3": engine.NewValue("value3"), "key4": engine.NewValue("value4"), } tags.MergeWithSuffix(tags2, ".suffix") str, _ := tags.Get("key3.suffix").String() fmt.Println(str) }
Output: value3
func (Tags) Names ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "slices" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } names := tags.Names() slices.Sort(names) fmt.Println(names) }
Output: [key1 key2]
Example (Empty) ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{} names := tags.Names() fmt.Println(names) }
Output: []
func (Tags) Set ¶ added in v0.0.2
Example ¶
package main import ( "fmt" "github.com/OutOfBedlam/tine/engine" ) func main() { tags := engine.Tags{ "key1": engine.NewValue("value1"), "key2": engine.NewValue("value2"), } tags.Set("key1", engine.NewValue("value3")) str, _ := tags.Get("key1").String() fmt.Println(str) }
Output: value3
type Timeformatter ¶
type Timeformatter struct {
// contains filtered or unexported fields
}
func NewTimeformatter ¶
func NewTimeformatter(format string) *Timeformatter
func NewTimeformatterWithLocation ¶
func NewTimeformatterWithLocation(format string, tz *time.Location) *Timeformatter
func (*Timeformatter) IsEpoch ¶
func (tf *Timeformatter) IsEpoch() bool
type Value ¶
type Value struct {
// contains filtered or unexported fields
}
func NewNullValue ¶
func NewUntypedNullValue ¶
func NewUntypedNullValue() *Value
func NewValue ¶
Example ¶
var df = DefaultValueFormat() var v, nv *Value v = NewValue("string") nv = NewNullValue(STRING) fmt.Println(v.Format(df), nv.Format(df)) v = NewValue(int64(-123)) nv = NewNullValue(INT) fmt.Println(v.Format(df), nv.Format(df)) v = NewValue(uint64(123)) nv = NewNullValue(UINT) fmt.Println(v.Format(df), nv.Format(df)) v = NewValue(float64(3.141592)) nv = NewNullValue(FLOAT) fmt.Println(v.Format(df), nv.Format(df)) v = NewValue(true) nv = NewNullValue(BOOL) fmt.Println(v.Format(df), nv.Format(df)) v = NewValue(time.Unix(1721865054, 0)) nv = NewNullValue(TIME) fmt.Println(v.Format(df), nv.Format(df)) v = NewValue([]byte("binary")) nv = NewNullValue(BINARY) fmt.Println(v.Format(df), nv.Format(df))
Output: string NULL -123 NULL 123 NULL 3.141592 NULL true NULL 2024-07-24T23:50:54Z NULL BINARY(6B) NULL
func (*Value) BinaryValue ¶
BinaryValue returns the binary value of the value. It will return a NewNullValue(BINARY) if it can not be converted to binary.
Example ¶
v := NewValue("binary") bv := v.BinaryValue() fmt.Println(hex.EncodeToString(bv.Raw().([]byte)))
Output: 62696e617279
func (*Value) Bool ¶
Example ¶
sv := NewValue("TRUE") bv, ok := sv.Bool() fmt.Println(bv, ok)
Output: true true
func (*Value) BoolValue ¶
BoolValue returns the bool value of the value. It will return a NewNullValue(BOOL) if it can not be converted to bool.
Example ¶
sv := NewValue("TRUE") bv := sv.BoolValue() fmt.Println(bv.Format(DefaultValueFormat()))
Output: true
func (*Value) Bytes ¶
Example ¶
v := NewValue("binary") bv, ok := v.Bytes() fmt.Println(hex.EncodeToString(bv), ok)
Output: 62696e617279 true
func (*Value) Eq ¶
Compare this value with other primitive type If other is nil, the result is not defined.
Example (Int_vs_float) ¶
v1 := NewValue(int64(123)) v2 := NewValue(123.0) fmt.Println(v1.Eq(v2), v1.Eq(123), v1.Eq(123.0), v1.Eq(123.4))
Output: true true true false
Example (String_vs_float) ¶
v1 := NewValue("123.456") v2 := NewValue(123.456) v3 := v2.StringValue() fmt.Println(v1.Eq(v2), v1.Eq(v3), v2.Eq("123.456"))
Output: true true true
Example (String_vs_string) ¶
v1 := NewValue("string") v2 := NewValue("string") fmt.Println(v1.Eq(v2), v1.Eq("string"), v1.Eq("not"))
Output: true true false
func (*Value) Float64 ¶
Example ¶
sv := NewValue("3.141592") iv, ok := sv.Float64() fmt.Println(iv, ok)
Output: 3.141592 true
func (*Value) FloatValue ¶
FloatValue returns the float value of the value. It will return a NewNullValue(FLOAT) if it can not be converted to float.
Example ¶
sv := NewValue("3.141592") iv := sv.FloatValue() fmt.Println(iv.Format(DefaultValueFormat()))
Output: 3.141592
func (*Value) Format ¶
func (val *Value) Format(vf ValueFormat) string
func (*Value) Gt ¶
Compare this value with other primitive type If other is nil, the result is not defined.
Example ¶
v1 := NewValue(int64(123)) v2 := NewValue(234.0) v3 := NewValue(345.4) fmt.Println(v2.Gt(v1), v1.Gt(v3), v3.Gt(v2))
Output: true false true
Example (Bool) ¶
v1 := NewValue(true) v2 := NewValue(false) fmt.Println(v1.Gt(v2))
Output: true
func (*Value) Int64 ¶
Example ¶
sv := NewValue("123") iv, ok := sv.Int64() fmt.Println(iv, ok)
Output: 123 true
func (*Value) IntValue ¶
IntValue returns the int value of the value. It will return a NewNullValue(INT) if it can not be converted to int.
Example ¶
sv := NewValue("123") iv := sv.IntValue() fmt.Println(iv.Format(DefaultValueFormat()))
Output: 123
func (*Value) Lt ¶
Compare this value with other primitive type If other is nil, the result is not defined.
Example ¶
v1 := NewValue(int64(123)) v2 := NewValue(234.0) v3 := NewValue(345.4) fmt.Println(v2.Lt(v1), v1.Lt(v2), v3.Lt(v2), v1.Lt(v3))
Output: false true false true
Example (Bool) ¶
v1 := NewValue(true) v2 := NewValue(true) fmt.Println(v2.Lt(v1))
Output: true
func (*Value) String ¶
Example ¶
v := NewValue(123.456) sv, ok := v.String() fmt.Println(sv, ok)
Output: 123.456 true
func (*Value) StringValue ¶
StringValue returns the string value of the value. It will return a NewNullValue(STRING) if it can not be converted to string.
Example ¶
v := NewValue(123.456) sv := v.StringValue() fmt.Println(sv.Format(DefaultValueFormat()))
Output: 123.456
func (*Value) Time ¶
Example ¶
v := NewValue(int64(1721865054)) tv, ok := v.Time() fmt.Println(tv.Unix(), ok)
Output: 1721865054 true
func (*Value) TimeValue ¶
TimeValue returns the time value of the value. It will return a NewNullValue(TIME) if it can not be converted to time.
Example ¶
v := NewValue(int64(1721865054)) tv := v.TimeValue() fmt.Println(tv.Format(DefaultValueFormat()))
Output: 2024-07-24T23:50:54Z
type ValueFormat ¶
type ValueFormat struct { Timeformat *Timeformatter Decimal int NullString string NullInt int NullFloat float64 NullTime time.Time NullBool bool }
Example ¶
tm := time.Unix(1721865054, 0) vf := DefaultValueFormat() vf.Timeformat = NewTimeformatterWithLocation("2006-01-02 15:04:05", time.UTC) vf.Decimal = 2 fmt.Println(NewValue(tm).Format(vf), NewValue(3.141592).Format(vf))
Output: 2024-07-24 23:50:54 3.14
Example (Epoch) ¶
tm := time.Unix(1721865054, 0) vf := DefaultValueFormat() vf.Timeformat = NewTimeformatter("s") vf.Decimal = 4 fmt.Println(NewValue(tm).Format(vf), NewValue(3.141592).Format(vf))
Output: 1721865054 3.1416
func DefaultValueFormat ¶
func DefaultValueFormat() ValueFormat
Example ¶
tm := time.Unix(1721865054, 0) vf := DefaultValueFormat() fmt.Println(NewValue(tm).Format(vf), NewValue(3.141592).Format(vf))
Output: 2024-07-24T23:50:54Z 3.141592
func (ValueFormat) FormatTime ¶
func (fo ValueFormat) FormatTime(tm time.Time) string
type Version ¶
type Version struct { Major int `json:"major"` Minor int `json:"minor"` Patch int `json:"patch"` GitSHA string `json:"git"` }
func GetVersion ¶
func GetVersion() *Version