influxdb2_helper

package module
v1.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: MIT Imports: 8 Imported by: 0

README

influxdb2-helper

Designed to simplify operations of InfluxDB v2

Usage

Import Module
package main

import idbHelper "github.com/zhangsq-ax/influxdb2-helper"

...
Create Helper Instance
...
helper := idbHelper.NewInfluxdbHelper(&idbHelper.InfluxdbHelperOptions{
    ServerUrl: os.Getenv("SERVER_URL"),
    Token: os.Getenv("TOKEN"),
    OrgName: os.Getenv("ORG_NAME"),
    BucketName: os.Getenv("BUCKET_NAME")
})
...
Query
Build Query Options
...
queryOpts := &idbHelper.QueryOptions{
    TimeRange: &[2]int64{1721059200000, 1721106016000},
    BucketName: "default",
    Measurement: "iot_state",
    Where: map[string]string{
        "deviceId": "71922044000721a",
    },
    Columns: []string{"_time", "deviceId", "..."},
    Limit: 100,
    Offset: 0,
    DescSort: true,
}
...

or

...
queryOpts := helper.NewQueryOptions("iot_state", map[string]string{}, []string{"_time", "deviceId", "..."}, 1721059200000, 1721106016000, 100, 0)
...

TimeRange - Required. The time range for querying data, UTC timestamp in milliseconds

BucketName - Required. The name of bucket to query

Measurement - Required. The name of measurement to query

Where - Optional. Tag-based query and filter conditions. Currently only the "and" relationship is supported between multiple conditions. By Default, no conditional filtering is performed.

Columns - Optional. Columns returned by the query result. By default, all columns are returned.

Limit - Optional. Return the limit of query result records. By default there is no limit on the number of records returned, but be aware that this may have performance issues.

Offset - Optinal. The number of records to skip when returning query results. Used together with the Limit parameter to implement query result paging.

DescSort - Optional. Whether to sort the query results in reverse order based on time.

Query Data
...
result, err := helper.Query(context.Background(), queryOpts.String())
if err != nil {
  panic(err)
}
for result.Next() {
  fmt.Println(result.Record().Values())
}
...

or

...
result, err := helper.QueryByOptions(context.Background(), queryOpts)
if err != nil {
  panic(err)
}
for result.Next() {
  fmt.Println(result.Record().Values())
}
...
Write
Create InfluxDB Write Point
...
import influxdb2 "github.com/influxdata/influxdb-client-go/v2"
...
writePoint, err := influxdb2.NewPoint("iot_state", map[string]string{
  "deviceId": "xxxxxxxxxxxx",
}, map[string]any{
  "x": 0,
  "y": 0,
  "yaw": 0,
}, time.Now())
...

Or use the method provided by the helper

...
type Location struct {
  X float64 `json:"x" writePoint:"x,field"`
  Y float64 `json:"y" writePoint:"y,field"`
  Yaw float64 `json:"yaw" writePoint:"yaw,field"`
}
type State struct {
  DeviceId string `json:"deviceId" writePoint:"deviceId,tag"`
  Location *Location `json:"location"`
  Timestamp int64 `json:"timestamp" writePoint:",time"`
}

data := `{"deviceId":"xxxxxxxx", "location":{"x": 0, "y": 0, "yaw": 0}, "timestamp": 1721117718000}`

state := &State{}
err := json.Unmarshal([]byte(data), state)
if err != nil {
  panic(err)
}

writePoint, err := idbHelper.ParseStructToWritePoint("iot_state", state)
if err != nil {
  panic(err)
}
Write To InfluxDB
...
err := helper.Write(context.Background(), writePoint)
if err != nil {
  panic(err)
}
...

Or use a custom method to write directly

type Location struct {
  X float64 `json:"x"`
  Y float64 `json:"y"`
  Yaw float64 `json:"yaw"`
}
type State struct {
  DeviceId string `json:"deviceId"`
  Location *Location `json:"location"`
  Timestamp int64 `json:"timestamp"`
}

func writePointGenerator(data any, measurement string) (*write.Point, error) {
  state, ok := data.(*State)
  if !ok {
    return nil, fmt.Errorf("invalid data")
  }
  ts := time.UnixMilli(state.Timestamp)
  tags := map[string]string{
    "deviceId": state.DeviceId,
  }
  fields := map[string]any{
    "x": state.Location.X,
    "y": state.Location.Y,
    "yaw": state.Location.Yaw,
  }
  
  return influxdb2.NewPoint(measurement, tags, fields, ts), nil
}

data := &State{
  DeviceId: "xxxxxxxx",
  Location: &Location{
    X: 0,
    Y: 0,
    Yaw: 0,
  },
  Timestamp: time.Now().UnixMilli(),
}

_, err := helper.WriteByGenerator(context.Background(), "iot_state", data, writePointGenerator)
if err != nil {
  panic(err)
}
Complete Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
	"github.com/influxdata/influxdb-client-go/v2/api/write"
	idbHelper "github.com/zhangsq-ax/influxdb2-helper"
	"os"
	"time"
)

type Location struct {
	X   float64 `json:"x" writePoint:"x,field"`
	Y   float64 `json:"y" writePoint:"y,field"`
	Yaw float64 `json:"yaw" writePoint:"yaw,field"`
}
type State struct {
	DeviceId  string    `json:"deviceId" writePoint:"deviceId,tag"`
	Location  *Location `json:"location"`
	Timestamp int64     `json:"timestamp" writePoint:",time"`
}

var (
	stateData = `{"deviceId": "xxxxxxxx", "location": {"x": 0, "y": 0, "yaw": 0}, "timestamp": 0}`
)

func main() {
	opts := &idbHelper.InfluxdbHelperOptions{
		ServerUrl:  os.Getenv("SERVER_URL"),
		Token:      os.Getenv("TOKEN"),
		OrgName:    os.Getenv("ORG_NAME"),
		BucketName: os.Getenv("BUCKET_NAME"),
	}
	helper := idbHelper.NewInfluxdbHelper(opts)

	// write point1 by struct
	point, err := idbHelper.ParseStructToWritePoint("iot_state", newState(""))
	if err != nil {
		panic(err)
	}
	err = helper.Write(context.Background(), point)
	if err != nil {
		panic(err)
	}

	// write point2 by generator
	state := newState("yyyyyyyy")
	_, err = helper.WriteByGenerator(context.Background(), "iot_state", state, func(data any, measurement string) (*write.Point, error) {
		state, ok := data.(*State)
		if !ok {
			return nil, fmt.Errorf("data type error")
		}
		ts := state.Timestamp
		fields := map[string]any{
			"x":   state.Location.X,
			"y":   state.Location.Y,
			"yaw": state.Location.Yaw,
		}
		tags := map[string]string{
			"deviceId": state.DeviceId,
		}

		return influxdb2.NewPoint(measurement, tags, fields, time.UnixMilli(ts)), nil
	})

	time.Sleep(5 * time.Second)

	// query
	queryOpts := &idbHelper.QueryOptions{
		TimeRange:   &[2]int64{time.Now().Add(-1 * time.Hour).UnixMilli(), time.Now().UnixMilli()},
		BucketName:  os.Getenv("BUCKET_NAME"),
		Measurement: "iot_state",
		Columns: []string{
			"_time",
			"deviceId",
			"x",
			"y",
			"yaw",
		},
		Limit:    100,
		DescSort: true,
	}
	result, err := helper.Query(context.Background(), queryOpts.String())
	if err != nil {
		panic(err)
	}
	for result.Next() {
		fmt.Println(result.Record().Values())
	}
}

func newState(deviceId string) *State {
	state := &State{}
	_ = json.Unmarshal([]byte(stateData), state)
	state.Timestamp = time.Now().UnixMilli()
	if deviceId != "" {
		state.DeviceId = deviceId
	}

	return state
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseStructToWritePoint added in v1.1.0

func ParseStructToWritePoint(measurement string, s interface{}) (*write.Point, error)

ParseStructToWritePoint 将 Struct 解析为 InfluxDB 的 write.Point

Types

type InfluxdbHelper

type InfluxdbHelper struct {
	// contains filtered or unexported fields
}

func NewInfluxdbHelper

func NewInfluxdbHelper(opts *InfluxdbHelperOptions, debug ...bool) *InfluxdbHelper

func (*InfluxdbHelper) BucketName added in v1.4.1

func (ih *InfluxdbHelper) BucketName() string

func (*InfluxdbHelper) Count added in v1.4.2

func (ih *InfluxdbHelper) Count(ctx context.Context, opts *QueryOptions, field string) (int64, error)

Count returns the count of the column in the query column must be a field in the measurement, not a tag

func (*InfluxdbHelper) NewQueryOptions added in v1.3.0

func (ih *InfluxdbHelper) NewQueryOptions(measurement string, where map[string]string, columns []string, startTime int64, endTime int64, limit int64, offset int64) *QueryOptions

func (*InfluxdbHelper) QueryByOptions added in v1.2.0

func (ih *InfluxdbHelper) QueryByOptions(ctx context.Context, opts *QueryOptions) (*api.QueryTableResult, error)

func (*InfluxdbHelper) Write

func (ih *InfluxdbHelper) Write(ctx context.Context, point ...*write.Point) error

func (*InfluxdbHelper) WriteByGenerator added in v1.2.0

func (ih *InfluxdbHelper) WriteByGenerator(ctx context.Context, measurement string, data any, generator func(data any, measurement string) (*write.Point, error)) (*write.Point, error)

type InfluxdbHelperOptions

type InfluxdbHelperOptions struct {
	ServerUrl  string
	Token      string
	OrgName    string
	BucketName string
}

type QueryOptions added in v1.1.0

type QueryOptions struct {
	TimeRange   *[2]int64 // 时间范围,[开始时间,结束时间],单位为毫秒
	BucketName  string
	Measurement string
	Where       map[string]string // 过滤条件
	Fields      []string          // 指定要获取的 Field 字段,必须指定,否则有性能问题
	Columns     []string          // 查询结果要返回的字段列表
	Limit       int64             // 限制返回的记录数
	Offset      int64             // 查询结果偏移
	DescSort    bool              // 是否按时间倒序
}

func (*QueryOptions) CountString added in v1.4.2

func (qo *QueryOptions) CountString(field string) string

func (*QueryOptions) String added in v1.1.0

func (qo *QueryOptions) String() string

func (*QueryOptions) Validate added in v1.5.0

func (qo *QueryOptions) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL