Documentation ¶
Index ¶
- Variables
- func Cancelable(ctx context.Context, c <-chan *Measurement) <-chan *Measurement
- func CodeFromName(name string) string
- func FilterMeasurements(ctx context.Context, in <-chan *Measurement, logger *logrus.Entry, ...) <-chan *Measurement
- func GenFromSlice(ctx context.Context, measurements []Measurement) <-chan *Measurement
- func NPrecision(value float64, precision int) nulltype.NullFloat64
- func NTruncCoord(value float64) nulltype.NullFloat64
- func Precision(value float64, precision int) float64
- func SinkToSlice(ctx context.Context, in <-chan *Measurement) <-chan []*Measurement
- func Split(ctx context.Context, in <-chan *Measurement) (<-chan *Measurement, <-chan *Measurement)
- func ToEPSG4326(x, y float64, projDefinition string) (float64, float64, error)
- func TruncCoord(value float64) float64
- type BatchableOptions
- type CSVStreamOptions
- type ClientOptions
- type CodesFilter
- type Doc
- type Error
- type ErrorResponse
- type ErrorType
- type Gauge
- type GaugeID
- type Gauges
- type HTTPClient
- func (client *HTTPClient) Do(req *http.Request, opts *RequestOptions) (*http.Response, error)
- func (client *HTTPClient) EnsureCookie(fromURL string, force bool) error
- func (client *HTTPClient) Get(url string, opts *RequestOptions) (resp *http.Response, err error)
- func (client *HTTPClient) GetAsDoc(url string, opts *RequestOptions) (*Doc, error)
- func (client *HTTPClient) GetAsJSON(url string, dest interface{}, opts *RequestOptions) error
- func (client *HTTPClient) GetAsString(url string, opts *RequestOptions) (string, error)
- func (client *HTTPClient) GetAsXML(url string, dest interface{}, opts *RequestOptions) error
- func (client *HTTPClient) PostForm(url string, data url.Values, opts *RequestOptions) (resp *http.Response, req *http.Request, err error)
- func (client *HTTPClient) PostFormAsString(url string, data url.Values, opts *RequestOptions) (result string, req *http.Request, err error)
- func (client *HTTPClient) SaveCookies()
- func (client *HTTPClient) StreamCSV(url string, handler func(row []string) error, opts CSVStreamOptions) error
- type HTime
- type HarvestMode
- type JobDescription
- type JobScheduler
- type LatestFilter
- type Location
- type LoggingScript
- type Measurement
- type Measurements
- type MeasurementsFactory
- type MeasurementsFilter
- type RequestOptions
- type Script
- type ScriptDescriptor
- type ScriptFactory
- type ScriptRegistry
- func (r *ScriptRegistry) Create(name string, options interface{}) (Script, HarvestMode, error)
- func (r *ScriptRegistry) CreateFromReader(name string, optsReader io.Reader) (Script, HarvestMode, error)
- func (r *ScriptRegistry) GetMode(name string) (HarvestMode, error)
- func (r *ScriptRegistry) List() []ScriptDescriptor
- func (r *ScriptRegistry) ParseJSONOptions(name string, inputs ...json.RawMessage) (interface{}, error)
- func (r *ScriptRegistry) Register(d *ScriptDescriptor)
- type Status
- type StringSet
Constants ¶
This section is empty.
Variables ¶
var Client = NewClient(ClientOptions{ UserAgent: "whitewater.guide robot", Timeout: 60, WithoutTLS: false, Proxy: "", }, nil)
Client is default client for scripts It will be reinitialized during server creation This default value will be used in tests
var ErrScriptNotFound = errors.New("script not found")
ErrScriptNotFound means that script is not registered in registry
Functions ¶
func Cancelable ¶
func Cancelable(ctx context.Context, c <-chan *Measurement) <-chan *Measurement
Cancelable is helper function that cancels channel once context is canceled
func CodeFromName ¶ added in v1.12.0
CodeFromName is used to generate stable ids for gauges that only have name
func FilterMeasurements ¶
func FilterMeasurements(ctx context.Context, in <-chan *Measurement, logger *logrus.Entry, filters ...MeasurementsFilter) <-chan *Measurement
FilterMeasurements filters channel of measurements using any number of filters It also supports context cancelation
func GenFromSlice ¶
func GenFromSlice(ctx context.Context, measurements []Measurement) <-chan *Measurement
GenFromSlice creates channel of Measurements from slice
func NPrecision ¶
NPrecision is like Precision but for nullables
func NTruncCoord ¶
func NTruncCoord(value float64) nulltype.NullFloat64
NTruncCoord is same as TruncCoord but for nullable values
func SinkToSlice ¶
func SinkToSlice(ctx context.Context, in <-chan *Measurement) <-chan []*Measurement
SinkToSlice takes channel of measurements and produces channel that will receive slice of measurements once input channel is closed or context is canceled
func Split ¶
func Split(ctx context.Context, in <-chan *Measurement) (<-chan *Measurement, <-chan *Measurement)
Split splits one channel of measurements into two
func ToEPSG4326 ¶
ToEPSG4326 converts coordinate from given coordinate system definition to EPSG4326 Definition can be obtained using following url https://epsg.io/<EPSG_CODE>.proj4 For example https://epsg.io/31257.proj4 Human-friendly page is https://epsg.io/31257 Coordinates are rounded to 5-digits precision (~1 meter) (https://en.wikipedia.org/wiki/Decimal_degrees)
func TruncCoord ¶
TruncCoord truncates locations to 5 digits precision [link](https://en.wikipedia.org/wiki/Decimal_degrees)
Types ¶
type BatchableOptions ¶ added in v1.7.0
type BatchableOptions interface {
GetBatchSize() int
}
BatchableOptions is an interface that options of batched script must implement
type CSVStreamOptions ¶
type CSVStreamOptions struct { // CSV separator symbol Comma rune // Decoder, defaults to UTF-8 Decoder *encoding.Decoder // Number of rows at the beginning of file that do not contain data HeaderHeight int // Number of colums. If a row contains different number of columns, the stream will stop with error // For header rows this is ignored NumColumns int // Extra HTTPClient options *RequestOptions }
CSVStreamOptions contains commons options for streaming data from CSV files
type ClientOptions ¶
type ClientOptions struct { UserAgent string `desc:"User agent for requests sent from scripts. Leave empty to use fake browser agent"` Timeout int64 `desc:"Request timeout in seconds"` WithoutTLS bool `desc:"Disable TLS for some gauges"` Proxy string `desc:"HTTP client proxy (for example, you can use mitm for local development)"` }
ClientOptions are HTTPClient that can be passed as args at startup
type CodesFilter ¶ added in v1.25.0
type CodesFilter struct {
Codes StringSet
}
CodesFilter return measurements filter that accepts only measuerments with given gauge codes
type Error ¶
type ErrorResponse ¶
type ErrorResponse struct { *Error HTTPStatusCode int `json:"-"` StatusText string `json:"status,omitempty"` // user-level status message ReqID string `json:"request_id,omitempty"` }
func NewErrorResponse ¶
func NewErrorResponse(e error, message string, code int) *ErrorResponse
func (*ErrorResponse) Render ¶
func (e *ErrorResponse) Render(w http.ResponseWriter, r *http.Request) error
type Gauge ¶
type Gauge struct { GaugeID Name string `json:"name"` // This is webpage URL from original source URL string `json:"url,omitempty"` // Water level unit, e.g, "m"/"ft"/"cm" LevelUnit string `json:"levelUnit,omitempty"` // Water flow/discharge unit, e.g. "cfs"/"m3/s" FlowUnit string `json:"flowUnit,omitempty"` // Station location, if known Location *Location `json:"location,omitempty"` }
Gauge represents gauge/station from upstream source
func GenerateRandGauge ¶
GenerateRandGauge generates random gauge for testing purposes
type GaugeID ¶
type GaugeID struct { // id of script from gorge's script registry Script string `json:"script"` // unique gauge code from upstream // if no code is provided (ex. Georgia) script must generate it's own stable unique codes Code string `json:"code"` }
GaugeID identifies gauge using script and code pair
type Gauges ¶
type Gauges []Gauge
Gauges is slice of Gauge with helper methods for sorting
func GaugeSinkToSlice ¶ added in v1.6.0
GaugeSinkToSlice converts gauges channels to struct and error
type HTTPClient ¶
type HTTPClient struct { *http.Client PersistentJar *jar.Jar UserAgent string // contains filtered or unexported fields }
HTTPClient is like default client, but with some conveniency methods for common scenarios
func NewClient ¶
func NewClient(opts ClientOptions, logger *logrus.Entry) *HTTPClient
NewClient constructs new HTTPClient with options
func (*HTTPClient) Do ¶
func (client *HTTPClient) Do(req *http.Request, opts *RequestOptions) (*http.Response, error)
Do is same as http.Client.Get, but sets extra headers
func (*HTTPClient) EnsureCookie ¶
func (client *HTTPClient) EnsureCookie(fromURL string, force bool) error
EnsureCookie makes sure that cookies from given URL are present and will be sent with further requests Some scripts will not return correct data unless cookies are present
func (*HTTPClient) Get ¶
func (client *HTTPClient) Get(url string, opts *RequestOptions) (resp *http.Response, err error)
Get is same as http.Client.Get, but sets extra headers
func (*HTTPClient) GetAsDoc ¶ added in v1.9.0
func (client *HTTPClient) GetAsDoc(url string, opts *RequestOptions) (*Doc, error)
GetAsDoc is shortcut for http.Client.Get to get HTML docs for goquery.
func (*HTTPClient) GetAsJSON ¶
func (client *HTTPClient) GetAsJSON(url string, dest interface{}, opts *RequestOptions) error
GetAsJSON is shortcut for http.Client.Get to get response as JSON
func (*HTTPClient) GetAsString ¶
func (client *HTTPClient) GetAsString(url string, opts *RequestOptions) (string, error)
GetAsString is shortcut for http.Client.Get to get response as string
func (*HTTPClient) GetAsXML ¶
func (client *HTTPClient) GetAsXML(url string, dest interface{}, opts *RequestOptions) error
GetAsXML is shortcut for http.Client.Get to get response as XML
func (*HTTPClient) PostForm ¶
func (client *HTTPClient) PostForm(url string, data url.Values, opts *RequestOptions) (resp *http.Response, req *http.Request, err error)
PostForm is like http.Client.PostForm but wit extra options
func (*HTTPClient) PostFormAsString ¶
func (client *HTTPClient) PostFormAsString(url string, data url.Values, opts *RequestOptions) (result string, req *http.Request, err error)
PostFormAsString shortcut for http.Client.PostForm to get response as string
func (*HTTPClient) SaveCookies ¶
func (client *HTTPClient) SaveCookies()
SaveCookies dumps cookies to disk, so in case of service restart they are not lost
func (*HTTPClient) StreamCSV ¶
func (client *HTTPClient) StreamCSV(url string, handler func(row []string) error, opts CSVStreamOptions) error
StreamCSV reads CSV file from given URL and streams it by calling handler for each row
type HTime ¶
HTime is just a wrapped around time which marshals/unmarshalt to/from RFC3339 string and can be stored in DB
func (HTime) MarshalJSON ¶
MarshalJSON implements json.Marshaler interface
func (*HTime) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler interface
type HarvestMode ¶
type HarvestMode int
HarvestMode determines how the schedule for script is generated
const ( // AllAtOnce scripts harvest all gauges in one batch. Such sources usually provide one file with latest measurements for every gauge AllAtOnce HarvestMode = iota // OneByOne scripts harvest only one gauge at a time. // For such scripts a schedule will be generated so that all the gauges are uniformly harvested during period of 1 hour OneByOne // Batched scripts are like one by one, except that multiple codes are provided to Harvest function // Options must implement BatchableOptions Batched )
func (*HarvestMode) MarshalJSON ¶
func (m *HarvestMode) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler interface
func (HarvestMode) String ¶
func (m HarvestMode) String() string
func (HarvestMode) TSName ¶
func (m HarvestMode) TSName() string
TSName is required to generate typescript enum
func (*HarvestMode) UnmarshalJSON ¶
func (m *HarvestMode) UnmarshalJSON(bytes []byte) error
UnmarshalJSON implements json.Unmarshaler interface
type JobDescription ¶
type JobDescription struct { // UUID. Represents paired entity id in client's service and are generated on client side ID string `json:"id" structs:"id"` // id of script from gorge's script registry Script string `json:"script" structs:"script"` // a map with keys being gauge codes and value being pieces of json representing harvest options for that gauge // pass `{}` or `null` if no options are given for the gauge Gauges map[string]json.RawMessage `json:"gauges" structs:"codes" ts_type:"{[key: string]: any} | null"` // cron expression, ignored for OneByOne scripts. AllAtOnce script will run on this cron schedule Cron string `json:"cron" structs:"cron"` // harvest options for the entire script. For example, upstream credentials Options json.RawMessage `json:"options" structs:"options,omitempty" ts_type:"{[key: string]: any} | null"` // When used as input this must be nil Status *Status `json:"status,omitempty"` }
JobDescription represents task that will run on schedule and harvest measurements from some source
func (*JobDescription) Bind ¶
func (j *JobDescription) Bind(r *http.Request) error
Bind implements go-chi Binder interface
func (*JobDescription) Scan ¶
func (j *JobDescription) Scan(src interface{}) error
Scan implements sql nner interface https://golang.org/pkg/database/sql/#Scanner
type JobScheduler ¶
type JobScheduler interface { Start() Stop() AddJob(description JobDescription) error DeleteJob(jobID string) error // ListNext returns map where values are times when scripts will run next time // If jobID is empty, ListNext lists next times for all running scripts. And map keys are script ids // If jobID is not empty, this will return next times for all codes of this one-by-one job, and map keys are gauge codes ListNext(jobID string) map[string]HTime }
JobScheduler is responsible for running harvest jobs on schedule
type LatestFilter ¶ added in v1.25.0
type LatestFilter struct { Latest map[GaugeID]Measurement After time.Time }
LatestFilter returns measurements filter that accepts only measurements that are either - newer than latest measurement for same gauge in the map - or if no latest measurement is found, are no older than "After" date
type Location ¶
type Location struct { Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` Altitude float64 `json:"altitude,omitempty"` }
Location is EPSG4326 coordinate
type LoggingScript ¶
type LoggingScript struct {
// contains filtered or unexported fields
}
LoggingScript is used to inject loggers into scripts It must be embedded into concrete scripts
func (*LoggingScript) GetLogger ¶
func (s *LoggingScript) GetLogger() *logrus.Entry
GetLogger returns injected logger, or discarding logger if none has been injected
func (*LoggingScript) SetLogger ¶
func (s *LoggingScript) SetLogger(logger *logrus.Entry)
SetLogger injects logger into script
type Measurement ¶
type Measurement struct { GaugeID Timestamp HTime `json:"timestamp" ts_type:"string"` // Level is null when gauge doesn't provide it or is temporary broken Level nulltype.NullFloat64 `json:"level" ts_type:"number | null"` // Flow is null when gauge doesn't provide it or is temporary broken Flow nulltype.NullFloat64 `json:"flow" ts_type:"number | null"` }
Measurement represents water level and/or flow value returned by gauge at the timestamp
func GenerateRandMeasurement ¶
func GenerateRandMeasurement(script string, code string, value float64, min float64, max float64) Measurement
GenerateRandMeasurement generates random measurement for testing purposes if value is not 0, it value will be returned for both level and flow otherwise level and flow will be random numbers in [min, max] range
type Measurements ¶
type Measurements []*Measurement
Measurements is helper for sorting
func HarvestSlice ¶
func HarvestSlice(script Script, codes StringSet, since int64) (Measurements, error)
HarvestSlice is test helper that runs script's harvest and returns result as slice
func (Measurements) Len ¶
func (m Measurements) Len() int
func (Measurements) Less ¶
func (m Measurements) Less(i, j int) bool
func (Measurements) Swap ¶
func (m Measurements) Swap(i, j int)
type MeasurementsFactory ¶
MeasurementsFactory is helper used in tests
func (*MeasurementsFactory) GenMany ¶
func (f *MeasurementsFactory) GenMany(size int) []Measurement
GenMany will generate many measurements using GenOne in steps of 1
func (*MeasurementsFactory) GenManyPtr ¶
func (f *MeasurementsFactory) GenManyPtr(size int) []*Measurement
GenManyPtr is like GenMany, but returns slice of pointers
func (*MeasurementsFactory) GenOne ¶
func (f *MeasurementsFactory) GenOne(offset int, args ...interface{}) Measurement
GenOne makes new measurement by adding offset hours to base time, offset as number to base value variadic args can be used to supply overrides in this order: - value - code - script
func (*MeasurementsFactory) GenOnePtr ¶
func (f *MeasurementsFactory) GenOnePtr(offset int, args ...interface{}) *Measurement
GenOnePtr is like GenOne, but returns pointer to Measurement
type MeasurementsFilter ¶
type MeasurementsFilter interface {
// contains filtered or unexported methods
}
MeasurementsFilter is used to skip unwanted measurements based on code, timestamp, etc...
type RequestOptions ¶
type RequestOptions struct { // When set to true, requests will be sent with random user-agent FakeAgent bool // Headers to set on request Headers map[string]string // Request will not save cookies SkipCookies bool }
RequestOptions are additional per-request options
type Script ¶
type Script interface { ListGauges() (Gauges, error) // Harvests measurements from upstream and writes them to recv channel, then closes both channels. // If unrecoverable error happens during this process, writes it into errs channel and closes both channels. // codes are set of gauge codes to harvest from upstream. It's meant to be passed to upstream. The script itself should not make use of it. // since is the last timestamp (usually for one-by-one scripts) that is meannt to be passed to upstream. The script itself should not make use of it. Harvest(ctx context.Context, recv chan<- *Measurement, errs chan<- error, codes StringSet, since int64) SetLogger(logger *logrus.Entry) GetLogger() *logrus.Entry }
Script represents bunch of methods to harvest measurements and gauges from certain upstream source
type ScriptDescriptor ¶
type ScriptDescriptor struct { Name string `json:"name"` // Description is human-readable name of data source, something that you can google Description string `json:"description"` Mode HarvestMode `json:"mode"` DefaultOptions func() interface{} `json:"-"` Factory ScriptFactory `json:"-"` }
ScriptDescriptor represents a script registered in gorge
type ScriptFactory ¶
ScriptFactory creates an instance of script and provides is with options It must faile if generic options cannot be cast to script's internal options
type ScriptRegistry ¶
type ScriptRegistry struct {
// contains filtered or unexported fields
}
ScriptRegistry is where all the script we can use must be registered
func (*ScriptRegistry) Create ¶
func (r *ScriptRegistry) Create(name string, options interface{}) (Script, HarvestMode, error)
Create new instance of a script with given options. options must be instance of a script's internal options
func (*ScriptRegistry) CreateFromReader ¶
func (r *ScriptRegistry) CreateFromReader(name string, optsReader io.Reader) (Script, HarvestMode, error)
CreateFromReader create new instance of a script. Options are provided in a form of reader. This reader must provide JSON options that can be unmarshalled into instance of script's internal options
func (*ScriptRegistry) GetMode ¶
func (r *ScriptRegistry) GetMode(name string) (HarvestMode, error)
GetMode returns harvest mode of a registered script
func (*ScriptRegistry) List ¶
func (r *ScriptRegistry) List() []ScriptDescriptor
List lists all registered scripts
func (*ScriptRegistry) ParseJSONOptions ¶
func (r *ScriptRegistry) ParseJSONOptions(name string, inputs ...json.RawMessage) (interface{}, error)
ParseJSONOptions parses raw json messages with script's options and merges them into on instance of script's internal options
func (*ScriptRegistry) Register ¶
func (r *ScriptRegistry) Register(d *ScriptDescriptor)
Register new script in registry
type Status ¶
type Status struct { Success bool `json:"success"` // When job was executed last time (has nothing to do wuth measurements timestamps) Timestamp HTime `json:"timestamp" ts_type:"string"` // Error, if last time job failed Error string `json:"error,omitempty"` // Number of harvested measurements Count int `json:"count"` // When this job will run next time Next *HTime `json:"next,omitempty" ts_type:"string"` }
Status contains information about job's last execution
type StringSet ¶
type StringSet map[string]struct{}
StringSet is a helper for sets of gauge codes
func GaugesCodes ¶
func GaugesCodes(m map[string]json.RawMessage) StringSet
GaugesCodes is helper method used to get codes from job description