Documentation ¶
Overview ¶
Package database implements TimescaleDB-backed time series data storage.
Architecture:
- Uses TimescaleDB for optimized time series storage and querying
- Implements automatic partitioning for efficient data management
- Provides built-in support for time-based aggregations
- Designed for horizontal scalability
Example usage:
repo, err := NewPostgresRepo("postgres://user:pass@localhost:5432/db") if err != nil { log.Fatal(err) } defer repo.Close() // Query time series data data, err := repo.Query(ctx, start, end, "1h", "AVG")
Index ¶
- type PostgresRepo
- func (s *PostgresRepo) BatchInsertTimeSeriesData(ctx context.Context, data []models.TimeSeriesData) error
- func (s *PostgresRepo) Close() error
- func (s *PostgresRepo) InsertTimeSeriesData(timestamp time.Time, value float64) error
- func (s *PostgresRepo) Query(ctx context.Context, start, end time.Time, window string, aggregation string) ([]models.TimeSeriesData, error)
- func (s *PostgresRepo) QueryTimeSeriesData(ctx context.Context, start, end time.Time, window string, aggregation string) ([]models.TimeSeriesData, error)
- type TimeSeriesRepository
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PostgresRepo ¶
type PostgresRepo struct {
// contains filtered or unexported fields
}
PostgresRepo implements TimeSeriesRepository using TimescaleDB.
Features:
- Automatic data partitioning by time
- Optimized time-based queries
- Transaction support for batch operations
- Connection pooling
Internal implementation uses TimescaleDB's hypertables for:
- Automatic chunk management
- Parallel query execution
- Time-bucket optimization
func NewPostgresRepo ¶
func NewPostgresRepo(connStr string) (*PostgresRepo, error)
NewPostgresRepo creates and initializes a new PostgresRepo.
The connection string should be in the format: "postgres://username:password@host:port/dbname?sslmode=disable"
The function will:
- Establish database connection
- Verify connectivity
- Initialize connection pool
Returns:
- *PostgresRepo: Initialized repository
- error: Connection or initialization error
func (*PostgresRepo) BatchInsertTimeSeriesData ¶
func (s *PostgresRepo) BatchInsertTimeSeriesData(ctx context.Context, data []models.TimeSeriesData) error
BatchInsertTimeSeriesData performs bulk data insertion.
The operation is atomic - either all data points are inserted or none. Uses prepared statements and transactions for optimal performance.
Parameters:
- ctx: Context for cancellation and timeout
- data: Slice of time series data points to insert
Transaction Flow:
- Begin transaction
- Prepare statement
- Execute batch inserts
- Commit or rollback
Returns error if:
- Transaction fails to start
- Statement preparation fails
- Any insert fails
- Commit fails
func (*PostgresRepo) Close ¶
func (s *PostgresRepo) Close() error
Close releases all database resources.
Should be called when the repository is no longer needed. Typically deferred after repository creation.
func (*PostgresRepo) InsertTimeSeriesData ¶
func (s *PostgresRepo) InsertTimeSeriesData(timestamp time.Time, value float64) error
func (*PostgresRepo) Query ¶
func (s *PostgresRepo) Query( ctx context.Context, start, end time.Time, window string, aggregation string, ) ([]models.TimeSeriesData, error)
Query implements the TimeSeriesRepository interface. Delegates to QueryTimeSeriesData for actual implementation.
See QueryTimeSeriesData for detailed documentation.
func (*PostgresRepo) QueryTimeSeriesData ¶
func (s *PostgresRepo) QueryTimeSeriesData( ctx context.Context, start, end time.Time, window string, aggregation string, ) ([]models.TimeSeriesData, error)
QueryTimeSeriesData retrieves and aggregates time series data.
Parameters:
- ctx: Context for cancellation and timeout
- start: Beginning of time range (inclusive)
- end: End of time range (exclusive)
- window: Time bucket size ("1m", "5m", "1h", "1d")
- aggregation: Aggregation function ("MIN", "MAX", "AVG", "SUM")
SQL Implementation:
Uses time_bucket() from TimescaleDB for efficient time-based grouping Implements dynamic aggregation selection via CASE statement
Returns:
- []models.TimeSeriesData: Array of aggregated data points
- error: Query execution error or invalid parameters
Example:
data, err := repo.QueryTimeSeriesData(ctx, time.Now().Add(-24*time.Hour), // start time.Now(), // end "1h", // window "AVG", // aggregation )
type TimeSeriesRepository ¶
type TimeSeriesRepository interface { // InsertTimeSeriesData inserts a single time series data point. // Returns an error if the insertion fails. InsertTimeSeriesData(timestamp time.Time, value float64) error // Query retrieves time series data within the specified time range. // Supports different time windows (1m, 5m, 1h, 1d) and aggregation methods (MIN, MAX, AVG, SUM). // Returns the aggregated data points and any error encountered. Query(ctx context.Context, start, end time.Time, window string, aggregation string) ([]models.TimeSeriesData, error) // BatchInsertTimeSeriesData inserts multiple time series data points in a single transaction. // This method is optimized for bulk insertions by reducing database round trips. // Returns an error if any part of the batch insertion fails. BatchInsertTimeSeriesData(ctx context.Context, data []models.TimeSeriesData) error // Close releases any resources held by the repository. // Should be called when the repository is no longer needed. Close() error }
TimeSeriesRepository defines the interface for time series operations.
This interface provides methods for:
- Single and batch data insertion
- Time series querying with aggregation
- Resource cleanup
Supported aggregations:
- MIN: Minimum value in time window
- MAX: Maximum value in time window
- AVG: Average value in time window
- SUM: Sum of values in time window
Supported time windows:
- 1m: One minute
- 5m: Five minutes
- 1h: One hour
- 1d: One day