storage

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2023 License: Apache-2.0 Imports: 9 Imported by: 3

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrNoDataPoints = errors.New("no data points found") //数据不存在
	ErrNoRowsData   = errors.New("no rows given")        // row empty
	ErrUnknown      = "UNKNOWN"
)

Functions

func NewMemoryPartition

func NewMemoryPartition(partitionDuration time.Duration, precision TimestampPrecision) partition

Types

type DataPoint

type DataPoint struct {
	// The actual value. This field must be set.
	Value float64
	// Unix timestamp.
	Timestamp int64
}

type Label

type Label struct {
	Key   string
	Value string
}

Label is a time-series label.

type Option

type Option func(*Storage)

func WithLogger

func WithLogger(logger *logger.Logger) Option

Defaults to a logger implementation that does nothing.

func WithPartitionDuration

func WithPartitionDuration(duration time.Duration) Option

Defaults to 5min

func WithRetention

func WithRetention(retention time.Duration) Option

Defaults to 1d.

func WithTimestampPrecision

func WithTimestampPrecision(precision TimestampPrecision) Option

Defaults to Nanoseconds

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) Option

Defaults to 15s.

type Reader

type Reader interface {
	Select(name string, labels []Label, start, end int64) (points []*DataPoint, err error)
}

type Row

type Row struct {
	// The unique name of metric.
	Name string
	// An optional key-value properties to further detailed identification.
	Labels []Label
	// This field must be set.
	DataPoint
}

type Storage

type Storage struct {
	// contains filtered or unexported fields
}
	memory data tree. just for data code list
	│                 │

   Read              Write

	│                 │
	│                 V
	│      ┌───────────────────┐ max: 1615010800
	├─────>   Memory Partition
	│      └───────────────────┘ min: 1615007201
	│
	│      ┌───────────────────┐ max: 1615007200
	├─────>   Memory Partition
	│      └───────────────────┘ min: 1615003601
	│
	│      ┌───────────────────┐ max: 1615003600
	└─────>   Memory Partition
	       └───────────────────┘ min: 1615000000

func (*Storage) Close

func (s *Storage) Close() error

func (*Storage) InsertRows

func (s *Storage) InsertRows(rows []Row) error
Example
stg, err := NewStorage(
	WithTimestampPrecision(Seconds),
	WithWriteTimeout(15*time.Second),
	WithRetention(1*time.Hour),
	WithLogger(logging),
)
if err != nil {
	panic(err)
}
defer func() {
	if err := stg.Close(); err != nil {
		panic(err)
	}
}()
err = stg.InsertRows([]Row{
	Row{
		Name:      "metric1",
		DataPoint: DataPoint{Timestamp: 1600000000, Value: 0.1},
	},
})
if err != nil {
	panic(err)
}
points, err := stg.Select("metric1", nil, 1600000000, 1600000001)
if err != nil {
	panic(err)
}
for _, p := range points {
	fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value)
}
Output:

timestamp: 1600000000, value: 0.1
Example (Concurrent)
stg, err := NewStorage(
	WithTimestampPrecision(Seconds),
)
if err != nil {
	panic(err)
}
defer stg.Close()

// First insert in order to ensure min timestamp
if err := stg.InsertRows([]Row{
	{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000000}},
}); err != nil {
	panic(err)
}

var wg sync.WaitGroup
for i := int64(1600000001); i < 1600000100; i++ {
	wg.Add(1)
	go func(timestamp int64) {
		if err := stg.InsertRows([]Row{
			{Name: "metric1", DataPoint: DataPoint{Timestamp: timestamp}},
		}); err != nil {
			panic(err)
		}
		wg.Done()
	}(i)
}
wg.Wait()

points, err := stg.Select("metric1", nil, 1600000000, 1600000100)
if err != nil {
	panic(err)
}
for _, p := range points {
	fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value)
}
Output:

func (*Storage) Select

func (s *Storage) Select(name string, labels []Label, start, end int64) ([]*DataPoint, error)
Example (From_memory)
tmpDir, err := os.MkdirTemp("", "storage-example")
if err != nil {
	panic(err)
}
defer os.RemoveAll(tmpDir)

stg, err := NewStorage(
	WithPartitionDuration(2*time.Hour),
	WithTimestampPrecision(Seconds),
	WithWriteTimeout(15*time.Second),
	WithRetention(1*time.Hour),
	WithLogger(logging),
)
if err != nil {
	panic(err)
}
defer func() {
	if err := stg.Close(); err != nil {
		panic(err)
	}
}()

// Ingest data points of metric1
for timestamp := int64(1600000000); timestamp < 1600000050; timestamp++ {
	err := stg.InsertRows([]Row{
		{Name: "metric1", DataPoint: DataPoint{Timestamp: timestamp, Value: 0.1}},
	})
	if err != nil {
		panic(err)
	}
}
// Ingest data points of metric2
for timestamp := int64(1600000050); timestamp < 1600000100; timestamp++ {
	err := stg.InsertRows([]Row{
		{Name: "metric2", DataPoint: DataPoint{Timestamp: timestamp, Value: 0.2}},
	})
	if err != nil {
		panic(err)
	}
}

points, err := stg.Select("metric1", nil, 1600000000, 1600000050)
if errors.Is(err, ErrNoDataPoints) {
	return
}
if err != nil {
	panic(err)
}
fmt.Println("Data points of metric1:")
for _, p := range points {
	fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value)
}

points2, err := stg.Select("metric2", nil, 1600000050, 1600000100)
if errors.Is(err, ErrNoDataPoints) {
	return
}
if err != nil {
	panic(err)
}
fmt.Println("Data points of metric2:")
for _, p := range points2 {
	fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value)
}
Output:

Data points of metric1:
Timestamp: 1600000000, Value: 0.1
Timestamp: 1600000001, Value: 0.1
Timestamp: 1600000002, Value: 0.1
Timestamp: 1600000003, Value: 0.1
Timestamp: 1600000004, Value: 0.1
Timestamp: 1600000005, Value: 0.1
Timestamp: 1600000006, Value: 0.1
Timestamp: 1600000007, Value: 0.1
Timestamp: 1600000008, Value: 0.1
Timestamp: 1600000009, Value: 0.1
Timestamp: 1600000010, Value: 0.1
Timestamp: 1600000011, Value: 0.1
Timestamp: 1600000012, Value: 0.1
Timestamp: 1600000013, Value: 0.1
Timestamp: 1600000014, Value: 0.1
Timestamp: 1600000015, Value: 0.1
Timestamp: 1600000016, Value: 0.1
Timestamp: 1600000017, Value: 0.1
Timestamp: 1600000018, Value: 0.1
Timestamp: 1600000019, Value: 0.1
Timestamp: 1600000020, Value: 0.1
Timestamp: 1600000021, Value: 0.1
Timestamp: 1600000022, Value: 0.1
Timestamp: 1600000023, Value: 0.1
Timestamp: 1600000024, Value: 0.1
Timestamp: 1600000025, Value: 0.1
Timestamp: 1600000026, Value: 0.1
Timestamp: 1600000027, Value: 0.1
Timestamp: 1600000028, Value: 0.1
Timestamp: 1600000029, Value: 0.1
Timestamp: 1600000030, Value: 0.1
Timestamp: 1600000031, Value: 0.1
Timestamp: 1600000032, Value: 0.1
Timestamp: 1600000033, Value: 0.1
Timestamp: 1600000034, Value: 0.1
Timestamp: 1600000035, Value: 0.1
Timestamp: 1600000036, Value: 0.1
Timestamp: 1600000037, Value: 0.1
Timestamp: 1600000038, Value: 0.1
Timestamp: 1600000039, Value: 0.1
Timestamp: 1600000040, Value: 0.1
Timestamp: 1600000041, Value: 0.1
Timestamp: 1600000042, Value: 0.1
Timestamp: 1600000043, Value: 0.1
Timestamp: 1600000044, Value: 0.1
Timestamp: 1600000045, Value: 0.1
Timestamp: 1600000046, Value: 0.1
Timestamp: 1600000047, Value: 0.1
Timestamp: 1600000048, Value: 0.1
Timestamp: 1600000049, Value: 0.1
Data points of metric2:
Timestamp: 1600000050, Value: 0.2
Timestamp: 1600000051, Value: 0.2
Timestamp: 1600000052, Value: 0.2
Timestamp: 1600000053, Value: 0.2
Timestamp: 1600000054, Value: 0.2
Timestamp: 1600000055, Value: 0.2
Timestamp: 1600000056, Value: 0.2
Timestamp: 1600000057, Value: 0.2
Timestamp: 1600000058, Value: 0.2
Timestamp: 1600000059, Value: 0.2
Timestamp: 1600000060, Value: 0.2
Timestamp: 1600000061, Value: 0.2
Timestamp: 1600000062, Value: 0.2
Timestamp: 1600000063, Value: 0.2
Timestamp: 1600000064, Value: 0.2
Timestamp: 1600000065, Value: 0.2
Timestamp: 1600000066, Value: 0.2
Timestamp: 1600000067, Value: 0.2
Timestamp: 1600000068, Value: 0.2
Timestamp: 1600000069, Value: 0.2
Timestamp: 1600000070, Value: 0.2
Timestamp: 1600000071, Value: 0.2
Timestamp: 1600000072, Value: 0.2
Timestamp: 1600000073, Value: 0.2
Timestamp: 1600000074, Value: 0.2
Timestamp: 1600000075, Value: 0.2
Timestamp: 1600000076, Value: 0.2
Timestamp: 1600000077, Value: 0.2
Timestamp: 1600000078, Value: 0.2
Timestamp: 1600000079, Value: 0.2
Timestamp: 1600000080, Value: 0.2
Timestamp: 1600000081, Value: 0.2
Timestamp: 1600000082, Value: 0.2
Timestamp: 1600000083, Value: 0.2
Timestamp: 1600000084, Value: 0.2
Timestamp: 1600000085, Value: 0.2
Timestamp: 1600000086, Value: 0.2
Timestamp: 1600000087, Value: 0.2
Timestamp: 1600000088, Value: 0.2
Timestamp: 1600000089, Value: 0.2
Timestamp: 1600000090, Value: 0.2
Timestamp: 1600000091, Value: 0.2
Timestamp: 1600000092, Value: 0.2
Timestamp: 1600000093, Value: 0.2
Timestamp: 1600000094, Value: 0.2
Timestamp: 1600000095, Value: 0.2
Timestamp: 1600000096, Value: 0.2
Timestamp: 1600000097, Value: 0.2
Timestamp: 1600000098, Value: 0.2
Timestamp: 1600000099, Value: 0.2
Example (From_memory_out_of_order)

Out of order data points that are not yet flushed are in the buffer but do not appear in select.

stg, err := NewStorage(
	WithTimestampPrecision(Seconds),
)
if err != nil {
	panic(err)
}
defer func() {
	if err := stg.Close(); err != nil {
		panic(err)
	}
}()
err = stg.InsertRows([]Row{
	{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000000, Value: 0.1}},
	{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000002, Value: 0.1}},
	{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000001, Value: 0.1}},
	{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000003, Value: 0.1}},
})
if err != nil {
	panic(err)
}
points, err := stg.Select("metric1", nil, 1600000000, 1600000003)
if err != nil {
	panic(err)
}
for _, p := range points {
	fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value)
}

// Out-of-order data points are ignored because they will get merged when flushing.
Output:

Timestamp: 1600000000, Value: 0.1
Timestamp: 1600000002, Value: 0.1
Timestamp: 1600000003, Value: 0.1

type StorageInterface

type StorageInterface interface {
	Reader
	// The precision of timestamps is nanoseconds by default. It can be changed using WithTimestampPrecision.
	InsertRows(rows []Row) error
	// Close gracefully shutdowns by flushing any unwritten data to the underlying disk partition.
	Close() error
}

func NewStorage

func NewStorage(opts ...Option) (StorageInterface, error)
Example (WithPartitionDuration)
stg, err := NewStorage(
	WithPartitionDuration(30*time.Minute),
	WithTimestampPrecision(Seconds),
	WithWriteTimeout(15*time.Second),
	WithRetention(1*time.Hour),
	WithLogger(logging),
)

if err != nil {
	panic(err)
}
defer stg.Close()
Output:

type TimestampPrecision

type TimestampPrecision int
const (
	Nanoseconds TimestampPrecision = iota
	Microseconds
	Milliseconds
	Seconds
)

func (TimestampPrecision) String

func (p TimestampPrecision) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL