data

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2019 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package data implements Data API

Index

Constants

View Source
const (
	INFLUXDB = "influxdb"
	MONGODB  = "mongodb"
)
View Source
const (
	MaxPerPage = 1000
)

Variables

This section is empty.

Functions

func SupportedBackends added in v0.6.1

func SupportedBackends(name string) bool

SupportedBackends returns true if the backend is listed as true

Types

type API added in v0.6.1

type API struct {
	// contains filtered or unexported fields
}

API describes the RESTful HTTP data API

func NewAPI added in v0.6.1

func NewAPI(registry registry.Storage, storage Storage, autoRegistration bool) *API

NewAPI returns the configured Data API

func (*API) Query added in v0.6.1

func (api *API) Query(w http.ResponseWriter, r *http.Request)

Query is a handler for querying data Expected parameters: id(s), optional: pagination, query string

func (*API) Submit added in v0.6.1

func (api *API) Submit(w http.ResponseWriter, r *http.Request)

Submit is a handler for submitting a new data point Expected parameters: id(s)

func (*API) SubmitWithoutID added in v0.6.1

func (api *API) SubmitWithoutID(w http.ResponseWriter, r *http.Request)

SubmitWithoutID is a handler for submitting a new data point Expected parameters: none

type InfluxStorage

type InfluxStorage struct {
	// contains filtered or unexported fields
}

InfluxStorage implements a InfluxDB storage client for HDS Data API

func NewInfluxStorage

func NewInfluxStorage(conf common.DataConf, retentionPeriods []string) (*InfluxStorage, error)

NewInfluxStorage returns a new InfluxStorage

func (*InfluxStorage) ChangeRetentionPolicy added in v0.5.2

func (s *InfluxStorage) ChangeRetentionPolicy(measurement, countField, oldRP, newRP string) error

func (*InfluxStorage) CountSprintf

func (s *InfluxStorage) CountSprintf(format string, a ...interface{}) (int64, error)

CountSprintf constructs a counting query for influxdb

func (*InfluxStorage) CreateHandler added in v0.6.1

func (s *InfluxStorage) CreateHandler(ds registry.DataSource) error

CreateHandler handles the creation of a new data source

func (*InfluxStorage) Database

func (s *InfluxStorage) Database() string

Database returns database name

func (*InfluxStorage) DeleteHandler added in v0.6.1

func (s *InfluxStorage) DeleteHandler(ds registry.DataSource) error

DeleteHandler handles deletion of a data source

func (*InfluxStorage) FieldForType

func (s *InfluxStorage) FieldForType(t string) string

FieldForType returns the field-name for HDS data types

func (*InfluxStorage) MeasurementName added in v0.5.0

func (s *InfluxStorage) MeasurementName(id string) string

MeasurementName returns formatted measurement name for a given data source

func (*InfluxStorage) MeasurementNameFQ added in v0.5.0

func (s *InfluxStorage) MeasurementNameFQ(retention, measurementName string) string

MeasurementNameFQ returns formatted fully-qualified measurement name

func (*InfluxStorage) ParseDuration added in v0.5.2

func (s *InfluxStorage) ParseDuration(durationStr string) (time.Duration, error)

func (*InfluxStorage) Query

func (s *InfluxStorage) Query(q Query, page, perPage int, sources ...*registry.DataSource) (senml.Pack, int, error)

Query retrieves data for specified data sources

func (*InfluxStorage) QuerySprintf

func (s *InfluxStorage) QuerySprintf(format string, a ...interface{}) (res []influx.Result, err error)

QuerySprintf constructs a query for influxdb

func (*InfluxStorage) Replication

func (s *InfluxStorage) Replication() int

Replication returns Influxdb Replication factor

func (*InfluxStorage) RetentionPolicyName added in v0.5.0

func (s *InfluxStorage) RetentionPolicyName(period string) string

RetentionPolicyName returns formatted retention policy name for a given period

func (*InfluxStorage) Submit

func (s *InfluxStorage) Submit(data map[string]senml.Pack, sources map[string]*registry.DataSource) error

Submit adds multiple data points for multiple data sources data is a map where keys are data source ids

func (*InfluxStorage) UpdateHandler added in v0.6.1

func (s *InfluxStorage) UpdateHandler(oldDS registry.DataSource, newDS registry.DataSource) error

UpdateHandler handles updates of a data source

type MQTTConnector

type MQTTConnector struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewMQTTConnector

func NewMQTTConnector(storage Storage) (*MQTTConnector, error)

func (*MQTTConnector) CreateHandler added in v0.6.1

func (c *MQTTConnector) CreateHandler(ds registry.DataSource) error

CreateHandler handles the creation of a new data source

func (*MQTTConnector) DeleteHandler added in v0.6.1

func (c *MQTTConnector) DeleteHandler(oldDS registry.DataSource) error

DeleteHandler handles deletion of a data source

func (*MQTTConnector) Start added in v0.6.1

func (c *MQTTConnector) Start(registry registry.Storage) error

func (*MQTTConnector) UpdateHandler added in v0.6.1

func (c *MQTTConnector) UpdateHandler(oldDS registry.DataSource, newDS registry.DataSource) error

UpdateHandler handles updates of a data source

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

type Query

type Query struct {
	Start time.Time
	End   time.Time
	Sort  string
	Limit int
}

func ParseQueryParameters

func ParseQueryParameters(form url.Values) (Query, error)

type RecordSet

type RecordSet struct {
	// URL is the URL of the returned recordset in the Data API
	URL string `json:"url"`
	// Data is a SenML object with data records, where
	// Name (bn and n) constitute the resource URL of the corresponding Data Source(s)
	Data senml.Pack `json:"data"`
	// Time is the time of query in milliseconds
	Time float64 `json:"time"`
	// Page is the current page in Data pagination
	Page int `json:"page"`
	// PerPage is the results per page in Data pagination
	PerPage int `json:"per_page"`
	// Total is the total records in Data pagination
	Total int `json:"total"`
}

RecordSet describes the recordset returned on querying the Data API

type RemoteClient

type RemoteClient struct {
	// contains filtered or unexported fields
}

func NewRemoteClient

func NewRemoteClient(serverEndpoint string, ticket *obtainer.Client) (*RemoteClient, error)

func (*RemoteClient) Query

func (c *RemoteClient) Query(q Query, page, perPage int, id ...string) (*RecordSet, error)

func (*RemoteClient) Submit

func (c *RemoteClient) Submit(data []byte, contentType string, id ...string) error

Submit data for ingestion, where: data - is a byte array with actual data contentType - mime-type of the data (will be set in the header) id... - ID (or array of IDs) of data sources for which the data is being submitted

type Storage

type Storage interface {
	// Adds data points for multiple data sources
	// data is a map where keys are data source ids
	// sources is a map where keys are data source ids
	Submit(data map[string]senml.Pack, sources map[string]*registry.DataSource) error

	// Queries data for specified data sources
	Query(q Query, page, perPage int, sources ...*registry.DataSource) (senml.Pack, int, error)

	// EventListener includes methods for event handling
	registry.EventListener
}

Storage is an interface of a Data storage backend

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL