Documentation
¶
Index ¶
- type ContextReader
- type CtxReader
- type DataReader
- type InterTimer
- type MockCtxReader
- type ReadJob
- type ReadJobResult
- type SimpleReader
- func (m *SimpleReader) Interval() time.Duration
- func (m *SimpleReader) JobChan() chan context.Context
- func (m *SimpleReader) Name() string
- func (m *SimpleReader) ResultChan() chan *ReadJobResult
- func (m *SimpleReader) Start(ctx context.Context) chan struct{}
- func (m *SimpleReader) Timeout() time.Duration
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ContextReader ¶
type ContextReader interface { // Get reads from the url and returns DefaultClient errors. // This operation's deadline and cancellation depends on ctx. // You should close the Body when you finished reading. Get(ctx context.Context) (*http.Response, error) }
ContextReader reads from the url with the specified context.
type CtxReader ¶
type CtxReader struct {
// contains filtered or unexported fields
}
CtxReader implements ContextReader interface.
type DataReader ¶ added in v0.1.2
type DataReader interface { InterTimer // The engine will send a signal to this channel to inform the reader when to read // from the target. // The engine never blocks if the reader is not able to process the requests. // It is the reader's job to provide a large enough channel, otherwise it will cause goroutine leakage. // The context might be canceled depending how the user sets the timeouts. JobChan() chan context.Context // The engine runs the reading job in another goroutine. Therefore it is the reader's job // not to send send a lot of results back to the engine, otherwise it might cause crash // on readers and the application itself. ResultChan() chan *ReadJobResult // The reader's loop should be inside a goroutine, and return a done channel. // This channel should be closed once its work is finished and the reader wants to quit. // When the context is timedout or canceled, the reader should return. Start(ctx context.Context) chan struct{} // Name should return the representation string for this reader. Choose a very simple name. Name() string }
DataReader recieves job requests to read from the target, and sends its success through the ResultChan channel.
type InterTimer ¶ added in v0.1.1
InterTimer is required by the Engine so it can read the intervals and timeouts.
type MockCtxReader ¶
type MockCtxReader struct { ContextReadFunc func(ctx context.Context) (*http.Response, error) // contains filtered or unexported fields }
func NewMockCtxReader ¶
func NewMockCtxReader(url string) *MockCtxReader
type ReadJob ¶
type ReadJob struct { Ctx context.Context // The reader should always send an error messge back, even if there is no errors. // In case there were no error, just send nil. Err chan error }
ReadJob is sent with a context and a channel to read the errors back.
type ReadJobResult ¶
type ReadJobResult struct { Time time.Time Res io.ReadCloser Err error }
ReadJobResult is constructed everytime a new record is fetched. The time is set after the request was successfully read.
type SimpleReader ¶ added in v0.1.1
type SimpleReader struct { StartFunc func() chan struct{} // contains filtered or unexported fields }
SimpleReader is useful for testing purposes.
Example ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() ctxReader := NewCtxReader(ts.URL) rdr, _ := NewSimpleReader(log, ctxReader, "reader_example", 10*time.Millisecond, 10*time.Millisecond) done := rdr.Start(ctx) job, _ := context.WithCancel(ctx) // Issueing a job rdr.JobChan() <- job // Now waiting for the results res := <-rdr.ResultChan() fmt.Println("Error:", res.Err) // Let's read what it retreived buf := new(bytes.Buffer) buf.ReadFrom(res.Res) fmt.Println("Result is:", buf.String()) // The reader should finish gracefully cancel() <-done fmt.Println("Readed has finished") // We need to cancel the job now fmt.Println("All done!")
Output: Error: <nil> Result is: {"the key": "is the value!"} Readed has finished All done!
func NewSimpleReader ¶ added in v0.1.1
func NewSimpleReader(logger logrus.FieldLogger, ctxReader ContextReader, name string, interval, timeout time.Duration) (*SimpleReader, error)
func (*SimpleReader) Interval ¶ added in v0.1.1
func (m *SimpleReader) Interval() time.Duration
func (*SimpleReader) JobChan ¶ added in v0.1.1
func (m *SimpleReader) JobChan() chan context.Context
func (*SimpleReader) Name ¶ added in v0.1.1
func (m *SimpleReader) Name() string
func (*SimpleReader) ResultChan ¶ added in v0.1.1
func (m *SimpleReader) ResultChan() chan *ReadJobResult
func (*SimpleReader) Start ¶ added in v0.1.1
func (m *SimpleReader) Start(ctx context.Context) chan struct{}
func (*SimpleReader) Timeout ¶ added in v0.1.1
func (m *SimpleReader) Timeout() time.Duration
Click to show internal directories.
Click to hide internal directories.