webhook

package
v0.0.0-...-dca2783 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2023 License: BSD-3-Clause Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WebHookSourceID          = int32(0)
	WebHookSensorPrefix      = "wh"
	WebHookRecentWindowHours = 48
)
View Source
const (
	AggregatingBatchSize = 100
)
View Source
const (
	BatchSize = 1000
)
View Source
const (
	ExtractedKey = "extracted"
)

Variables

This section is empty.

Functions

func Logger

func Logger(ctx context.Context) *zap.Logger

func MaxValue

func MaxValue(values []float64) float64

func NewSourceAggregatorConfig

func NewSourceAggregatorConfig() *sourceAggregatorConfig

Types

type AssociatedAttribute

type AssociatedAttribute struct {
	Priority  int32
	Attribute *data.StationAttributeSlot
}

type CsvMessageSource

type CsvMessageSource struct {
	// contains filtered or unexported fields
}

func NewCsvMessageSource

func NewCsvMessageSource(path string, schemaID int32, verbose bool) *CsvMessageSource

func (*CsvMessageSource) NextBatch

func (s *CsvMessageSource) NextBatch(ctx context.Context, batch *MessageBatch) error

type DatabaseMessageSource

type DatabaseMessageSource struct {
	// contains filtered or unexported fields
}

func NewDatabaseMessageSource

func NewDatabaseMessageSource(db *sqlxcache.DB, schemaID int32, messageID int64, resume bool) *DatabaseMessageSource

func (*DatabaseMessageSource) NextBatch

func (s *DatabaseMessageSource) NextBatch(ctx context.Context, batch *MessageBatch) error

type EmptySource

type EmptySource struct {
}

func (*EmptySource) NextBatch

func (s *EmptySource) NextBatch(ctx context.Context, batch *MessageBatch) error

type EvaluationError

type EvaluationError struct {
	Query    string
	NoReturn bool
}

func (*EvaluationError) Error

func (m *EvaluationError) Error() string

type Extracted

type Extracted struct {
}

type Extractor

type Extractor interface {
	Extract(ctx context.Context, complete interface{}, source interface{}) (interface{}, error)
}

func FindExtractor

func FindExtractor(ctxt context.Context, name string) (Extractor, error)

type FkLoRaLocation

type FkLoRaLocation struct {
	Longitude float32 `json:"longitude"`
	Latitude  float32 `json:"latitude"`
	Altitude  float32 `json:"altitude"`
}

type FkLoRaReadings

type FkLoRaReadings struct {
	Packet   byte      `json:"packet"`
	Age      uint64    `json:"age"`
	Block    uint64    `json:"block"`
	Readings []float32 `json:"readings"`
}

type FkLoRaStatus

type FkLoRaStatus struct {
	BatteryBusVoltage float32 `json:"battery_bus_v"`
	BatteryMinV       float32 `json:"battery_min_v"`
	BatteryMaxV       float32 `json:"battery_max_v"`
	SolarMinV         float32 `json:"solar_min_v"`
	SolarMaxV         float32 `json:"solar_max_v"`
}

type FkLocationExtractor

type FkLocationExtractor struct {
}

func (*FkLocationExtractor) Extract

func (e *FkLocationExtractor) Extract(ctx context.Context, complete interface{}, source interface{}) (interface{}, error)

type FkReadingsExtractor

type FkReadingsExtractor struct {
}

func (*FkReadingsExtractor) Extract

func (e *FkReadingsExtractor) Extract(ctx context.Context, complete interface{}, source interface{}) (interface{}, error)

type FkStatusExtractor

type FkStatusExtractor struct {
}

func (*FkStatusExtractor) Extract

func (e *FkStatusExtractor) Extract(ctx context.Context, complete interface{}, source interface{}) (interface{}, error)

type JqCache

type JqCache struct {
	// contains filtered or unexported fields
}

type MessageBatch

type MessageBatch struct {
	Messages  []*WebHookMessage
	Schemas   map[int32]*MessageSchemaRegistration
	StartTime time.Time
	// contains filtered or unexported fields
}

type MessageSchema

type MessageSchema struct {
	Station  *MessageSchemaStation   `json:"station"` // Deprecated
	Stations []*MessageSchemaStation `json:"stations"`
}

type MessageSchemaAttribute

type MessageSchemaAttribute struct {
	Name       string `json:"name"`
	Expression string `json:"expression"`
	Location   bool   `json:"location"`
	Associated bool   `json:"associated"`
	Status     bool   `json:"status"`
	Hidden     bool   `json:"hidden"`
}

type MessageSchemaExtractor

type MessageSchemaExtractor struct {
	Source     string `json:"source"`
	Type       string `json:"type"`
	Expression string `json:"expression"`
}

type MessageSchemaModule

type MessageSchemaModule struct {
	Key     string                 `json:"key"`
	Name    *string                `json:"name"`
	Sensors []*MessageSchemaSensor `json:"sensors"`
}

func (*MessageSchemaModule) KeyPrefix

func (m *MessageSchemaModule) KeyPrefix() string

type MessageSchemaRegistration

type MessageSchemaRegistration struct {
	ID              int32      `db:"id"`
	OwnerID         int32      `db:"owner_id"`
	ProjectID       *int32     `db:"project_id"`
	Name            string     `db:"name"`
	Token           []byte     `db:"token"`
	Body            []byte     `db:"body"`
	ReceivedAt      *time.Time `db:"received_at"`
	ProcessedAt     *time.Time `db:"processed_at"`
	ProcessInterval *int32     `db:"process_interval"`
	// contains filtered or unexported fields
}

func (*MessageSchemaRegistration) Parse

type MessageSchemaRepository

type MessageSchemaRepository struct {
	// contains filtered or unexported fields
}

func NewMessageSchemaRepository

func NewMessageSchemaRepository(db *sqlxcache.DB) (rr *MessageSchemaRepository)

func (*MessageSchemaRepository) QueryAllSchemas

func (*MessageSchemaRepository) QuerySchemas

func (*MessageSchemaRepository) QuerySchemasPendingProcessing

func (rr *MessageSchemaRepository) QuerySchemasPendingProcessing(ctx context.Context) ([]*MessageSchemaRegistration, error)

func (*MessageSchemaRepository) StartProcessingSchema

func (rr *MessageSchemaRepository) StartProcessingSchema(ctx context.Context, schemaID int32) error

type MessageSchemaSensor

type MessageSchemaSensor struct {
	Key                 string                  `json:"key"`
	Name                string                  `json:"name"`
	ConditionExpression string                  `json:"condition"`
	Expression          string                  `json:"expression"`
	Battery             bool                    `json:"battery"`
	Transient           bool                    `json:"transient"`
	UnitOfMeasure       *string                 `json:"units"`
	Filter              *[]float64              `json:"filter"`
	Extractor           *MessageSchemaExtractor `json:"extractor"`
}

type MessageSchemaStation

type MessageSchemaStation struct {
	Key                  string                    `json:"key"`
	Model                string                    `json:"model"`
	Flatten              bool                      `json:"flatten"`
	ConditionExpression  string                    `json:"condition"`
	IdentifierExpression string                    `json:"identifier"`
	NameExpression       string                    `json:"name"`
	ReceivedExpression   string                    `json:"received"`
	Extractors           []*MessageSchemaExtractor `json:"extractors"`
	Modules              []*MessageSchemaModule    `json:"modules"`
	Attributes           []*MessageSchemaAttribute `json:"attributes"`
}

type MessageSource

type MessageSource interface {
	NextBatch(ctx context.Context, batch *MessageBatch) error
}

type MessagesRepository

type MessagesRepository struct {
	// contains filtered or unexported fields
}

func NewMessagesRepository

func NewMessagesRepository(db *sqlxcache.DB) (rr *MessagesRepository)

func (*MessagesRepository) QueryBatchBySchemaIDForProcessing

func (rr *MessagesRepository) QueryBatchBySchemaIDForProcessing(ctx context.Context, batch *MessageBatch, schemaID int32) error

func (*MessagesRepository) QueryMessageForProcessing

func (rr *MessagesRepository) QueryMessageForProcessing(ctx context.Context, batch *MessageBatch, messageID int64, verbose bool) error

func (*MessagesRepository) ResumeOnMessage

func (rr *MessagesRepository) ResumeOnMessage(ctx context.Context, batch *MessageBatch, messageID int64) error

type ModelAdapter

type ModelAdapter struct {
	// contains filtered or unexported fields
}

func NewModelAdapter

func NewModelAdapter(db *sqlxcache.DB) (m *ModelAdapter)

func (*ModelAdapter) Close

func (m *ModelAdapter) Close(ctx context.Context) error

func (*ModelAdapter) Save

type ParsedAttribute

type ParsedAttribute struct {
	JSONValue  interface{} `json:"json_value"`
	Location   bool        `json:"location"`
	Associated bool        `json:"associated"`
	Status     bool        `json:"status"`
	Hidden     bool        `json:"hidden"`
}

type ParsedMessage

type ParsedMessage struct {
	Original   *WebHookMessage
	Schema     *MessageSchemaStation
	SchemaID   int32
	OwnerID    int32
	ProjectID  *int32
	DeviceID   []byte
	ReceivedAt *time.Time
	Data       []*ParsedReading
	DeviceName *string
	Attributes map[string]*ParsedAttribute
}

type ParsedReading

type ParsedReading struct {
	Key             string  `json:"key"`
	ModuleKeyPrefix string  `json:"module_key_prefix"`
	FullSensorKey   string  `json:"full_sensor_key"`
	Value           float64 `json:"value"`
	Battery         bool    `json:"battery"`
	Transient       bool    `json:"transient"`
}

type ProcessSchema

type ProcessSchema struct {
	SchemaID int32 `json:"schema_id"`
}

type ProcessSchemaHandler

type ProcessSchemaHandler struct {
	// contains filtered or unexported fields
}

func NewProcessSchemaHandler

func NewProcessSchemaHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher) *ProcessSchemaHandler

func (*ProcessSchemaHandler) Handle

type SourceAggregator

type SourceAggregator struct {
	// contains filtered or unexported fields
}

func NewSourceAggregator

func NewSourceAggregator(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, verbose, legacy bool) *SourceAggregator

func (*SourceAggregator) ProcessSource

func (i *SourceAggregator) ProcessSource(ctx context.Context, source MessageSource, startTime time.Time) error

type WebHookMessage

type WebHookMessage struct {
	ID        int64     `db:"id" json:"id"`
	CreatedAt time.Time `db:"created_at" json:"created_at"`
	SchemaID  *int32    `db:"schema_id" json:"schema_id"`
	Headers   *string   `db:"headers" json:"headers"`
	Body      []byte    `db:"body" json:"body"`
}

func (*WebHookMessage) Parse

func (m *WebHookMessage) Parse(ctx context.Context, cache *JqCache, schemas map[int32]*MessageSchemaRegistration) ([]*ParsedMessage, error)

type WebHookMessageReceived

type WebHookMessageReceived struct {
	SchemaID  int32 `json:"ttn_schema_id"`
	MessageID int64 `json:"ttn_message_id"`
}

type WebHookMessageReceivedHandler

type WebHookMessageReceivedHandler struct {
	// contains filtered or unexported fields
}

func NewWebHookMessageReceivedHandler

func NewWebHookMessageReceivedHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig, verbose bool) *WebHookMessageReceivedHandler

func (*WebHookMessageReceivedHandler) Handle

type WebHookService

type WebHookService struct {
	// contains filtered or unexported fields
}

func NewWebHookService

func NewWebHookService(ctx context.Context, options *common.ServiceOptions) *WebHookService

func (*WebHookService) JWTAuth

func (s *WebHookService) JWTAuth(ctx context.Context, token string, scheme *security.JWTScheme) (context.Context, error)

func (*WebHookService) Webhook

func (c *WebHookService) Webhook(ctx context.Context, payload *whService.WebhookPayload, bodyReader io.ReadCloser) error

type WebHookStation

type WebHookStation struct {
	Provision       *data.Provision
	Configuration   *data.StationConfiguration
	Station         *data.Station
	Module          *data.StationModule
	Sensors         []*data.ModuleSensor
	SensorPrefix    string
	Attributes      map[string]*data.StationAttributeSlot
	Associated      map[string]*AssociatedAttribute
	LastReadingTime *time.Time
}

func (*WebHookStation) FindAttribute

func (s *WebHookStation) FindAttribute(name string) *data.StationAttributeSlot

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL