Documentation ¶
Overview ¶
Package reader contains logic for reading from a provider. Any objects that implements the DataReader interface can be used in this system. The job should provide an io.ReadCloser and should produce a JSON object, othewise the data will be rejected.
The data stream SHOULD not be closed. The engine WILL close it upon reading its contents.
Readers should ping their endpoint upon creation to make sure they can read from. Otherwise they should return an error indicating they cannot start.
When the context is canceled, the reader should finish its job and return. The Time should be set when the data is read from the endpoint, otherwise it will lose its meaning. The engine will issue jobs based on the Interval, which is set in the configuration file.
Index ¶
- type ContextReader
- type CtxReader
- type DataReader
- type InterTimer
- type MockConfig
- func (c *MockConfig) Backoff() int
- func (c *MockConfig) Endpoint() string
- func (c *MockConfig) Interval() time.Duration
- func (c *MockConfig) Logger() logrus.FieldLogger
- func (c *MockConfig) Name() string
- func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, ...) (DataReader, error)
- func (c *MockConfig) RoutePath() string
- func (c *MockConfig) Timeout() time.Duration
- func (c *MockConfig) TypeName() string
- type MockCtxReader
- type ReadJob
- type ReadJobResult
- type SimpleReader
- func (m *SimpleReader) Interval() time.Duration
- func (m *SimpleReader) JobChan() chan context.Context
- func (m *SimpleReader) Name() string
- func (m *SimpleReader) ResultChan() chan *ReadJobResult
- func (m *SimpleReader) Start(ctx context.Context) <-chan struct{}
- func (m *SimpleReader) Timeout() time.Duration
- func (m *SimpleReader) TypeName() string
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ContextReader ¶
type ContextReader interface { // Get reads from the url and returns DefaultClient errors. // This operation's deadline and cancellation depends on ctx. // You should close the Body when you finished reading. Get(ctx context.Context) (*http.Response, error) }
ContextReader reads from the url with the specified context.
type CtxReader ¶
type CtxReader struct {
// contains filtered or unexported fields
}
CtxReader implements ContextReader interface.
type DataReader ¶ added in v0.1.2
type DataReader interface { InterTimer // The engine will send a signal to this channel to inform the reader when it is time to read // from the target. // The engine never blocks if the reader is not able to process the requests. // This channel will be provided by the Engine. // The context might be canceled depending how the user sets the timeouts. JobChan() chan context.Context // The engine runs the reading job in another goroutine. The engine will provide this channel, however // the reader should not send send a lot of data back to the engine, otherwise it might cause crash // on readers and the application itself. ResultChan() chan *ReadJobResult // The reader's loop should be inside a goroutine, and return a done channel. // This channel should be closed once its work is finished and the reader wants to quit. // When the context is timedout or canceled, the reader should return. Start(ctx context.Context) <-chan struct{} // TypeName is usually the application name and is set by the user in the configuration file. // Recorders should not intercept the engine for its decision, unless they have a // valid reason. TypeName() string // Name should return the representation string for this reader. Choose a very simple and unique name. Name() string }
DataReader recieves job requests to read from the target, and sends its success through the ResultChan channel.
type InterTimer ¶ added in v0.1.1
type InterTimer interface { Timeout() time.Duration // is used for timing out reading from the endpoint. Interval() time.Duration // the engine will issue jobs based on this interval. }
InterTimer is required by the Engine so it can read the intervals and timeouts.
type MockConfig ¶ added in v0.2.1
type MockConfig struct { Name_ string TypeName_ string Endpoint_ string RoutePath_ string Timeout_ time.Duration Interval_ time.Duration Backoff_ int Logger_ logrus.FieldLogger }
func NewMockConfig ¶ added in v0.2.1
func NewMockConfig(name, typeName string, log logrus.FieldLogger, endpoint, routepath string, interval, timeout time.Duration, backoff int) (*MockConfig, error)
func (*MockConfig) Backoff ¶ added in v0.2.1
func (c *MockConfig) Backoff() int
func (*MockConfig) Endpoint ¶ added in v0.2.1
func (c *MockConfig) Endpoint() string
func (*MockConfig) Interval ¶ added in v0.2.1
func (c *MockConfig) Interval() time.Duration
func (*MockConfig) Logger ¶ added in v0.2.1
func (c *MockConfig) Logger() logrus.FieldLogger
func (*MockConfig) Name ¶ added in v0.2.1
func (c *MockConfig) Name() string
func (*MockConfig) NewInstance ¶ added in v0.2.1
func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, resultChan chan *ReadJobResult) (DataReader, error)
func (*MockConfig) RoutePath ¶ added in v0.2.1
func (c *MockConfig) RoutePath() string
func (*MockConfig) Timeout ¶ added in v0.2.1
func (c *MockConfig) Timeout() time.Duration
func (*MockConfig) TypeName ¶ added in v0.2.1
func (c *MockConfig) TypeName() string
type MockCtxReader ¶
type MockCtxReader struct { ContextReadFunc func(ctx context.Context) (*http.Response, error) // contains filtered or unexported fields }
func NewMockCtxReader ¶
func NewMockCtxReader(url string) *MockCtxReader
type ReadJob ¶
type ReadJob struct { Ctx context.Context // The reader should always send an error messge back, even if there is no errors otherwise it will // cause goroutine leakage. In case there were no error, just send nil. Err chan error }
ReadJob is sent with a context and a channel to read the errors back.
type ReadJobResult ¶
ReadJobResult is constructed everytime a new record is fetched. The time is set after the request was successfully read.
type SimpleReader ¶ added in v0.1.1
type SimpleReader struct { StartFunc func() chan struct{} // contains filtered or unexported fields }
SimpleReader is useful for testing purposes.
Example ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() jobChan := make(chan context.Context, 10) resultChan := make(chan *ReadJobResult, 10) ctxReader := NewCtxReader(ts.URL) red, _ := NewSimpleReader(log, ctxReader, jobChan, resultChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond) done := red.Start(ctx) job, _ := context.WithCancel(ctx) // Issueing a job red.JobChan() <- job // Now waiting for the results res := <-red.ResultChan() fmt.Println("Error:", res.Err) // Let's read what it retreived buf := new(bytes.Buffer) buf.ReadFrom(res.Res) fmt.Println("Result is:", buf.String()) // The reader should finish gracefully cancel() <-done fmt.Println("Readed has finished") // We need to cancel the job now fmt.Println("All done!")
Output: Error: <nil> Result is: {"the key": "is the value!"} Readed has finished All done!
Example (Start) ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() jobChan := make(chan context.Context) resultChan := make(chan *ReadJobResult) red, _ := NewSimpleReader(log, NewCtxReader(ts.URL), jobChan, resultChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond) done := red.Start(ctx) fmt.Println("Reader has started its event loop!") select { case <-done: panic("Reader shouldn't have closed its done channel") default: fmt.Println("Reader is working!") } cancel() <-done fmt.Println("Reader has stopped its event loop!")
Output: Reader has started its event loop! Reader is working! Reader has stopped its event loop!
func NewSimpleReader ¶ added in v0.1.1
func NewSimpleReader(logger logrus.FieldLogger, ctxReader ContextReader, jobChan chan context.Context, resultChan chan *ReadJobResult, name, typeName string, interval, timeout time.Duration) (*SimpleReader, error)
func (*SimpleReader) Interval ¶ added in v0.1.1
func (m *SimpleReader) Interval() time.Duration
func (*SimpleReader) JobChan ¶ added in v0.1.1
func (m *SimpleReader) JobChan() chan context.Context
func (*SimpleReader) Name ¶ added in v0.1.1
func (m *SimpleReader) Name() string
func (*SimpleReader) ResultChan ¶ added in v0.1.1
func (m *SimpleReader) ResultChan() chan *ReadJobResult
func (*SimpleReader) Start ¶ added in v0.1.1
func (m *SimpleReader) Start(ctx context.Context) <-chan struct{}
func (*SimpleReader) Timeout ¶ added in v0.1.1
func (m *SimpleReader) Timeout() time.Duration
func (*SimpleReader) TypeName ¶ added in v0.2.1
func (m *SimpleReader) TypeName() string