database

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package database implements TimescaleDB-backed time series data storage.

Architecture:

  • Uses TimescaleDB for optimized time series storage and querying
  • Implements automatic partitioning for efficient data management
  • Provides built-in support for time-based aggregations
  • Designed for horizontal scalability

Example usage:

repo, err := NewPostgresRepo("postgres://user:pass@localhost:5432/db")
if err != nil {
    log.Fatal(err)
}
defer repo.Close()

// Query time series data
data, err := repo.Query(ctx, start, end, "1h", "AVG")

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PostgresRepo

type PostgresRepo struct {
	// contains filtered or unexported fields
}

PostgresRepo implements TimeSeriesRepository using TimescaleDB.

Features:

  • Automatic data partitioning by time
  • Optimized time-based queries
  • Transaction support for batch operations
  • Connection pooling

Internal implementation uses TimescaleDB's hypertables for:

  • Automatic chunk management
  • Parallel query execution
  • Time-bucket optimization

func NewPostgresRepo

func NewPostgresRepo(connStr string) (*PostgresRepo, error)

NewPostgresRepo creates and initializes a new PostgresRepo.

The connection string should be in the format: "postgres://username:password@host:port/dbname?sslmode=disable"

The function will:

  1. Establish database connection
  2. Verify connectivity
  3. Initialize connection pool

Returns:

  • *PostgresRepo: Initialized repository
  • error: Connection or initialization error

func (*PostgresRepo) BatchInsertTimeSeriesData

func (s *PostgresRepo) BatchInsertTimeSeriesData(ctx context.Context, data []models.TimeSeriesData) error

BatchInsertTimeSeriesData performs bulk data insertion.

The operation is atomic - either all data points are inserted or none. Uses prepared statements and transactions for optimal performance.

Parameters:

  • ctx: Context for cancellation and timeout
  • data: Slice of time series data points to insert

Transaction Flow:

  1. Begin transaction
  2. Prepare statement
  3. Execute batch inserts
  4. Commit or rollback

Returns error if:

  • Transaction fails to start
  • Statement preparation fails
  • Any insert fails
  • Commit fails

func (*PostgresRepo) Close

func (s *PostgresRepo) Close() error

Close releases all database resources.

Should be called when the repository is no longer needed. Typically deferred after repository creation.

func (*PostgresRepo) InsertTimeSeriesData

func (s *PostgresRepo) InsertTimeSeriesData(timestamp time.Time, value float64) error

func (*PostgresRepo) Query

func (s *PostgresRepo) Query(
	ctx context.Context,
	start, end time.Time,
	window string,
	aggregation string,
) ([]models.TimeSeriesData, error)

Query implements the TimeSeriesRepository interface. Delegates to QueryTimeSeriesData for actual implementation.

See QueryTimeSeriesData for detailed documentation.

func (*PostgresRepo) QueryTimeSeriesData

func (s *PostgresRepo) QueryTimeSeriesData(
	ctx context.Context,
	start, end time.Time,
	window string,
	aggregation string,
) ([]models.TimeSeriesData, error)

QueryTimeSeriesData retrieves and aggregates time series data.

Parameters:

  • ctx: Context for cancellation and timeout
  • start: Beginning of time range (inclusive)
  • end: End of time range (exclusive)
  • window: Time bucket size ("1m", "5m", "1h", "1d")
  • aggregation: Aggregation function ("MIN", "MAX", "AVG", "SUM")

SQL Implementation:

Uses time_bucket() from TimescaleDB for efficient time-based grouping
Implements dynamic aggregation selection via CASE statement

Returns:

  • []models.TimeSeriesData: Array of aggregated data points
  • error: Query execution error or invalid parameters

Example:

data, err := repo.QueryTimeSeriesData(ctx,
    time.Now().Add(-24*time.Hour), // start
    time.Now(),                    // end
    "1h",                          // window
    "AVG",                         // aggregation
)

type TimeSeriesRepository

type TimeSeriesRepository interface {
	// InsertTimeSeriesData inserts a single time series data point.
	// Returns an error if the insertion fails.
	InsertTimeSeriesData(timestamp time.Time, value float64) error

	// Query retrieves time series data within the specified time range.
	// Supports different time windows (1m, 5m, 1h, 1d) and aggregation methods (MIN, MAX, AVG, SUM).
	// Returns the aggregated data points and any error encountered.
	Query(ctx context.Context, start, end time.Time, window string, aggregation string) ([]models.TimeSeriesData, error)

	// BatchInsertTimeSeriesData inserts multiple time series data points in a single transaction.
	// This method is optimized for bulk insertions by reducing database round trips.
	// Returns an error if any part of the batch insertion fails.
	BatchInsertTimeSeriesData(ctx context.Context, data []models.TimeSeriesData) error

	// Close releases any resources held by the repository.
	// Should be called when the repository is no longer needed.
	Close() error
}

TimeSeriesRepository defines the interface for time series operations.

This interface provides methods for:

  • Single and batch data insertion
  • Time series querying with aggregation
  • Resource cleanup

Supported aggregations:

  • MIN: Minimum value in time window
  • MAX: Maximum value in time window
  • AVG: Average value in time window
  • SUM: Sum of values in time window

Supported time windows:

  • 1m: One minute
  • 5m: Five minutes
  • 1h: One hour
  • 1d: One day

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL