recorder

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2016 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DataRecorder

type DataRecorder interface {
	InterTimer

	// Recorder should not block when RecordJob is sent to this channel.
	PayloadChan() chan *RecordJob

	// The recorder's loop should be inside a goroutine, and return a done channel.
	// The done channel should be closed one it's work is finished and wants to quit.
	// When the context is timedout or canceled, the recorder should return.
	Start(ctx context.Context) <-chan struct{}

	// Name should return the representation string for this recorder. Choose a very simple name.
	Name() string

	// IndexName comes from the configuration, but the engine takes over.
	// Recorders should not intercept the engine for its decision, unless they have a
	// valid reason.
	IndexName() string
}

DataRecorder in an interface for shipping data to a repository. The repository should have the concept of index/database and type/table abstractions. See ElasticSearch for more information. Recorder should send nil to Err channel of the RecordJob object if no error occurs.

type InterTimer added in v0.1.1

type InterTimer interface {
	Timeout() time.Duration
	Interval() time.Duration
}

InterTimer is required by the Engine so it can read the intervals and timeouts.

type MockConfig added in v0.2.1

type MockConfig struct {
	Name_      string
	Endpoint_  string
	Timeout_   time.Duration
	Interval_  time.Duration
	Backoff_   int
	IndexName_ string
	Logger_    logrus.FieldLogger
}

MockConfig holds the necessary configuration for setting up an elasticsearch reader endpoint.

func NewMockConfig added in v0.2.1

func NewMockConfig(name string, log logrus.FieldLogger, endpoint string, interval, timeout time.Duration, backoff int, indexName string) (*MockConfig, error)

func (*MockConfig) Backoff added in v0.2.1

func (m *MockConfig) Backoff() int

func (*MockConfig) Endpoint added in v0.2.1

func (m *MockConfig) Endpoint() string

func (*MockConfig) IndexName added in v0.2.1

func (m *MockConfig) IndexName() string

func (*MockConfig) Interval added in v0.2.1

func (m *MockConfig) Interval() time.Duration

func (*MockConfig) Logger added in v0.2.1

func (m *MockConfig) Logger() logrus.FieldLogger

func (*MockConfig) Name added in v0.2.1

func (m *MockConfig) Name() string

func (*MockConfig) NewInstance added in v0.2.1

func (m *MockConfig) NewInstance(ctx context.Context, payloadChan chan *RecordJob) (DataRecorder, error)

func (*MockConfig) RoutePath added in v0.2.1

func (m *MockConfig) RoutePath() string

func (*MockConfig) Timeout added in v0.2.1

func (m *MockConfig) Timeout() time.Duration

type RecordJob

type RecordJob struct {
	Ctx       context.Context
	Payload   datatype.DataContainer
	IndexName string
	TypeName  string
	Time      time.Time // Is used for timeseries data
	Err       chan<- error
}

RecordJob is sent with a context and a payload to be recorded. If the TypeName and IndexName are different than the previous one, the recorder should use the ones engine provides

type SimpleRecorder added in v0.1.1

type SimpleRecorder struct {
	Pmu             sync.RWMutex
	PayloadChanFunc func() chan *RecordJob
	ErrorFunc       func() error
	Smu             sync.RWMutex
	StartFunc       func() chan struct{}
	// contains filtered or unexported fields
}
Example
log := lib.DiscardLogger()
ctx, cancel := context.WithCancel(context.Background())
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	fmt.Println("I have received the payload!")
}))
defer ts.Close()

payloadChan := make(chan *RecordJob)
rec, _ := NewSimpleRecorder(ctx, log, payloadChan, "reader_example", ts.URL, "intexName", 10*time.Millisecond, 10*time.Millisecond)
done := rec.Start(ctx)

errChan := make(chan error)
job := &RecordJob{
	Ctx:       ctx,
	Payload:   nil,
	IndexName: "my index",
	Time:      time.Now(),
	Err:       errChan,
}
// Issueing a job
rec.PayloadChan() <- job

// Now waiting for the results
res := <-errChan
fmt.Println("Error:", res)
// Issueing another job
rec.PayloadChan() <- job
// Make sure you drain the errors
<-errChan

// The recorder should finish gracefully
cancel()
<-done
fmt.Println("Readed has finished")

// We need to cancel the job now
fmt.Println("Finished sending!")
// close(rec.PayloadChan())
Output:

Example (Start1)
log := lib.DiscardLogger()
ctx, cancel := context.WithCancel(context.Background())

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) }))
defer ts.Close()

payloadChan := make(chan *RecordJob)
rec, _ := NewSimpleRecorder(ctx, log, payloadChan, "reader_example", ts.URL, "intexName", 10*time.Millisecond, 10*time.Millisecond)
done := rec.Start(ctx)

fmt.Println("Recorder has started its event loop!")

select {
case <-done:
	panic("Recorder shouldn't have closed its done channel")
default:
	fmt.Println("Recorder is working!")
}

cancel()
<-done
fmt.Println("Recorder has stopped its event loop!")
Output:

Recorder has started its event loop!
Recorder is working!
Recorder has stopped its event loop!

func NewSimpleRecorder added in v0.1.1

func NewSimpleRecorder(ctx context.Context, logger logrus.FieldLogger, payloadChan chan *RecordJob, name, endpoint, indexName string, interval, timeout time.Duration) (*SimpleRecorder, error)

func (*SimpleRecorder) Error added in v0.1.1

func (s *SimpleRecorder) Error() error

func (*SimpleRecorder) IndexName added in v0.1.1

func (s *SimpleRecorder) IndexName() string

func (*SimpleRecorder) Interval added in v0.1.1

func (s *SimpleRecorder) Interval() time.Duration

func (*SimpleRecorder) Name added in v0.1.1

func (s *SimpleRecorder) Name() string

func (*SimpleRecorder) PayloadChan added in v0.1.1

func (s *SimpleRecorder) PayloadChan() chan *RecordJob

func (*SimpleRecorder) Start added in v0.1.1

func (s *SimpleRecorder) Start(ctx context.Context) <-chan struct{}

func (*SimpleRecorder) Timeout added in v0.1.1

func (s *SimpleRecorder) Timeout() time.Duration

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL