core

package
v1.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2021 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Client = NewClient(ClientOptions{
	UserAgent:  "whitewater.guide robot",
	Timeout:    60,
	WithoutTLS: false,
	Proxy:      "",
}, nil)

Client is default client for scripts It will be reinitialized during server creation This default value will be used in tests

View Source
var ErrScriptNotFound = errors.New("script not found")

ErrScriptNotFound means that script is not registered in registry

Functions

func Cancelable

func Cancelable(ctx context.Context, c <-chan *Measurement) <-chan *Measurement

Cancelable is helper function that cancels channel once context is canceled

func CodeFromName added in v1.12.0

func CodeFromName(name string) string

CodeFromName is used to generate stable ids for gauges that only have name

func FilterMeasurements

func FilterMeasurements(ctx context.Context, in <-chan *Measurement, logger *logrus.Entry, filters ...MeasurementsFilter) <-chan *Measurement

FilterMeasurements filters channel of measurements using any number of filters It also supports context cancelation

func GenFromSlice

func GenFromSlice(ctx context.Context, measurements []Measurement) <-chan *Measurement

GenFromSlice creates channel of Measurements from slice

func NPrecision

func NPrecision(value float64, precision int) nulltype.NullFloat64

NPrecision is like Precision but for nullables

func NTruncCoord

func NTruncCoord(value float64) nulltype.NullFloat64

NTruncCoord is same as TruncCoord but for nullable values

func Precision

func Precision(value float64, precision int) float64

Precision truncates float64 to given precision

func SinkToSlice

func SinkToSlice(ctx context.Context, in <-chan *Measurement) <-chan []*Measurement

SinkToSlice takes channel of measurements and produces channel that will receive slice of measurements once input channel is closed or context is canceled

func Split

func Split(ctx context.Context, in <-chan *Measurement) (<-chan *Measurement, <-chan *Measurement)

Split splits one channel of measurements into two

func ToEPSG4326

func ToEPSG4326(x, y float64, projDefinition string) (float64, float64, error)

ToEPSG4326 converts coordinate from given coordinate system definition to EPSG4326 Definition can be obtained using following url https://epsg.io/<EPSG_CODE>.proj4 For example https://epsg.io/31257.proj4 Human-friendly page is https://epsg.io/31257 Coordinates are rounded to 5-digits precision (~1 meter) (https://en.wikipedia.org/wiki/Decimal_degrees)

func TruncCoord

func TruncCoord(value float64) float64

TruncCoord truncates locations to 5 digits precision [link](https://en.wikipedia.org/wiki/Decimal_degrees)

Types

type BatchableOptions added in v1.7.0

type BatchableOptions interface {
	GetBatchSize() int
}

BatchableOptions is an interface that options of batched script must implement

type CSVStreamOptions

type CSVStreamOptions struct {
	// CSV separator symbol
	Comma rune
	// Decoder, defaults to UTF-8
	Decoder *encoding.Decoder
	// Number of rows at the beginning of file that do not contain data
	HeaderHeight int
	// Number of colums. If a row contains different number of columns, the stream will stop with error
	// For header rows this is ignored
	NumColumns int
	// Extra HTTPClient options
	*RequestOptions
}

CSVStreamOptions contains commons options for streaming data from CSV files

type ClientOptions

type ClientOptions struct {
	UserAgent  string `desc:"User agent for requests sent from scripts. Leave empty to use fake browser agent"`
	Timeout    int64  `desc:"Request timeout in seconds"`
	WithoutTLS bool   `desc:"Disable TLS for some gauges"`
	Proxy      string `desc:"HTTP client proxy (for example, you can use mitm for local development)"`
}

ClientOptions are HTTPClient that can be passed as args at startup

type CodesFilter added in v1.25.0

type CodesFilter struct {
	Codes StringSet
}

CodesFilter return measurements filter that accepts only measuerments with given gauge codes

type Doc added in v1.9.0

type Doc struct {
	*goquery.Document
	// contains filtered or unexported fields
}

Doc extends goquery.Document with a Close() method

func (*Doc) Close added in v1.9.0

func (doc *Doc) Close()

Close closes underlying resp body

type Error

type Error struct {
	Err error                  `json:"-"`
	Msg string                 `json:"error"`
	Ctx map[string]interface{} `json:"-"`
}

func NewErr

func NewErr(err error, ctx ...interface{}) *Error

func WrapErr

func WrapErr(err error, msg string, ctx ...interface{}) *Error

func (*Error) Error

func (e *Error) Error() string

func (*Error) Unwrap

func (e *Error) Unwrap() error

func (*Error) With

func (e *Error) With(key string, val interface{}) *Error

func (*Error) WithMap

func (e *Error) WithMap(m map[string]interface{}) *Error

type ErrorResponse

type ErrorResponse struct {
	*Error
	HTTPStatusCode int    `json:"-"`
	StatusText     string `json:"status,omitempty"` // user-level status message
	ReqID          string `json:"request_id,omitempty"`
}

func NewErrorResponse

func NewErrorResponse(e error, message string, code int) *ErrorResponse

func (*ErrorResponse) Render

type ErrorType

type ErrorType int

type Gauge

type Gauge struct {
	GaugeID
	Name string `json:"name"`
	// This is webpage URL from original source
	URL string `json:"url,omitempty"`
	// Water level unit, e.g, "m"/"ft"/"cm"
	LevelUnit string `json:"levelUnit,omitempty"`
	// Water flow/discharge unit, e.g. "cfs"/"m3/s"
	FlowUnit string `json:"flowUnit,omitempty"`
	// Station location, if known
	Location *Location `json:"location,omitempty"`
}

Gauge represents gauge/station from upstream source

func GenerateRandGauge

func GenerateRandGauge(script string, index int) Gauge

GenerateRandGauge generates random gauge for testing purposes

type GaugeID

type GaugeID struct {
	// id of script from gorge's script registry
	Script string `json:"script"`
	// unique gauge code from upstream
	// if no code is provided (ex. Georgia) script must generate it's own stable unique codes
	Code string `json:"code"`
}

GaugeID identifies gauge using script and code pair

func (*GaugeID) Less

func (id *GaugeID) Less(other *GaugeID) bool

Less is helper for sorting gauges

type Gauges

type Gauges []Gauge

Gauges is slice of Gauge with helper methods for sorting

func GaugeSinkToSlice added in v1.6.0

func GaugeSinkToSlice(gauges chan *Gauge, errs chan error) (Gauges, error)

GaugeSinkToSlice converts gauges channels to struct and error

func (Gauges) Len

func (g Gauges) Len() int

func (Gauges) Less

func (g Gauges) Less(i, j int) bool

func (Gauges) Swap

func (g Gauges) Swap(i, j int)

type HTTPClient

type HTTPClient struct {
	*http.Client
	PersistentJar *jar.Jar
	UserAgent     string
	// contains filtered or unexported fields
}

HTTPClient is like default client, but with some conveniency methods for common scenarios

func NewClient

func NewClient(opts ClientOptions, logger *logrus.Entry) *HTTPClient

NewClient constructs new HTTPClient with options

func (*HTTPClient) Do

func (client *HTTPClient) Do(req *http.Request, opts *RequestOptions) (*http.Response, error)

Do is same as http.Client.Get, but sets extra headers

func (*HTTPClient) EnsureCookie

func (client *HTTPClient) EnsureCookie(fromURL string, force bool) error

EnsureCookie makes sure that cookies from given URL are present and will be sent with further requests Some scripts will not return correct data unless cookies are present

func (*HTTPClient) Get

func (client *HTTPClient) Get(url string, opts *RequestOptions) (resp *http.Response, err error)

Get is same as http.Client.Get, but sets extra headers

func (*HTTPClient) GetAsDoc added in v1.9.0

func (client *HTTPClient) GetAsDoc(url string, opts *RequestOptions) (*Doc, error)

GetAsDoc is shortcut for http.Client.Get to get HTML docs for goquery.

func (*HTTPClient) GetAsJSON

func (client *HTTPClient) GetAsJSON(url string, dest interface{}, opts *RequestOptions) error

GetAsJSON is shortcut for http.Client.Get to get response as JSON

func (*HTTPClient) GetAsString

func (client *HTTPClient) GetAsString(url string, opts *RequestOptions) (string, error)

GetAsString is shortcut for http.Client.Get to get response as string

func (*HTTPClient) GetAsXML

func (client *HTTPClient) GetAsXML(url string, dest interface{}, opts *RequestOptions) error

GetAsXML is shortcut for http.Client.Get to get response as XML

func (*HTTPClient) PostForm

func (client *HTTPClient) PostForm(url string, data url.Values, opts *RequestOptions) (resp *http.Response, req *http.Request, err error)

PostForm is like http.Client.PostForm but wit extra options

func (*HTTPClient) PostFormAsString

func (client *HTTPClient) PostFormAsString(url string, data url.Values, opts *RequestOptions) (result string, req *http.Request, err error)

PostFormAsString shortcut for http.Client.PostForm to get response as string

func (*HTTPClient) SaveCookies

func (client *HTTPClient) SaveCookies()

SaveCookies dumps cookies to disk, so in case of service restart they are not lost

func (*HTTPClient) StreamCSV

func (client *HTTPClient) StreamCSV(url string, handler func(row []string) error, opts CSVStreamOptions) error

StreamCSV reads CSV file from given URL and streams it by calling handler for each row

type HTime

type HTime struct {
	time.Time
}

HTime is just a wrapped around time which marshals/unmarshalt to/from RFC3339 string and can be stored in DB

func (HTime) MarshalJSON

func (t HTime) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler interface

func (*HTime) Scan

func (t *HTime) Scan(src interface{}) error

Scan implements sql.Scanner interface

func (*HTime) UnmarshalJSON

func (t *HTime) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler interface

func (HTime) Value

func (t HTime) Value() (driver.Value, error)

Value implements driver Valuer interface.

type HarvestMode

type HarvestMode int

HarvestMode determines how the schedule for script is generated

const (
	// AllAtOnce scripts harvest all gauges in one batch. Such sources usually provide one file with latest measurements for every gauge
	AllAtOnce HarvestMode = iota
	// OneByOne scripts harvest only one gauge at a time.
	// For such scripts a schedule will be generated so that all the gauges are uniformly harvested during period of 1 hour
	OneByOne
	// Batched scripts are like one by one, except that multiple codes are provided to Harvest function
	// Options must implement BatchableOptions
	Batched
)

func (*HarvestMode) MarshalJSON

func (m *HarvestMode) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler interface

func (HarvestMode) String

func (m HarvestMode) String() string

func (HarvestMode) TSName

func (m HarvestMode) TSName() string

TSName is required to generate typescript enum

func (*HarvestMode) UnmarshalJSON

func (m *HarvestMode) UnmarshalJSON(bytes []byte) error

UnmarshalJSON implements json.Unmarshaler interface

type JobDescription

type JobDescription struct {
	// UUID. Represents paired entity id in client's service and are generated on client side
	ID string `json:"id" structs:"id"`
	// id of script from gorge's script registry
	Script string `json:"script" structs:"script"`
	// a map with keys being gauge codes and value being pieces of json representing harvest options for that gauge
	// pass `{}` or `null` if no options are given for the gauge
	Gauges map[string]json.RawMessage `json:"gauges" structs:"codes" ts_type:"{[key: string]: any} | null"`
	// cron expression, ignored for OneByOne scripts. AllAtOnce script will run on this cron schedule
	Cron string `json:"cron" structs:"cron"`
	// harvest options for the entire script. For example, upstream credentials
	Options json.RawMessage `json:"options" structs:"options,omitempty" ts_type:"{[key: string]: any} | null"`
	// When used as input this must be nil
	Status *Status `json:"status,omitempty"`
}

JobDescription represents task that will run on schedule and harvest measurements from some source

func (*JobDescription) Bind

func (j *JobDescription) Bind(r *http.Request) error

Bind implements go-chi Binder interface

func (*JobDescription) Scan

func (j *JobDescription) Scan(src interface{}) error

Scan implements sql nner interface https://golang.org/pkg/database/sql/#Scanner

type JobScheduler

type JobScheduler interface {
	Start()
	Stop()
	AddJob(description JobDescription) error
	DeleteJob(jobID string) error
	// ListNext returns map where values are times when scripts will run next time
	// If jobID is empty, ListNext lists next times for all running scripts. And map keys are script ids
	// If jobID is not empty, this will return next times for all codes of this one-by-one job, and map keys are gauge codes
	ListNext(jobID string) map[string]HTime
}

JobScheduler is responsible for running harvest jobs on schedule

type LatestFilter added in v1.25.0

type LatestFilter struct {
	Latest map[GaugeID]Measurement
	After  time.Time
}

LatestFilter returns measurements filter that accepts only measurements that are either - newer than latest measurement for same gauge in the map - or if no latest measurement is found, are no older than "After" date

type Location

type Location struct {
	Latitude  float64 `json:"latitude"`
	Longitude float64 `json:"longitude"`
	Altitude  float64 `json:"altitude,omitempty"`
}

Location is EPSG4326 coordinate

type LoggingScript

type LoggingScript struct {
	// contains filtered or unexported fields
}

LoggingScript is used to inject loggers into scripts It must be embedded into concrete scripts

func (*LoggingScript) GetLogger

func (s *LoggingScript) GetLogger() *logrus.Entry

GetLogger returns injected logger, or discarding logger if none has been injected

func (*LoggingScript) SetLogger

func (s *LoggingScript) SetLogger(logger *logrus.Entry)

SetLogger injects logger into script

type Measurement

type Measurement struct {
	GaugeID
	Timestamp HTime `json:"timestamp" ts_type:"string"`
	// Level is null when gauge doesn't provide it or is temporary broken
	Level nulltype.NullFloat64 `json:"level" ts_type:"number | null"`
	// Flow is null when gauge doesn't provide it or is temporary broken
	Flow nulltype.NullFloat64 `json:"flow" ts_type:"number | null"`
}

Measurement represents water level and/or flow value returned by gauge at the timestamp

func GenerateRandMeasurement

func GenerateRandMeasurement(script string, code string, value float64, min float64, max float64) Measurement

GenerateRandMeasurement generates random measurement for testing purposes if value is not 0, it value will be returned for both level and flow otherwise level and flow will be random numbers in [min, max] range

type Measurements

type Measurements []*Measurement

Measurements is helper for sorting

func HarvestSlice

func HarvestSlice(script Script, codes StringSet, since int64) (Measurements, error)

HarvestSlice is test helper that runs script's harvest and returns result as slice

func (Measurements) Len

func (m Measurements) Len() int

func (Measurements) Less

func (m Measurements) Less(i, j int) bool

func (Measurements) Swap

func (m Measurements) Swap(i, j int)

type MeasurementsFactory

type MeasurementsFactory struct {
	Time   time.Time
	Script string
	Code   string
	Value  float64
}

MeasurementsFactory is helper used in tests

func (*MeasurementsFactory) GenMany

func (f *MeasurementsFactory) GenMany(size int) []Measurement

GenMany will generate many measurements using GenOne in steps of 1

func (*MeasurementsFactory) GenManyPtr

func (f *MeasurementsFactory) GenManyPtr(size int) []*Measurement

GenManyPtr is like GenMany, but returns slice of pointers

func (*MeasurementsFactory) GenOne

func (f *MeasurementsFactory) GenOne(offset int, args ...interface{}) Measurement

GenOne makes new measurement by adding offset hours to base time, offset as number to base value variadic args can be used to supply overrides in this order: - value - code - script

func (*MeasurementsFactory) GenOnePtr

func (f *MeasurementsFactory) GenOnePtr(offset int, args ...interface{}) *Measurement

GenOnePtr is like GenOne, but returns pointer to Measurement

type MeasurementsFilter

type MeasurementsFilter interface {
	// contains filtered or unexported methods
}

MeasurementsFilter is used to skip unwanted measurements based on code, timestamp, etc...

type RequestOptions

type RequestOptions struct {
	// When set to true, requests will be sent with random user-agent
	FakeAgent bool
	// Headers to set on request
	Headers map[string]string
	// Request will not save cookies
	SkipCookies bool
}

RequestOptions are additional per-request options

type Script

type Script interface {
	ListGauges() (Gauges, error)
	// Harvests measurements from upstream and writes them to recv channel, then closes both channels.
	// If unrecoverable error happens during this process, writes it into errs channel and closes both channels.
	// codes are set of gauge codes to harvest from upstream. It's meant to be passed to upstream. The script itself should not make use of it.
	// since is the last timestamp (usually for one-by-one scripts) that is meannt to be passed to upstream. The script itself should not make use of it.
	Harvest(ctx context.Context, recv chan<- *Measurement, errs chan<- error, codes StringSet, since int64)
	SetLogger(logger *logrus.Entry)
	GetLogger() *logrus.Entry
}

Script represents bunch of methods to harvest measurements and gauges from certain upstream source

type ScriptDescriptor

type ScriptDescriptor struct {
	Name string `json:"name"`
	// Description is human-readable name of data source, something that you can google
	Description    string             `json:"description"`
	Mode           HarvestMode        `json:"mode"`
	DefaultOptions func() interface{} `json:"-"`
	Factory        ScriptFactory      `json:"-"`
}

ScriptDescriptor represents a script registered in gorge

type ScriptFactory

type ScriptFactory func(name string, options interface{}) (Script, error)

ScriptFactory creates an instance of script and provides is with options It must faile if generic options cannot be cast to script's internal options

type ScriptRegistry

type ScriptRegistry struct {
	// contains filtered or unexported fields
}

ScriptRegistry is where all the script we can use must be registered

func NewRegistry

func NewRegistry() *ScriptRegistry

NewRegistry creates new ScriptRegistry

func (*ScriptRegistry) Create

func (r *ScriptRegistry) Create(name string, options interface{}) (Script, HarvestMode, error)

Create new instance of a script with given options. options must be instance of a script's internal options

func (*ScriptRegistry) CreateFromReader

func (r *ScriptRegistry) CreateFromReader(name string, optsReader io.Reader) (Script, HarvestMode, error)

CreateFromReader create new instance of a script. Options are provided in a form of reader. This reader must provide JSON options that can be unmarshalled into instance of script's internal options

func (*ScriptRegistry) GetMode

func (r *ScriptRegistry) GetMode(name string) (HarvestMode, error)

GetMode returns harvest mode of a registered script

func (*ScriptRegistry) List

func (r *ScriptRegistry) List() []ScriptDescriptor

List lists all registered scripts

func (*ScriptRegistry) ParseJSONOptions

func (r *ScriptRegistry) ParseJSONOptions(name string, inputs ...json.RawMessage) (interface{}, error)

ParseJSONOptions parses raw json messages with script's options and merges them into on instance of script's internal options

func (*ScriptRegistry) Register

func (r *ScriptRegistry) Register(d *ScriptDescriptor)

Register new script in registry

type Status

type Status struct {
	Success bool `json:"success"`
	// When job was executed last time (has nothing to do wuth measurements timestamps)
	Timestamp HTime `json:"timestamp" ts_type:"string"`
	// Error, if last time job failed
	Error string `json:"error,omitempty"`
	// Number of harvested measurements
	Count int `json:"count"`
	// When this job will run next time
	Next *HTime `json:"next,omitempty" ts_type:"string"`
}

Status contains information about job's last execution

type StringSet

type StringSet map[string]struct{}

StringSet is a helper for sets of gauge codes

func GaugesCodes

func GaugesCodes(m map[string]json.RawMessage) StringSet

GaugesCodes is helper method used to get codes from job description

func (StringSet) Contains

func (set StringSet) Contains(str string) bool

Contains checks if given code is contained in set

func (StringSet) Only

func (set StringSet) Only() (string, error)

Only return the only code of set as string. If set contains not exactly 1 code, it returns error.

func (StringSet) Slice

func (set StringSet) Slice() []string

Slice converts StringSet to slice

func (StringSet) String

func (set StringSet) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL