Documentation ¶
Index ¶
- Constants
- Variables
- func CreateMap(ctx context.Context, services *BackgroundServices) gue.WorkMap
- func Logger(ctx context.Context) *zap.Logger
- func NewMuxRecordHandler(handlers []RecordHandler) *muxRecordHandler
- func NewNoopRecordHandler() *noopRecordHandler
- func NotifyMentions(mentions []*Mention) []*data.Notification
- func ParseStationIDs(raw *string) []int32
- func Register(ctx context.Context, services *BackgroundServices, ...)
- func SignS3URL(svc *s3.S3, url string) (signed string, err error)
- type AggregateQueryParams
- type AggregateSummary
- type AsyncFileWriter
- type AsyncReaderFunc
- type AsyncWriterFunc
- type BackgroundServices
- type CanExport
- type CsvExporter
- func (e *CsvExporter) Export(ctx context.Context, urls []string) error
- func (e *CsvExporter) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, ...) error
- func (e *CsvExporter) OnDone(ctx context.Context) (err error)
- func (e *CsvExporter) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, ...) error
- func (e *CsvExporter) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, ...) error
- func (e *CsvExporter) Prepare(ctx context.Context, urls []string) error
- type DataQuerier
- func (dq *DataQuerier) GetIDs(ctx context.Context, mas []ModuleAndSensor) (*SensorDatabaseIDs, error)
- func (dq *DataQuerier) GetStationIDs(ctx context.Context, stationIDs []int32) (*SensorDatabaseIDs, error)
- func (dq *DataQuerier) QueryMeta(ctx context.Context, qp *QueryParams) (qm *QueryMeta, err error)
- func (dq *DataQuerier) QueryOuterValues(ctx context.Context, aqp *AggregateQueryParams) (rr []*DataRow, err error)
- type DataRow
- type DescribeStationLocationHandler
- type DirtyRange
- type ExportDataHandler
- type FileArchives
- type FkbHandler
- type FkbWalker
- type HandlerCollectionHandler
- func (v *HandlerCollectionHandler) OnData(ctx context.Context, provision *data.Provision, rawData *pb.DataRecord, ...) error
- func (v *HandlerCollectionHandler) OnDone(ctx context.Context) error
- func (v *HandlerCollectionHandler) OnMeta(ctx context.Context, provision *data.Provision, rawMeta *pb.DataRecord, ...) error
- type IngestStationHandler
- func (h *IngestStationHandler) IngestionCompleted(ctx context.Context, m *messages.IngestionCompleted, mc *jobs.MessageContext) error
- func (h *IngestStationHandler) IngestionFailed(ctx context.Context, m *messages.IngestionFailed, mc *jobs.MessageContext) error
- func (h *IngestStationHandler) Start(ctx context.Context, m *messages.IngestStation, mc *jobs.MessageContext) error
- type IngestionReceivedHandler
- type IngestionSaga
- type JsonLinesExporter
- func (e *JsonLinesExporter) Export(ctx context.Context, urls []string) error
- func (e *JsonLinesExporter) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, ...) error
- func (e *JsonLinesExporter) OnDone(ctx context.Context) (err error)
- func (e *JsonLinesExporter) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, ...) error
- func (e *JsonLinesExporter) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, ...) error
- func (e *JsonLinesExporter) Prepare(ctx context.Context, urls []string) error
- type Mention
- type MessageCollection
- type ModuleAndSensor
- type OnWalkProgress
- type OurTransportMessageFunc
- type ProgressReader
- type QueriedModuleID
- type QueryMeta
- type QueryParams
- type RawQueryParams
- type RecordAdder
- func (ra *RecordAdder) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, ...) error
- func (ra *RecordAdder) OnDone(ctx context.Context) (err error)
- func (ra *RecordAdder) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, ...) error
- func (ra *RecordAdder) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, ...) error
- func (ra *RecordAdder) WriteRecords(ctx context.Context, ingestion *data.Ingestion) (info *WriteInfo, err error)
- type RecordHandler
- type RefreshMaterializedViewsHandler
- type RefreshStationHandler
- type RefreshWindows
- type SensorDataBatchHandler
- type SensorDataModifiedHandler
- type SensorDatabaseIDs
- type SensorMeta
- type StationIngestionSaga
- type StationMeta
- type StationRefresher
- type UnmarshalFunc
- type WalkProgress
- type WalkRecordsInfo
- type WorkFailed
- type WriteInfo
Constants ¶
View Source
const CompactFieldSets = true
View Source
const (
MaximumDataRecordLength = 1024 * 10
)
View Source
const (
MaximumRetriesBeforeTrash = 3
)
View Source
const (
SecondsBetweenProgressUpdates = 1
)
Variables ¶
View Source
var (
ErrEmptyRefresh = errors.New("empty refresh")
)
Functions ¶
func CreateMap ¶
func CreateMap(ctx context.Context, services *BackgroundServices) gue.WorkMap
func NewMuxRecordHandler ¶
func NewMuxRecordHandler(handlers []RecordHandler) *muxRecordHandler
func NewNoopRecordHandler ¶
func NewNoopRecordHandler() *noopRecordHandler
func NotifyMentions ¶
func NotifyMentions(mentions []*Mention) []*data.Notification
func ParseStationIDs ¶
func Register ¶
func Register(ctx context.Context, services *BackgroundServices, work map[string]gue.WorkFunc, message interface{}, handler OurTransportMessageFunc)
Types ¶
type AggregateQueryParams ¶
type AggregateSummary ¶
type AggregateSummary struct { NumberRecords int64 `db:"number_records" json:"numberRecords"` Start *data.NumericWireTime `db:"start" json:"start"` End *data.NumericWireTime `db:"end" json:"end"` }
type AsyncFileWriter ¶
type AsyncFileWriter struct {
// contains filtered or unexported fields
}
func NewAsyncFileWriter ¶
func NewAsyncFileWriter(readFunc AsyncReaderFunc, writeFunc AsyncWriterFunc) *AsyncFileWriter
type BackgroundServices ¶
type BackgroundServices struct {
// contains filtered or unexported fields
}
func NewBackgroundServices ¶
func NewBackgroundServices(database *sqlxcache.DB, dbpool *pgxpool.Pool, metrics *logging.Metrics, fileArchives *FileArchives, que *gue.Client, timeScaleConfig *storage.TimeScaleDBConfig, locations *data.DescribeLocations) *BackgroundServices
type CsvExporter ¶
type CsvExporter struct {
// contains filtered or unexported fields
}
func NewCsvExporter ¶
func NewCsvExporter(files files.FileArchive, metrics *logging.Metrics, writer io.Writer, progress OnWalkProgress) (self *CsvExporter)
func (*CsvExporter) OnData ¶
func (e *CsvExporter) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, bytes []byte) error
func (*CsvExporter) OnMeta ¶
func (e *CsvExporter) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, bytes []byte) error
func (*CsvExporter) OnSignedMeta ¶
func (e *CsvExporter) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error
type DataQuerier ¶
type DataQuerier struct {
// contains filtered or unexported fields
}
func NewDataQuerier ¶
func NewDataQuerier(db *sqlxcache.DB) *DataQuerier
func (*DataQuerier) GetIDs ¶
func (dq *DataQuerier) GetIDs(ctx context.Context, mas []ModuleAndSensor) (*SensorDatabaseIDs, error)
func (*DataQuerier) GetStationIDs ¶
func (dq *DataQuerier) GetStationIDs(ctx context.Context, stationIDs []int32) (*SensorDatabaseIDs, error)
func (*DataQuerier) QueryMeta ¶
func (dq *DataQuerier) QueryMeta(ctx context.Context, qp *QueryParams) (qm *QueryMeta, err error)
func (*DataQuerier) QueryOuterValues ¶
func (dq *DataQuerier) QueryOuterValues(ctx context.Context, aqp *AggregateQueryParams) (rr []*DataRow, err error)
type DataRow ¶
type DataRow struct { Time data.NumericWireTime `db:"time" json:"time"` StationID *int32 `db:"station_id" json:"stationId,omitempty"` SensorID *int64 `db:"sensor_id" json:"sensorId,omitempty"` ModuleID *string `db:"module_id" json:"moduleId,omitempty"` Location *data.Location `db:"location" json:"location,omitempty"` Value *float64 `db:"value" json:"value,omitempty"` // Deprecated Id *int64 `db:"id" json:"-"` TimeGroup *int32 `db:"time_group" json:"-"` // TSDB BucketSamples *int32 `json:"-"` DataStart *time.Time `json:"-"` DataEnd *time.Time `json:"-"` AverageValue *float64 `json:"avg,omitempty"` MinimumValue *float64 `json:"min,omitempty"` MaximumValue *float64 `json:"max,omitempty"` LastValue *float64 `json:"last,omitempty"` }
func (*DataRow) CoerceNaNs ¶
func (row *DataRow) CoerceNaNs()
type DescribeStationLocationHandler ¶
type DescribeStationLocationHandler struct {
// contains filtered or unexported fields
}
func NewDescribeStationLocationHandler ¶
func NewDescribeStationLocationHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher, locations *data.DescribeLocations) *DescribeStationLocationHandler
func (*DescribeStationLocationHandler) Handle ¶
func (h *DescribeStationLocationHandler) Handle(ctx context.Context, m *messages.StationLocationUpdated, j *gue.Job) error
type DirtyRange ¶
type ExportDataHandler ¶
type ExportDataHandler struct {
// contains filtered or unexported fields
}
func NewExportDataHandler ¶
func NewExportDataHandler(db *sqlxcache.DB, files files.FileArchive, metrics *logging.Metrics) *ExportDataHandler
func (*ExportDataHandler) Handle ¶
func (h *ExportDataHandler) Handle(ctx context.Context, m *messages.ExportData) error
type FileArchives ¶
type FileArchives struct { Ingestion files.FileArchive Media files.FileArchive Exported files.FileArchive }
type FkbHandler ¶
type FkbHandler interface { OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error OnMeta(ctx context.Context, recordNumber int64, rawMeta *pb.DataRecord, bytes []byte) error OnData(ctx context.Context, rawData *pb.DataRecord, rawMeta *pb.DataRecord, bytes []byte) error OnDone(ctx context.Context) error }
type FkbWalker ¶
type FkbWalker struct {
// contains filtered or unexported fields
}
func NewFkbWalker ¶
func NewFkbWalker(files files.FileArchive, metrics *logging.Metrics, handler FkbHandler, progress OnWalkProgress, verbose bool) (ra *FkbWalker)
type HandlerCollectionHandler ¶
type HandlerCollectionHandler struct {
// contains filtered or unexported fields
}
func NewHandlerCollectionHandler ¶
func NewHandlerCollectionHandler(handlers []RecordHandler) *HandlerCollectionHandler
func (*HandlerCollectionHandler) OnData ¶
func (v *HandlerCollectionHandler) OnData(ctx context.Context, provision *data.Provision, rawData *pb.DataRecord, rawMeta *pb.DataRecord, db *data.DataRecord, meta *data.MetaRecord) error
func (*HandlerCollectionHandler) OnDone ¶
func (v *HandlerCollectionHandler) OnDone(ctx context.Context) error
func (*HandlerCollectionHandler) OnMeta ¶
func (v *HandlerCollectionHandler) OnMeta(ctx context.Context, provision *data.Provision, rawMeta *pb.DataRecord, meta *data.MetaRecord) error
type IngestStationHandler ¶
type IngestStationHandler struct {
// contains filtered or unexported fields
}
func NewIngestStationHandler ¶
func NewIngestStationHandler(db *sqlxcache.DB, dbpool *pgxpool.Pool, files files.FileArchive, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *IngestStationHandler
func (*IngestStationHandler) IngestionCompleted ¶
func (h *IngestStationHandler) IngestionCompleted(ctx context.Context, m *messages.IngestionCompleted, mc *jobs.MessageContext) error
func (*IngestStationHandler) IngestionFailed ¶
func (h *IngestStationHandler) IngestionFailed(ctx context.Context, m *messages.IngestionFailed, mc *jobs.MessageContext) error
func (*IngestStationHandler) Start ¶
func (h *IngestStationHandler) Start(ctx context.Context, m *messages.IngestStation, mc *jobs.MessageContext) error
type IngestionReceivedHandler ¶
type IngestionReceivedHandler struct {
// contains filtered or unexported fields
}
func NewIngestionReceivedHandler ¶
func NewIngestionReceivedHandler(db *sqlxcache.DB, dbpool *pgxpool.Pool, files files.FileArchive, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *IngestionReceivedHandler
func (*IngestionReceivedHandler) BatchCompleted ¶
func (h *IngestionReceivedHandler) BatchCompleted(ctx context.Context, m *messages.SensorDataBatchCommitted, mc *jobs.MessageContext) error
func (*IngestionReceivedHandler) Start ¶
func (h *IngestionReceivedHandler) Start(ctx context.Context, m *messages.IngestionReceived, mc *jobs.MessageContext) error
type IngestionSaga ¶
type IngestionSaga struct { QueuedID int64 `json:"queued_id"` UserID int32 `json:"user_id"` StationID *int32 `json:"station_id"` DataStart time.Time `json:"data_start"` DataEnd time.Time `json:"data_end"` Required map[string]bool `json:"required"` Completed map[string]bool `json:"completed"` }
func (*IngestionSaga) IsCompleted ¶
func (s *IngestionSaga) IsCompleted() bool
type JsonLinesExporter ¶
type JsonLinesExporter struct {
// contains filtered or unexported fields
}
func NewJsonLinesExporter ¶
func NewJsonLinesExporter(files files.FileArchive, metrics *logging.Metrics, writer io.Writer, progress OnWalkProgress) (self *JsonLinesExporter)
func (*JsonLinesExporter) Export ¶
func (e *JsonLinesExporter) Export(ctx context.Context, urls []string) error
func (*JsonLinesExporter) OnData ¶
func (e *JsonLinesExporter) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, bytes []byte) error
func (*JsonLinesExporter) OnDone ¶
func (e *JsonLinesExporter) OnDone(ctx context.Context) (err error)
func (*JsonLinesExporter) OnMeta ¶
func (e *JsonLinesExporter) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, bytes []byte) error
func (*JsonLinesExporter) OnSignedMeta ¶
func (e *JsonLinesExporter) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error
type Mention ¶
type MessageCollection ¶
func ReadLengthPrefixedCollection ¶
func ReadLengthPrefixedCollection(ctx context.Context, maximumMessageLength uint64, r io.Reader, f UnmarshalFunc) (pbs MessageCollection, totalBytesRead int, err error)
ReadLengthPrefixedCollection reads a collection of protocol buffer messages from the supplied reader. Each message is presumed prefixed by a 32 bit varint which represents the size of the ensuing message. The UnmarshalFunc argument is a supplied callback used to convert the raw bytes read as a message to the desired message type. The protocol buffer message collection is returned, along with any error arising. For more detailed information on this approach, see the official protocol buffer documentation https://developers.google.com/protocol-buffers/docs/techniques#streaming.
type ModuleAndSensor ¶
type OnWalkProgress ¶
type OnWalkProgress func(ctx context.Context, progress WalkProgress) error
type OurTransportMessageFunc ¶
type OurTransportMessageFunc func(ctx context.Context, j *gue.Job, services *BackgroundServices, tm *jobs.TransportMessage, mc *jobs.MessageContext) error
type ProgressReader ¶
type ProgressReader struct { io.ReadCloser // contains filtered or unexported fields }
func (*ProgressReader) Close ¶
func (r *ProgressReader) Close() error
type QueriedModuleID ¶
type QueryMeta ¶
type QueryMeta struct { Sensors map[int64]*SensorMeta Stations map[int32]*StationMeta }
type QueryParams ¶
type QueryParams struct { Start time.Time `json:"start"` End time.Time `json:"end"` Stations []int32 `json:"stations"` Sensors []ModuleAndSensor `json:"sensors"` Resolution int32 `json:"resolution"` Aggregate string `json:"aggregate"` Tail int32 `json:"tail"` Complete bool `json:"complete"` Backend string `json:"backend"` Eternity bool `json:"eternity"` BeginningOfTime bool `json:"beginning_of_time"` EndOfTime bool `json:"end_of_time"` }
func ExportQueryParams ¶
func ExportQueryParams(de *data.DataExport) (*QueryParams, error)
type RawQueryParams ¶
type RawQueryParams struct { Start *int64 `json:"start"` End *int64 `json:"end"` Resolution *int32 `json:"resolution"` Stations *string `json:"stations"` Sensors *string `json:"sensors"` Modules *string `json:"modules"` Aggregate *string `json:"aggregate"` Tail *int32 `json:"tail"` Complete *bool `json:"complete"` Backend *string `json:"backend"` }
func (*RawQueryParams) BuildQueryParams ¶
func (raw *RawQueryParams) BuildQueryParams() (qp *QueryParams, err error)
type RecordAdder ¶
type RecordAdder struct {
// contains filtered or unexported fields
}
func NewRecordAdder ¶
func NewRecordAdder(db *sqlxcache.DB, files files.FileArchive, metrics *logging.Metrics, handler RecordHandler, verbose bool, saveData bool) (ra *RecordAdder)
func (*RecordAdder) OnData ¶
func (ra *RecordAdder) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, bytes []byte) error
func (*RecordAdder) OnMeta ¶
func (ra *RecordAdder) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, bytes []byte) error
func (*RecordAdder) OnSignedMeta ¶
func (ra *RecordAdder) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error
func (*RecordAdder) WriteRecords ¶
type RecordHandler ¶
type RecordHandler interface { OnMeta(ctx context.Context, provision *data.Provision, rawMeta *pb.DataRecord, db *data.MetaRecord) error OnData(ctx context.Context, provision *data.Provision, rawData *pb.DataRecord, rawMeta *pb.DataRecord, db *data.DataRecord, meta *data.MetaRecord) error OnDone(ctx context.Context) error }
func NewAllHandlers ¶
func NewAllHandlers(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, publisher jobs.MessagePublisher, completions *jobs.CompletionIDs) RecordHandler
type RefreshMaterializedViewsHandler ¶
type RefreshMaterializedViewsHandler struct {
// contains filtered or unexported fields
}
func NewRefreshMaterializedViewsHandler ¶
func NewRefreshMaterializedViewsHandler(metrics *logging.Metrics, tsConfig *storage.TimeScaleDBConfig) *RefreshMaterializedViewsHandler
func (*RefreshMaterializedViewsHandler) RefreshView ¶
func (h *RefreshMaterializedViewsHandler) RefreshView(ctx context.Context, m *messages.RefreshMaterializedView, mc *jobs.MessageContext) error
func (*RefreshMaterializedViewsHandler) Start ¶
func (h *RefreshMaterializedViewsHandler) Start(ctx context.Context, m *messages.RefreshAllMaterializedViews, mc *jobs.MessageContext) error
type RefreshStationHandler ¶
type RefreshStationHandler struct {
// contains filtered or unexported fields
}
func NewRefreshStationHandler ¶
func NewRefreshStationHandler(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig) *RefreshStationHandler
func (*RefreshStationHandler) Handle ¶
func (h *RefreshStationHandler) Handle(ctx context.Context, m *messages.RefreshStation) error
type RefreshWindows ¶
type RefreshWindows struct {
// contains filtered or unexported fields
}
func NewRefreshWindows ¶
func NewRefreshWindows(pool *pgxpool.Pool) *RefreshWindows
func (*RefreshWindows) QueryForDirty ¶
func (rw *RefreshWindows) QueryForDirty(ctx context.Context) ([]*DirtyRange, error)
type SensorDataBatchHandler ¶
type SensorDataBatchHandler struct {
// contains filtered or unexported fields
}
func NewSensorDataBatchHandler ¶
func NewSensorDataBatchHandler(metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *SensorDataBatchHandler
func (*SensorDataBatchHandler) Handle ¶
func (h *SensorDataBatchHandler) Handle(ctx context.Context, m *messages.SensorDataBatch, j *gue.Job, mc *jobs.MessageContext) error
type SensorDataModifiedHandler ¶
type SensorDataModifiedHandler struct {
// contains filtered or unexported fields
}
func NewSensorDataModifiedHandler ¶
func NewSensorDataModifiedHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *SensorDataModifiedHandler
func (*SensorDataModifiedHandler) Handle ¶
func (h *SensorDataModifiedHandler) Handle(ctx context.Context, m *messages.SensorDataModified, j *gue.Job) error
type SensorDatabaseIDs ¶
type SensorMeta ¶
type StationIngestionSaga ¶
type StationIngestionSaga struct { UserID int32 `json:"user_id"` StationID int32 `json:"station_id"` Ingestions []int64 `json:"ingestions"` Required map[int64]int64 `json:"required"` Completed map[int64]bool `json:"completed"` }
func (*StationIngestionSaga) HasMoreIngestions ¶
func (s *StationIngestionSaga) HasMoreIngestions() bool
func (*StationIngestionSaga) IsCompleted ¶
func (s *StationIngestionSaga) IsCompleted() bool
func (*StationIngestionSaga) NextIngestionID ¶
func (s *StationIngestionSaga) NextIngestionID() int64
type StationMeta ¶
type StationRefresher ¶
type StationRefresher struct {
// contains filtered or unexported fields
}
func NewStationRefresher ¶
func NewStationRefresher(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, tableSuffix string) (sr *StationRefresher, err error)
type WalkProgress ¶
type WalkProgress struct {
// contains filtered or unexported fields
}
type WalkRecordsInfo ¶
type WorkFailed ¶
type WorkFailed struct {
Work *jobs.TransportMessage `json:"work"`
}
Source Files ¶
- all.go
- async_file_writer.go
- describe_station_location_handler.go
- export_data_handler.go
- fkb_walker.go
- gostream.go
- handlers.go
- ingest_station_handler.go
- ingestion_received_handler.go
- logging.go
- map.go
- mentions.go
- record_adder.go
- refresh_materialized_views_handler.go
- refresh_station_handler.go
- sensor_data_batch_handler.go
- sensor_data_modified_handler.go
- sensors.go
- station_refresher.go
- utilities.go
Click to show internal directories.
Click to hide internal directories.