store

package
v0.0.0-...-9634a1a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

README

influxdb_writer功能实现说明:

  1. 写主成功,同步备
  2. 写主成功, 同步写入备失败(或备无法连接),记录confidence
  3. 写主失败,不同步备

confidence写入说明:

  • 写入位置: _tsdb_meta-> confidence
  • 写入内容:
    • _id: 写入shard-id
    • db: db名字
    • table: measurement名字
    • time: 分钟或10分钟级别时间
    • status:取值如下
      • PRIMARY_FAILED // 主写失败,但一般写confidence也会失败
      • SYNC_SUCCESS // 同步成功
      • REPLICA_DISCONNECT // 备influxdb无法连接,需尝试同步
      • SYNC_FAILED_1 // 备同步失败1次
      • SYNC_FAILED_2 // 备同步失败2次
      • SYNC_FAILED_3 // 同步3次失败,不同步

influxdb_repair功能实现说明:

  • 定期(默认1分钟)扫描confidence记录,进行主备同步,若备influxdb无法连接,则无须同步。
  1. 读取_tsdb_meta-> confidence 下最近的200条confidence记录
  2. 读取confidence记录对应的influxdb数据
  3. 将读取的数据,写入备influxdb中。
    • 若写成功,
      • 删除confidence表相应的记录,并记录到 confidence_synced表中,并修改status 为 SYNC_SUCCESS
    • 若写失败
      • 若当前confidence的status 为 REPLICA_DISCONNECT ,修改status为 SYNC_FAILED_1
      • 若当前confidence的status 为 SYNC_FAILED_1 ,修改status为 SYNC_FAILED_2
      • 若当前confidence的status 为 SYNC_FAILED_2 ,则删除confidence表相应的记录,并记录到 confidence_synced表中,并修改status 为 SYNC_FAILED_3

Documentation

Index

Constants

View Source
const (
	SUCCESS = "success"
	FAILED  = "failed"
)
View Source
const (
	CONFIDENCE_DB                 = "_tsdb_meta"
	CONFIDENCE_MEASUREMENT        = "confidence"
	CONFIDENCE_MEASUREMENT_SYNCED = "confidence_synced"
	TAG_DB                        = "db"
	TAG_ID                        = "_id"
	TAG_MEASUREMENT               = "table"
	FIELD_STATUS                  = "status"

	SYNC_COUNT_ONCE         = 200  // 每次获取多少条记录进行同步
	SYNC_INTERVAL           = 60   // 两次同步间隔时间 单位:秒
	SYNC_START_DELAY        = 300  // 同步HB的开始时间相对当前时间的时延
	SYNC_FAILED_RETRY_TIMES = 3    // 同步失败,重试次数,
	MAX_BATCH_WRITE_POINTS  = 1024 // 批量写influxdb的数量
	RP_1S                   = "rp_1s"
	RP_1M                   = "rp_1m"
	DURATION_1S             = "1d" // 自动同步创建的db的RP默认值
	SHARD_DURATION_1S       = "2h"
	DURATION_1M             = "10d" // 自动同步创建的db的RP默认值
	SHARD_DURATION_1M       = "1d"
	DURATION_10M            = "100d"
	SHARD_DURATION_10M      = "10d"
)
View Source
const (
	QUEUE_FETCH_MAX_SIZE   = 1024
	DEFAULT_BATCH_SIZE     = 512 * 1024
	DEFAULT_FLUSH_TIMEOUT  = 5 // 单位 秒
	DEFAULT_QUEUE_SIZE     = 256 * 1024
	INFLUXDB_PRECISION_S   = "s"
	UNIX_TIMESTAMP_TO_TIME = (1969*365 + 1969/4 - 1969/100 + 1969/400) * 24 * 60 * 60
	TIME_BINARY_LEN        = 15
	MAX_ERR_MSG_LEN        = 512
)

Variables

This section is empty.

Functions

func GetRetentionPolicies

func GetRetentionPolicies(httpClient client.Client, db string) ([]string, error)

func MarshalTimestampTo

func MarshalTimestampTo(ts uint32, buffer []byte) int

func UpdateCQs

func UpdateCQs(httpAddr, user, password string) error

提供给droplet-ctl调用, 实现更新CQ:

1, 查询vtap_flow, vtap_packet, vtap_wan数据库的RP和CQ信息,提取CQ的参数
  - 包括原RP,目的RP,聚合时长,可累加聚合信息(以packet_rx为准),不可累加聚合(以rtt的处理为准)
2,对所有的数据库根据获取的CQ参数和当前的field信息再次下发CQ,
  - 若有新增field,则会对新增的field增加CQ命令
  - 若无新增field,则对已有的CQ重复下发,若和原来的CQ参数不一致,则报错。

Types

type ActionEnum

type ActionEnum uint8
const (
	ADD ActionEnum = iota
	DEL
	MOD
	SHOW
	UPDATE // 若版本升级,新增field,支持更新CQ
)

type AggrEnum

type AggrEnum uint8
const (
	SUM AggrEnum = iota
	MAX
	MIN
	AVG
)

func (AggrEnum) String

func (a AggrEnum) String() string

type CLIHandler

type CLIHandler struct {
	// contains filtered or unexported fields
}

func NewCLIHandler

func NewCLIHandler(httpAddr, user, password, dbGroup, action, baseRP, newRP, aggrSummable, aggrUnsummable, RPPrefix string, CQInterval, RPDuration int) (*CLIHandler, error)

func (*CLIHandler) Run

func (c *CLIHandler) Run() (string, error)

type CQHandler

type CQHandler struct {
	// contains filtered or unexported fields
}

func GetCQParams

func GetCQParams(cqInfos []CQInfo, db, rp string) *CQHandler

func (*CQHandler) Add

func (c *CQHandler) Add() error

func (*CQHandler) CQName

func (c *CQHandler) CQName(field string) string

func (*CQHandler) Del

func (c *CQHandler) Del() error

func (*CQHandler) Show

func (c *CQHandler) Show() string

func (*CQHandler) Update

func (c *CQHandler) Update() error

type CQInfo

type CQInfo struct {
	// contains filtered or unexported fields
}

func GetCQInfos

func GetCQInfos(httpClient client.Client) ([]CQInfo, error)

type Confidence

type Confidence struct {
	// contains filtered or unexported fields
}

type Counter

type Counter struct {
	WriteSuccessCount int64 `statsd:"write-success-count"`
	WriteFailedCount  int64 `statsd:"write-failed-count"`
}

type CounterRepair

type CounterRepair struct {
	SyncCount int64 `statsd:"sync-count"`
}

type DBCreateCtl

type DBCreateCtl struct {
	HttpClient client.Client
	ExistDBs   map[string]bool
	sync.RWMutex
}

type InfluxdbItem

type InfluxdbItem interface {
	MarshalToBytes([]byte) int
	GetDBName() string
	GetMeasurement() string
	GetTimestamp() uint32
	Release()
}

type InfluxdbPoint

type InfluxdbPoint struct {
	// contains filtered or unexported fields
}

func (*InfluxdbPoint) GetDBName

func (p *InfluxdbPoint) GetDBName() string

func (*InfluxdbPoint) GetMeasurement

func (p *InfluxdbPoint) GetMeasurement() string

func (*InfluxdbPoint) GetTimestamp

func (p *InfluxdbPoint) GetTimestamp() uint32

func (*InfluxdbPoint) MarshalToBytes

func (p *InfluxdbPoint) MarshalToBytes(buffer []byte) int

func (*InfluxdbPoint) Release

func (p *InfluxdbPoint) Release()

type InfluxdbWriter

type InfluxdbWriter struct {
	ReplicaEnabled bool
	DataQueues     queue.FixedMultiQueue
	ReplicaQueues  queue.FixedMultiQueue
	WriteDirect    bool

	Name                    string
	ShardID                 string
	QueueCount              int
	QueueWriterInfosPrimary []*WriterInfo
	QueueWriterInfosReplica []*WriterInfo

	DBCreateCtlPrimary DBCreateCtl
	DBCreateCtlReplica DBCreateCtl

	PrimaryClient client.Client
	BatchSize     int
	FlushTimeout  int64
	RP            RetentionPolicy
	// contains filtered or unexported fields
}

func NewInfluxdbWriter

func NewInfluxdbWriter(addrPrimary, addrReplica, httpUsername, httpPassword, name, shardID string, queueCount, queueSize int) (*InfluxdbWriter, error)

func (*InfluxdbWriter) Close

func (w *InfluxdbWriter) Close()

func (*InfluxdbWriter) Put

func (w *InfluxdbWriter) Put(queueID int, item ...interface{})

高性能接口,需要用户实现InfluxdbItem接口

func (*InfluxdbWriter) PutPoint

func (w *InfluxdbWriter) PutPoint(queueID int, db, measurement string, tag map[string]string, field map[string]int64, timestamp uint32)

普通接口,性能较差,易用

func (*InfluxdbWriter) Run

func (w *InfluxdbWriter) Run()

func (*InfluxdbWriter) SetBatchSize

func (w *InfluxdbWriter) SetBatchSize(size int)

func (*InfluxdbWriter) SetBatchTimeout

func (w *InfluxdbWriter) SetBatchTimeout(timeout int64)

func (*InfluxdbWriter) SetRetentionPolicy

func (w *InfluxdbWriter) SetRetentionPolicy(rp, duration, shardDuration string, defaultFlag bool)

func (*InfluxdbWriter) SetWriteDirect

func (w *InfluxdbWriter) SetWriteDirect(enabled bool)

type PointCache

type PointCache struct {
	// contains filtered or unexported fields
}

type RPHandler

type RPHandler struct {
	// contains filtered or unexported fields
}

func (*RPHandler) Add

func (r *RPHandler) Add() error

func (*RPHandler) Del

func (r *RPHandler) Del() error

func (*RPHandler) Mod

func (r *RPHandler) Mod() error

func (*RPHandler) Show

func (r *RPHandler) Show() string

type Repair

type Repair struct {
	utils.Closable
	// contains filtered or unexported fields
}

func NewRepair

func NewRepair(addrPrimary, addrReplica, httpUsername, httpPassword, rp, shardID, dbRegex string, start bool, syncStartDelay, syncInterval, syncCountOnce int) (*Repair, error)

func (*Repair) Close

func (r *Repair) Close()

func (*Repair) GetCounter

func (r *Repair) GetCounter() interface{}

func (*Repair) Run

func (r *Repair) Run()

type RepairStatus

type RepairStatus uint8
const (
	PRIMARY_FAILED            RepairStatus = iota // 主写失败,不同步
	SYNC_SUCCESS                                  // 同步成功
	REPLICA_DISCONNECT                            // 备influxdb无法连接,等待连接成功后, 再尝试同步
	SYNC_FAILED_1                                 // 备同步失败1次
	SYNC_FAILED_2                                 // 备同步失败2次
	SYNC_FAILED_3                                 // 备同步3次失败,不再同步
	SYNC_FAILED_SERIES_EXCEED                     // 备series数量超过限制, 同步失败, 不再同步

	STATUS_INVALID
)

type RetentionPolicy

type RetentionPolicy struct {
	// contains filtered or unexported fields
}

type WriterInfo

type WriterInfo struct {
	utils.Closable
	// contains filtered or unexported fields
}

func (*WriterInfo) Close

func (i *WriterInfo) Close()

func (*WriterInfo) GetCounter

func (i *WriterInfo) GetCounter() interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL