Documentation
¶
Overview ¶
Package reader contains logic for reading from a provider. Any objects that implements the DataReader interface can be used in this system. The job should provide an io.ReadCloser and should produce a JSON object, otherwise the data will be rejected.
The data stream SHOULD not be closed. The engine WILL close it upon reading its contents.
Readers should ping their endpoint upon creation to make sure they can read from. Otherwise they should return an error indicating they cannot start.
When the context is cancelled, the reader should finish its job and return. The Time should be set when the data is read from the endpoint, otherwise it will lose its meaning. The engine will issue jobs based on the Interval, which is set in the configuration file.
Index ¶
- Constants
- Variables
- func TestReaderConstruction(t *testing.T, setup setupFunc)
- func TestReaderEndpointManeuvers(t *testing.T, ...)
- func TestReaderEssentials(t *testing.T, ...)
- type DataReader
- type ErrEndpointNotAvailable
- type ErrInvalidEndpoint
- type InterTimer
- type MockConfig
- func (c *MockConfig) Backoff() int
- func (c *MockConfig) Endpoint() string
- func (c *MockConfig) Interval() time.Duration
- func (c *MockConfig) Logger() logrus.FieldLogger
- func (c *MockConfig) Name() string
- func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, ...) (DataReader, error)
- func (c *MockConfig) RoutePath() string
- func (c *MockConfig) Timeout() time.Duration
- func (c *MockConfig) TypeName() string
- type ReadJobResult
- type SimpleReader
- func (m *SimpleReader) Interval() time.Duration
- func (m *SimpleReader) JobChan() chan context.Context
- func (m *SimpleReader) Mapper() datatype.Mapper
- func (m *SimpleReader) Name() string
- func (m *SimpleReader) ResultChan() chan *ReadJobResult
- func (m *SimpleReader) Start(ctx context.Context, stop communication.StopChannel)
- func (m *SimpleReader) Timeout() time.Duration
- func (m *SimpleReader) TypeName() string
Examples ¶
Constants ¶
const ( // GenericReaderReceivesJobTestCase invokes TestGenericReaderReceivesJob test GenericReaderReceivesJobTestCase = iota // ReaderSendsResultTestCase invokes TestReaderSendsResult test ReaderSendsResultTestCase // ReaderReadsOnBufferedChanTestCase invokes TestReaderReadsOnBufferedChan test ReaderReadsOnBufferedChanTestCase // ReaderDrainsAfterClosingContextTestCase invokes TestReaderDrainsAfterClosingContext test ReaderDrainsAfterClosingContextTestCase // ReaderClosesTestCase invokes TestReaderCloses test ReaderClosesTestCase // ReaderClosesWithBufferedChansTestCase invokes TestReaderClosesWithBufferedChans test ReaderClosesWithBufferedChansTestCase // ReaderWithNoValidURLErrorsTestCase invokes TestReaderWithNoValidURLErrors test ReaderWithNoValidURLErrorsTestCase // ReaderErrorsOnEndpointDisapearsTestCase invokes TestReaderErrorsOnEndpointDisapears test ReaderErrorsOnEndpointDisapearsTestCase )
Variables ¶
var ( // ErrEmptyName is the error when the package name is empty. ErrEmptyName = fmt.Errorf("name cannot be empty") // ErrEmptyEndpoint is the error when the given endpoint is empty. ErrEmptyEndpoint = fmt.Errorf("endpoint cannot be empty") // ErrEmptyTypeName is the error when the type_name is an empty string. ErrEmptyTypeName = fmt.Errorf("type_name cannot be empty") )
Functions ¶
func TestReaderConstruction ¶ added in v0.5.0
TestReaderConstruction runs all essential tests on object construction.
func TestReaderEndpointManeuvers ¶ added in v0.5.0
func TestReaderEndpointManeuvers(t *testing.T, setup func(testCase int, endpoint string) (red DataReader, errorChan chan communication.ErrorMessage))
TestReaderEndpointManeuvers runs all tests regarding the endpoint changing state
func TestReaderEssentials ¶ added in v0.5.0
func TestReaderEssentials(t *testing.T, setup func(testCase int) (red DataReader, errorChan chan communication.ErrorMessage, testMessage string, teardown func()))
TestReaderEssentials runs all essential tests
Types ¶
type DataReader ¶ added in v0.1.2
type DataReader interface { InterTimer // The engine will send a signal to this channel to inform the reader when // it is time to read from the target. // The engine never blocks if the reader is not able to process the requests. // This channel will be provided by the Engine. // The context might be cancelled depending how the user sets the time-outs. // The UUID associated with this job is inside the context. Readers are // advised to use this ID and pass them along. JobChan() chan context.Context // The engine runs the reading job in another goroutine. The engine will provide this channel, however // the reader should not send send a lot of data back to the engine, otherwise it might cause crash // on readers and the application itself. ResultChan() chan *ReadJobResult // The reader's loop should be inside a goroutine. // This channel should be closed once the worker receives a stop signal // and its work is finished. The response to the stop signal should happen // otherwise it will hang the Engine around. // When the context is timed-out or cancelled, the reader should return. Start(ctx context.Context, stop communication.StopChannel) // Mapper should return an instance of the datatype mapper. // Engine uses this object to present the data to recorders. Mapper() datatype.Mapper // TypeName is usually the application name and is set by the user in the configuration file. // Recorders should not intercept the engine for its decision, unless they have a // valid reason. TypeName() string // Name should return the representation string for this reader. Choose a very simple and unique name. Name() string }
DataReader receives job requests to read from the target, and sends its success through the ResultChan channel.
type ErrEndpointNotAvailable ¶ added in v0.5.0
ErrEndpointNotAvailable is the error when the endpoint is not available.
func (ErrEndpointNotAvailable) EndpointNotAvailable ¶ added in v0.5.0
func (ErrEndpointNotAvailable) EndpointNotAvailable()
EndpointNotAvailable defines the behaviour of the error
func (ErrEndpointNotAvailable) Error ¶ added in v0.5.0
func (e ErrEndpointNotAvailable) Error() string
type ErrInvalidEndpoint ¶ added in v0.5.0
type ErrInvalidEndpoint string
ErrInvalidEndpoint is the error when the endpoint is not a valid url
func (ErrInvalidEndpoint) Error ¶ added in v0.5.0
func (e ErrInvalidEndpoint) Error() string
func (ErrInvalidEndpoint) InvalidEndpoint ¶ added in v0.5.0
func (ErrInvalidEndpoint) InvalidEndpoint()
InvalidEndpoint defines the behaviour of the error
type InterTimer ¶ added in v0.1.1
type InterTimer interface { Timeout() time.Duration // is used for timing out reading from the endpoint. Interval() time.Duration // the engine will issue jobs based on this interval. }
InterTimer is required by the Engine so it can read the intervals and time-outs.
type MockConfig ¶ added in v0.2.1
type MockConfig struct { MockName string MockTypeName string MockEndpoint string MockRoutePath string MockTimeout time.Duration MockInterval time.Duration MockBackoff int MockLogger logrus.FieldLogger }
MockConfig is used for instantiating a mock reader
func NewMockConfig ¶ added in v0.2.1
func NewMockConfig(name, typeName string, log logrus.FieldLogger, endpoint, routepath string, interval, timeout time.Duration, backoff int) (*MockConfig, error)
NewMockConfig returns a mocked version of the config
func (*MockConfig) Backoff ¶ added in v0.2.1
func (c *MockConfig) Backoff() int
Backoff returns the backoff
func (*MockConfig) Endpoint ¶ added in v0.2.1
func (c *MockConfig) Endpoint() string
Endpoint returns the endpoint
func (*MockConfig) Interval ¶ added in v0.2.1
func (c *MockConfig) Interval() time.Duration
Interval returns the interval
func (*MockConfig) Logger ¶ added in v0.2.1
func (c *MockConfig) Logger() logrus.FieldLogger
Logger returns the logger
func (*MockConfig) NewInstance ¶ added in v0.2.1
func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, resultChan chan *ReadJobResult, errChan chan<- communication.ErrorMessage) (DataReader, error)
NewInstance returns a mocked version of the config
func (*MockConfig) RoutePath ¶ added in v0.2.1
func (c *MockConfig) RoutePath() string
RoutePath returns the routepath
func (*MockConfig) Timeout ¶ added in v0.2.1
func (c *MockConfig) Timeout() time.Duration
Timeout returns the timeout
func (*MockConfig) TypeName ¶ added in v0.2.1
func (c *MockConfig) TypeName() string
TypeName returns the typename
type ReadJobResult ¶
type ReadJobResult struct { ID communication.JobID Time time.Time TypeName string Res io.ReadCloser Mapper datatype.Mapper //TODO: refactor this out }
ReadJobResult is constructed every time a new record is fetched. The time is set after the request was successfully read.
type SimpleReader ¶ added in v0.1.1
type SimpleReader struct { StartFunc func(communication.StopChannel) // contains filtered or unexported fields }
SimpleReader is useful for testing purposes.
Example ¶
log := lib.DiscardLogger() ctx := context.Background() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() jobChan := make(chan context.Context, 10) errorChan := make(chan communication.ErrorMessage, 10) resultChan := make(chan *ReadJobResult, 10) red, _ := NewSimpleReader(log, ts.URL, jobChan, resultChan, errorChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond) stop := make(communication.StopChannel) red.Start(ctx, stop) // Issuing a job red.JobChan() <- communication.NewReadJob(ctx) // Lets check the errors select { case <-errorChan: panic("Wasn't expecting any errors") default: fmt.Println("No errors reported") } res := <-red.ResultChan() // Let's read what it retrieved buf := new(bytes.Buffer) buf.ReadFrom(res.Res) fmt.Println("Result is:", buf.String()) done := make(chan struct{}) stop <- done <-done fmt.Println("Reader has finished") // We need to cancel the job now fmt.Println("All done!")
Output: No errors reported Result is: {"the key": "is the value!"} Reader has finished All done!
Example (Start) ¶
log := lib.DiscardLogger() ctx := context.Background() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() jobChan := make(chan context.Context) errorChan := make(chan communication.ErrorMessage) resultChan := make(chan *ReadJobResult) red, _ := NewSimpleReader(log, ts.URL, jobChan, resultChan, errorChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond) stop := make(communication.StopChannel) red.Start(ctx, stop) done := make(chan struct{}) stop <- done <-done fmt.Println("Reader has stopped its event loop!")
Output: Reader has stopped its event loop!
func NewSimpleReader ¶ added in v0.1.1
func NewSimpleReader( log logrus.FieldLogger, endpoint string, jobChan chan context.Context, resultChan chan *ReadJobResult, errorChan chan<- communication.ErrorMessage, name, typeName string, interval, timeout time.Duration, ) (*SimpleReader, error)
NewSimpleReader is a reader for using in tests
func (*SimpleReader) Interval ¶ added in v0.1.1
func (m *SimpleReader) Interval() time.Duration
Interval returns the interval
func (*SimpleReader) JobChan ¶ added in v0.1.1
func (m *SimpleReader) JobChan() chan context.Context
JobChan returns the job channel
func (*SimpleReader) Mapper ¶ added in v0.3.0
func (m *SimpleReader) Mapper() datatype.Mapper
Mapper returns the mapper
func (*SimpleReader) Name ¶ added in v0.1.1
func (m *SimpleReader) Name() string
Name returns the name
func (*SimpleReader) ResultChan ¶ added in v0.1.1
func (m *SimpleReader) ResultChan() chan *ReadJobResult
ResultChan returns the result channel
func (*SimpleReader) Start ¶ added in v0.1.1
func (m *SimpleReader) Start(ctx context.Context, stop communication.StopChannel)
Start executes the StartFunc if defined, otherwise continues normally
func (*SimpleReader) Timeout ¶ added in v0.1.1
func (m *SimpleReader) Timeout() time.Duration
Timeout returns the timeout
func (*SimpleReader) TypeName ¶ added in v0.2.1
func (m *SimpleReader) TypeName() string
TypeName returns the type name