data

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2018 License: Apache-2.0 Imports: 26 Imported by: 2

Documentation

Overview

Package data implements Data API

Index

Constants

View Source
const (
	MaxPerPage = 1000
)

Variables

View Source
var SupportedContentTypes = map[string]bool{
	"application/senml+json": true,
}

Supported content-types for data ingestion

Functions

func NewMQTTConnector

func NewMQTTConnector(registryClient registry.Client, storage Storage) (chan<- common.Notification, error)

func NtfListener

func NtfListener(s Storage, ntChan <-chan common.Notification)

Handles an incoming notification

func NtfListenerMQTT

func NtfListenerMQTT(c *MQTTConnector, ntChan <-chan common.Notification)

Handles an incoming notification

Types

type DataPoint

type DataPoint struct {
	*senml.Entry
}

DataPoint is a data record embedding a SenML Entry

func NewDataPoint

func NewDataPoint() DataPoint

NewDataPoint returns a DataPoint given an SenML Entry

func (*DataPoint) FromEntry

func (p *DataPoint) FromEntry(e senml.Entry) DataPoint

FromEntry returns a DataPoint given an SenML Entry

type DataSet

type DataSet struct {
	*senml.Message
	Entries []DataPoint `json:"e"`
}

DataSet is a set of DataPoints embedding a SenML Message

func NewDataSet

func NewDataSet() DataSet

NewDataSet returns a DataSet given an SenML Message

func (*DataSet) FromMessage

func (s *DataSet) FromMessage(m senml.Message) DataSet

FromMessage returns a DataSet given a SenML Message

type Entry

type Entry struct {
	Name         string   `bson:"n,omitempty"`
	Units        string   `bson:"u,omitempty"`
	Value        *float64 `bson:"v,omitempty"`
	StringValue  *string  `bson:"sv,omitempty"`
	BooleanValue *bool    `bson:"bv,omitempty"`
	Sum          *float64 `bson:"s,omitempty"`
	Time         int64    `bson:"t,omitempty"`
	UpdateTime   int64    `bson:"ut,omitempty"`
}

Entry is a measurement of Parameter Entry

type HTTPAPI

type HTTPAPI struct {
	// contains filtered or unexported fields
}

HTTPAPI describes the RESTful HTTP data API

func NewHTTPAPI

func NewHTTPAPI(registryClient registry.Client, storage Storage, autoRegistration bool) *HTTPAPI

NewHTTPAPI returns the configured Data API

func (*HTTPAPI) Query

func (d *HTTPAPI) Query(w http.ResponseWriter, r *http.Request)

Query is a handler for querying data Expected parameters: id(s), optional: pagination, query string

func (*HTTPAPI) Submit

func (d *HTTPAPI) Submit(w http.ResponseWriter, r *http.Request)

Submit is a handler for submitting a new data point Expected parameters: id(s) TODO: check SupportedContentTypes instead of hard-coding SenML

func (*HTTPAPI) SubmitWithoutID

func (d *HTTPAPI) SubmitWithoutID(w http.ResponseWriter, r *http.Request)

SubmitWithoutID is a handler for submitting a new data point Expected parameters: none

type InfluxStorage

type InfluxStorage struct {
	// contains filtered or unexported fields
}

InfluxStorage implements a InfluxDB storage client for HDS Data API

func NewInfluxStorage

func NewInfluxStorage(dataConf *common.DataConf) (*InfluxStorage, chan<- common.Notification, error)

NewInfluxStorage returns a new InfluxStorage

func (*InfluxStorage) CountSprintf

func (s *InfluxStorage) CountSprintf(format string, a ...interface{}) (int64, error)

CountSprintf constructs a counting query for influxdb

func (*InfluxStorage) Database

func (s *InfluxStorage) Database() string

Database returns database name

func (*InfluxStorage) FieldForType

func (s *InfluxStorage) FieldForType(t string) string

FieldForType returns the field-name for HDS data types

func (*InfluxStorage) MeasurementName added in v0.5.0

func (s *InfluxStorage) MeasurementName(ds *registry.DataSource) string

MeasurementName returns formatted measurement name for a given data source

func (*InfluxStorage) MeasurementNameFQ added in v0.5.0

func (s *InfluxStorage) MeasurementNameFQ(ds *registry.DataSource) string

MeasurementNameFQ returns formatted fully-qualified measurement name

func (*InfluxStorage) NtfCreated

func (s *InfluxStorage) NtfCreated(ds registry.DataSource, callback chan error)

NtfCreated handles the creation of a new data source

func (*InfluxStorage) NtfDeleted

func (s *InfluxStorage) NtfDeleted(ds registry.DataSource, callback chan error)

NtfDeleted handles deletion of a data source

func (*InfluxStorage) NtfUpdated

func (s *InfluxStorage) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)

NtfUpdated handles updates of a data source

func (*InfluxStorage) Query

func (s *InfluxStorage) Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error)

Query retrieves data for specified data sources

func (*InfluxStorage) QuerySprintf

func (s *InfluxStorage) QuerySprintf(format string, a ...interface{}) (res []influx.Result, err error)

QuerySprintf constructs a query for influxdb

func (*InfluxStorage) Replication

func (s *InfluxStorage) Replication() int

Replication returns Influxdb Replication factor

func (*InfluxStorage) RetentionPolicyName added in v0.5.0

func (s *InfluxStorage) RetentionPolicyName(period string) string

RetentionPolicyName returns formatted retention policy name for a given period

func (*InfluxStorage) Submit

func (s *InfluxStorage) Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error

Submit adds multiple data points for multiple data sources data is a map where keys are data source ids

type MQTTConnector

type MQTTConnector struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*MQTTConnector) NtfCreated

func (c *MQTTConnector) NtfCreated(ds registry.DataSource, callback chan error)

Handles the creation of a new data source

func (*MQTTConnector) NtfDeleted

func (c *MQTTConnector) NtfDeleted(oldDS registry.DataSource, callback chan error)

Handles deletion of a data source

func (*MQTTConnector) NtfUpdated

func (c *MQTTConnector) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)

Handles updates of a data source

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

type MongoStorage

type MongoStorage struct {
	// contains filtered or unexported fields
}

InfluxStorage implements a simple data storage back-end with SQLite

func NewMongoStorage

func NewMongoStorage(DSN string) (*MongoStorage, chan<- common.Notification, error)

NewInfluxStorage returns a new Storage given a configuration

func (*MongoStorage) Database

func (s *MongoStorage) Database() string

Database name

func (*MongoStorage) FQMsrmt

func (s *MongoStorage) FQMsrmt(ds *registry.DataSource) string

Fully qualified measurement name

func (*MongoStorage) FieldForType

func (s *MongoStorage) FieldForType(t string) string

The field-name for HDS data types

func (*MongoStorage) Msrmt

func (s *MongoStorage) Msrmt(ds *registry.DataSource) string

Formatted measurement name for a given data source

func (*MongoStorage) NtfCreated

func (s *MongoStorage) NtfCreated(ds registry.DataSource, callback chan error)

Handles the creation of a new data source

func (*MongoStorage) NtfDeleted

func (s *MongoStorage) NtfDeleted(ds registry.DataSource, callback chan error)

Handles deletion of a data source

func (*MongoStorage) NtfUpdated

func (s *MongoStorage) NtfUpdated(oldDS registry.DataSource, newDS registry.DataSource, callback chan error)

Handles updates of a data source

func (*MongoStorage) Query

func (s *MongoStorage) Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error)

func (*MongoStorage) Retention

func (s *MongoStorage) Retention(ds *registry.DataSource) string

Formatted retention policy name for a given data source

func (*MongoStorage) Submit

func (s *MongoStorage) Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error

Adds multiple data points for multiple data sources data is a map where keys are data source ids

type Query

type Query struct {
	Start time.Time
	End   time.Time
	Sort  string
	Limit int
}

func ParseQueryParameters

func ParseQueryParameters(form url.Values) (Query, error)

type RecordSet

type RecordSet struct {
	// URL is the URL of the returned recordset in the Data API
	URL string `json:"url"`
	// Data is a SenML object with data records, where
	// Name (bn and n) constitute the resource URL of the corresponding Data Source(s)
	Data DataSet `json:"data"`
	// Time is the time of query in milliseconds
	Time float64 `json:"time"`
	// Page is the current page in Data pagination
	Page int `json:"page"`
	// PerPage is the results per page in Data pagination
	PerPage int `json:"per_page"`
	// Total is the total #of pages in Data pagination
	Total int `json:"total"`
}

RecordSet describes the recordset returned on querying the Data API

type RemoteClient

type RemoteClient struct {
	// contains filtered or unexported fields
}

func NewRemoteClient

func NewRemoteClient(serverEndpoint string, ticket *obtainer.Client) (*RemoteClient, error)

func (*RemoteClient) Query

func (c *RemoteClient) Query(q Query, page, perPage int, id ...string) (*RecordSet, error)

func (*RemoteClient) Submit

func (c *RemoteClient) Submit(data []byte, contentType string, id ...string) error

Submit data for ingestion, where: data - is a byte array with actual data contentType - mime-type of the data (will be set in the header) id... - ID (or array of IDs) of data sources for which the data is being submitted

type Storage

type Storage interface {
	// Adds data points for multiple data sources
	// data is a map where keys are data source ids
	// sources is a map where keys are data source ids
	Submit(data map[string][]DataPoint, sources map[string]*registry.DataSource) error

	// Queries data for specified data sources
	Query(q Query, page, perPage int, sources ...*registry.DataSource) (DataSet, int, error)

	// Methods for handling notifications
	NtfCreated(ds registry.DataSource, callback chan error)
	NtfUpdated(old registry.DataSource, new registry.DataSource, callback chan error)
	NtfDeleted(ds registry.DataSource, callback chan error)
}

Storage is an interface of a Data storage backend

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL