define

package
v0.0.0-...-0fdf8e3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// CycleModeTypeEnd 1 周期结束后执行
	CycleModeTypeEnd CycleModeType = iota + 1
	// CycleModeTypeInnerDay 2周期中每天计算一次
	CycleModeTypeInnerDay = 2
	CycleModeTypeAlways   = 10
)
View Source
const (
	TaskCycleTypeYear = iota + 1
	TaskCycleTypeQuarter
	TaskCycleTypeMonthly
	TaskCycleTypeWeekly
	TaskCycleTypeDay
	TaskCycleTypeHour
)
View Source
const (
	LockKeyPrefix = "metric:task:lock:"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator interface {
	// Name 插件的名字,必须全局唯一
	Name() string
	// Run 执行
	Run(ctx context.Context, key string, data Record) error
	// SetConfig 初始化的时候,用来放入配置
	SetConfig(ctx context.Context, config []byte) error
	// Description 用来描述配置
	Description() string
	// Metric 存储的时候,获取计算的值
	Metric(ctx context.Context) (string, float64)
	MetricExtra(ctx context.Context) (string, interface{}, bool)
	SetMetricMetadata(ctx context.Context, data MetricMetadata) error
}

Aggregator calculate metric

type AggregatorInput

type AggregatorInput struct {
	Key string

	Input             chan Record
	Output            chan MetricData
	Plugins           []Aggregator
	CancelKeyWorkerFn context.CancelFunc
}

AggregatorInput Plan: 如何支持多级, 上一级Aggregator带入到next aggregator 中

type AggregatorPlugins

type AggregatorPlugins struct {
	Input  chan Record
	Output OutputData
	Config []byte `json:"config"`
}

type Collect

type Collect interface {
	// Name 插件的名字,必须全局唯一
	Name() string
	// Keys 用做计算的维度
	Keys(ctx context.Context) ([]string, error)
	// Run 执行
	Run(ctx context.Context, key string, start, end uint64, input chan Record) error

	// SetConfig 修改配置
	SetConfig(ctx context.Context, config []byte) error
	// Description 用来描述配置
	Description() string
}

Collect 收集数据插件

type CollectInput

type CollectInput struct {
	Plugin          Collect
	Fields          []string
	Labels          []string
	OutputPluginChn chan MetricData
	MetricMetadata  MetricMetadata
}

type CycleModeType

type CycleModeType int8

CycleModeType 周期执行方式,1 周期结束后执行,2周期中每天计算一次

type Filter

type Filter interface {
	// Name 插件的名字,必须全局唯一
	Name() string
	// Run 执行
	Run(ctx context.Context, key string, data Record) error
	// SetConfig 初始化的时候,用来放入配置
	SetConfig(ctx context.Context, config []byte) error
	// Description 用来描述配置
	Description() string
}

Filter data transform

type FilterInput

type FilterInput struct {
	Input             chan Record
	Output            chan Record
	Key               string
	Plugins           []Filter
	CancelKeyWorkerFn context.CancelFunc
}

type FilterPlugins

type FilterPlugins struct {
	Name   string `json:"name"`
	Config []byte `json:"config"`
}

type InputParams

type InputParams struct {
	Method string   `json:"method"`
	Fields []string `json:"fields"`
	Start  int32    `json:"start"`
	End    int32    `json:"end"`
}

type Lock

type Lock interface {
	Lock(ctx context.Context, key string, lockedExpireMinute time.Duration) (bool, error)
	Unlock(ctx context.Context, key string) error
}

type MetricData

type MetricData struct {
	MetricKey string
	Value     map[string]float64     `json:"value"`
	Extra     map[string]interface{} `json:"extra"`
}

MetricData 计算产生的数据

type MetricMetadata

type MetricMetadata struct {
	// 指标名字
	MetricName string        `json:"name"  `
	Start      uint64        `json:"start"`
	End        uint64        `json:"end"`
	Cycle      TaskCycleType `json:"cycle"`
	CycleMode  CycleModeType `json:"interval_mode"`
	// output 插件需要根据这个值,来确定数据是否需要更新
	LastFinishTime uint64 `json:"last_finish_time"`
}

type MetricPower

type MetricPower map[string]int

func (*MetricPower) Scan

func (m *MetricPower) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (MetricPower) Value

func (m MetricPower) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type MetricTask

type MetricTask struct {
	// 任务的名字,同时也是指标名字
	TaskName string `json:"task_name"  gorm:"column:task_name"`
	// 指标周期(1年,2季,3月,4周,5日,6时)(接口必须)
	TaskCycle TaskCycleType `json:"task_cycle" gorm:"column:task_cycle"`
	// 周期执行方式,1 周期结束后执行,2周期中每天计算一次
	CycleMode CycleModeType `json:"cycle_mode" gorm:"column:cycle_mode"`
	// 向前计算的周期数
	CalculateCycle uint8 `json:"calculate_cycle" gorm:"column:calculate_cycle"`
	// 任务状态, 1 正常,可以允许, 2. 暂停,不被执行 3. 待删除
	TaskStatus StatusEnumType `json:"task_status" gorm:"column:task_status"`

	// 如何处理start 开始处理任务的时间, 有start+cycle 可以选出结束时间
	TaskStart   uint64                              `json:"task_start" gorm:"column:task_start"`
	Collect     MetricTaskPluginCollectConfig       `json:"collect" gorm:"column:collect"`
	Filters     MetricTaskPluginConfigArr           `json:"filters" gorm:"column:filters"`
	Aggregators MetricTaskPluginAggregatorConfigArr `json:"aggregators" gorm:"column:aggregators"`
	Output      MetricTaskPluginOutputConfig        `json:"output" gorm:"column:output"`
	//Power          MetricPower                         `json:"power" gorm:"column:power"`
	LastFinishTime uint64 `json:"last_finish_time" gorm:"column:last_finish_time"`

	OutputIndexName string `json:"output_index_name" gorm:"column:output_index_name"`
}

func (MetricTask) Map

func (m MetricTask) Map() map[string]interface{}

type MetricTaskImpl

type MetricTaskImpl interface {
	Get(ctx context.Context) ([]MetricTask, error)
	TaskDone(ctx context.Context, name string, nextCycleTime, lastFinishTime uint64) error
	Add(ctx context.Context, info MetricTask, extra map[string]interface{}) error
	ModifyOutputIndexName(ctx context.Context, taskName, indexName string) error
}

type MetricTaskPluginAggregatorConfig

type MetricTaskPluginAggregatorConfig struct {
	Name   string    `json:"name" gorm:"column:name"`
	Config RAWConfig `json:"config" gorm:"column:config"`
	// 暂未启用
	Next *MetricTaskPluginConfig `json:"next" gorm:"column:next"`
}

type MetricTaskPluginAggregatorConfigArr

type MetricTaskPluginAggregatorConfigArr []MetricTaskPluginAggregatorConfig

MetricTaskPluginAggregatorConfigArr gorm 对json类型反序列有问题, 需要在字段的维度实现Scan 和 Value

func (*MetricTaskPluginAggregatorConfigArr) Scan

func (m *MetricTaskPluginAggregatorConfigArr) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (MetricTaskPluginAggregatorConfigArr) Value

Value return json value, implement driver.Valuer interface

type MetricTaskPluginCollectConfig

type MetricTaskPluginCollectConfig struct {
	Name   string    `json:"name" gorm:"column:name"`
	Config RAWConfig `json:"config" gorm:"column:config"`
}

func (*MetricTaskPluginCollectConfig) Scan

func (c *MetricTaskPluginCollectConfig) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (MetricTaskPluginCollectConfig) Value

Value return json value, implement driver.Valuer interface

type MetricTaskPluginConfig

type MetricTaskPluginConfig struct {
	Name   string    `json:"name" gorm:"column:name"`
	Config RAWConfig `json:"config" gorm:"column:config"`
}

type MetricTaskPluginConfigArr

type MetricTaskPluginConfigArr []MetricTaskPluginConfig

MetricTaskPluginConfigArr gorm 对json类型反序列有问题, 需要在字段的维度实现Scan 和 Value

func (*MetricTaskPluginConfigArr) Scan

func (m *MetricTaskPluginConfigArr) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (MetricTaskPluginConfigArr) Value

Value return json value, implement driver.Valuer interface

type MetricTaskPluginOutputConfig

type MetricTaskPluginOutputConfig struct {
	Name string `json:"name" gorm:"column:name"`
}

func (*MetricTaskPluginOutputConfig) Scan

func (m *MetricTaskPluginOutputConfig) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (MetricTaskPluginOutputConfig) Value

Value return json value, implement driver.Valuer interface

type Output

type Output interface {
	// Name 插件的名字,必须全局唯一
	Name() string
	// Write 输出数据到目标
	Write(ctx context.Context, data OutputData) error
	// Exists 插件是否有相同的问题件, 这个时刻,数据还没有计算
	Exists(ctx context.Context, metricKey string) (bool, error)
	// Description 用来描述配置
	Description() string
	// IndexName string
	IndexName(ctx context.Context) (string, error)
	SetMetricMetadata(ctx context.Context, data MetricMetadata) error
}

Output record to storage

type OutputData

type OutputData struct {
	MetricData
}

type OutputInput

type OutputInput struct {
	Plugin         Output
	Input          chan MetricData
	MetricDataDesc MetricMetadata
}

type RAWConfig

type RAWConfig []byte

func (RAWConfig) MarshalJSON

func (d RAWConfig) MarshalJSON() ([]byte, error)

func (*RAWConfig) UnmarshalJSON

func (d *RAWConfig) UnmarshalJSON(data []byte) error

type Record

type Record interface {
	Data() map[string]string
	Update(key, value string) error
	Field() map[string]float64
	UUID() string
}

func NewRecord

func NewRecord(uuid string, data map[string]string, field map[string]float64) Record

NewRecord

 Params:
	uuid: 数据去重使用, 可能是多个字段聚合。重复的数据回被过滤

type StatusEnumType

type StatusEnumType int8
const (
	// StatusEnumTypeNormal 正在执行的任务
	StatusEnumTypeNormal StatusEnumType = 1
	// StatusEnumTypePaused 已经暂停的任务
	StatusEnumTypePaused StatusEnumType = 2
	// StatusEnumTypeDelete 需要清理的任务
	StatusEnumTypeDelete StatusEnumType = 3
)

type TaskCycleType

type TaskCycleType uint8

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL