Documentation
¶
Overview ¶
Package recorder contains logic to record data into a database. Any objects that implements the DataRecorder interface can be used in this system.
Recorders should ping their endpoint upon creation to make sure they can access. Otherwise they should return an error indicating they cannot start.
When the context is canceled, the recorder should finish its job and return. The Time is used by the Engine for changing the index name. It is useful for cleaning up the old data.
Index ¶
- type DataRecorder
- type MockConfig
- func (m *MockConfig) Backoff() int
- func (m *MockConfig) Endpoint() string
- func (m *MockConfig) IndexName() string
- func (m *MockConfig) Logger() logrus.FieldLogger
- func (m *MockConfig) Name() string
- func (m *MockConfig) NewInstance(ctx context.Context, payloadChan chan *RecordJob) (DataRecorder, error)
- func (m *MockConfig) RoutePath() string
- func (m *MockConfig) Timeout() time.Duration
- type RecordJob
- type SimpleRecorder
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataRecorder ¶
type DataRecorder interface { // Timeout is required by the Engine so it can read the timeouts. Timeout() time.Duration // The Engine provides this channel and sends the payload through this channel. // Recorder should not block when RecordJob is sent to this channel. PayloadChan() chan *RecordJob // The recorder's loop should be inside a goroutine, and return a done channel. // The done channel should be closed one it's work is finished and wants to quit. // When the context is timedout or canceled, the recorder should return. Start(ctx context.Context) <-chan struct{} // Name should return the representation string for this recorder. Choose a very simple and unique name. Name() string // IndexName comes from the configuration, but the engine takes over. // Recorders should not intercept the engine for its decision, unless they have a // valid reason. IndexName() string }
DataRecorder in an interface for shipping data to a repository. The repository should have the concept of index/database and type/table abstractions. See ElasticSearch for more information. Recorder should send nil to Err channel of the RecordJob object if no error occurs.
type MockConfig ¶ added in v0.2.1
type MockConfig struct { Name_ string Endpoint_ string Timeout_ time.Duration Backoff_ int IndexName_ string Logger_ logrus.FieldLogger }
MockConfig holds the necessary configuration for setting up an elasticsearch reader endpoint.
func NewMockConfig ¶ added in v0.2.1
func NewMockConfig(name string, log logrus.FieldLogger, endpoint string, timeout time.Duration, backoff int, indexName string) (*MockConfig, error)
func (*MockConfig) Backoff ¶ added in v0.2.1
func (m *MockConfig) Backoff() int
func (*MockConfig) Endpoint ¶ added in v0.2.1
func (m *MockConfig) Endpoint() string
func (*MockConfig) IndexName ¶ added in v0.2.1
func (m *MockConfig) IndexName() string
func (*MockConfig) Logger ¶ added in v0.2.1
func (m *MockConfig) Logger() logrus.FieldLogger
func (*MockConfig) Name ¶ added in v0.2.1
func (m *MockConfig) Name() string
func (*MockConfig) NewInstance ¶ added in v0.2.1
func (m *MockConfig) NewInstance(ctx context.Context, payloadChan chan *RecordJob) (DataRecorder, error)
func (*MockConfig) RoutePath ¶ added in v0.2.1
func (m *MockConfig) RoutePath() string
func (*MockConfig) Timeout ¶ added in v0.2.1
func (m *MockConfig) Timeout() time.Duration
type RecordJob ¶
type RecordJob struct { Ctx context.Context Payload datatype.DataContainer IndexName string TypeName string Time time.Time // Is used for timeseries data Err chan<- error }
RecordJob is sent with a context and a payload to be recorded. If the TypeName and IndexName are different than the previous one, the recorder should use the ones engine provides.
type SimpleRecorder ¶ added in v0.1.1
type SimpleRecorder struct { Pmu sync.RWMutex PayloadChanFunc func() chan *RecordJob ErrorFunc func() error Smu sync.RWMutex StartFunc func() chan struct{} // contains filtered or unexported fields }
Example ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Println("I have received the payload!") })) defer ts.Close() payloadChan := make(chan *RecordJob) rec, _ := NewSimpleRecorder(ctx, log, payloadChan, "reader_example", ts.URL, "intexName", 10*time.Millisecond) done := rec.Start(ctx) errChan := make(chan error) job := &RecordJob{ Ctx: ctx, Payload: nil, IndexName: "my index", Time: time.Now(), Err: errChan, } // Issueing a job rec.PayloadChan() <- job // Now waiting for the results res := <-errChan fmt.Println("Error:", res) // Issueing another job rec.PayloadChan() <- job // Make sure you drain the errors <-errChan // The recorder should finish gracefully cancel() <-done fmt.Println("Readed has finished") // We need to cancel the job now fmt.Println("Finished sending!") // close(rec.PayloadChan())
Output:
Example (Start) ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() payloadChan := make(chan *RecordJob) rec, _ := NewSimpleRecorder(ctx, log, payloadChan, "reader_example", ts.URL, "intexName", 10*time.Millisecond) done := rec.Start(ctx) fmt.Println("Recorder has started its event loop!") select { case <-done: panic("Recorder shouldn't have closed its done channel") default: fmt.Println("Recorder is working!") } cancel() <-done fmt.Println("Recorder has stopped its event loop!")
Output: Recorder has started its event loop! Recorder is working! Recorder has stopped its event loop!
func NewSimpleRecorder ¶ added in v0.1.1
func NewSimpleRecorder(ctx context.Context, logger logrus.FieldLogger, payloadChan chan *RecordJob, name, endpoint, indexName string, timeout time.Duration) (*SimpleRecorder, error)
func (*SimpleRecorder) Error ¶ added in v0.1.1
func (s *SimpleRecorder) Error() error
func (*SimpleRecorder) IndexName ¶ added in v0.1.1
func (s *SimpleRecorder) IndexName() string
func (*SimpleRecorder) Name ¶ added in v0.1.1
func (s *SimpleRecorder) Name() string
func (*SimpleRecorder) PayloadChan ¶ added in v0.1.1
func (s *SimpleRecorder) PayloadChan() chan *RecordJob
func (*SimpleRecorder) Start ¶ added in v0.1.1
func (s *SimpleRecorder) Start(ctx context.Context) <-chan struct{}
func (*SimpleRecorder) Timeout ¶ added in v0.1.1
func (s *SimpleRecorder) Timeout() time.Duration
Directories
¶
Path | Synopsis |
---|---|
Package elasticsearch contains logic to record data to an elasticsearch index.
|
Package elasticsearch contains logic to record data to an elasticsearch index. |