Documentation ¶
Index ¶
- Constants
- func GetRetentionPolicies(httpClient client.Client, db string) ([]string, error)
- func MarshalTimestampTo(ts uint32, buffer []byte) int
- func UpdateCQs(httpAddr, user, password string) error
- type ActionEnum
- type AggrEnum
- type CLIHandler
- type CQHandler
- type CQInfo
- type Confidence
- type Counter
- type CounterRepair
- type DBCreateCtl
- type InfluxdbItem
- type InfluxdbPoint
- type InfluxdbWriter
- func (w *InfluxdbWriter) Close()
- func (w *InfluxdbWriter) Put(queueID int, item ...interface{})
- func (w *InfluxdbWriter) PutPoint(queueID int, db, measurement string, tag map[string]string, ...)
- func (w *InfluxdbWriter) Run()
- func (w *InfluxdbWriter) SetBatchSize(size int)
- func (w *InfluxdbWriter) SetBatchTimeout(timeout int64)
- func (w *InfluxdbWriter) SetRetentionPolicy(rp, duration, shardDuration string, defaultFlag bool)
- func (w *InfluxdbWriter) SetWriteDirect(enabled bool)
- type PointCache
- type RPHandler
- type Repair
- type RepairStatus
- type RetentionPolicy
- type WriterInfo
Constants ¶
View Source
const ( SUCCESS = "success" FAILED = "failed" )
View Source
const ( CONFIDENCE_DB = "_tsdb_meta" CONFIDENCE_MEASUREMENT = "confidence" CONFIDENCE_MEASUREMENT_SYNCED = "confidence_synced" TAG_DB = "db" TAG_ID = "_id" TAG_MEASUREMENT = "table" FIELD_STATUS = "status" SYNC_COUNT_ONCE = 200 // 每次获取多少条记录进行同步 SYNC_INTERVAL = 60 // 两次同步间隔时间 单位:秒 SYNC_START_DELAY = 300 // 同步HB的开始时间相对当前时间的时延 SYNC_FAILED_RETRY_TIMES = 3 // 同步失败,重试次数, MAX_BATCH_WRITE_POINTS = 1024 // 批量写influxdb的数量 RP_1S = "rp_1s" RP_1M = "rp_1m" DURATION_1S = "1d" // 自动同步创建的db的RP默认值 SHARD_DURATION_1S = "2h" DURATION_1M = "10d" // 自动同步创建的db的RP默认值 SHARD_DURATION_1M = "1d" DURATION_10M = "100d" SHARD_DURATION_10M = "10d" )
View Source
const ( QUEUE_FETCH_MAX_SIZE = 1024 DEFAULT_BATCH_SIZE = 512 * 1024 DEFAULT_FLUSH_TIMEOUT = 5 // 单位 秒 DEFAULT_QUEUE_SIZE = 256 * 1024 INFLUXDB_PRECISION_S = "s" UNIX_TIMESTAMP_TO_TIME = (1969*365 + 1969/4 - 1969/100 + 1969/400) * 24 * 60 * 60 TIME_BINARY_LEN = 15 MAX_ERR_MSG_LEN = 512 )
Variables ¶
This section is empty.
Functions ¶
func GetRetentionPolicies ¶
func MarshalTimestampTo ¶
Types ¶
type ActionEnum ¶
type ActionEnum uint8
const ( ADD ActionEnum = iota DEL MOD SHOW UPDATE // 若版本升级,新增field,支持更新CQ )
type CLIHandler ¶
type CLIHandler struct {
// contains filtered or unexported fields
}
func NewCLIHandler ¶
func NewCLIHandler(httpAddr, user, password, dbGroup, action, baseRP, newRP, aggrSummable, aggrUnsummable, RPPrefix string, CQInterval, RPDuration int) (*CLIHandler, error)
func (*CLIHandler) Run ¶
func (c *CLIHandler) Run() (string, error)
type CQHandler ¶
type CQHandler struct {
// contains filtered or unexported fields
}
func GetCQParams ¶
type Confidence ¶
type Confidence struct {
// contains filtered or unexported fields
}
type CounterRepair ¶
type CounterRepair struct {
SyncCount int64 `statsd:"sync-count"`
}
type DBCreateCtl ¶
type InfluxdbItem ¶
type InfluxdbPoint ¶
type InfluxdbPoint struct {
// contains filtered or unexported fields
}
func (*InfluxdbPoint) GetDBName ¶
func (p *InfluxdbPoint) GetDBName() string
func (*InfluxdbPoint) GetMeasurement ¶
func (p *InfluxdbPoint) GetMeasurement() string
func (*InfluxdbPoint) GetTimestamp ¶
func (p *InfluxdbPoint) GetTimestamp() uint32
func (*InfluxdbPoint) MarshalToBytes ¶
func (p *InfluxdbPoint) MarshalToBytes(buffer []byte) int
func (*InfluxdbPoint) Release ¶
func (p *InfluxdbPoint) Release()
type InfluxdbWriter ¶
type InfluxdbWriter struct { ReplicaEnabled bool DataQueues queue.FixedMultiQueue ReplicaQueues queue.FixedMultiQueue WriteDirect bool Name string ShardID string QueueCount int QueueWriterInfosPrimary []*WriterInfo QueueWriterInfosReplica []*WriterInfo DBCreateCtlPrimary DBCreateCtl DBCreateCtlReplica DBCreateCtl PrimaryClient client.Client BatchSize int FlushTimeout int64 RP RetentionPolicy // contains filtered or unexported fields }
func NewInfluxdbWriter ¶
func NewInfluxdbWriter(addrPrimary, addrReplica, httpUsername, httpPassword, name, shardID string, queueCount, queueSize int) (*InfluxdbWriter, error)
func (*InfluxdbWriter) Close ¶
func (w *InfluxdbWriter) Close()
func (*InfluxdbWriter) Put ¶
func (w *InfluxdbWriter) Put(queueID int, item ...interface{})
高性能接口,需要用户实现InfluxdbItem接口
func (*InfluxdbWriter) PutPoint ¶
func (w *InfluxdbWriter) PutPoint(queueID int, db, measurement string, tag map[string]string, field map[string]int64, timestamp uint32)
普通接口,性能较差,易用
func (*InfluxdbWriter) Run ¶
func (w *InfluxdbWriter) Run()
func (*InfluxdbWriter) SetBatchSize ¶
func (w *InfluxdbWriter) SetBatchSize(size int)
func (*InfluxdbWriter) SetBatchTimeout ¶
func (w *InfluxdbWriter) SetBatchTimeout(timeout int64)
func (*InfluxdbWriter) SetRetentionPolicy ¶
func (w *InfluxdbWriter) SetRetentionPolicy(rp, duration, shardDuration string, defaultFlag bool)
func (*InfluxdbWriter) SetWriteDirect ¶
func (w *InfluxdbWriter) SetWriteDirect(enabled bool)
type PointCache ¶
type PointCache struct {
// contains filtered or unexported fields
}
type Repair ¶
func (*Repair) GetCounter ¶
func (r *Repair) GetCounter() interface{}
type RepairStatus ¶
type RepairStatus uint8
const ( PRIMARY_FAILED RepairStatus = iota // 主写失败,不同步 SYNC_SUCCESS // 同步成功 REPLICA_DISCONNECT // 备influxdb无法连接,等待连接成功后, 再尝试同步 SYNC_FAILED_1 // 备同步失败1次 SYNC_FAILED_2 // 备同步失败2次 SYNC_FAILED_3 // 备同步3次失败,不再同步 SYNC_FAILED_SERIES_EXCEED // 备series数量超过限制, 同步失败, 不再同步 STATUS_INVALID )
type RetentionPolicy ¶
type RetentionPolicy struct {
// contains filtered or unexported fields
}
type WriterInfo ¶
func (*WriterInfo) Close ¶
func (i *WriterInfo) Close()
func (*WriterInfo) GetCounter ¶
func (i *WriterInfo) GetCounter() interface{}
Click to show internal directories.
Click to hide internal directories.