reader

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2016 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ContextReader

type ContextReader interface {
	// Get reads from the url and returns DefaultClient errors.
	// This operation's deadline and cancellation depends on ctx.
	// You should close the Body when you finished reading.
	Get(ctx context.Context) (*http.Response, error)
}

ContextReader reads from the url with the specified context.

type CtxReader

type CtxReader struct {
	// contains filtered or unexported fields
}

CtxReader implements ContextReader interface.

func NewCtxReader

func NewCtxReader(url string) *CtxReader

NewCtxReader requires a sanitised url.

func (*CtxReader) Get

func (c *CtxReader) Get(ctx context.Context) (*http.Response, error)

Get uses GET verb for retreiving the data TODO: implement other verbs

type DataReader added in v0.1.2

type DataReader interface {
	InterTimer

	// The engine will send a signal to this channel to inform the reader when to read
	// from the target.
	// The engine never blocks if the reader is not able to process the requests.
	// It is the reader's job to provide a large enough channel, otherwise it will cause goroutine leakage.
	// The context might be canceled depending how the user sets the timeouts.
	JobChan() chan context.Context

	// The engine runs the reading job in another goroutine. Therefore it is the reader's job
	// not to send send a lot of results back to the engine, otherwise it might cause crash
	// on readers and the application itself.
	ResultChan() chan *ReadJobResult

	// The reader's loop should be inside a goroutine, and return a done channel.
	// This channel should be closed once its work is finished and the reader wants to quit.
	// When the context is timedout or canceled, the reader should return.
	Start(ctx context.Context) chan struct{}

	// Name should return the representation string for this reader. Choose a very simple name.
	Name() string
}

DataReader recieves job requests to read from the target, and sends its success through the ResultChan channel.

type InterTimer added in v0.1.1

type InterTimer interface {
	Timeout() time.Duration
	Interval() time.Duration
}

InterTimer is required by the Engine so it can read the intervals and timeouts.

type MockCtxReader

type MockCtxReader struct {
	ContextReadFunc func(ctx context.Context) (*http.Response, error)
	// contains filtered or unexported fields
}

func NewMockCtxReader

func NewMockCtxReader(url string) *MockCtxReader

func (*MockCtxReader) Get

func (m *MockCtxReader) Get(ctx context.Context) (*http.Response, error)

type ReadJob

type ReadJob struct {
	Ctx context.Context

	// The reader should always send an error messge back, even if there is no errors.
	// In case there were no error, just send nil.
	Err chan error
}

ReadJob is sent with a context and a channel to read the errors back.

type ReadJobResult

type ReadJobResult struct {
	Time time.Time
	Res  io.ReadCloser
	Err  error
}

ReadJobResult is constructed everytime a new record is fetched. The time is set after the request was successfully read.

type SimpleReader added in v0.1.1

type SimpleReader struct {
	StartFunc func() chan struct{}
	// contains filtered or unexported fields
}

SimpleReader is useful for testing purposes.

Example
log := lib.DiscardLogger()
ctx, cancel := context.WithCancel(context.Background())
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	io.WriteString(w, `{"the key": "is the value!"}`)
}))
defer ts.Close()

ctxReader := NewCtxReader(ts.URL)
rdr, _ := NewSimpleReader(log, ctxReader, "reader_example", 10*time.Millisecond, 10*time.Millisecond)
done := rdr.Start(ctx)

job, _ := context.WithCancel(ctx)
// Issueing a job
rdr.JobChan() <- job

// Now waiting for the results
res := <-rdr.ResultChan()
fmt.Println("Error:", res.Err)

// Let's read what it retreived
buf := new(bytes.Buffer)
buf.ReadFrom(res.Res)
fmt.Println("Result is:", buf.String())

// The reader should finish gracefully
cancel()
<-done
fmt.Println("Readed has finished")
// We need to cancel the job now
fmt.Println("All done!")
Output:

Error: <nil>
Result is: {"the key": "is the value!"}
Readed has finished
All done!

func NewSimpleReader added in v0.1.1

func NewSimpleReader(logger logrus.FieldLogger, ctxReader ContextReader, name string, interval, timeout time.Duration) (*SimpleReader, error)

func (*SimpleReader) Interval added in v0.1.1

func (m *SimpleReader) Interval() time.Duration

func (*SimpleReader) JobChan added in v0.1.1

func (m *SimpleReader) JobChan() chan context.Context

func (*SimpleReader) Name added in v0.1.1

func (m *SimpleReader) Name() string

func (*SimpleReader) ResultChan added in v0.1.1

func (m *SimpleReader) ResultChan() chan *ReadJobResult

func (*SimpleReader) Start added in v0.1.1

func (m *SimpleReader) Start(ctx context.Context) chan struct{}

func (*SimpleReader) Timeout added in v0.1.1

func (m *SimpleReader) Timeout() time.Duration

Directories

Path Synopsis
Package self contains codes for recording expvars own metrics
Package self contains codes for recording expvars own metrics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL