mstreamer

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2023 License: MIT Imports: 18 Imported by: 0

README

mstreamer

General Purpose streamer framework written in go

Documentation

Index

Constants

View Source
const (
	//TBool is a boolean
	TBool = 'b'
	//TInt is an integer
	TInt = 'i'
	//TUint is an unsigned integer
	TUint = 'u'
	//TFloat is a float
	TFloat = 'f'
	//TString is a string
	TString = 's'
	//TNil is a nil value
	TNil = 'n'
)

Variables

This section is empty.

Functions

func ParseValue

func ParseValue(t FieldType, value string) interface{}

ParseValue parse a value from a FieldType

Types

type ComposedFilter

type ComposedFilter func(flts ...Filter) (Filter, error)

ComposedFilter composes a set of filters in a chain

type ComposedInput

type ComposedInput func(Source, Encoder) (Input, error)

ComposedInput composes a Source and Encoder and returns an Input function

type ComposedOutput

type ComposedOutput func(Decoder, Sinker) (Output, error)

ComposedOutput composes a Source and Encoder and returns an Input function

type Decoder

type Decoder func(Feedback, MeasureReader) (io.ReadCloser, error)

Decoder takes a Feedback function and a Measure Reader and returns a generic reader and an error if any Decoder should read measures from the receiver Measure Read, transform it and write the results down to the returned stream Initialization errors should be returned using the error object Runtime errors should be send to the Feedback function.

func NewDecoder

func NewDecoder(adapter DecoderAdapter) (Decoder, error)

NewDecoder takes

func NewGenericDecoder

func NewGenericDecoder(decw DecoderToWriter) (Decoder, error)

NewGenericDecoder is

type DecoderAdapter

type DecoderAdapter func(Feedback, MeasureReader, io.Writer)

DecoderAdapter takes

type DecoderToWriter

type DecoderToWriter func(Measure, io.Writer) error

DecoderToWriter das

type EncodeToWriter

type EncodeToWriter func(interface{}, MeasureWriter) error

EncodeToWriter das

type Encoder

type Encoder func(Feedback, io.ReadCloser) (MeasureReader, error)

Encoder takes a Feedback function and a stream reader and returns a measure Reader and an error if any Encoder should read from the received stream and write encoded metrics into a Writer function. The data will be available to other systems through the returned Reader Initialization errors should be returned using the error object Runtime errors should be send to the Feedback function.

func NewEncoder

func NewEncoder(adapter EncoderAdapter) (Encoder, error)

NewEncoder builds an encoder function

func NewFromJSONEncoder

func NewFromJSONEncoder(data interface{}, encode EncodeToWriter) (Encoder, error)

NewFromJSONEncoder is

type EncoderAdapter

type EncoderAdapter func(Feedback, io.Reader, MeasureWriter)

EncoderAdapter takes

type Feedback

type Feedback func(format string, a ...interface{})

Feedback takes a format string and a list of interfaces to assemble a string. It was create to receive feedbacks (info, error messages) from other components in a centralized way

type Field

type Field struct {
	Name string      `json:"name,omitempty"`
	Type FieldType   `json:"type,omitempty"`
	Data interface{} `json:"data,omitempty"`
}

Field is the non-indexed portion of a measure where one can input metrics

func ParseField

func ParseField(name, kind, value string) Field

ParseField parse a field from string

func (*Field) Compare

func (f *Field) Compare(o Field) (int, error)

Compare returns -1 to less 0 to equal and +1 to more or error if note comparable

type FieldType

type FieldType byte

FieldType represents the type of a field data

func FieldValueType

func FieldValueType(value interface{}) FieldType

FieldValueType returns the type of a field

func ParseType

func ParseType(str string) FieldType

ParseType tries to convert string into types

type Filter

type Filter func(Feedback, MeasureReader) (MeasureReader, error)

Filter takes a Feedback funtion and a Measure Reader function and returns anoter Measure Reader funtion Filter should read measures from the received Measure Read, apply some kind of filter logic and write the results down to the output Reader Initialization errors should be returned using the error object Runtime errors should be send to the Feedback function.

func NewByPassFilter

func NewByPassFilter() (Filter, error)

NewByPassFilter takes a name and returns a Filter that inject that name on every measure received

func NewComposedFilter

func NewComposedFilter(flts ...Filter) (Filter, error)

NewComposedFilter takes a list of Filters and returns a new Filter that is a composition of all input filters running sequentially from left to right

func NewFieldInjectorFilter

func NewFieldInjectorFilter(fields ...Field) (Filter, error)

NewFieldInjectorFilter takes a list of fields and returnas a filter that inject those fields on every measure received

func NewFilter

func NewFilter(adapter FilterAdapter, finalizer FinalizeAdapter) (Filter, error)

NewFilter takes and adapter that implements filter logic and returns a brand new filter to be used in pipelines

func NewLogFilter

func NewLogFilter(label string) (Filter, error)

NewLogFilter takes a label and returns a Filters that prints out every Measure, tagged with the label, on console output

func NewMeasureCountFilter

func NewMeasureCountFilter() (Filter, error)

NewMeasureCountFilter counts the number of inputed measures

func NewNameInjectorFilter

func NewNameInjectorFilter(name string) (Filter, error)

NewNameInjectorFilter takes a name and returns a Filter that inject that name on every measure received

func NewNameSanityFilter

func NewNameSanityFilter() (Filter, error)

NewNameSanityFilter takes a name and returns a Filter that inject that name on every measure received

func NewTagInjectorFilter

func NewTagInjectorFilter(tags ...Tag) (Filter, error)

NewTagInjectorFilter takes a list of tags and returns a Filter that inject those tags on every measure received

func NewTimeInjectorFilter

func NewTimeInjectorFilter(time int64) (Filter, error)

NewTimeInjectorFilter takes a time and returns a Filter that inject that time on every measure received

type FilterAdapter

type FilterAdapter func(Feedback, *Measure, MeasureWriter)

FilterAdapter takes a Feedback, a Measure pointer and a MeasureWriter and Create a filter using Measure as input Write down filtered Measure information on MeasureWriter Use Feedback funtion to inform about any errors or debug information

type FilteredInput

type FilteredInput func(Input, Filter) (Input, error)

FilteredInput composes a Input and a Filter and returns an Input function

type FilteredOutput

type FilteredOutput func(Filter, Output) (Output, error)

FilteredOutput composes a Input and a Filter and returns an Input function

type FinalizeAdapter

type FinalizeAdapter func(Feedback, MeasureWriter)

FinalizeAdapter is a function called when filter stream has been closed

type Helpers

type Helpers struct {
	IOPipe    func() (*io.PipeReader, *io.PipeWriter)
	IOcopy    func(io.Writer, io.Reader)
	NewWriter func(io.Writer) MeasureWriter
	NewReader func(io.Reader) MeasureReader
}

Helpers is a struct with Helper functions to be used inside components

type IOPipeline

type IOPipeline func(Input, Filter, Output) (Runnable, error)

IOPipeline takes an input, a filter and an output

type Input

type Input func(Feedback) (MeasureReader, error)

Input takes a Feedback and returns a reader

func NewComposedInput

func NewComposedInput(src Source, enc Encoder) (Input, error)

NewComposedInput composes a source and a encoder

func NewFilteredInput

func NewFilteredInput(inp Input, flt Filter) (Input, error)

NewFilteredInput composes a Input and a Filter and returns an Input function

func NewInputFromProducer

func NewInputFromProducer(producer func(f Feedback, w MeasureWriter)) (Input, error)

NewInputFromProducer takes a producer function and returns an Input function

func NewMergedInput

func NewMergedInput(inputs ...Input) (Input, error)

NewMergedInput takes a list of inputs and merges it in a single input

type Measure

type Measure struct {
	Name string  `json:"name"`
	Tags []Tag   `json:"tags,omitempty"`
	Flds []Field `json:"flds"`
	Time int64   `json:"time"`
}

Measure is the single unit of data we pass through systems

func (*Measure) Field

func (m *Measure) Field(name string) (Field, error)

Field returns the field

func (*Measure) FieldValue

func (m *Measure) FieldValue(name string) (interface{}, error)

FieldValue returns the value of a field

func (*Measure) MD5

func (m *Measure) MD5() []byte

MD5 calculates a md5 hash from this measure tags

func (*Measure) SHA1

func (m *Measure) SHA1() []byte

SHA1 calculates a sha1 hash from this measure tags

func (*Measure) Tag

func (m *Measure) Tag(name string) (Tag, error)

Tag returns the tag

func (*Measure) TagValue

func (m *Measure) TagValue(name string) (string, error)

TagValue returns the value of a tag

type MeasureReader

type MeasureReader interface {
	Read(*Measure) error
}

MeasureReader takes a Measure pointer, modifies it and return error if any

func NewReader

func NewReader(r io.Reader) MeasureReader

NewReader read measures from io.Reader

type MeasureWriter

type MeasureWriter interface {
	Write(Measure) error
}

MeasureWriter takes a Measure struct, send it to any kind of transportation and return am error if any

func NewWriter

func NewWriter(w io.Writer) MeasureWriter

NewWriter sends measures over io.Writer

type MergedInput

type MergedInput func(...Input) (Input, error)

MergedInput takes a list of inputs and merges it in a single input

type MergedOutput

type MergedOutput func(...Output) (Output, error)

MergedOutput takes a list of outputs and returns a single output function

type Output

type Output func(Feedback, MeasureReader) error

Output takes a Feedback and a Reader and outputs to whatever place

func NewComposedOutput

func NewComposedOutput(dec Decoder, snk Sinker) (Output, error)

NewComposedOutput is

func NewFilteredOutput

func NewFilteredOutput(flt Filter, out Output) (Output, error)

NewFilteredOutput composes a Input and a Filter and returns an Input function

func NewMergedOutput

func NewMergedOutput(outputs ...Output) (Output, error)

NewMergedOutput takes a list of outputs and returns a single output function

type PushSinker

type PushSinker func(Feedback, io.ReadCloser) error

PushSinker takes

type Runnable

type Runnable func(Feedback) error

Runnable takes a Feedback function and runs whatever logic inside and returns an error if any

func NewIOPipeline

func NewIOPipeline(i Input, f Filter, o Output) (Runnable, error)

NewIOPipeline takes one component of each simple type

type Sinker

type Sinker func(Feedback, io.ReadCloser) error

Sinker takes a Feedback function and a generic Reader and returns an error if any Sinker should send the received data to other systems Initialization errors should be returned using the error object Runtime errors should be send to the Feedback function.

func NewBasicHTTPSinker

func NewBasicHTTPSinker(url, user, pwd string) (Sinker, error)

NewBasicHTTPSinker is

func NewPushSinker

func NewPushSinker(push PushSinker) (Sinker, error)

NewPushSinker takes

func NewSinker

func NewSinker(adapter SinkerAdapter) (Sinker, error)

NewSinker takes

func NewStdoutSinker

func NewStdoutSinker() (Sinker, error)

NewStdoutSinker is

func NewTCPSinker

func NewTCPSinker(addr string) (Sinker, error)

NewTCPSinker takes an address

func NewWriteCloserSinker

func NewWriteCloserSinker(out io.WriteCloser) (Sinker, error)

NewWriteCloserSinker takes

func NewWriterSinker

func NewWriterSinker(out io.Writer) (Sinker, error)

NewWriterSinker takes

type SinkerAdapter

type SinkerAdapter func(Feedback, io.ReadCloser) error

SinkerAdapter takes

type Source

type Source func(Feedback) (io.ReadCloser, error)

Source takes a Feedback function, get data from whatever source and returns that data in a io.ReadCloser object Initialization errors should be returned using the error object Runtime errors should be send to the Feedback function.

func NewBasicHTTPSource

func NewBasicHTTPSource(url, user, pwd string) (Source, error)

NewBasicHTTPSource is a basic HTTP Source

func NewGetterSource

func NewGetterSource(get SourceGetter) (Source, error)

NewGetterSource takes

func NewSource

func NewSource(adapter SourceAdapter) (Source, error)

NewSource returns a NewSource component using a getter function as its input stream

type SourceAdapter

type SourceAdapter func(Feedback, io.Writer)

SourceAdapter takes

type SourceGetter

type SourceGetter func() (io.ReadCloser, error)

SourceGetter takes

type Tag

type Tag struct {
	Name string `json:"name,omitempty"`
	Data string `json:"data,omitempty"`
}

Tag is the indexed portion of data. systems should index it to speed lookup searches

func MakeTag

func MakeTag(name, value string) Tag

MakeTag creates a tag from a string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL