Documentation
¶
Index ¶
- type ContextReader
- type CtxReader
- type DataReader
- type InterTimer
- type MockConfig
- func (c *MockConfig) Backoff() int
- func (c *MockConfig) Endpoint() string
- func (c *MockConfig) Interval() time.Duration
- func (c *MockConfig) Logger() logrus.FieldLogger
- func (c *MockConfig) Name() string
- func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, ...) (DataReader, error)
- func (c *MockConfig) RoutePath() string
- func (c *MockConfig) Timeout() time.Duration
- func (c *MockConfig) TypeName() string
- type MockCtxReader
- type ReadJob
- type ReadJobResult
- type SimpleReader
- func (m *SimpleReader) Interval() time.Duration
- func (m *SimpleReader) JobChan() chan context.Context
- func (m *SimpleReader) Name() string
- func (m *SimpleReader) ResultChan() chan *ReadJobResult
- func (m *SimpleReader) Start(ctx context.Context) <-chan struct{}
- func (m *SimpleReader) Timeout() time.Duration
- func (m *SimpleReader) TypeName() string
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ContextReader ¶
type ContextReader interface { // Get reads from the url and returns DefaultClient errors. // This operation's deadline and cancellation depends on ctx. // You should close the Body when you finished reading. Get(ctx context.Context) (*http.Response, error) }
ContextReader reads from the url with the specified context.
type CtxReader ¶
type CtxReader struct {
// contains filtered or unexported fields
}
CtxReader implements ContextReader interface.
type DataReader ¶ added in v0.1.2
type DataReader interface { InterTimer // The engine will send a signal to this channel to inform the reader when to read // from the target. // The engine never blocks if the reader is not able to process the requests. // It is the reader's job to provide a large enough channel, otherwise it will cause goroutine leakage. // The context might be canceled depending how the user sets the timeouts. JobChan() chan context.Context // The engine runs the reading job in another goroutine. Therefore it is the reader's job // not to send send a lot of results back to the engine, otherwise it might cause crash // on readers and the application itself. ResultChan() chan *ReadJobResult // The reader's loop should be inside a goroutine, and return a done channel. // This channel should be closed once its work is finished and the reader wants to quit. // When the context is timedout or canceled, the reader should return. Start(ctx context.Context) <-chan struct{} // TypeName is usually the application name. // Recorders should not intercept the engine for its decision, unless they have a // valid reason. TypeName() string // Name should return the representation string for this reader. Choose a very simple name. Name() string }
DataReader recieves job requests to read from the target, and sends its success through the ResultChan channel.
type InterTimer ¶ added in v0.1.1
InterTimer is required by the Engine so it can read the intervals and timeouts.
type MockConfig ¶ added in v0.2.1
type MockConfig struct { Name_ string TypeName_ string Endpoint_ string RoutePath_ string Timeout_ time.Duration Interval_ time.Duration Backoff_ int Logger_ logrus.FieldLogger }
func NewMockConfig ¶ added in v0.2.1
func NewMockConfig(name, typeName string, log logrus.FieldLogger, endpoint, routepath string, interval, timeout time.Duration, backoff int) (*MockConfig, error)
func (*MockConfig) Backoff ¶ added in v0.2.1
func (c *MockConfig) Backoff() int
func (*MockConfig) Endpoint ¶ added in v0.2.1
func (c *MockConfig) Endpoint() string
func (*MockConfig) Interval ¶ added in v0.2.1
func (c *MockConfig) Interval() time.Duration
func (*MockConfig) Logger ¶ added in v0.2.1
func (c *MockConfig) Logger() logrus.FieldLogger
func (*MockConfig) Name ¶ added in v0.2.1
func (c *MockConfig) Name() string
func (*MockConfig) NewInstance ¶ added in v0.2.1
func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, resultChan chan *ReadJobResult) (DataReader, error)
func (*MockConfig) RoutePath ¶ added in v0.2.1
func (c *MockConfig) RoutePath() string
func (*MockConfig) Timeout ¶ added in v0.2.1
func (c *MockConfig) Timeout() time.Duration
func (*MockConfig) TypeName ¶ added in v0.2.1
func (c *MockConfig) TypeName() string
type MockCtxReader ¶
type MockCtxReader struct { ContextReadFunc func(ctx context.Context) (*http.Response, error) // contains filtered or unexported fields }
func NewMockCtxReader ¶
func NewMockCtxReader(url string) *MockCtxReader
type ReadJob ¶
type ReadJob struct { Ctx context.Context // The reader should always send an error messge back, even if there is no errors. // In case there were no error, just send nil. Err chan error }
ReadJob is sent with a context and a channel to read the errors back.
type ReadJobResult ¶
ReadJobResult is constructed everytime a new record is fetched. The time is set after the request was successfully read.
type SimpleReader ¶ added in v0.1.1
type SimpleReader struct { StartFunc func() chan struct{} // contains filtered or unexported fields }
SimpleReader is useful for testing purposes.
Example ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() jobChan := make(chan context.Context, 10) resultChan := make(chan *ReadJobResult, 10) ctxReader := NewCtxReader(ts.URL) red, _ := NewSimpleReader(log, ctxReader, jobChan, resultChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond) done := red.Start(ctx) job, _ := context.WithCancel(ctx) // Issueing a job red.JobChan() <- job // Now waiting for the results res := <-red.ResultChan() fmt.Println("Error:", res.Err) // Let's read what it retreived buf := new(bytes.Buffer) buf.ReadFrom(res.Res) fmt.Println("Result is:", buf.String()) // The reader should finish gracefully cancel() <-done fmt.Println("Readed has finished") // We need to cancel the job now fmt.Println("All done!")
Output: Error: <nil> Result is: {"the key": "is the value!"} Readed has finished All done!
Example (Start1) ¶
log := lib.DiscardLogger() ctx, cancel := context.WithCancel(context.Background()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() jobChan := make(chan context.Context) resultChan := make(chan *ReadJobResult) red, _ := NewSimpleReader(log, NewCtxReader(ts.URL), jobChan, resultChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond) done := red.Start(ctx) fmt.Println("Reader has started its event loop!") select { case <-done: panic("Reader shouldn't have closed its done channel") default: fmt.Println("Reader is working!") } cancel() <-done fmt.Println("Reader has stopped its event loop!")
Output: Reader has started its event loop! Reader is working! Reader has stopped its event loop!
func NewSimpleReader ¶ added in v0.1.1
func NewSimpleReader(logger logrus.FieldLogger, ctxReader ContextReader, jobChan chan context.Context, resultChan chan *ReadJobResult, name, typeName string, interval, timeout time.Duration) (*SimpleReader, error)
func (*SimpleReader) Interval ¶ added in v0.1.1
func (m *SimpleReader) Interval() time.Duration
func (*SimpleReader) JobChan ¶ added in v0.1.1
func (m *SimpleReader) JobChan() chan context.Context
func (*SimpleReader) Name ¶ added in v0.1.1
func (m *SimpleReader) Name() string
func (*SimpleReader) ResultChan ¶ added in v0.1.1
func (m *SimpleReader) ResultChan() chan *ReadJobResult
func (*SimpleReader) Start ¶ added in v0.1.1
func (m *SimpleReader) Start(ctx context.Context) <-chan struct{}
func (*SimpleReader) Timeout ¶ added in v0.1.1
func (m *SimpleReader) Timeout() time.Duration
func (*SimpleReader) TypeName ¶ added in v0.2.1
func (m *SimpleReader) TypeName() string
Source Files
¶
Click to show internal directories.
Click to hide internal directories.