handlers

package
v0.0.0-...-dca2783 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2023 License: BSD-3-Clause Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BatchSize = 1000
)

Variables

View Source
var (
	AggregateTimeGroupThresholds = map[string]int{
		"24h": 86400 * 2,
		"12h": 86400,
		"6h":  86400,
		"1h":  3600 * 4,
		"30m": 3600 * 2,
		"10m": 30 * 60,
		"1m":  2 * 60,
		"10s": 30,
	}
	AggregateIntervals = map[string]int32{
		"24h": 86400,
		"12h": 86400 / 2,
		"6h":  86400 / 4,
		"1h":  3600,
		"30m": 30 * 60,
		"10m": 10 * 60,
		"1m":  60,
		"10s": 10,
	}
)
View Source
var (
	AggregateNames = []string{"24h", "12h", "6h", "1h", "30m", "10m", "1m", "10s"}
)
View Source
var (
	EarliestPlausibleTime = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)
)

Functions

func Logger

func Logger(ctx context.Context) *zap.Logger

func MakeAggregateTableName

func MakeAggregateTableName(suffix string, name string) string

func NewDefaultAggregatorConfig

func NewDefaultAggregatorConfig() *defaultAggregatorConfig

func NewStationModelRecordHandler

func NewStationModelRecordHandler(db *sqlxcache.DB) *stationModelRecordHandler

Types

type AggregateSensorKey

type AggregateSensorKey struct {
	SensorKey string
	ModuleID  int64
}

type Aggregated

type Aggregated struct {
	Time          time.Time
	Location      []float64
	Values        map[AggregateSensorKey]float64
	NumberSamples int32
}

type AggregatingHandler

type AggregatingHandler struct {
	// contains filtered or unexported fields
}

func NewAggregatingHandler

func NewAggregatingHandler(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, tableSuffix string, completely, skipManual bool) *AggregatingHandler

func (*AggregatingHandler) OnData

func (*AggregatingHandler) OnDone

func (v *AggregatingHandler) OnDone(ctx context.Context) error

func (*AggregatingHandler) OnMeta

type AggregationFunction

type AggregationFunction interface {
	Apply(values []float64) (float64, error)
}

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

func NewAggregator

func NewAggregator(db *sqlxcache.DB, tableSuffix string, stationID int32, batchSize int, config AggregatorConfig) *Aggregator

func (*Aggregator) AddSample

func (v *Aggregator) AddSample(ctx context.Context, sampled time.Time, location []float64, sensorKey AggregateSensorKey, value float64) error

func (*Aggregator) ClearNumberSamples

func (v *Aggregator) ClearNumberSamples(ctx context.Context) error

func (*Aggregator) Close

func (v *Aggregator) Close(ctx context.Context) error

func (*Aggregator) DeleteEmptyAggregates

func (v *Aggregator) DeleteEmptyAggregates(ctx context.Context) error

func (*Aggregator) NextTime

func (v *Aggregator) NextTime(ctx context.Context, sampled time.Time) error

type AggregatorConfig

type AggregatorConfig interface {
	Apply(sensorKey AggregateSensorKey, values []float64) (float64, error)
}

type AverageFunction

type AverageFunction struct {
}

func (*AverageFunction) Apply

func (f *AverageFunction) Apply(values []float64) (float64, error)

type InterestingnessHandler

type InterestingnessHandler struct {
	// contains filtered or unexported fields
}

func NewInterestingnessHandler

func NewInterestingnessHandler(db *sqlxcache.DB) *InterestingnessHandler

func (*InterestingnessHandler) Close

func (*InterestingnessHandler) ConsiderReading

func (ih *InterestingnessHandler) ConsiderReading(ctx context.Context, r *data.IncomingReading) error

type MaximumFunction

type MaximumFunction struct {
}

func (*MaximumFunction) Apply

func (f *MaximumFunction) Apply(values []float64) (float64, error)

type SensorAndModulePosition

type SensorAndModulePosition struct {
	ConfigurationID int64  `db:"configuration_id"`
	SensorID        int64  `db:"sensor_id"`
	ModuleIndex     uint32 `db:"module_index"`
	SensorIndex     uint32 `db:"sensor_index"`
}

type TimeScaleRecord

type TimeScaleRecord struct {
	Time      time.Time
	StationID int32
	ModuleID  int64
	SensorID  int64
	Location  []float64
	Value     float64
}

type TsDBHandler

type TsDBHandler struct {
	// contains filtered or unexported fields
}

func NewTsDbHandler

func NewTsDbHandler(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, publisher jobs.MessagePublisher, completions *jobs.CompletionIDs) *TsDBHandler

func (*TsDBHandler) OnData

func (v *TsDBHandler) OnData(ctx context.Context, provision *data.Provision, rawData *pb.DataRecord, rawMeta *pb.DataRecord, db *data.DataRecord, meta *data.MetaRecord) error

func (*TsDBHandler) OnDone

func (v *TsDBHandler) OnDone(ctx context.Context) error

func (*TsDBHandler) OnMeta

func (v *TsDBHandler) OnMeta(ctx context.Context, provision *data.Provision, rawMeta *pb.DataRecord, meta *data.MetaRecord) error

type UpdateSensorValue

type UpdateSensorValue struct {
	ID    int64     `db:"id"`
	Value float64   `db:"reading_last"`
	Time  time.Time `db:"reading_time"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL