recorder

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2018 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package recorder contains logic to record data into a database. The payload is guaranteed to be json marshallable. Any types that implements the DataRecorder interface can be used in this system.

Important Notes

When the context is cancelled, the recorder should finish its job and return.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyName       = fmt.Errorf("name cannot be empty")
	ErrEmptyEndpoint   = fmt.Errorf("endpoint cannot be empty")
	ErrEmptyIndexName  = fmt.Errorf("index_name cannot be empty")
	ErrBackoffExceeded = fmt.Errorf("endpoint gone too long")
	ErrPingNotCalled   = fmt.Errorf("the caller forgot to ask me pinging")
)

ErrEmptyName is the error when the package name is empty. ErrEmptyEndpoint is the error when the given endpoint is empty. ErrEmptyIndexName is the error when the index_name is an empty string. ErrBackoffExceeded is the error when the endpoint's absence has exceeded the backoff value. It is not strictly an error, it is however a pointer to an error in the past. ErrPingNotCalled is the error if the caller calls the record without pinging.

Functions

func WithBackoff added in v0.9.0

func WithBackoff(backoff int) func(Constructor) error

WithBackoff sets the backoff of the recorder

func WithEndpoint added in v0.9.0

func WithEndpoint(endpoint string) func(Constructor) error

WithEndpoint sets the endpoint of the recorder

func WithIndexName added in v0.9.0

func WithIndexName(indexName string) func(Constructor) error

WithIndexName sets the indexName of the recorder

func WithLogger added in v0.9.0

func WithLogger(log internal.FieldLogger) func(Constructor) error

WithLogger sets the log of the recorder

func WithName added in v0.9.0

func WithName(name string) func(Constructor) error

WithName sets the name of the recorder

func WithTimeout added in v0.9.0

func WithTimeout(timeout time.Duration) func(Constructor) error

WithTimeout sets the timeout of the recorder

Types

type Constructor added in v0.8.1

type Constructor interface {
	SetLogger(logger internal.FieldLogger)
	SetName(name string)
	SetIndexName(indexName string)
	SetEndpoint(endpoint string)
	SetTimeout(timeout time.Duration)
	SetBackoff(backoff int)
}

Constructor is an interface for setting up an object for testing.

type DataRecorder

type DataRecorder interface {
	Name() string
	Ping() error
	IndexName() string
	Timeout() time.Duration
	Record(context.Context, *Job) error
}

DataRecorder receives a payload for shipping data to a repository. The repository should have the concept of index/database and type/table abstractions. See ElasticSearch for more information.

Notes

Recorders should not change the index name coming in the payload unless they have a valid reason. The engine might add a date to this index name if the user has specified in the configuration file. Ping() should ping the endpoint and return nil if was successful. The Engine will not launch the reader if the ping result is an error. IndexName() comes from the configuration, but the engine takes over. Recorder() should record the Job and report the errors. When the context is timed-out or cancelled, the recorder should return with the context's error.

Example

This example shows when a record job is issued, the recorder hits the endpoint.

package main

import (
	"context"
	"fmt"
	"net/http"
	"net/http/httptest"
	"time"

	"github.com/arsham/expipe/datatype"
	"github.com/arsham/expipe/recorder"
	"github.com/arsham/expipe/recorder/testing"
)

func main() {
	ctx := context.Background()
	receivedPayload := make(chan string)
	pinged := false
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if !pinged {
			pinged = true
			return
		}
		receivedPayload <- "I have received the payload!"
	}))
	defer ts.Close()

	rec := testing.GetRecorder(ts.URL)
	rec.Ping()
	fmt.Println("Pinging successful")
	payload := datatype.New([]datatype.DataType{
		datatype.StringType{Key: "key", Value: "value"},
	})
	job := &recorder.Job{
		Payload:   payload,
		IndexName: "my index",
		Time:      time.Now(),
	}

	go func() {
		err := rec.Record(ctx, job)
		if err != nil {
			panic("Wasn't expecting any errors")
		}
	}()
	fmt.Println(<-receivedPayload)
	fmt.Println("No errors reported")

	go rec.Record(ctx, job) // Issuing another job
	fmt.Println(<-receivedPayload)

}
Output:

Pinging successful
I have received the payload!
No errors reported
I have received the payload!

type ErrEndpointNotAvailable added in v0.5.0

type ErrEndpointNotAvailable struct {
	Endpoint string
	Err      error
}

ErrEndpointNotAvailable is the error when the endpoint is not available.

func (ErrEndpointNotAvailable) Error added in v0.5.0

func (e ErrEndpointNotAvailable) Error() string

type ErrInvalidEndpoint added in v0.4.0

type ErrInvalidEndpoint string

ErrInvalidEndpoint is the error when the endpoint is not a valid url

func (ErrInvalidEndpoint) Error added in v0.4.0

func (e ErrInvalidEndpoint) Error() string

type ErrInvalidIndexName added in v0.7.0

type ErrInvalidIndexName string

ErrInvalidIndexName is the error when the index name is invalid.

func (ErrInvalidIndexName) Error added in v0.7.0

func (e ErrInvalidIndexName) Error() string

type ErrLowBackoffValue added in v0.4.0

type ErrLowBackoffValue int64

ErrLowBackoffValue is the error when the endpoint is not a valid url

func (ErrLowBackoffValue) Error added in v0.4.0

func (e ErrLowBackoffValue) Error() string

type ErrLowTimeout added in v0.8.1

type ErrLowTimeout time.Duration

ErrLowTimeout is the error when the interval is zero

func (ErrLowTimeout) Error added in v0.8.1

func (e ErrLowTimeout) Error() string

type ErrParseTimeOut added in v0.4.0

type ErrParseTimeOut struct {
	Timeout string
	Err     error
}

ErrParseTimeOut is for when the timeout cannot be parsed

func (ErrParseTimeOut) Error added in v0.4.0

func (e ErrParseTimeOut) Error() string

type Job added in v0.7.0

type Job struct {
	// ID is the job ID generated at the time the payload was generated.
	ID token.ID

	// Payload has a Bytes() method for returning the data.
	// It is guaranteed to be json marshallable.
	Payload datatype.DataContainer

	// Time is the recorded time at the time of fetching data by the readers.
	// You should use this value to fetch the content of the payload
	Time time.Time

	// IndexName might be different than the one is set in the recorder.
	// Engine might decide to change it and you have to use the provided one.
	IndexName string

	// TypeName comes from the configuration of readers.
	TypeName string
}

Job is sent with a context and a payload to be recorded. If the TypeName and IndexName are different than the previous one, the recorder should use the ones engine provides. If any errors occurred, recorders should return the error on Read return value.

Directories

Path Synopsis
Package elasticsearch contains logic to record data to an elasticsearch index.
Package elasticsearch contains logic to record data to an elasticsearch index.
Package testing is a test suit for recorders.
Package testing is a test suit for recorders.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL