Documentation
¶
Overview ¶
Package engine can read from any endpoints that provides expvar data and ships them to elasticsearch. You can inspect the metrics with kibana.
Please refer to golang's expvar documentation for more information. Installation guides can be found on github page: https://github.com/arsham/expipe
At the heart of this package, there is Engine. It acts like a glue between multiple Readers and a Recorder. Messages are transferred in a package called DataContainer, which is a list of DataType objects.
Example configuration ¶
Save it somewhere (let's call it expipe.yml for now):
settings: log_level: info readers: # You can specify the applications you want to show the metrics FirstApp: # service name type: expvar # the type of reader. More to come soon! type_name: AppVastic # this will be the _type in elasticsearch endpoint: localhost:1234 # where the application routepath: /debug/vars # the endpoint that app provides the metrics interval: 500ms # every half a second, it will collect the metrics. timeout: 3s # in 3 seconds it gives in if the application is not responsive backoff: 10 # after 10 times the application didn't response, it will stop reading from it AnotherApplication: type: expvar type_name: this_is_awesome endpoint: localhost:1235 routepath: /metrics timeout: 13s backoff: 10 recorders: # This section is where the data will be shipped to main_elasticsearch: type: elasticsearch # the type of recorder. More to come soon! endpoint: 127.0.0.1:9200 index_name: expipe timeout: 8s backoff: 10 the_other_elasticsearch: type: elasticsearch endpoint: 127.0.0.1:9201 index_name: expipe timeout: 18s backoff: 10 routes: # You can specify metrics of which application will be recorded in which target route1: readers: - FirstApp recorders: - main_elasticsearch route2: readers: - FirstApp - AnotherApplication recorders: - main_elasticsearch route3: # Yes, you can have multiple! readers: - AnotherApplication recorders: - main_elasticsearch - the_other_elasticsearch
Then run the application:
expipe -c expipe.yml
You can mix and match the routes, but the engine will choose the best set-up to achieve your goal without duplicating the results. For instance assume you set the routes like this:
readers: app_0: type: expvar app_1: type: expvar app_2: type: expvar app_3: type: expvar app_4: type: expvar app_5: type: expvar not_used_app: type: expvar # note that this one is not specified in the routes, therefore it is ignored recorders: elastic_0: type: elasticsearch elastic_1: type: elasticsearch elastic_2: type: elasticsearch elastic_3: type: elasticsearch routes: route1: readers: - app_0 - app_2 - app_4 recorders: - elastic_1 route2: readers: - app_0 - app_5 recorders: - elastic_2 - elastic_3 route3: readers: - app_1 - app_2 recorders: - elastic_0 - elastic_1
Expipe creates three engines like so:
elastic_0 records data from app_0, app_1 elastic_1 records data from app_0, app_1, app_2, app_4 elastic_2 records data from app_0, app_5 elastic_3 records data from app_0, app_5
You can change the numbers to your liking:
gc_types: # These inputs will be collected into one list and zero values will be removed memstats.PauseEnd memstats.PauseNs memory_bytes: # These values will be transoformed from bytes StackInuse: mb # To MB memstats.Alloc: gb # To GB
To run the tests for the codes, in the root of the application run:
go test $(glide nv)
Or for testing readers:
go test ./readers
To show the coverage, use this gist: https://gist.github.com/arsham/f45f7e7eea7e18796bc1ed5ced9f9f4a. Then run:
gocover
It will open a browser tab and show you the coverage.
To run all benchmarks:
go test $(glide nv) -run=^$ -bench=.
For showing the memory and cpu profiles, on each folder run:
BASENAME=$(basename $(pwd)) go test -run=^$ -bench=. -cpuprofile=cpu.out -benchmem -memprofile=mem.out go tool pprof -pdf $BASENAME.test cpu.out > cpu.pdf && open cpu.pdf go tool pprof -pdf $BASENAME.test mem.out > mem.pdf && open mem.pdf
License ¶
Use of this source code is governed by the Apache 2.0 license. License that can be found in the LICENSE file.
Index ¶
- Variables
- func Start(e Engine) chan struct{}
- func WithCtx(ctx context.Context) func(Engine) error
- func WithLogger(log tools.FieldLogger) func(Engine) error
- func WithReader(red reader.DataReader) func(Engine) error
- func WithRecorders(recs ...recorder.DataRecorder) func(Engine) error
- type Engine
- type JobError
- type Operator
- func (o Operator) Ctx() context.Context
- func (o Operator) Log() tools.FieldLogger
- func (o Operator) Reader() reader.DataReader
- func (o Operator) Recorders() map[string]recorder.DataRecorder
- func (o *Operator) SetCtx(ctx context.Context)
- func (o *Operator) SetLog(log tools.FieldLogger)
- func (o *Operator) SetReader(reader reader.DataReader)
- func (o *Operator) SetRecorders(recorders map[string]recorder.DataRecorder)
- func (o *Operator) String() string
- type PingError
- type Service
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoReader = fmt.Errorf("no reader provided") ErrNoRecorder = fmt.Errorf("no recorder provided") ErrNoLogger = fmt.Errorf("no logger provided") ErrNoCtx = fmt.Errorf("no ctx provided") )
Errors returning from Engine operations.
Functions ¶
func Start ¶
func Start(e Engine) chan struct{}
Start begins pulling data from DataReader and chip them to the DataRecorder. When the context is cancelled or timed out, the engine abandons its operations and returns an error if accrued.
Example ¶
You need at least a pair of DataReader and DataRecorder to start an engine. In this example we are using the mocked versions.
log := tools.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) recorded := make(chan string) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { recorded <- "Job was recorded" })) defer ts.Close() red := getReader(log) rec := getRecorders(log, ts.URL) e, err := engineWithReadRecs(ctx, log, red, rec) if err != nil { log.Fatalln("This error should not happen:", err) } done := make(chan struct{}) go func() { engine.Start(e) done <- struct{}{} }() fmt.Println("Engine creation success:", err == nil) fmt.Println(<-recorded) cancel() <-done fmt.Println("Client closed gracefully")
Output: Engine creation success: true Job was recorded Client closed gracefully
func WithCtx ¶
WithCtx uses ctx as the Engine's background context.
Example ¶
package main import ( "context" "fmt" "github.com/arsham/expipe/engine" ) func main() { ctx := context.Background() o := &engine.Operator{} err := engine.WithCtx(ctx)(o) fmt.Println("Error:", err) fmt.Println("o.Ctx() == ctx:", o.Ctx() == ctx) }
Output: Error: <nil> o.Ctx() == ctx: true
func WithLogger ¶
func WithLogger(log tools.FieldLogger) func(Engine) error
WithLogger sets the logger.
Example ¶
package main import ( "fmt" "github.com/arsham/expipe/engine" "github.com/arsham/expipe/tools" ) func main() { log := tools.DiscardLogger() o := &engine.Operator{} err := engine.WithLogger(log)(o) fmt.Println("Error:", err) fmt.Println("o.Log() == log:", o.Log() == log) }
Output: Error: <nil> o.Log() == log: true
func WithReader ¶ added in v0.11.0
func WithReader(red reader.DataReader) func(Engine) error
WithReader builds up the reader.
Example ¶
package main import ( "fmt" "net/http" "net/http/httptest" "time" "github.com/arsham/expipe/engine" "github.com/arsham/expipe/reader" rdt "github.com/arsham/expipe/reader/testing" "github.com/arsham/expipe/tools" ) func readerWithUrl(url string) reader.DataReader { log := tools.DiscardLogger() red, err := rdt.New( reader.WithLogger(log), reader.WithEndpoint(url), reader.WithName("reader_example"), reader.WithTypeName("typeName"), reader.WithInterval(time.Millisecond*100), reader.WithTimeout(time.Second), reader.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return red } func main() { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) red := readerWithUrl(ts.URL) o := &engine.Operator{} err := engine.WithReader(red)(o) fmt.Println("Error:", err) }
Output: Error: <nil>
Example (PingError) ¶
If the DataReader couldn't ping, it will return an error.
package main import ( "fmt" "net/http" "net/http/httptest" "reflect" "time" "github.com/arsham/expipe/engine" "github.com/arsham/expipe/reader" rdt "github.com/arsham/expipe/reader/testing" "github.com/arsham/expipe/tools" ) func readerWithUrl(url string) reader.DataReader { log := tools.DiscardLogger() red, err := rdt.New( reader.WithLogger(log), reader.WithEndpoint(url), reader.WithName("reader_example"), reader.WithTypeName("typeName"), reader.WithInterval(time.Millisecond*100), reader.WithTimeout(time.Second), reader.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return red } func main() { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) ts.Close() red := readerWithUrl(ts.URL) o := &engine.Operator{} err := engine.WithReader(red)(o) fmt.Println("Error type:", reflect.TypeOf(err)) }
Output: Error type: engine.PingError
func WithRecorders ¶ added in v0.11.0
func WithRecorders(recs ...recorder.DataRecorder) func(Engine) error
WithRecorders builds up the recorder and checks them.
Example ¶
package main import ( "fmt" "net/http" "net/http/httptest" "time" "github.com/arsham/expipe/engine" "github.com/arsham/expipe/recorder" rct "github.com/arsham/expipe/recorder/testing" "github.com/arsham/expipe/tools" ) func recorderWithUrl(url string) recorder.DataRecorder { log := tools.DiscardLogger() rec, err := rct.New( recorder.WithLogger(log), recorder.WithEndpoint(url), recorder.WithName("recorder_example"), recorder.WithIndexName("indexName"), recorder.WithTimeout(time.Second), recorder.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return rec } func main() { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) rec := recorderWithUrl(ts.URL) o := &engine.Operator{} err := engine.WithRecorders(rec)(o) fmt.Println("Error:", err) }
Output: Error: <nil>
Example (PingError) ¶
If the DataRecorder couldn't ping, it will return an error.
package main import ( "fmt" "net/http" "net/http/httptest" "reflect" "time" "github.com/arsham/expipe/engine" "github.com/arsham/expipe/recorder" rct "github.com/arsham/expipe/recorder/testing" "github.com/arsham/expipe/tools" ) func recorderWithUrl(url string) recorder.DataRecorder { log := tools.DiscardLogger() rec, err := rct.New( recorder.WithLogger(log), recorder.WithEndpoint(url), recorder.WithName("recorder_example"), recorder.WithIndexName("indexName"), recorder.WithTimeout(time.Second), recorder.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return rec } func main() { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) ts.Close() rec := recorderWithUrl(ts.URL) o := &engine.Operator{} err := engine.WithRecorders(rec)(o) fmt.Println("Error type:", reflect.TypeOf(err)) }
Output: Error type: engine.PingError
Types ¶
type Engine ¶
type Engine interface { fmt.Stringer SetCtx(context.Context) SetLog(tools.FieldLogger) SetRecorders(map[string]recorder.DataRecorder) SetReader(reader.DataReader) Ctx() context.Context Log() tools.FieldLogger Recorders() map[string]recorder.DataRecorder Reader() reader.DataReader }
Engine is an interface to Operator's behaviour. This abstraction is very tight on purpose.
func New ¶
New generates the Engine based on the provided options.
Example ¶
You can pass your configuration.
package main import ( "context" "fmt" "net/http" "net/http/httptest" "time" "github.com/arsham/expipe/engine" "github.com/arsham/expipe/reader" rdt "github.com/arsham/expipe/reader/testing" "github.com/arsham/expipe/recorder" rct "github.com/arsham/expipe/recorder/testing" "github.com/arsham/expipe/tools" ) func recorderWithUrl(url string) recorder.DataRecorder { log := tools.DiscardLogger() rec, err := rct.New( recorder.WithLogger(log), recorder.WithEndpoint(url), recorder.WithName("recorder_example"), recorder.WithIndexName("indexName"), recorder.WithTimeout(time.Second), recorder.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return rec } func readerWithUrl(url string) reader.DataReader { log := tools.DiscardLogger() red, err := rdt.New( reader.WithLogger(log), reader.WithEndpoint(url), reader.WithName("reader_example"), reader.WithTypeName("typeName"), reader.WithInterval(time.Millisecond*100), reader.WithTimeout(time.Second), reader.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return red } func main() { log := tools.DiscardLogger() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) ctx := context.Background() rec := recorderWithUrl(ts.URL) red := readerWithUrl(ts.URL) e, err := engine.New( engine.WithCtx(ctx), engine.WithLogger(log), engine.WithReader(red), engine.WithRecorders(rec), ) fmt.Println("Error:", err) fmt.Println("Engine is nil:", e == nil) }
Output: Error: <nil> Engine is nil: false
Example (Replaces) ¶
Please note that if you have a duplicate, the last one will replace the old ones.
package main import ( "context" "fmt" "net/http" "net/http/httptest" "time" "github.com/arsham/expipe/engine" "github.com/arsham/expipe/reader" rdt "github.com/arsham/expipe/reader/testing" "github.com/arsham/expipe/recorder" rct "github.com/arsham/expipe/recorder/testing" "github.com/arsham/expipe/tools" ) func recorderWithUrl(url string) recorder.DataRecorder { log := tools.DiscardLogger() rec, err := rct.New( recorder.WithLogger(log), recorder.WithEndpoint(url), recorder.WithName("recorder_example"), recorder.WithIndexName("indexName"), recorder.WithTimeout(time.Second), recorder.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return rec } func readerWithUrl(url string) reader.DataReader { log := tools.DiscardLogger() red, err := rdt.New( reader.WithLogger(log), reader.WithEndpoint(url), reader.WithName("reader_example"), reader.WithTypeName("typeName"), reader.WithInterval(time.Millisecond*100), reader.WithTimeout(time.Second), reader.WithBackoff(5), ) if err != nil { log.Fatalln("This error should not happen:", err) } return red } func main() { log := tools.DiscardLogger() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) ctx1, cancel := context.WithCancel(context.Background()) ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel() defer cancel2() rec := recorderWithUrl(ts.URL) red := readerWithUrl(ts.URL) e, err := engine.New( engine.WithCtx(ctx1), engine.WithCtx(ctx2), engine.WithLogger(log), engine.WithReader(red), engine.WithRecorders(rec), ) fmt.Println("Error:", err) fmt.Println("e.Ctx() == ctx1:", e.Ctx() == ctx1) fmt.Println("e.Ctx() == ctx2:", e.Ctx() == ctx2) }
Output: Error: <nil> e.Ctx() == ctx1: false e.Ctx() == ctx2: true
type JobError ¶
type JobError struct { Name string // Name of the operator; reader, recorder. ID token.ID Err error }
JobError caries an error around in Engine operations.
type Operator ¶ added in v0.11.0
type Operator struct {
// contains filtered or unexported fields
}
Operator represents an Engine that receives information from a reader and ships them to multiple recorders. The Operator is allowed to change the index and type names at will. When the context times out or cancelled, the Engine will close and return. Use the shut down channel to signal the Engine to stop recording. The ctx context will create a new context based on the parent.
func (Operator) Log ¶ added in v0.11.0
func (o Operator) Log() tools.FieldLogger
Log returns the logger assigned to this Engine.
func (Operator) Reader ¶ added in v0.11.0
func (o Operator) Reader() reader.DataReader
Reader returns the reader.
func (Operator) Recorders ¶ added in v0.11.0
func (o Operator) Recorders() map[string]recorder.DataRecorder
Recorders returns the recorder map.
func (*Operator) SetLog ¶ added in v0.11.0
func (o *Operator) SetLog(log tools.FieldLogger)
SetLog sets the logger of this Engine.
func (*Operator) SetReader ¶ added in v0.11.0
func (o *Operator) SetReader(reader reader.DataReader)
SetReader sets the reader.
func (*Operator) SetRecorders ¶ added in v0.11.0
func (o *Operator) SetRecorders(recorders map[string]recorder.DataRecorder)
SetRecorders sets the recorder map.
type Service ¶ added in v0.11.0
type Service struct { Log tools.FieldLogger Ctx context.Context Conf *config.ConfMap Configure func(...func(Engine) error) (Engine, error) }
Service initialises Engines. Configure injects the input values into the Operator by calling each function on it.
func (*Service) Start ¶ added in v0.11.0
Start creates some Engines and returns a channel that closes it when it's done its work. For each routes, we need one engine that has multiple readers and writes to one recorder. When all recorders of one reader go out of scope, the Engine stops that reader because there is no destination. Each Engine is ran in its own goroutine.