Documentation ¶
Overview ¶
Package data implements Data API
Index ¶
- Constants
- Variables
- func NewMQTTConnector(registryClient registry.Client, storage Storage) (chan<- common.Notification, error)
- func NtfListener(s Storage, ntChan <-chan common.Notification)
- func NtfListenerMQTT(c *MQTTConnector, ntChan <-chan common.Notification)
- type DataPoint
- type DataSet
- type Entry
- type HTTPAPI
- type InfluxStorage
- func (s *InfluxStorage) CountSprintf(format string, a ...interface{}) (int64, error)
- func (s *InfluxStorage) Database() string
- func (s *InfluxStorage) FieldForType(t string) string
- func (s *InfluxStorage) MeasurementName(ds *registry.DataSource) string
- func (s *InfluxStorage) MeasurementNameFQ(ds *registry.DataSource) string
- func (s *InfluxStorage) NtfCreated(ds registry.DataSource, callback chan error)
- func (s *InfluxStorage) NtfDeleted(ds registry.DataSource, callback chan error)
- func (s *InfluxStorage) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)
- func (s *InfluxStorage) Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error)
- func (s *InfluxStorage) QuerySprintf(format string, a ...interface{}) (res []influx.Result, err error)
- func (s *InfluxStorage) Replication() int
- func (s *InfluxStorage) RetentionPolicyName(period string) string
- func (s *InfluxStorage) Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error
- type MQTTConnector
- type Manager
- type MongoStorage
- func (s *MongoStorage) Database() string
- func (s *MongoStorage) FQMsrmt(ds *registry.DataSource) string
- func (s *MongoStorage) FieldForType(t string) string
- func (s *MongoStorage) Msrmt(ds *registry.DataSource) string
- func (s *MongoStorage) NtfCreated(ds registry.DataSource, callback chan error)
- func (s *MongoStorage) NtfDeleted(ds registry.DataSource, callback chan error)
- func (s *MongoStorage) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)
- func (s *MongoStorage) Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error)
- func (s *MongoStorage) Retention(ds *registry.DataSource) string
- func (s *MongoStorage) Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error
- type Query
- type RecordSet
- type RemoteClient
- type Storage
- type Subscription
Constants ¶
const (
MaxPerPage = 1000
)
Variables ¶
var SupportedContentTypes = map[string]bool{ "application/senml+json": true, }
Supported content-types for data ingestion
Functions ¶
func NewMQTTConnector ¶
func NtfListener ¶
func NtfListener(s Storage, ntChan <-chan common.Notification)
Handles an incoming notification
func NtfListenerMQTT ¶
func NtfListenerMQTT(c *MQTTConnector, ntChan <-chan common.Notification)
Handles an incoming notification
Types ¶
type DataPoint ¶
DataPoint is a data record embedding a SenML Entry
func NewDataPoint ¶
func NewDataPoint() DataPoint
NewDataPoint returns a DataPoint given an SenML Entry
type Entry ¶
type Entry struct { Name string `bson:"n,omitempty"` Units string `bson:"u,omitempty"` Value *float64 `bson:"v,omitempty"` StringValue *string `bson:"sv,omitempty"` BooleanValue *bool `bson:"bv,omitempty"` Sum *float64 `bson:"s,omitempty"` Time int64 `bson:"t,omitempty"` UpdateTime int64 `bson:"ut,omitempty"` }
Entry is a measurement of Parameter Entry
type HTTPAPI ¶
type HTTPAPI struct {
// contains filtered or unexported fields
}
HTTPAPI describes the RESTful HTTP data API
func NewHTTPAPI ¶
NewHTTPAPI returns the configured Data API
func (*HTTPAPI) Query ¶
func (d *HTTPAPI) Query(w http.ResponseWriter, r *http.Request)
Query is a handler for querying data Expected parameters: id(s), optional: pagination, query string
func (*HTTPAPI) Submit ¶
func (d *HTTPAPI) Submit(w http.ResponseWriter, r *http.Request)
Submit is a handler for submitting a new data point Expected parameters: id(s) TODO: check SupportedContentTypes instead of hard-coding SenML
func (*HTTPAPI) SubmitWithoutID ¶
func (d *HTTPAPI) SubmitWithoutID(w http.ResponseWriter, r *http.Request)
SubmitWithoutID is a handler for submitting a new data point Expected parameters: none
type InfluxStorage ¶
type InfluxStorage struct {
// contains filtered or unexported fields
}
InfluxStorage implements a InfluxDB storage client for HDS Data API
func NewInfluxStorage ¶
func NewInfluxStorage(dataConf *common.DataConf) (*InfluxStorage, chan<- common.Notification, error)
NewInfluxStorage returns a new InfluxStorage
func (*InfluxStorage) CountSprintf ¶
func (s *InfluxStorage) CountSprintf(format string, a ...interface{}) (int64, error)
CountSprintf constructs a counting query for influxdb
func (*InfluxStorage) Database ¶
func (s *InfluxStorage) Database() string
Database returns database name
func (*InfluxStorage) FieldForType ¶
func (s *InfluxStorage) FieldForType(t string) string
FieldForType returns the field-name for HDS data types
func (*InfluxStorage) MeasurementName ¶ added in v0.5.0
func (s *InfluxStorage) MeasurementName(ds *registry.DataSource) string
MeasurementName returns formatted measurement name for a given data source
func (*InfluxStorage) MeasurementNameFQ ¶ added in v0.5.0
func (s *InfluxStorage) MeasurementNameFQ(ds *registry.DataSource) string
MeasurementNameFQ returns formatted fully-qualified measurement name
func (*InfluxStorage) NtfCreated ¶
func (s *InfluxStorage) NtfCreated(ds registry.DataSource, callback chan error)
NtfCreated handles the creation of a new data source
func (*InfluxStorage) NtfDeleted ¶
func (s *InfluxStorage) NtfDeleted(ds registry.DataSource, callback chan error)
NtfDeleted handles deletion of a data source
func (*InfluxStorage) NtfUpdated ¶
func (s *InfluxStorage) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)
NtfUpdated handles updates of a data source
func (*InfluxStorage) Query ¶
func (s *InfluxStorage) Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error)
Query retrieves data for specified data sources
func (*InfluxStorage) QuerySprintf ¶
func (s *InfluxStorage) QuerySprintf(format string, a ...interface{}) (res []influx.Result, err error)
QuerySprintf constructs a query for influxdb
func (*InfluxStorage) Replication ¶
func (s *InfluxStorage) Replication() int
Replication returns Influxdb Replication factor
func (*InfluxStorage) RetentionPolicyName ¶ added in v0.5.0
func (s *InfluxStorage) RetentionPolicyName(period string) string
RetentionPolicyName returns formatted retention policy name for a given period
func (*InfluxStorage) Submit ¶
func (s *InfluxStorage) Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error
Submit adds multiple data points for multiple data sources data is a map where keys are data source ids
type MQTTConnector ¶
func (*MQTTConnector) NtfCreated ¶
func (c *MQTTConnector) NtfCreated(ds registry.DataSource, callback chan error)
Handles the creation of a new data source
func (*MQTTConnector) NtfDeleted ¶
func (c *MQTTConnector) NtfDeleted(oldDS registry.DataSource, callback chan error)
Handles deletion of a data source
func (*MQTTConnector) NtfUpdated ¶
func (c *MQTTConnector) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)
Handles updates of a data source
type MongoStorage ¶
type MongoStorage struct {
// contains filtered or unexported fields
}
InfluxStorage implements a simple data storage back-end with SQLite
func NewMongoStorage ¶
func NewMongoStorage(DSN string) (*MongoStorage, chan<- common.Notification, error)
NewInfluxStorage returns a new Storage given a configuration
func (*MongoStorage) FQMsrmt ¶
func (s *MongoStorage) FQMsrmt(ds *registry.DataSource) string
Fully qualified measurement name
func (*MongoStorage) FieldForType ¶
func (s *MongoStorage) FieldForType(t string) string
The field-name for HDS data types
func (*MongoStorage) Msrmt ¶
func (s *MongoStorage) Msrmt(ds *registry.DataSource) string
Formatted measurement name for a given data source
func (*MongoStorage) NtfCreated ¶
func (s *MongoStorage) NtfCreated(ds registry.DataSource, callback chan error)
Handles the creation of a new data source
func (*MongoStorage) NtfDeleted ¶
func (s *MongoStorage) NtfDeleted(ds registry.DataSource, callback chan error)
Handles deletion of a data source
func (*MongoStorage) NtfUpdated ¶
func (s *MongoStorage) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)
Handles updates of a data source
func (*MongoStorage) Query ¶
func (s *MongoStorage) Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error)
func (*MongoStorage) Retention ¶
func (s *MongoStorage) Retention(ds *registry.DataSource) string
Formatted retention policy name for a given data source
func (*MongoStorage) Submit ¶
func (s *MongoStorage) Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error
Adds multiple data points for multiple data sources data is a map where keys are data source ids
type RecordSet ¶
type RecordSet struct { // URL is the URL of the returned recordset in the Data API URL string `json:"url"` // Data is a SenML object with data records, where // Name (bn and n) constitute the resource URL of the corresponding Data Source(s) Data DataSet `json:"data"` // Time is the time of query in milliseconds Time float64 `json:"time"` // Page is the current page in Data pagination Page int `json:"page"` // PerPage is the results per page in Data pagination PerPage int `json:"per_page"` // Total is the total #of pages in Data pagination Total int `json:"total"` }
RecordSet describes the recordset returned on querying the Data API
type RemoteClient ¶
type RemoteClient struct {
// contains filtered or unexported fields
}
func NewRemoteClient ¶
func NewRemoteClient(serverEndpoint string, ticket *obtainer.Client) (*RemoteClient, error)
func (*RemoteClient) Submit ¶
func (c *RemoteClient) Submit(data []byte, contentType string, id ...string) error
Submit data for ingestion, where: data - is a byte array with actual data contentType - mime-type of the data (will be set in the header) id... - ID (or array of IDs) of data sources for which the data is being submitted
type Storage ¶
type Storage interface { // Adds data points for multiple data sources // data is a map where keys are data source ids // sources is a map where keys are data source ids Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error // Queries data for specified data sources Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error) // Methods for handling notifications NtfCreated(ds registry.DataSource, callback chan error) NtfUpdated(old registry.DataSource, new registry.DataSource, callback chan error) NtfDeleted(ds registry.DataSource, callback chan error) }
Storage is an interface of a Data storage backend
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}