Documentation
¶
Index ¶
- type DataRecorder
- type InterTimer
- type MockConfig
- func (m *MockConfig) Backoff() int
- func (m *MockConfig) Endpoint() string
- func (m *MockConfig) IndexName() string
- func (m *MockConfig) Interval() time.Duration
- func (m *MockConfig) Logger() logrus.FieldLogger
- func (m *MockConfig) Name() string
- func (m *MockConfig) NewInstance(ctx context.Context, payloadChan chan *RecordJob) (DataRecorder, error)
- func (m *MockConfig) RoutePath() string
- func (m *MockConfig) Timeout() time.Duration
- type RecordJob
- type SimpleRecorder
- func (s *SimpleRecorder) Error() error
- func (s *SimpleRecorder) IndexName() string
- func (s *SimpleRecorder) Interval() time.Duration
- func (s *SimpleRecorder) Name() string
- func (s *SimpleRecorder) PayloadChan() chan *RecordJob
- func (s *SimpleRecorder) Start(ctx context.Context) <-chan struct{}
- func (s *SimpleRecorder) Timeout() time.Duration
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataRecorder ¶
type DataRecorder interface { InterTimer // Recorder should not block when RecordJob is sent to this channel. PayloadChan() chan *RecordJob // The recorder's loop should be inside a goroutine, and return a done channel. // The done channel should be closed one it's work is finished and wants to quit. // When the context is timedout or canceled, the recorder should return. Start(ctx context.Context) <-chan struct{} // Name should return the representation string for this recorder. Choose a very simple name. Name() string // IndexName comes from the configuration, but the engine takes over. // Recorders should not intercept the engine for its decision, unless they have a // valid reason. IndexName() string }
DataRecorder in an interface for shipping data to a repository. The repository should have the concept of index/database and type/table abstractions. See ElasticSearch for more information. Recorder should send nil to Err channel of the RecordJob object if no error occurs.
type InterTimer ¶ added in v0.1.1
InterTimer is required by the Engine so it can read the intervals and timeouts.
type MockConfig ¶ added in v0.2.1
type MockConfig struct { Name_ string Endpoint_ string Timeout_ time.Duration Interval_ time.Duration Backoff_ int IndexName_ string Logger_ logrus.FieldLogger }
MockConfig holds the necessary configuration for setting up an elasticsearch reader endpoint.
func NewMockConfig ¶ added in v0.2.1
func NewMockConfig(name string, log logrus.FieldLogger, endpoint string, interval, timeout time.Duration, backoff int, indexName string) (*MockConfig, error)
func (*MockConfig) Backoff ¶ added in v0.2.1
func (m *MockConfig) Backoff() int
func (*MockConfig) Endpoint ¶ added in v0.2.1
func (m *MockConfig) Endpoint() string
func (*MockConfig) IndexName ¶ added in v0.2.1
func (m *MockConfig) IndexName() string
func (*MockConfig) Interval ¶ added in v0.2.1
func (m *MockConfig) Interval() time.Duration
func (*MockConfig) Logger ¶ added in v0.2.1
func (m *MockConfig) Logger() logrus.FieldLogger
func (*MockConfig) Name ¶ added in v0.2.1
func (m *MockConfig) Name() string
func (*MockConfig) NewInstance ¶ added in v0.2.1
func (m *MockConfig) NewInstance(ctx context.Context, payloadChan chan *RecordJob) (DataRecorder, error)
func (*MockConfig) RoutePath ¶ added in v0.2.1
func (m *MockConfig) RoutePath() string
func (*MockConfig) Timeout ¶ added in v0.2.1
func (m *MockConfig) Timeout() time.Duration
type RecordJob ¶
type RecordJob struct { Ctx context.Context Payload datatype.DataContainer IndexName string TypeName string Time time.Time // Is used for timeseries data Err chan<- error }
RecordJob is sent with a context and a payload to be recorded. If the TypeName and IndexName are different than the previous one, the recorder should use the ones engine provides
type SimpleRecorder ¶ added in v0.1.1
type SimpleRecorder struct { Pmu sync.RWMutex PayloadChanFunc func() chan *RecordJob ErrorFunc func() error Smu sync.RWMutex StartFunc func() chan struct{} // contains filtered or unexported fields }
Example ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Println("I have received the payload!") })) defer ts.Close() payloadChan := make(chan *RecordJob) rec, _ := NewSimpleRecorder(ctx, log, payloadChan, "reader_example", ts.URL, "intexName", 10*time.Millisecond, 10*time.Millisecond) done := rec.Start(ctx) errChan := make(chan error) job := &RecordJob{ Ctx: ctx, Payload: nil, IndexName: "my index", Time: time.Now(), Err: errChan, } // Issueing a job rec.PayloadChan() <- job // Now waiting for the results res := <-errChan fmt.Println("Error:", res) // Issueing another job rec.PayloadChan() <- job // Make sure you drain the errors <-errChan // The recorder should finish gracefully cancel() <-done fmt.Println("Readed has finished") // We need to cancel the job now fmt.Println("Finished sending!") // close(rec.PayloadChan())
Output:
Example (Start1) ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() payloadChan := make(chan *RecordJob) rec, _ := NewSimpleRecorder(ctx, log, payloadChan, "reader_example", ts.URL, "intexName", 10*time.Millisecond, 10*time.Millisecond) done := rec.Start(ctx) fmt.Println("Recorder has started its event loop!") select { case <-done: panic("Recorder shouldn't have closed its done channel") default: fmt.Println("Recorder is working!") } cancel() <-done fmt.Println("Recorder has stopped its event loop!")
Output: Recorder has started its event loop! Recorder is working! Recorder has stopped its event loop!
func NewSimpleRecorder ¶ added in v0.1.1
func NewSimpleRecorder(ctx context.Context, logger logrus.FieldLogger, payloadChan chan *RecordJob, name, endpoint, indexName string, interval, timeout time.Duration) (*SimpleRecorder, error)
func (*SimpleRecorder) Error ¶ added in v0.1.1
func (s *SimpleRecorder) Error() error
func (*SimpleRecorder) IndexName ¶ added in v0.1.1
func (s *SimpleRecorder) IndexName() string
func (*SimpleRecorder) Interval ¶ added in v0.1.1
func (s *SimpleRecorder) Interval() time.Duration
func (*SimpleRecorder) Name ¶ added in v0.1.1
func (s *SimpleRecorder) Name() string
func (*SimpleRecorder) PayloadChan ¶ added in v0.1.1
func (s *SimpleRecorder) PayloadChan() chan *RecordJob
func (*SimpleRecorder) Start ¶ added in v0.1.1
func (s *SimpleRecorder) Start(ctx context.Context) <-chan struct{}
func (*SimpleRecorder) Timeout ¶ added in v0.1.1
func (s *SimpleRecorder) Timeout() time.Duration
Click to show internal directories.
Click to hide internal directories.