Documentation
¶
Overview ¶
Package recorder contains logic to record data into a database. The payload is guaranteed to be json marshallable. Any types that implements the DataRecorder interface can be used in this system.
Important Notes ¶
When the context is cancelled, the recorder should finish its job and return.
Index ¶
- Variables
- func SetBackoff(backoff int) func(Constructor) error
- func SetEndpoint(endpoint string) func(Constructor) error
- func SetIndexName(indexName string) func(Constructor) error
- func SetLogger(log internal.FieldLogger) func(Constructor) error
- func SetName(name string) func(Constructor) error
- func SetTimeout(timeout time.Duration) func(Constructor) error
- type Constructor
- type DataRecorder
- type ErrEndpointNotAvailable
- type ErrInvalidEndpoint
- type ErrInvalidIndexName
- type ErrLowBackoffValue
- type ErrLowTimeout
- type ErrParseTimeOut
- type Job
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEmptyName is the error when the package name is empty. ErrEmptyName = fmt.Errorf("name cannot be empty") // ErrEmptyEndpoint is the error when the given endpoint is empty. ErrEmptyEndpoint = fmt.Errorf("endpoint cannot be empty") // ErrEmptyIndexName is the error when the index_name is an empty string. ErrEmptyIndexName = fmt.Errorf("index_name cannot be empty") // ErrBackoffExceeded is the error when the endpoint's absence has exceeded the backoff value. // It is not strictly an error, it is however a pointer to an error in the past. ErrBackoffExceeded = fmt.Errorf("endpoint gone too long") // ErrPingNotCalled is the error if the caller calls the record without pinging. ErrPingNotCalled = fmt.Errorf("the caller forgot to ask me pinging") )
Functions ¶
func SetBackoff ¶ added in v0.8.1
func SetBackoff(backoff int) func(Constructor) error
SetBackoff sets the backoff of the recorder
func SetEndpoint ¶ added in v0.8.1
func SetEndpoint(endpoint string) func(Constructor) error
SetEndpoint sets the endpoint of the recorder
func SetIndexName ¶ added in v0.8.1
func SetIndexName(indexName string) func(Constructor) error
SetIndexName sets the indexName of the recorder
func SetLogger ¶ added in v0.8.1
func SetLogger(log internal.FieldLogger) func(Constructor) error
SetLogger sets the log of the recorder
func SetName ¶ added in v0.8.1
func SetName(name string) func(Constructor) error
SetName sets the name of the recorder
func SetTimeout ¶ added in v0.8.1
func SetTimeout(timeout time.Duration) func(Constructor) error
SetTimeout sets the timeout of the recorder
Types ¶
type Constructor ¶ added in v0.8.1
type Constructor interface { // SetLogger is for setting the Logger SetLogger(logger internal.FieldLogger) // SetName is for setting the Name SetName(name string) // SetIndexName is for setting the IndexName SetIndexName(indexName string) // SetEndpoint is for setting the Endpoint SetEndpoint(endpoint string) // SetTimeout is for setting the Timeout SetTimeout(timeout time.Duration) // SetBackoff is for setting the Backoff SetBackoff(backoff int) }
Constructor is an interface for setting up an object for testing.
type DataRecorder ¶
type DataRecorder interface { // Name should return the representation string for this recorder. // Choose a very simple and unique name. Name() string // Ping should ping the endpoint and return nil if was successful. // The Engine will not launch the reader if the ping result is an error. Ping() error // IndexName comes from the configuration, but the engine takes over. IndexName() string // Timeout is required by the Engine so it can read the time-outs. Timeout() time.Duration // The recorder should record the Job and report the errors. // When the context is timed-out or cancelled, the recorder should return // with the context's error. Record(context.Context, *Job) error }
DataRecorder receives a payload for shipping data to a repository. The repository should have the concept of index/database and type/table abstractions. See ElasticSearch for more information.
Notes ¶
Recorders should not change the index name coming in the payload unless they have a valid reason. The engine might add a date to this index name if the user has specified in the configuration file.
Example ¶
This example shows when a record job is issued, the recorder hits the endpoint.
package main import ( "context" "fmt" "net/http" "net/http/httptest" "time" "github.com/arsham/expipe/internal/datatype" "github.com/arsham/expipe/recorder" "github.com/arsham/expipe/recorder/testing" ) func main() { ctx := context.Background() receivedPayload := make(chan string) pinged := false ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !pinged { pinged = true return } receivedPayload <- "I have received the payload!" })) defer ts.Close() rec := testing.GetRecorder(ts.URL) rec.Ping() fmt.Println("Pinging successful") payload := datatype.New([]datatype.DataType{ datatype.StringType{Key: "key", Value: "value"}, }) job := &recorder.Job{ Payload: payload, IndexName: "my index", Time: time.Now(), } go func() { err := rec.Record(ctx, job) if err != nil { panic("Wasn't expecting any errors") } }() fmt.Println(<-receivedPayload) fmt.Println("No errors reported") go rec.Record(ctx, job) // Issuing another job fmt.Println(<-receivedPayload) }
Output: Pinging successful I have received the payload! No errors reported I have received the payload!
type ErrEndpointNotAvailable ¶ added in v0.5.0
ErrEndpointNotAvailable is the error when the endpoint is not available.
func (ErrEndpointNotAvailable) Error ¶ added in v0.5.0
func (e ErrEndpointNotAvailable) Error() string
type ErrInvalidEndpoint ¶ added in v0.4.0
type ErrInvalidEndpoint string
ErrInvalidEndpoint is the error when the endpoint is not a valid url
func (ErrInvalidEndpoint) Error ¶ added in v0.4.0
func (e ErrInvalidEndpoint) Error() string
type ErrInvalidIndexName ¶ added in v0.7.0
type ErrInvalidIndexName string
ErrInvalidIndexName is the error when the index name is invalid.
func (ErrInvalidIndexName) Error ¶ added in v0.7.0
func (e ErrInvalidIndexName) Error() string
type ErrLowBackoffValue ¶ added in v0.4.0
type ErrLowBackoffValue int64
ErrLowBackoffValue is the error when the endpoint is not a valid url
func (ErrLowBackoffValue) Error ¶ added in v0.4.0
func (e ErrLowBackoffValue) Error() string
func (ErrLowBackoffValue) LowBackoffValue ¶ added in v0.4.0
func (ErrLowBackoffValue) LowBackoffValue()
LowBackoffValue defines the behaviour of the error
type ErrLowTimeout ¶ added in v0.8.1
ErrLowTimeout is the error when the interval is zero
func (ErrLowTimeout) Error ¶ added in v0.8.1
func (e ErrLowTimeout) Error() string
type ErrParseTimeOut ¶ added in v0.4.0
ErrParseTimeOut is for when the timeout cannot be parsed
func (ErrParseTimeOut) Error ¶ added in v0.4.0
func (e ErrParseTimeOut) Error() string
type Job ¶ added in v0.7.0
type Job struct { // ID is the job ID generated at the time the payload was generated. ID token.ID // Payload has a Bytes() method for returning the data. // It is guaranteed to be json marshallable. Payload datatype.DataContainer // Time is the recorded time at the time of fetching data by the readers. // You should use this value to fetch the content of the payload Time time.Time // IndexName might be different than the one is set in the recorder. // Engine might decide to change it and you have to use the provided one. IndexName string // TypeName comes from the configuration of readers. TypeName string }
Job is sent with a context and a payload to be recorded. If the TypeName and IndexName are different than the previous one, the recorder should use the ones engine provides. If any errors occurred, recorders should return the error on Read return value.
Directories
¶
Path | Synopsis |
---|---|
Package elasticsearch contains logic to record data to an elasticsearch index.
|
Package elasticsearch contains logic to record data to an elasticsearch index. |
Package testing is a test suit for recorders.
|
Package testing is a test suit for recorders. |