Documentation ¶
Index ¶
- Constants
- func Logger(ctx context.Context) *zap.Logger
- func MaxValue(values []float64) float64
- func NewSourceAggregatorConfig() *sourceAggregatorConfig
- type AssociatedAttribute
- type CsvMessageSource
- type DatabaseMessageSource
- type EmptySource
- type EvaluationError
- type Extracted
- type Extractor
- type FkLoRaLocation
- type FkLoRaReadings
- type FkLoRaStatus
- type FkLocationExtractor
- type FkReadingsExtractor
- type FkStatusExtractor
- type JqCache
- type MessageBatch
- type MessageSchema
- type MessageSchemaAttribute
- type MessageSchemaExtractor
- type MessageSchemaModule
- type MessageSchemaRegistration
- type MessageSchemaRepository
- func (rr *MessageSchemaRepository) QueryAllSchemas(ctx context.Context) ([]*MessageSchemaRegistration, error)
- func (rr *MessageSchemaRepository) QuerySchemas(ctx context.Context, batch *MessageBatch) (map[int32]*MessageSchemaRegistration, error)
- func (rr *MessageSchemaRepository) QuerySchemasPendingProcessing(ctx context.Context) ([]*MessageSchemaRegistration, error)
- func (rr *MessageSchemaRepository) StartProcessingSchema(ctx context.Context, schemaID int32) error
- type MessageSchemaSensor
- type MessageSchemaStation
- type MessageSource
- type MessagesRepository
- func (rr *MessagesRepository) QueryBatchBySchemaIDForProcessing(ctx context.Context, batch *MessageBatch, schemaID int32) error
- func (rr *MessagesRepository) QueryMessageForProcessing(ctx context.Context, batch *MessageBatch, messageID int64, verbose bool) error
- func (rr *MessagesRepository) ResumeOnMessage(ctx context.Context, batch *MessageBatch, messageID int64) error
- type ModelAdapter
- type ParsedAttribute
- type ParsedMessage
- type ParsedReading
- type ProcessSchema
- type ProcessSchemaHandler
- type SourceAggregator
- type WebHookMessage
- type WebHookMessageReceived
- type WebHookMessageReceivedHandler
- type WebHookService
- type WebHookStation
Constants ¶
View Source
const ( WebHookSourceID = int32(0) WebHookSensorPrefix = "wh" WebHookRecentWindowHours = 48 )
View Source
const (
AggregatingBatchSize = 100
)
View Source
const (
BatchSize = 1000
)
View Source
const (
ExtractedKey = "extracted"
)
Variables ¶
This section is empty.
Functions ¶
func NewSourceAggregatorConfig ¶
func NewSourceAggregatorConfig() *sourceAggregatorConfig
Types ¶
type AssociatedAttribute ¶
type AssociatedAttribute struct { Priority int32 Attribute *data.StationAttributeSlot }
type CsvMessageSource ¶
type CsvMessageSource struct {
// contains filtered or unexported fields
}
func NewCsvMessageSource ¶
func NewCsvMessageSource(path string, schemaID int32, verbose bool) *CsvMessageSource
func (*CsvMessageSource) NextBatch ¶
func (s *CsvMessageSource) NextBatch(ctx context.Context, batch *MessageBatch) error
type DatabaseMessageSource ¶
type DatabaseMessageSource struct {
// contains filtered or unexported fields
}
func (*DatabaseMessageSource) NextBatch ¶
func (s *DatabaseMessageSource) NextBatch(ctx context.Context, batch *MessageBatch) error
type EmptySource ¶
type EmptySource struct { }
func (*EmptySource) NextBatch ¶
func (s *EmptySource) NextBatch(ctx context.Context, batch *MessageBatch) error
type EvaluationError ¶
func (*EvaluationError) Error ¶
func (m *EvaluationError) Error() string
type Extractor ¶
type FkLoRaLocation ¶
type FkLoRaReadings ¶
type FkLoRaStatus ¶
type FkLocationExtractor ¶
type FkLocationExtractor struct { }
type FkReadingsExtractor ¶
type FkReadingsExtractor struct { }
type FkStatusExtractor ¶
type FkStatusExtractor struct { }
type MessageBatch ¶
type MessageBatch struct { Messages []*WebHookMessage Schemas map[int32]*MessageSchemaRegistration StartTime time.Time // contains filtered or unexported fields }
type MessageSchema ¶
type MessageSchema struct { Station *MessageSchemaStation `json:"station"` // Deprecated Stations []*MessageSchemaStation `json:"stations"` }
type MessageSchemaAttribute ¶
type MessageSchemaExtractor ¶
type MessageSchemaModule ¶
type MessageSchemaModule struct { Key string `json:"key"` Name *string `json:"name"` Sensors []*MessageSchemaSensor `json:"sensors"` }
func (*MessageSchemaModule) KeyPrefix ¶
func (m *MessageSchemaModule) KeyPrefix() string
type MessageSchemaRegistration ¶
type MessageSchemaRegistration struct { ID int32 `db:"id"` OwnerID int32 `db:"owner_id"` ProjectID *int32 `db:"project_id"` Name string `db:"name"` Token []byte `db:"token"` Body []byte `db:"body"` ReceivedAt *time.Time `db:"received_at"` ProcessedAt *time.Time `db:"processed_at"` ProcessInterval *int32 `db:"process_interval"` // contains filtered or unexported fields }
func (*MessageSchemaRegistration) Parse ¶
func (r *MessageSchemaRegistration) Parse() (*MessageSchema, error)
type MessageSchemaRepository ¶
type MessageSchemaRepository struct {
// contains filtered or unexported fields
}
func NewMessageSchemaRepository ¶
func NewMessageSchemaRepository(db *sqlxcache.DB) (rr *MessageSchemaRepository)
func (*MessageSchemaRepository) QueryAllSchemas ¶
func (rr *MessageSchemaRepository) QueryAllSchemas(ctx context.Context) ([]*MessageSchemaRegistration, error)
func (*MessageSchemaRepository) QuerySchemas ¶
func (rr *MessageSchemaRepository) QuerySchemas(ctx context.Context, batch *MessageBatch) (map[int32]*MessageSchemaRegistration, error)
func (*MessageSchemaRepository) QuerySchemasPendingProcessing ¶
func (rr *MessageSchemaRepository) QuerySchemasPendingProcessing(ctx context.Context) ([]*MessageSchemaRegistration, error)
func (*MessageSchemaRepository) StartProcessingSchema ¶
func (rr *MessageSchemaRepository) StartProcessingSchema(ctx context.Context, schemaID int32) error
type MessageSchemaSensor ¶
type MessageSchemaSensor struct { Key string `json:"key"` Name string `json:"name"` ConditionExpression string `json:"condition"` Expression string `json:"expression"` Battery bool `json:"battery"` Transient bool `json:"transient"` UnitOfMeasure *string `json:"units"` Filter *[]float64 `json:"filter"` Extractor *MessageSchemaExtractor `json:"extractor"` }
type MessageSchemaStation ¶
type MessageSchemaStation struct { Key string `json:"key"` Model string `json:"model"` Flatten bool `json:"flatten"` ConditionExpression string `json:"condition"` IdentifierExpression string `json:"identifier"` NameExpression string `json:"name"` ReceivedExpression string `json:"received"` Extractors []*MessageSchemaExtractor `json:"extractors"` Modules []*MessageSchemaModule `json:"modules"` Attributes []*MessageSchemaAttribute `json:"attributes"` }
type MessageSource ¶
type MessageSource interface {
NextBatch(ctx context.Context, batch *MessageBatch) error
}
type MessagesRepository ¶
type MessagesRepository struct {
// contains filtered or unexported fields
}
func NewMessagesRepository ¶
func NewMessagesRepository(db *sqlxcache.DB) (rr *MessagesRepository)
func (*MessagesRepository) QueryBatchBySchemaIDForProcessing ¶
func (rr *MessagesRepository) QueryBatchBySchemaIDForProcessing(ctx context.Context, batch *MessageBatch, schemaID int32) error
func (*MessagesRepository) QueryMessageForProcessing ¶
func (rr *MessagesRepository) QueryMessageForProcessing(ctx context.Context, batch *MessageBatch, messageID int64, verbose bool) error
func (*MessagesRepository) ResumeOnMessage ¶
func (rr *MessagesRepository) ResumeOnMessage(ctx context.Context, batch *MessageBatch, messageID int64) error
type ModelAdapter ¶
type ModelAdapter struct {
// contains filtered or unexported fields
}
func NewModelAdapter ¶
func NewModelAdapter(db *sqlxcache.DB) (m *ModelAdapter)
func (*ModelAdapter) Save ¶
func (m *ModelAdapter) Save(ctx context.Context, pm *ParsedMessage) (*WebHookStation, error)
type ParsedAttribute ¶
type ParsedMessage ¶
type ParsedMessage struct { Original *WebHookMessage Schema *MessageSchemaStation SchemaID int32 OwnerID int32 ProjectID *int32 DeviceID []byte ReceivedAt *time.Time Data []*ParsedReading DeviceName *string Attributes map[string]*ParsedAttribute }
type ParsedReading ¶
type ProcessSchema ¶
type ProcessSchema struct {
SchemaID int32 `json:"schema_id"`
}
type ProcessSchemaHandler ¶
type ProcessSchemaHandler struct {
// contains filtered or unexported fields
}
func NewProcessSchemaHandler ¶
func NewProcessSchemaHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher) *ProcessSchemaHandler
func (*ProcessSchemaHandler) Handle ¶
func (h *ProcessSchemaHandler) Handle(ctx context.Context, m *ProcessSchema) error
type SourceAggregator ¶
type SourceAggregator struct {
// contains filtered or unexported fields
}
func NewSourceAggregator ¶
func NewSourceAggregator(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, verbose, legacy bool) *SourceAggregator
func (*SourceAggregator) ProcessSource ¶
func (i *SourceAggregator) ProcessSource(ctx context.Context, source MessageSource, startTime time.Time) error
type WebHookMessage ¶
type WebHookMessage struct { ID int64 `db:"id" json:"id"` CreatedAt time.Time `db:"created_at" json:"created_at"` SchemaID *int32 `db:"schema_id" json:"schema_id"` Headers *string `db:"headers" json:"headers"` Body []byte `db:"body" json:"body"` }
func (*WebHookMessage) Parse ¶
func (m *WebHookMessage) Parse(ctx context.Context, cache *JqCache, schemas map[int32]*MessageSchemaRegistration) ([]*ParsedMessage, error)
type WebHookMessageReceived ¶
type WebHookMessageReceivedHandler ¶
type WebHookMessageReceivedHandler struct {
// contains filtered or unexported fields
}
func NewWebHookMessageReceivedHandler ¶
func NewWebHookMessageReceivedHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig, verbose bool) *WebHookMessageReceivedHandler
func (*WebHookMessageReceivedHandler) Handle ¶
func (h *WebHookMessageReceivedHandler) Handle(ctx context.Context, m *WebHookMessageReceived) error
type WebHookService ¶
type WebHookService struct {
// contains filtered or unexported fields
}
func NewWebHookService ¶
func NewWebHookService(ctx context.Context, options *common.ServiceOptions) *WebHookService
func (*WebHookService) Webhook ¶
func (c *WebHookService) Webhook(ctx context.Context, payload *whService.WebhookPayload, bodyReader io.ReadCloser) error
type WebHookStation ¶
type WebHookStation struct { Provision *data.Provision Configuration *data.StationConfiguration Station *data.Station Module *data.StationModule Sensors []*data.ModuleSensor SensorPrefix string Attributes map[string]*data.StationAttributeSlot Associated map[string]*AssociatedAttribute LastReadingTime *time.Time }
func (*WebHookStation) FindAttribute ¶
func (s *WebHookStation) FindAttribute(name string) *data.StationAttributeSlot
Click to show internal directories.
Click to hide internal directories.