Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrNoDataPoints = errors.New("no data points found") //数据不存在 ErrNoRowsData = errors.New("no rows given") // row empty ErrUnknown = "UNKNOWN" )
Functions ¶
func NewMemoryPartition ¶
func NewMemoryPartition(partitionDuration time.Duration, precision TimestampPrecision) partition
Types ¶
type Option ¶
type Option func(*Storage)
func WithLogger ¶
Defaults to a logger implementation that does nothing.
func WithPartitionDuration ¶
Defaults to 5min
func WithTimestampPrecision ¶
func WithTimestampPrecision(precision TimestampPrecision) Option
Defaults to Nanoseconds
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
memory data tree. just for data code list │ │ Read Write │ │ │ V │ ┌───────────────────┐ max: 1615010800 ├─────> Memory Partition │ └───────────────────┘ min: 1615007201 │ │ ┌───────────────────┐ max: 1615007200 ├─────> Memory Partition │ └───────────────────┘ min: 1615003601 │ │ ┌───────────────────┐ max: 1615003600 └─────> Memory Partition └───────────────────┘ min: 1615000000
func (*Storage) InsertRows ¶
Example ¶
stg, err := NewStorage( WithTimestampPrecision(Seconds), WithWriteTimeout(15*time.Second), WithRetention(1*time.Hour), WithLogger(logging), ) if err != nil { panic(err) } defer func() { if err := stg.Close(); err != nil { panic(err) } }() err = stg.InsertRows([]Row{ Row{ Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000000, Value: 0.1}, }, }) if err != nil { panic(err) } points, err := stg.Select("metric1", nil, 1600000000, 1600000001) if err != nil { panic(err) } for _, p := range points { fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value) }
Output: timestamp: 1600000000, value: 0.1
Example (Concurrent) ¶
stg, err := NewStorage( WithTimestampPrecision(Seconds), ) if err != nil { panic(err) } defer stg.Close() // First insert in order to ensure min timestamp if err := stg.InsertRows([]Row{ {Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000000}}, }); err != nil { panic(err) } var wg sync.WaitGroup for i := int64(1600000001); i < 1600000100; i++ { wg.Add(1) go func(timestamp int64) { if err := stg.InsertRows([]Row{ {Name: "metric1", DataPoint: DataPoint{Timestamp: timestamp}}, }); err != nil { panic(err) } wg.Done() }(i) } wg.Wait() points, err := stg.Select("metric1", nil, 1600000000, 1600000100) if err != nil { panic(err) } for _, p := range points { fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value) }
Output:
func (*Storage) Select ¶
Example (From_memory) ¶
tmpDir, err := os.MkdirTemp("", "storage-example") if err != nil { panic(err) } defer os.RemoveAll(tmpDir) stg, err := NewStorage( WithPartitionDuration(2*time.Hour), WithTimestampPrecision(Seconds), WithWriteTimeout(15*time.Second), WithRetention(1*time.Hour), WithLogger(logging), ) if err != nil { panic(err) } defer func() { if err := stg.Close(); err != nil { panic(err) } }() // Ingest data points of metric1 for timestamp := int64(1600000000); timestamp < 1600000050; timestamp++ { err := stg.InsertRows([]Row{ {Name: "metric1", DataPoint: DataPoint{Timestamp: timestamp, Value: 0.1}}, }) if err != nil { panic(err) } } // Ingest data points of metric2 for timestamp := int64(1600000050); timestamp < 1600000100; timestamp++ { err := stg.InsertRows([]Row{ {Name: "metric2", DataPoint: DataPoint{Timestamp: timestamp, Value: 0.2}}, }) if err != nil { panic(err) } } points, err := stg.Select("metric1", nil, 1600000000, 1600000050) if errors.Is(err, ErrNoDataPoints) { return } if err != nil { panic(err) } fmt.Println("Data points of metric1:") for _, p := range points { fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value) } points2, err := stg.Select("metric2", nil, 1600000050, 1600000100) if errors.Is(err, ErrNoDataPoints) { return } if err != nil { panic(err) } fmt.Println("Data points of metric2:") for _, p := range points2 { fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value) }
Output: Data points of metric1: Timestamp: 1600000000, Value: 0.1 Timestamp: 1600000001, Value: 0.1 Timestamp: 1600000002, Value: 0.1 Timestamp: 1600000003, Value: 0.1 Timestamp: 1600000004, Value: 0.1 Timestamp: 1600000005, Value: 0.1 Timestamp: 1600000006, Value: 0.1 Timestamp: 1600000007, Value: 0.1 Timestamp: 1600000008, Value: 0.1 Timestamp: 1600000009, Value: 0.1 Timestamp: 1600000010, Value: 0.1 Timestamp: 1600000011, Value: 0.1 Timestamp: 1600000012, Value: 0.1 Timestamp: 1600000013, Value: 0.1 Timestamp: 1600000014, Value: 0.1 Timestamp: 1600000015, Value: 0.1 Timestamp: 1600000016, Value: 0.1 Timestamp: 1600000017, Value: 0.1 Timestamp: 1600000018, Value: 0.1 Timestamp: 1600000019, Value: 0.1 Timestamp: 1600000020, Value: 0.1 Timestamp: 1600000021, Value: 0.1 Timestamp: 1600000022, Value: 0.1 Timestamp: 1600000023, Value: 0.1 Timestamp: 1600000024, Value: 0.1 Timestamp: 1600000025, Value: 0.1 Timestamp: 1600000026, Value: 0.1 Timestamp: 1600000027, Value: 0.1 Timestamp: 1600000028, Value: 0.1 Timestamp: 1600000029, Value: 0.1 Timestamp: 1600000030, Value: 0.1 Timestamp: 1600000031, Value: 0.1 Timestamp: 1600000032, Value: 0.1 Timestamp: 1600000033, Value: 0.1 Timestamp: 1600000034, Value: 0.1 Timestamp: 1600000035, Value: 0.1 Timestamp: 1600000036, Value: 0.1 Timestamp: 1600000037, Value: 0.1 Timestamp: 1600000038, Value: 0.1 Timestamp: 1600000039, Value: 0.1 Timestamp: 1600000040, Value: 0.1 Timestamp: 1600000041, Value: 0.1 Timestamp: 1600000042, Value: 0.1 Timestamp: 1600000043, Value: 0.1 Timestamp: 1600000044, Value: 0.1 Timestamp: 1600000045, Value: 0.1 Timestamp: 1600000046, Value: 0.1 Timestamp: 1600000047, Value: 0.1 Timestamp: 1600000048, Value: 0.1 Timestamp: 1600000049, Value: 0.1 Data points of metric2: Timestamp: 1600000050, Value: 0.2 Timestamp: 1600000051, Value: 0.2 Timestamp: 1600000052, Value: 0.2 Timestamp: 1600000053, Value: 0.2 Timestamp: 1600000054, Value: 0.2 Timestamp: 1600000055, Value: 0.2 Timestamp: 1600000056, Value: 0.2 Timestamp: 1600000057, Value: 0.2 Timestamp: 1600000058, Value: 0.2 Timestamp: 1600000059, Value: 0.2 Timestamp: 1600000060, Value: 0.2 Timestamp: 1600000061, Value: 0.2 Timestamp: 1600000062, Value: 0.2 Timestamp: 1600000063, Value: 0.2 Timestamp: 1600000064, Value: 0.2 Timestamp: 1600000065, Value: 0.2 Timestamp: 1600000066, Value: 0.2 Timestamp: 1600000067, Value: 0.2 Timestamp: 1600000068, Value: 0.2 Timestamp: 1600000069, Value: 0.2 Timestamp: 1600000070, Value: 0.2 Timestamp: 1600000071, Value: 0.2 Timestamp: 1600000072, Value: 0.2 Timestamp: 1600000073, Value: 0.2 Timestamp: 1600000074, Value: 0.2 Timestamp: 1600000075, Value: 0.2 Timestamp: 1600000076, Value: 0.2 Timestamp: 1600000077, Value: 0.2 Timestamp: 1600000078, Value: 0.2 Timestamp: 1600000079, Value: 0.2 Timestamp: 1600000080, Value: 0.2 Timestamp: 1600000081, Value: 0.2 Timestamp: 1600000082, Value: 0.2 Timestamp: 1600000083, Value: 0.2 Timestamp: 1600000084, Value: 0.2 Timestamp: 1600000085, Value: 0.2 Timestamp: 1600000086, Value: 0.2 Timestamp: 1600000087, Value: 0.2 Timestamp: 1600000088, Value: 0.2 Timestamp: 1600000089, Value: 0.2 Timestamp: 1600000090, Value: 0.2 Timestamp: 1600000091, Value: 0.2 Timestamp: 1600000092, Value: 0.2 Timestamp: 1600000093, Value: 0.2 Timestamp: 1600000094, Value: 0.2 Timestamp: 1600000095, Value: 0.2 Timestamp: 1600000096, Value: 0.2 Timestamp: 1600000097, Value: 0.2 Timestamp: 1600000098, Value: 0.2 Timestamp: 1600000099, Value: 0.2
Example (From_memory_out_of_order) ¶
Out of order data points that are not yet flushed are in the buffer but do not appear in select.
stg, err := NewStorage( WithTimestampPrecision(Seconds), ) if err != nil { panic(err) } defer func() { if err := stg.Close(); err != nil { panic(err) } }() err = stg.InsertRows([]Row{ {Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000000, Value: 0.1}}, {Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000002, Value: 0.1}}, {Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000001, Value: 0.1}}, {Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000003, Value: 0.1}}, }) if err != nil { panic(err) } points, err := stg.Select("metric1", nil, 1600000000, 1600000003) if err != nil { panic(err) } for _, p := range points { fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value) } // Out-of-order data points are ignored because they will get merged when flushing.
Output: Timestamp: 1600000000, Value: 0.1 Timestamp: 1600000002, Value: 0.1 Timestamp: 1600000003, Value: 0.1
type StorageInterface ¶
type StorageInterface interface { Reader // The precision of timestamps is nanoseconds by default. It can be changed using WithTimestampPrecision. InsertRows(rows []Row) error // Close gracefully shutdowns by flushing any unwritten data to the underlying disk partition. Close() error }
func NewStorage ¶
func NewStorage(opts ...Option) (StorageInterface, error)
Example (WithPartitionDuration) ¶
stg, err := NewStorage( WithPartitionDuration(30*time.Minute), WithTimestampPrecision(Seconds), WithWriteTimeout(15*time.Second), WithRetention(1*time.Hour), WithLogger(logging), ) if err != nil { panic(err) } defer stg.Close()
Output:
type TimestampPrecision ¶
type TimestampPrecision int
const ( Nanoseconds TimestampPrecision = iota Microseconds Milliseconds Seconds )
func (TimestampPrecision) String ¶
func (p TimestampPrecision) String() string
Click to show internal directories.
Click to hide internal directories.