reader

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2016 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ContextReader

type ContextReader interface {
	// Get reads from the url and returns DefaultClient errors.
	// This operation's deadline and cancellation depends on ctx.
	// You should close the Body when you finished reading.
	Get(ctx context.Context) (*http.Response, error)
}

ContextReader reads from the url with the specified context.

type CtxReader

type CtxReader struct {
	// contains filtered or unexported fields
}

CtxReader implements ContextReader interface.

func NewCtxReader

func NewCtxReader(url string) *CtxReader

NewCtxReader requires a sanitised url.

func (*CtxReader) Get

func (c *CtxReader) Get(ctx context.Context) (*http.Response, error)

Get uses GET verb for retreiving the data TODO: implement other verbs

type DataReader added in v0.1.2

type DataReader interface {
	InterTimer

	// The engine will send a signal to this channel to inform the reader when to read
	// from the target.
	// The engine never blocks if the reader is not able to process the requests.
	// It is the reader's job to provide a large enough channel, otherwise it will cause goroutine leakage.
	// The context might be canceled depending how the user sets the timeouts.
	JobChan() chan context.Context

	// The engine runs the reading job in another goroutine. Therefore it is the reader's job
	// not to send send a lot of results back to the engine, otherwise it might cause crash
	// on readers and the application itself.
	ResultChan() chan *ReadJobResult

	// The reader's loop should be inside a goroutine, and return a done channel.
	// This channel should be closed once its work is finished and the reader wants to quit.
	// When the context is timedout or canceled, the reader should return.
	Start(ctx context.Context) <-chan struct{}

	// TypeName is usually the application name.
	// Recorders should not intercept the engine for its decision, unless they have a
	// valid reason.
	TypeName() string

	// Name should return the representation string for this reader. Choose a very simple name.
	Name() string
}

DataReader recieves job requests to read from the target, and sends its success through the ResultChan channel.

type InterTimer added in v0.1.1

type InterTimer interface {
	Timeout() time.Duration
	Interval() time.Duration
}

InterTimer is required by the Engine so it can read the intervals and timeouts.

type MockConfig added in v0.2.1

type MockConfig struct {
	Name_      string
	TypeName_  string
	Endpoint_  string
	RoutePath_ string
	Timeout_   time.Duration
	Interval_  time.Duration
	Backoff_   int
	Logger_    logrus.FieldLogger
}

func NewMockConfig added in v0.2.1

func NewMockConfig(name, typeName string, log logrus.FieldLogger, endpoint, routepath string, interval, timeout time.Duration, backoff int) (*MockConfig, error)

func (*MockConfig) Backoff added in v0.2.1

func (c *MockConfig) Backoff() int

func (*MockConfig) Endpoint added in v0.2.1

func (c *MockConfig) Endpoint() string

func (*MockConfig) Interval added in v0.2.1

func (c *MockConfig) Interval() time.Duration

func (*MockConfig) Logger added in v0.2.1

func (c *MockConfig) Logger() logrus.FieldLogger

func (*MockConfig) Name added in v0.2.1

func (c *MockConfig) Name() string

func (*MockConfig) NewInstance added in v0.2.1

func (c *MockConfig) NewInstance(ctx context.Context, jobChan chan context.Context, resultChan chan *ReadJobResult) (DataReader, error)

func (*MockConfig) RoutePath added in v0.2.1

func (c *MockConfig) RoutePath() string

func (*MockConfig) Timeout added in v0.2.1

func (c *MockConfig) Timeout() time.Duration

func (*MockConfig) TypeName added in v0.2.1

func (c *MockConfig) TypeName() string

type MockCtxReader

type MockCtxReader struct {
	ContextReadFunc func(ctx context.Context) (*http.Response, error)
	// contains filtered or unexported fields
}

func NewMockCtxReader

func NewMockCtxReader(url string) *MockCtxReader

func (*MockCtxReader) Get

func (m *MockCtxReader) Get(ctx context.Context) (*http.Response, error)

type ReadJob

type ReadJob struct {
	Ctx context.Context

	// The reader should always send an error messge back, even if there is no errors.
	// In case there were no error, just send nil.
	Err chan error
}

ReadJob is sent with a context and a channel to read the errors back.

type ReadJobResult

type ReadJobResult struct {
	Time     time.Time
	TypeName string
	Res      io.ReadCloser
	Err      error
}

ReadJobResult is constructed everytime a new record is fetched. The time is set after the request was successfully read.

type SimpleReader added in v0.1.1

type SimpleReader struct {
	StartFunc func() chan struct{}
	// contains filtered or unexported fields
}

SimpleReader is useful for testing purposes.

Example
log := lib.DiscardLogger()
ctx, cancel := context.WithCancel(context.Background())
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	io.WriteString(w, `{"the key": "is the value!"}`)
}))
defer ts.Close()

jobChan := make(chan context.Context, 10)
resultChan := make(chan *ReadJobResult, 10)
ctxReader := NewCtxReader(ts.URL)
red, _ := NewSimpleReader(log, ctxReader, jobChan, resultChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond)
done := red.Start(ctx)

job, _ := context.WithCancel(ctx)
// Issueing a job
red.JobChan() <- job

// Now waiting for the results
res := <-red.ResultChan()
fmt.Println("Error:", res.Err)

// Let's read what it retreived
buf := new(bytes.Buffer)
buf.ReadFrom(res.Res)
fmt.Println("Result is:", buf.String())

// The reader should finish gracefully
cancel()
<-done
fmt.Println("Readed has finished")
// We need to cancel the job now
fmt.Println("All done!")
Output:

Error: <nil>
Result is: {"the key": "is the value!"}
Readed has finished
All done!
Example (Start1)
log := lib.DiscardLogger()
ctx, cancel := context.WithCancel(context.Background())

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) }))
defer ts.Close()

jobChan := make(chan context.Context)
resultChan := make(chan *ReadJobResult)

red, _ := NewSimpleReader(log, NewCtxReader(ts.URL), jobChan, resultChan, "reader_example", "reader_example", 10*time.Millisecond, 10*time.Millisecond)
done := red.Start(ctx)
fmt.Println("Reader has started its event loop!")

select {
case <-done:
	panic("Reader shouldn't have closed its done channel")
default:
	fmt.Println("Reader is working!")
}

cancel()
<-done
fmt.Println("Reader has stopped its event loop!")
Output:

Reader has started its event loop!
Reader is working!
Reader has stopped its event loop!

func NewSimpleReader added in v0.1.1

func NewSimpleReader(logger logrus.FieldLogger, ctxReader ContextReader, jobChan chan context.Context, resultChan chan *ReadJobResult, name, typeName string, interval, timeout time.Duration) (*SimpleReader, error)

func (*SimpleReader) Interval added in v0.1.1

func (m *SimpleReader) Interval() time.Duration

func (*SimpleReader) JobChan added in v0.1.1

func (m *SimpleReader) JobChan() chan context.Context

func (*SimpleReader) Name added in v0.1.1

func (m *SimpleReader) Name() string

func (*SimpleReader) ResultChan added in v0.1.1

func (m *SimpleReader) ResultChan() chan *ReadJobResult

func (*SimpleReader) Start added in v0.1.1

func (m *SimpleReader) Start(ctx context.Context) <-chan struct{}

func (*SimpleReader) Timeout added in v0.1.1

func (m *SimpleReader) Timeout() time.Duration

func (*SimpleReader) TypeName added in v0.2.1

func (m *SimpleReader) TypeName() string

Directories

Path Synopsis
Package self contains codes for recording expvars own metrics
Package self contains codes for recording expvars own metrics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL