Documentation ¶
Overview ¶
Package data implements Data API
Package senmltest implements senml testing utilities
Index ¶
- Constants
- func CompareRecords(r1 senml.Record, r2 senml.Record) (same bool)
- func CompareSenml(s1 senml.Pack, s2 senml.Pack) (same bool)
- func Diff_name_diff_types() senml.Pack
- func FromSenmlTime(t float64) time.Time
- func RegisterGRPCAPI(srv *grpc.Server, c Controller)
- func Same_name_same_types(count int, series registry.TimeSeries, decremental bool) senml.Pack
- func SupportedBackends(name string) bool
- func ToSenmlTime(t time.Time) float64
- type API
- type Controller
- func (c Controller) Count(ctx context.Context, q Query, seriesNames []string) (total int, retErr common.Error)
- func (c Controller) Delete(ctx context.Context, seriesNames []string, from time.Time, to time.Time) (retErr common.Error)
- func (c Controller) QueryPage(ctx context.Context, q Query, ids []string) (pack senml.Pack, total *int, retErr common.Error)
- func (c Controller) QueryStream(ctx context.Context, q Query, ids []string, sendFunc sendFunction) (retErr common.Error)
- func (c Controller) Submit(ctx context.Context, senmlPack senml.Pack, ids []string) common.Error
- func (c Controller) Subscribe(seriesNames ...string) (chan interface{}, common.Error)
- func (c Controller) Unsubscribe(channel chan interface{}, names ...string)
- type DenormMask
- type GrpcAPI
- func (a GrpcAPI) Count(ctx context.Context, request *_go.QueryRequest) (*_go.CountResponse, error)
- func (a GrpcAPI) Delete(ctx context.Context, request *_go.DeleteRequest) (*_go.Void, error)
- func (a GrpcAPI) Query(request *_go.QueryRequest, stream _go.Data_QueryServer) (err error)
- func (a GrpcAPI) Submit(stream _go.Data_SubmitServer) error
- func (a GrpcAPI) Subscribe(request *_go.SubscribeRequest, stream _go.Data_SubscribeServer) error
- type GrpcClient
- func (c *GrpcClient) CloseSubmitStream(stream _go.Data_SubmitClient) error
- func (c *GrpcClient) Count(ctx context.Context, series []string, q Query) (total int, err error)
- func (c *GrpcClient) CreateSubmitStream(ctx context.Context) (stream _go.Data_SubmitClient, err error)
- func (c *GrpcClient) Delete(seriesNames []string, from time.Time, to time.Time) error
- func (c *GrpcClient) Query(ctx context.Context, seriesNames []string, q Query) (senml.Pack, error)
- func (c *GrpcClient) QueryStream(ctx context.Context, seriesNames []string, q Query) (chan ResponsePack, error)
- func (c *GrpcClient) Submit(ctx context.Context, pack senml.Pack) error
- func (c *GrpcClient) SubmitToStream(stream _go.Data_SubmitClient, pack senml.Pack) error
- func (c *GrpcClient) Subscribe(ctx context.Context, seriesNames ...string) (chan ResponsePack, error)
- type MQTTConnector
- type Manager
- type Query
- type RecordSet
- type RemoteClient
- type ResponsePack
- type SqlStorage
- func (s *SqlStorage) Count(ctx context.Context, q Query, series ...*registry.TimeSeries) (int, error)
- func (s *SqlStorage) CreateHandler(ts registry.TimeSeries) error
- func (s *SqlStorage) Delete(ctx context.Context, series []*registry.TimeSeries, from time.Time, ...) (err error)
- func (s *SqlStorage) DeleteHandler(ts registry.TimeSeries) error
- func (s *SqlStorage) Disconnect() error
- func (s *SqlStorage) QueryPage(ctx context.Context, q Query, series ...*registry.TimeSeries) (pack senml.Pack, total *int, err error)
- func (s *SqlStorage) QueryStream(ctx context.Context, q Query, sendFunc sendFunction, ...) error
- func (s *SqlStorage) Submit(ctx context.Context, data map[string]senml.Pack, ...) (err error)
- func (s *SqlStorage) UpdateHandler(oldDS registry.TimeSeries, newDS registry.TimeSeries) error
- type Storage
- type Subscription
Constants ¶
const ( MaxPerPage = 1000 //value for ParamDenormalize TimeField = "time" TimeFieldShort = "t" NameField = "name" NameFieldShort = "n" UnitField = "unit" UnitFieldShort = "u" ValueField = "value" ValueFieldShort = "v" SumField = "sum" SumFieldShort = "s" )
const ( INFLUXDB = "influxdb" MONGODB = "mongodb" SENMLSTORE = "senmlstore" SQLITE = "sqlite" DRIVER_SQLITE3 = "sqlite3" )
Variables ¶
This section is empty.
Functions ¶
func CompareRecords ¶
func CompareRecords(r1 senml.Record, r2 senml.Record) (same bool)
func CompareSenml ¶
func CompareSenml(s1 senml.Pack, s2 senml.Pack) (same bool)
func Diff_name_diff_types ¶
func Diff_name_diff_types() senml.Pack
func FromSenmlTime ¶
func RegisterGRPCAPI ¶
func RegisterGRPCAPI(srv *grpc.Server, c Controller)
Register the Data API to the server
func Same_name_same_types ¶
func Same_name_same_types(count int, series registry.TimeSeries, decremental bool) senml.Pack
func SupportedBackends ¶ added in v0.6.1
SupportedBackends returns true if the backend is listed as true
func ToSenmlTime ¶
Types ¶
type API ¶ added in v0.6.1
type API struct {
// contains filtered or unexported fields
}
API describes the RESTful HTTP data API
func (*API) Query ¶ added in v0.6.1
func (api *API) Query(w http.ResponseWriter, r *http.Request)
QueryPage is a handler for querying data Expected parameters: id(s), optional: pagination, query string
func (*API) Submit ¶ added in v0.6.1
func (api *API) Submit(w http.ResponseWriter, r *http.Request)
Submit is a handler for submitting a new data point Expected parameters: id(s)
func (*API) SubmitWithoutID ¶ added in v0.6.1
func (api *API) SubmitWithoutID(w http.ResponseWriter, r *http.Request)
SubmitWithoutID is a handler for submitting a new data point Expected parameters: none
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
func NewController ¶
func NewController(registry registry.Controller, storage Storage, autoRegistration bool) *Controller
NewAPI returns the configured Data API
func (Controller) QueryStream ¶
func (Controller) Submit ¶
TODO: Return right code in return so that right code is returned by callers. e.g. Grpc code or http error responses.
func (Controller) Subscribe ¶
func (c Controller) Subscribe(seriesNames ...string) (chan interface{}, common.Error)
func (Controller) Unsubscribe ¶
func (c Controller) Unsubscribe(channel chan interface{}, names ...string)
type DenormMask ¶
type DenormMask int32
Specifying which field needs to be denormalized
const ( DenormMaskName DenormMask = 1 << iota DenormMaskTime DenormMaskUnit DenormMaskValue DenormMaskSum )
type GrpcAPI ¶
type GrpcAPI struct {
// contains filtered or unexported fields
}
API describes the RESTful HTTP data API
func (GrpcAPI) Count ¶
func (a GrpcAPI) Count(ctx context.Context, request *_go.QueryRequest) (*_go.CountResponse, error)
func (GrpcAPI) Query ¶
func (a GrpcAPI) Query(request *_go.QueryRequest, stream _go.Data_QueryServer) (err error)
func (GrpcAPI) Subscribe ¶
func (a GrpcAPI) Subscribe(request *_go.SubscribeRequest, stream _go.Data_SubscribeServer) error
type GrpcClient ¶
type GrpcClient struct {
Client _go.DataClient
}
func NewGrpcClient ¶
func NewGrpcClient(serverEndpoint string, opts ...grpc.DialOption) (*GrpcClient, error)
func (*GrpcClient) CloseSubmitStream ¶
func (c *GrpcClient) CloseSubmitStream(stream _go.Data_SubmitClient) error
func (*GrpcClient) CreateSubmitStream ¶
func (c *GrpcClient) CreateSubmitStream(ctx context.Context) (stream _go.Data_SubmitClient, err error)
func (*GrpcClient) QueryStream ¶
func (c *GrpcClient) QueryStream(ctx context.Context, seriesNames []string, q Query) (chan ResponsePack, error)
func (*GrpcClient) SubmitToStream ¶
func (c *GrpcClient) SubmitToStream(stream _go.Data_SubmitClient, pack senml.Pack) error
func (*GrpcClient) Subscribe ¶
func (c *GrpcClient) Subscribe(ctx context.Context, seriesNames ...string) (chan ResponsePack, error)
type MQTTConnector ¶
func NewMQTTConnector ¶
func NewMQTTConnector(storage Storage, clientID string) (*MQTTConnector, error)
func (*MQTTConnector) CreateHandler ¶ added in v0.6.1
func (c *MQTTConnector) CreateHandler(ts registry.TimeSeries) error
CreateHandler handles the creation of a new time series
func (*MQTTConnector) DeleteHandler ¶ added in v0.6.1
func (c *MQTTConnector) DeleteHandler(oldTS registry.TimeSeries) error
DeleteHandler handles deletion of a time series
func (*MQTTConnector) Start ¶ added in v0.6.1
func (c *MQTTConnector) Start(reg registry.Controller) error
func (*MQTTConnector) UpdateHandler ¶ added in v0.6.1
func (c *MQTTConnector) UpdateHandler(oldTs registry.TimeSeries, newTS registry.TimeSeries) error
UpdateHandler handles updates of a time series
type Query ¶
type Query struct { // From is the Time from which the data needs to be fetched From time.Time // Time to which the data needs to be fetched To time.Time // SortAsc if set to true, oldest measurements are listed first in the resulting pack. If set to false, latest entries are listed first. SortAsc bool // PerPage: in case of paginated query, number of measurements returned as part of the query. In case of streamed query, number of measurements per pack in the time series PerPage int // Denormalize is a set of flags to be set based on the fields to be denormalized (Base field) Denormalize DenormMask // AggrFunc is the function performing the aggregation AggrFunc string // AggrWindow is the duration for aggregation AggrWindow time.Duration // Limit is applicable only for streamed queries Limit int // Offset is applicable only for streamed queries Offset int // Page is applicable only for paginated queries Page int // Count: if enabled, it will return the total number of entries to the query. // applicable only for paginated queries Count bool }
type RecordSet ¶
type RecordSet struct { // SelfLink is the SelfLink of the returned recordset in the Data API SelfLink string `json:"selfLink"` // Data is a SenML object with data records, where // Name (bn and n) constitute the resource BrokerURL of the corresponding time series Data senml.Pack `json:"data"` // Time is the time of query in seconds TimeTook float64 `json:"took"` //Next link for the same query, in case there more entries to follow for the same query NextLink string `json:"nextLink,omitempty"` //Total number of entries Count *int `json:"Count,omitempty"` }
RecordSet describes the recordset returned on querying the Data API
type RemoteClient ¶
type RemoteClient struct {
// contains filtered or unexported fields
}
func NewRemoteClient ¶
func NewRemoteClient(serverEndpoint string, ticket *obtainer.Client) (*RemoteClient, error)
func (*RemoteClient) Query ¶
func (c *RemoteClient) Query(q Query, id ...string) (*RecordSet, error)
func (*RemoteClient) Submit ¶
func (c *RemoteClient) Submit(data []byte, contentType string, id ...string) error
Submit data for ingestion, where: data - is a byte array with actual data contentType - mime-type of the data (will be set in the header) id... - ID (or array of IDs) of time series for which the data is being submitted
type ResponsePack ¶
type ResponsePack struct { Pack senml.Pack Err error }
type SqlStorage ¶
type SqlStorage struct {
// contains filtered or unexported fields
}
SqlStorage implements a SqlDB storage client for HDS Data API
func NewSqlStorage ¶
func NewSqlStorage(conf common.DataConf) (storage *SqlStorage, disconnect_func func() error, err error)
func (*SqlStorage) Count ¶
func (s *SqlStorage) Count(ctx context.Context, q Query, series ...*registry.TimeSeries) (int, error)
func (*SqlStorage) CreateHandler ¶
func (s *SqlStorage) CreateHandler(ts registry.TimeSeries) error
CreateHandler handles the creation of a new TimeSeries
func (*SqlStorage) Delete ¶
func (s *SqlStorage) Delete(ctx context.Context, series []*registry.TimeSeries, from time.Time, to time.Time) (err error)
func (*SqlStorage) DeleteHandler ¶
func (s *SqlStorage) DeleteHandler(ts registry.TimeSeries) error
DeleteHandler handles deletion of a TimeSeries
func (*SqlStorage) Disconnect ¶
func (s *SqlStorage) Disconnect() error
func (*SqlStorage) QueryPage ¶
func (s *SqlStorage) QueryPage(ctx context.Context, q Query, series ...*registry.TimeSeries) (pack senml.Pack, total *int, err error)
func (*SqlStorage) QueryStream ¶
func (s *SqlStorage) QueryStream(ctx context.Context, q Query, sendFunc sendFunction, series ...*registry.TimeSeries) error
func (*SqlStorage) Submit ¶
func (s *SqlStorage) Submit(ctx context.Context, data map[string]senml.Pack, series map[string]*registry.TimeSeries) (err error)
func (*SqlStorage) UpdateHandler ¶
func (s *SqlStorage) UpdateHandler(oldDS registry.TimeSeries, newDS registry.TimeSeries) error
UpdateHandler handles updates of a TimeSeries
type Storage ¶
type Storage interface { // Adds data points for multiple time series // data is a map where keys are time series ids // series is a map where keys are time series ids Submit(ctx context.Context, data map[string]senml.Pack, series map[string]*registry.TimeSeries) error // Queries data for specified time series //QueryPage(q QueryPage, page, PerPage int, series ...*registry.TimeSeries) (senml.Pack, int, error) QueryPage(ctx context.Context, q Query, series ...*registry.TimeSeries) (pack senml.Pack, total *int, err error) QueryStream(ctx context.Context, q Query, sendFunc sendFunction, series ...*registry.TimeSeries) error Count(ctx context.Context, q Query, series ...*registry.TimeSeries) (total int, err error) // Delete the data within a given time range Delete(ctx context.Context, series []*registry.TimeSeries, from time.Time, to time.Time) (err error) // EventListener includes methods for event handling registry.EventListener }
Storage is an interface of a Data storage backend
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}