data

package
v1.0.0-beta.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2020 License: Apache-2.0 Imports: 33 Imported by: 2

Documentation

Overview

Package data implements Data API

Package senmltest implements senml testing utilities

Index

Constants

View Source
const (
	MaxPerPage = 1000

	//value for ParamDenormalize
	TimeField       = "time"
	TimeFieldShort  = "t"
	NameField       = "name"
	NameFieldShort  = "n"
	UnitField       = "unit"
	UnitFieldShort  = "u"
	ValueField      = "value"
	ValueFieldShort = "v"
	SumField        = "sum"
	SumFieldShort   = "s"
)
View Source
const (
	INFLUXDB       = "influxdb"
	MONGODB        = "mongodb"
	SENMLSTORE     = "senmlstore"
	SQLITE         = "sqlite"
	DRIVER_SQLITE3 = "sqlite3"
)

Variables

This section is empty.

Functions

func CompareRecords

func CompareRecords(r1 senml.Record, r2 senml.Record) (same bool)

func CompareSenml

func CompareSenml(s1 senml.Pack, s2 senml.Pack) (same bool)

func Diff_name_diff_types

func Diff_name_diff_types() senml.Pack

func RegisterGRPCAPI

func RegisterGRPCAPI(srv *grpc.Server, c Controller)

Register the Data API to the server

func Same_name_same_types

func Same_name_same_types(count int, series registry.TimeSeries, decremental bool) senml.Pack

func SupportedBackends added in v0.6.1

func SupportedBackends(name string) bool

SupportedBackends returns true if the backend is listed as true

Types

type API added in v0.6.1

type API struct {
	// contains filtered or unexported fields
}

API describes the RESTful HTTP data API

func NewAPI added in v0.6.1

func NewAPI(c Controller) *API

NewAPI returns the configured Data API

func (*API) Delete

func (api *API) Delete(w http.ResponseWriter, r *http.Request)

func (*API) Query added in v0.6.1

func (api *API) Query(w http.ResponseWriter, r *http.Request)

QueryPage is a handler for querying data Expected parameters: id(s), optional: pagination, query string

func (*API) Submit added in v0.6.1

func (api *API) Submit(w http.ResponseWriter, r *http.Request)

Submit is a handler for submitting a new data point Expected parameters: id(s)

func (*API) SubmitWithoutID added in v0.6.1

func (api *API) SubmitWithoutID(w http.ResponseWriter, r *http.Request)

SubmitWithoutID is a handler for submitting a new data point Expected parameters: none

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func NewController

func NewController(registry registry.Controller, storage Storage, autoRegistration bool) *Controller

NewAPI returns the configured Data API

func (Controller) Count

func (c Controller) Count(ctx context.Context, q Query, seriesNames []string) (total int, retErr common.Error)

func (Controller) Delete

func (c Controller) Delete(ctx context.Context, seriesNames []string, from time.Time, to time.Time) (retErr common.Error)

func (Controller) QueryPage

func (c Controller) QueryPage(ctx context.Context, q Query, ids []string) (pack senml.Pack, total *int, retErr common.Error)

func (Controller) QueryStream

func (c Controller) QueryStream(ctx context.Context, q Query, ids []string, sendFunc sendFunction) (retErr common.Error)

func (Controller) Submit

func (c Controller) Submit(ctx context.Context, senmlPack senml.Pack, ids []string) common.Error

TODO: Return right code in return so that right code is returned by callers. e.g. Grpc code or http error responses.

func (Controller) Subscribe

func (c Controller) Subscribe(seriesNames ...string) (chan interface{}, common.Error)

func (Controller) Unsubscribe

func (c Controller) Unsubscribe(channel chan interface{}, names ...string)

type DenormMask

type DenormMask int32

Specifying which field needs to be denormalized

const (
	DenormMaskName DenormMask = 1 << iota
	DenormMaskTime
	DenormMaskUnit
	DenormMaskValue
	DenormMaskSum
)

type GrpcAPI

type GrpcAPI struct {
	// contains filtered or unexported fields
}

API describes the RESTful HTTP data API

func (GrpcAPI) Count

func (a GrpcAPI) Count(ctx context.Context, request *_go.QueryRequest) (*_go.CountResponse, error)

func (GrpcAPI) Delete

func (a GrpcAPI) Delete(ctx context.Context, request *_go.DeleteRequest) (*_go.Void, error)

func (GrpcAPI) Query

func (a GrpcAPI) Query(request *_go.QueryRequest, stream _go.Data_QueryServer) (err error)

func (GrpcAPI) Submit

func (a GrpcAPI) Submit(stream _go.Data_SubmitServer) error

func (GrpcAPI) Subscribe

func (a GrpcAPI) Subscribe(request *_go.SubscribeRequest, stream _go.Data_SubscribeServer) error

type GrpcClient

type GrpcClient struct {
	Client _go.DataClient
}

func NewGrpcClient

func NewGrpcClient(serverEndpoint string) (*GrpcClient, error)

func (*GrpcClient) Count

func (c *GrpcClient) Count(series []string, q Query) (total int, err error)

func (*GrpcClient) Delete

func (c *GrpcClient) Delete(seriesNames []string, from time.Time, to time.Time) error

func (*GrpcClient) Query

func (c *GrpcClient) Query(seriesNames []string, q Query) (senml.Pack, error)

TODO facilitate aborting of the query (using channels)

func (*GrpcClient) Submit

func (c *GrpcClient) Submit(pack senml.Pack) error

func (*GrpcClient) Subscribe

func (c *GrpcClient) Subscribe(ctx context.Context, seriesNames ...string) (chan ResponsePack, error)

type MQTTConnector

type MQTTConnector struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewMQTTConnector

func NewMQTTConnector(storage Storage, clientID string) (*MQTTConnector, error)

func (*MQTTConnector) CreateHandler added in v0.6.1

func (c *MQTTConnector) CreateHandler(ts registry.TimeSeries) error

CreateHandler handles the creation of a new time series

func (*MQTTConnector) DeleteHandler added in v0.6.1

func (c *MQTTConnector) DeleteHandler(oldTS registry.TimeSeries) error

DeleteHandler handles deletion of a time series

func (*MQTTConnector) Start added in v0.6.1

func (c *MQTTConnector) Start(reg registry.Controller) error

func (*MQTTConnector) UpdateHandler added in v0.6.1

func (c *MQTTConnector) UpdateHandler(oldTs registry.TimeSeries, newTS registry.TimeSeries) error

UpdateHandler handles updates of a time series

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

type Query

type Query struct {
	// From is the Time from which the data needs to be fetched
	From time.Time
	// Time to which the data needs to be fetched
	To time.Time

	// SortAsc if set to true, oldest measurements are listed first in the resulting pack. If set to false, latest entries are listed first.
	SortAsc bool

	// PerPage: in case of paginated query, number of measurements returned as part of the query. In case of streamed query, number of measurements per pack in the time series
	PerPage int

	// Denormalize is a set of flags to be set based on the fields to be denormalized (Base field)
	Denormalize DenormMask

	// AggrFunc is the function performing the aggregation
	AggrFunc string

	// AggrWindow is the duration for aggregation
	AggrWindow time.Duration

	// Limit is applicable only for streamed queries
	Limit int

	// Offset is applicable only for streamed queries
	Offset int

	// Page is applicable only for paginated queries
	Page int

	// Count: if enabled, it will return the total number of entries to the query.
	// applicable only for paginated queries
	Count bool
}

func ParseQueryParameters

func ParseQueryParameters(form url.Values) (Query, common.Error)

type RecordSet

type RecordSet struct {
	// SelfLink is the SelfLink of the returned recordset in the Data API
	SelfLink string `json:"selfLink"`

	// Data is a SenML object with data records, where
	// Name (bn and n) constitute the resource BrokerURL of the corresponding time series
	Data senml.Pack `json:"data"`

	// Time is the time of query in seconds
	TimeTook float64 `json:"took"`

	//Next link for the same query, in case there more entries to follow for the same query
	NextLink string `json:"nextLink,omitempty"`

	//Total number of entries
	Count *int `json:"Count,omitempty"`
}

RecordSet describes the recordset returned on querying the Data API

type RemoteClient

type RemoteClient struct {
	// contains filtered or unexported fields
}

func NewRemoteClient

func NewRemoteClient(serverEndpoint string, ticket *obtainer.Client) (*RemoteClient, error)

func (*RemoteClient) Query

func (c *RemoteClient) Query(q Query, id ...string) (*RecordSet, error)

func (*RemoteClient) Submit

func (c *RemoteClient) Submit(data []byte, contentType string, id ...string) error

Submit data for ingestion, where: data - is a byte array with actual data contentType - mime-type of the data (will be set in the header) id... - ID (or array of IDs) of time series for which the data is being submitted

type ResponsePack

type ResponsePack struct {
	// contains filtered or unexported fields
}

type SqlStorage

type SqlStorage struct {
	// contains filtered or unexported fields
}

SqlStorage implements a SqlDB storage client for HDS Data API

func NewSqlStorage

func NewSqlStorage(conf common.DataConf) (storage *SqlStorage, disconnect_func func() error, err error)

func (*SqlStorage) Count

func (s *SqlStorage) Count(ctx context.Context, q Query, series ...*registry.TimeSeries) (int, error)

func (*SqlStorage) CreateHandler

func (s *SqlStorage) CreateHandler(ts registry.TimeSeries) error

CreateHandler handles the creation of a new TimeSeries

func (*SqlStorage) Delete

func (s *SqlStorage) Delete(ctx context.Context, series []*registry.TimeSeries, from time.Time, to time.Time) (err error)

func (*SqlStorage) DeleteHandler

func (s *SqlStorage) DeleteHandler(ts registry.TimeSeries) error

DeleteHandler handles deletion of a TimeSeries

func (*SqlStorage) Disconnect

func (s *SqlStorage) Disconnect() error

func (*SqlStorage) QueryPage

func (s *SqlStorage) QueryPage(ctx context.Context, q Query, series ...*registry.TimeSeries) (pack senml.Pack, total *int, err error)

func (*SqlStorage) QueryStream

func (s *SqlStorage) QueryStream(ctx context.Context, q Query, sendFunc sendFunction, series ...*registry.TimeSeries) error

func (*SqlStorage) Submit

func (s *SqlStorage) Submit(ctx context.Context, data map[string]senml.Pack, series map[string]*registry.TimeSeries) (err error)

func (*SqlStorage) UpdateHandler

func (s *SqlStorage) UpdateHandler(oldDS registry.TimeSeries, newDS registry.TimeSeries) error

UpdateHandler handles updates of a TimeSeries

type Storage

type Storage interface {
	// Adds data points for multiple time series
	// data is a map where keys are time series ids
	// series is a map where keys are time series ids
	Submit(ctx context.Context, data map[string]senml.Pack, series map[string]*registry.TimeSeries) error

	// Queries data for specified time series
	//QueryPage(q QueryPage, page, PerPage int, series ...*registry.TimeSeries) (senml.Pack, int, error)
	QueryPage(ctx context.Context, q Query, series ...*registry.TimeSeries) (pack senml.Pack, total *int, err error)

	QueryStream(ctx context.Context, q Query, sendFunc sendFunction, series ...*registry.TimeSeries) error

	Count(ctx context.Context, q Query, series ...*registry.TimeSeries) (total int, err error)

	// Delete the data within a given time range
	Delete(ctx context.Context, series []*registry.TimeSeries, from time.Time, to time.Time) (err error)

	// EventListener includes methods for event handling
	registry.EventListener
}

Storage is an interface of a Data storage backend

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL