reader

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2016 License: Apache-2.0 Imports: 7 Imported by: 10

Documentation

Overview

Package reader contains logic for reading from a provider. Any objects that implements the DataReader interface can be used in this system. The job should provide an io.ReadCloser and should produce a JSON object, otherwise the data will be rejected.

The data stream SHOULD not be closed. The engine WILL close it upon reading its contents.

Readers should ping their endpoint upon creation to make sure they can read from. Otherwise they should return an error indicating they cannot start.

When the context is cancelled, the reader should finish its job and return. The Time should be set when the data is read from the endpoint, otherwise it will lose its meaning. The engine will issue jobs based on the Interval, which is set in the configuration file.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ContextReader

type ContextReader interface {
	// Get reads from the url and returns DefaultClient errors.
	// This operation's deadline and cancellation depends on ctx.
	// You should close the Body when you finished reading.
	Get(ctx context.Context) (*http.Response, error)
}

ContextReader reads from the url with the specified context.

type CtxReader

type CtxReader struct {
	// contains filtered or unexported fields
}

CtxReader implements ContextReader interface.

func NewCtxReader

func NewCtxReader(url string) *CtxReader

NewCtxReader requires a sanitised url.

func (*CtxReader) Get

func (c *CtxReader) Get(ctx context.Context) (*http.Response, error)

Get uses GET verb for retrieving the data TODO: implement other verbs

Example (A)
ctxReader := NewCtxReader("bad url")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

res, err := ctxReader.Get(ctx)
fmt.Println(res)
fmt.Println(err != nil)
Output:

<nil>
true
Example (B)
resp := "my response"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, resp)
}))
defer ts.Close()

ctxReader := NewCtxReader(ts.URL)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
res, err := ctxReader.Get(ctx)
defer res.Body.Close()

buf := new(bytes.Buffer)
buf.ReadFrom(res.Body)

fmt.Println("err == nil:", err == nil)
fmt.Println("res != nil:", res != nil)
fmt.Println("Response body:", buf.String())
Output:

err == nil: true
res != nil: true
Response body: my response

type DataReader added in v0.1.2

type DataReader interface {
	InterTimer

	// The engine will send a signal to this channel to inform the reader when
	// it is time to read from the target.
	// The engine never blocks if the reader is not able to process the requests.
	// This channel will be provided by the Engine.
	// The context might be cancelled depending how the user sets the time-outs.
	// The UUID associated with this job is inside the context. Readers are
	// advised to use this ID and pass them along.
	JobChan() chan context.Context

	// The engine runs the reading job in another goroutine. The engine will provide this channel, however
	// the reader should not send send a lot of data back to the engine, otherwise it might cause crash
	// on readers and the application itself.
	ResultChan() chan *ReadJobResult

	// The reader's loop should be inside a goroutine.
	// This channel should be closed once the worker receives a stop signal
	// and its work is finished. The response to the stop signal should happen
	// otherwise it will hang the Engine around.
	// When the context is timed-out or cancelled, the reader should return.
	Start(ctx context.Context, stop communication.StopChannel)

	// Mapper should return an instance of the datatype mapper.
	// Engine uses this object to present the data to recorders.
	Mapper() datatype.Mapper

	// TypeName is usually the application name and is set by the user in the configuration file.
	// Recorders should not intercept the engine for its decision, unless they have a
	// valid reason.
	TypeName() string

	// Name should return the representation string for this reader. Choose a very simple and unique name.
	Name() string
}

DataReader receives job requests to read from the target, and sends its success through the ResultChan channel.

type InterTimer added in v0.1.1

type InterTimer interface {
	Timeout() time.Duration  // is used for timing out reading from the endpoint.
	Interval() time.Duration // the engine will issue jobs based on this interval.
}

InterTimer is required by the Engine so it can read the intervals and time-outs.

type MockConfig added in v0.2.1

type MockConfig struct {
	MockName      string
	MockTypeName  string
	MockEndpoint  string
	MockRoutePath string
	MockTimeout   time.Duration
	MockInterval  time.Duration
	MockBackoff   int
	MockLogger    logrus.FieldLogger
}

MockConfig is used for instantiating a mock reader

func NewMockConfig added in v0.2.1

func NewMockConfig(name, typeName string, log logrus.FieldLogger, endpoint, routepath string, interval, timeout time.Duration, backoff int) (*MockConfig, error)

NewMockConfig returns a mocked version of the config

func (*MockConfig) Backoff added in v0.2.1

func (c *MockConfig) Backoff() int

Backoff returns the backoff

func (*MockConfig) Endpoint added in v0.2.1

func (c *MockConfig) Endpoint() string

Endpoint returns the endpoint

func (*MockConfig) Interval added in v0.2.1

func (c *MockConfig) Interval() time.Duration

Interval returns the interval

func (*MockConfig) Logger added in v0.2.1

func (c *MockConfig) Logger() logrus.FieldLogger

Logger returns the logger

func (*MockConfig) Name added in v0.2.1

func (c *MockConfig) Name() string

Name returns the name

func (*MockConfig) NewInstance added in v0.2.1

func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, resultChan chan *ReadJobResult, errChan chan<- communication.ErrorMessage) (DataReader, error)

NewInstance returns a mocked version of the config

func (*MockConfig) RoutePath added in v0.2.1

func (c *MockConfig) RoutePath() string

RoutePath returns the routepath

func (*MockConfig) Timeout added in v0.2.1

func (c *MockConfig) Timeout() time.Duration

Timeout returns the timeout

func (*MockConfig) TypeName added in v0.2.1

func (c *MockConfig) TypeName() string

TypeName returns the typename

type MockCtxReader

type MockCtxReader struct {
	ContextReadFunc func(ctx context.Context) (*http.Response, error)
	// contains filtered or unexported fields
}

MockCtxReader is the mocked version of CtxReader

func NewMockCtxReader

func NewMockCtxReader(url string) *MockCtxReader

NewMockCtxReader instantiates a MockCtxReader object and sets the url to be read from ContextReadFunc

func (*MockCtxReader) Get

func (m *MockCtxReader) Get(ctx context.Context) (*http.Response, error)

Get calls ContextReadFunc with ctx

type ReadJobResult

type ReadJobResult struct {
	ID       communication.JobID
	Time     time.Time
	TypeName string
	Res      io.ReadCloser
	Mapper   datatype.Mapper //TODO: refactor this out
}

ReadJobResult is constructed every time a new record is fetched. The time is set after the request was successfully read.

type SimpleReader added in v0.1.1

type SimpleReader struct {
	StartFunc func(communication.StopChannel)
	// contains filtered or unexported fields
}

SimpleReader is useful for testing purposes.

Example
log := lib.DiscardLogger()
ctx := context.Background()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	io.WriteString(w, `{"the key": "is the value!"}`)
}))
defer ts.Close()

jobChan := make(chan context.Context, 10)
errorChan := make(chan communication.ErrorMessage, 10)
resultChan := make(chan *ReadJobResult, 10)
ctxReader := NewCtxReader(ts.URL)
red, _ := NewSimpleReader(log, ctxReader, jobChan, resultChan, errorChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond)
stop := make(communication.StopChannel)
red.Start(ctx, stop)

// Issuing a job
red.JobChan() <- communication.NewReadJob(ctx)

// Lets check the errors
select {
case <-errorChan:
	panic("Wasn't expecting any errors")
default:
	fmt.Println("No errors reported")
}

res := <-red.ResultChan()
// Let's read what it retrieved
buf := new(bytes.Buffer)
buf.ReadFrom(res.Res)
fmt.Println("Result is:", buf.String())

done := make(chan struct{})
stop <- done
<-done
fmt.Println("Reader has finished")
// We need to cancel the job now
fmt.Println("All done!")
Output:

No errors reported
Result is: {"the key": "is the value!"}
Reader has finished
All done!
Example (Start)
log := lib.DiscardLogger()
ctx := context.Background()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) }))
defer ts.Close()

jobChan := make(chan context.Context)
errorChan := make(chan communication.ErrorMessage)
resultChan := make(chan *ReadJobResult)

red, _ := NewSimpleReader(log, NewCtxReader(ts.URL), jobChan, resultChan, errorChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond)
stop := make(communication.StopChannel)
red.Start(ctx, stop)

done := make(chan struct{})
stop <- done
<-done
fmt.Println("Reader has stopped its event loop!")
Output:

Reader has stopped its event loop!

func NewSimpleReader added in v0.1.1

func NewSimpleReader(
	log logrus.FieldLogger,
	ctxReader ContextReader,
	jobChan chan context.Context,
	resultChan chan *ReadJobResult,
	errorChan chan<- communication.ErrorMessage,
	name,
	typeName string,
	interval,
	timeout time.Duration,
) (*SimpleReader, error)

NewSimpleReader is a reader for using in tests

func (*SimpleReader) ErrorChan added in v0.4.0

func (m *SimpleReader) ErrorChan() chan<- communication.ErrorMessage

ErrorChan returns the errorchan

func (*SimpleReader) Interval added in v0.1.1

func (m *SimpleReader) Interval() time.Duration

Interval returns the interval

func (*SimpleReader) JobChan added in v0.1.1

func (m *SimpleReader) JobChan() chan context.Context

JobChan returns the jobchan

func (*SimpleReader) Mapper added in v0.3.0

func (m *SimpleReader) Mapper() datatype.Mapper

Mapper returns the mapper

func (*SimpleReader) Name added in v0.1.1

func (m *SimpleReader) Name() string

Name returns the name

func (*SimpleReader) ResultChan added in v0.1.1

func (m *SimpleReader) ResultChan() chan *ReadJobResult

ResultChan returns the resultchan

func (*SimpleReader) Start added in v0.1.1

Start executes the StartFunc if defined, otherwise continues normally

func (*SimpleReader) Timeout added in v0.1.1

func (m *SimpleReader) Timeout() time.Duration

Timeout returns the timeout

func (*SimpleReader) TypeName added in v0.2.1

func (m *SimpleReader) TypeName() string

TypeName returns the typename

Directories

Path Synopsis
Package expvar contains logic to read from an expvar provide.
Package expvar contains logic to read from an expvar provide.
Package self contains codes for recording expvastic's own metrics.
Package self contains codes for recording expvastic's own metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL