Documentation ¶
Index ¶
- Constants
- Variables
- func MapToRawQuery(scope, agg string, params map[string]string) (string, error)
- func NormalizeColumn(col, typ string) string
- func NormalizeID(fn string, p *Property) string
- func NormalizeKey(keys, typ string) string
- func NormalizeName(key string) string
- func NormalizeRequest(req *Request) error
- func RegisterQueryParser(name string, parser QLParser)
- func RegisterResponseFormater(name string, formater Formater)
- func Unmarshal(input, output interface{}) error
- type Aggregation
- type Column
- type Context
- type Flag
- type Flags
- type Formater
- type Function
- type Group
- type GroupFilter
- type Order
- type Property
- type QLParser
- type Queryer
- type Request
- type Response
- type TimeAlign
- type ValueRange
Constants ¶
View Source
const ( FlagColumnFunc = Flag(0) FlagOrderBy = Flag(1) FlagReduce = Flag(2) )
Flag .
Variables ¶
View Source
var Formats = map[string]Formater{}
Formats .
View Source
var Functions = map[string]func(col *Column) Function{ "max": func(col *Column) Function { return &esValueFunction{ esFunction: esFunction{ Column: col, supportOrderBy: true, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewMaxAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewMaxAggregation().Field(c.Property.Key), nil }, }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) { return aggs.Max(id) }, } }, "min": func(col *Column) Function { return &esValueFunction{ esFunction: esFunction{ Column: col, supportOrderBy: true, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewMinAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewMinAggregation().Field(c.Property.Key), nil }, }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) { return aggs.Min(id) }, } }, "avg": func(col *Column) Function { return &esValueFunction{ esFunction: esFunction{ Column: col, supportOrderBy: true, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewAvgAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewAvgAggregation().Field(c.Property.Key), nil }, }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) { return aggs.Avg(id) }, } }, "sum": func(col *Column) Function { return &esValueFunction{ esFunction: esFunction{ Column: col, supportOrderBy: true, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewSumAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewSumAggregation().Field(c.Property.Key), nil }, }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) { return aggs.Sum(id) }, } }, "count": func(col *Column) Function { return &esValueFunction{ esFunction: esFunction{ Column: col, supportOrderBy: true, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewValueCountAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewValueCountAggregation().Field(c.Property.Key), nil }, }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) { return aggs.ValueCount(id) }, } }, "cardinality": cardinalityFunction, "distinct": cardinalityFunction, "countPercent": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: true, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewValueCountAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewValueCountAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { agg, ok := aggs.ValueCount(id) if !ok { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } if agg.Value == nil { return float64(0), nil } var total int64 if ctx.Resp != nil && ctx.Resp.Hits != nil { total = ctx.Resp.Hits.TotalHits } if total == 0 { return float64(0), nil } return *agg.Value * float64(100) / float64(total), nil }, } }, "sumPercent": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: true, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if !Flags(flags).IsOrderBy() && len(ctx.Req.GroupBy) > 0 { ctx.Source.Aggregation(col.ID, elastic.NewSumAggregation().Field(c.Property.Key)) } if c.Property.IsScript() { return elastic.NewSumAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewSumAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { agg, ok := aggs.ValueCount(id) if !ok { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } if agg.Value == nil { return float64(0), nil } var total float64 if len(ctx.Req.GroupBy) > 0 { totalAgg, ok := ctx.Resp.Aggregations.Sum(id) if !ok { return nil, fmt.Errorf("fail to get global sum aggregation") } if totalAgg.Value != nil { total = *totalAgg.Value } } if total == 0 { return float64(0), nil } return *agg.Value * float64(100) / float64(total), nil }, } }, "range": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { agg := elastic.NewRangeAggregation() if c.Property.IsScript() { agg.Script(elastic.NewScript(c.Property.Script)) } else { agg.Field(c.Property.Key) } for _, item := range c.Params { value, ok := item.(*ValueRange) if !ok { return nil, fmt.Errorf("invalid range params type") } agg.AddRange(value.From, value.To) } return agg, nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { rang, ok := aggs.Range(id) if !ok || rang == nil { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } var list []map[string]interface{} for _, bucket := range rang.Buckets { var percent float64 if ctx.Resp != nil && ctx.Resp.Hits != nil && ctx.Resp.Hits.TotalHits != 0 { percent = float64(bucket.DocCount) * 100 / float64(ctx.Resp.Hits.TotalHits) } list = append(list, map[string]interface{}{ "min": bucket.From, "max": bucket.To, "percent": percent, "count": bucket.DocCount, }) } return list, nil }, } }, "last": lastFunction, "value": lastFunction, "values": valuesFunction, "cols": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return nil, fmt.Errorf("function 'cols' not support script") } return elastic.NewTopHitsAggregation().Size(1).Sort(ctx.Req.TimeKey, false). FetchSourceContext(elastic.NewFetchSourceContext(true).Include(strings.Split(c.Property.Key, ",")...)), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { hits, ok := aggs.TopHits(id) if !ok { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } if hits == nil || hits.Hits == nil || len(hits.Hits.Hits) <= 0 || hits.Hits.Hits[0].Source == nil { return nil, nil } var out map[string]interface{} err := json.Unmarshal([]byte(*hits.Hits.Hits[0].Source), &out) if err != nil { return nil, fmt.Errorf("tail to Unmarshal TopHits source: %s", err) } var cols []interface{} for _, key := range strings.Split(col.Property.Key, ",") { cols = append(cols, getMapValue(key, out)) } return cols, nil }, } }, "cpm": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: true, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewValueCountAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewValueCountAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { count, _ := aggs.ValueCount(id) return countPerInterval(col, ctx, count, 60000000000) }, } }, "sumCpm": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: true, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewSumAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewSumAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { sum, _ := aggs.Sum(id) return countPerInterval(col, ctx, sum, 60000000000) }, } }, "cps": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: true, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewValueCountAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewValueCountAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { count, _ := aggs.ValueCount(id) return countPerInterval(col, ctx, count, 1000000000) }, } }, "sumCps": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: true, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewSumAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewSumAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { sum, _ := aggs.Sum(id) return countPerInterval(col, ctx, sum, 1000000000) }, } }, "diff": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewMinAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewMinAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { if next, ok := ctx.Attributes["next"]; ok { min, _ := aggs.Min(id) if min == nil { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } if min.Value == nil { return 0, nil } if next, ok := next.(elastic.Aggregations); ok { if next, ok := next.Min(id); ok && next != nil && next.Value != nil { return *next.Value - *min.Value, nil } } } return 0, nil }, } }, "diffps": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewMinAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewMinAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { if next, ok := ctx.Attributes["next"]; ok { min, _ := aggs.Min(id) if min == nil { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } if min.Value == nil { return 0, nil } if next, ok := next.(elastic.Aggregations); ok { if next, ok := next.Min(id); ok && next != nil && next.Value != nil { return (*next.Value - *min.Value) / (ctx.Req.Interval / 1000000000), nil } } } return 0, nil }, } }, "uprate": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return elastic.NewMinAggregation().Script(elastic.NewScript(c.Property.Script)), nil } return elastic.NewMinAggregation().Field(c.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { if next, ok := ctx.Attributes["next"]; ok { min, _ := aggs.Min(id) if min == nil { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } if min.Value == nil { return 0, nil } if next, ok := next.(elastic.Aggregations); ok { if next, ok := next.Min(id); ok && next != nil && next.Value != nil { if *next.Value < *min.Value { return *next.Value / (ctx.Req.Interval / 1000000000), nil } return (*next.Value - *min.Value) / (ctx.Req.Interval / 1000000000), nil } } } return 0, nil }, } }, "apdex": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: false, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { agg := elastic.NewRangeAggregation() if c.Property.IsScript() { agg.Script(elastic.NewScript(c.Property.Script)) } else { agg.Field(c.Property.Key) } if len(c.Params) != 3 { return nil, fmt.Errorf("invalid apdex params type") } for _, item := range c.Params { value, ok := item.(*ValueRange) if !ok { return nil, fmt.Errorf("invalid apdex params type") } agg.AddRange(value.From, value.To) } return agg, nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { rang, ok := aggs.Range(id) if !ok || rang == nil { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } var total, satisfied, tolerating int64 for i, bucket := range rang.Buckets { total += bucket.DocCount if i == 0 { satisfied += bucket.DocCount } else if i == 1 { tolerating += bucket.DocCount } } return (float64(satisfied) + float64(tolerating)/2) / float64(total), nil }, } }, "maxFieldTimestamp": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if c.Property.IsScript() { return nil, fmt.Errorf("function '%s' not support script", c.Function) } return elastic.NewTopHitsAggregation().Size(1).Sort(col.Property.Key, false). FetchSourceContext(elastic.NewFetchSourceContext(true).Include(ctx.Req.TimeKey)), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { hits, ok := aggs.TopHits(id) if !ok { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } if hits == nil || hits.Hits == nil || len(hits.Hits.Hits) <= 0 || hits.Hits.Hits[0].Source == nil { return nil, nil } var out map[string]interface{} err := json.Unmarshal([]byte(*hits.Hits.Hits[0].Source), &out) if err != nil { return nil, fmt.Errorf("tail to Unmarshal TopHits source: %s", err) } val, ok := utils.ConvertInt64(getMapValue(ctx.Req.TimeKey, out)) if ok { return val / 1000000, nil } return val, nil }, } }, "latestTimestamp": func(col *Column) Function { return &esValueFunction{ esFunction: esFunction{ Column: col, supportOrderBy: true, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { return elastic.NewMaxAggregation().Field(ctx.Req.TimeKey), nil }, }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) { return aggs.Max(id) }, } }, "oldestTimestamp": func(col *Column) Function { return &esValueFunction{ esFunction: esFunction{ Column: col, supportOrderBy: true, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { return elastic.NewMinAggregation().Field(ctx.Req.TimeKey), nil }, }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) { return aggs.Min(id) }, } }, "pencentiles": func(col *Column) Function { return &esFunction{ Column: col, supportOrderBy: false, reduceSupport: true, AggGatter: func(c *Column, ctx *Context, flags ...Flag) (elastic.Aggregation, error) { if len(c.Params) <= 0 { return nil, fmt.Errorf("pencentiles params must not empty") } value, ok := utils.ConvertFloat64(c.Params[0]) if !ok || value > float64(100) { return nil, fmt.Errorf("pencentiles params must not empty") } return elastic.NewPercentilesAggregation().Percentiles(value).Field(col.Property.Key), nil }, ValueGetter: func(ctx *Context, id string, aggs elastic.Aggregations) (interface{}, error) { percents, ok := aggs.Percentiles(id) if !ok || percents == nil { return nil, fmt.Errorf("invalid %s aggregation type", col.FuncName) } for _, v := range percents.Values { return v, nil } return nil, nil }, } }, "p99": pencentilesFunction(99), "p95": pencentilesFunction(95), "p90": pencentilesFunction(90), "p85": pencentilesFunction(85), "p80": pencentilesFunction(80), "p75": pencentilesFunction(75), "p70": pencentilesFunction(70), "p65": pencentilesFunction(65), "p60": pencentilesFunction(60), "p55": pencentilesFunction(55), "p50": pencentilesFunction(50), "p45": pencentilesFunction(45), "p40": pencentilesFunction(40), "p35": pencentilesFunction(35), "p30": pencentilesFunction(30), "p25": pencentilesFunction(25), "p20": pencentilesFunction(20), "p15": pencentilesFunction(15), "p10": pencentilesFunction(10), "p5": pencentilesFunction(5), "group_reduce": func(col *Column) Function { return &groupReduceFunction{ Column: col, } }, }
Functions .
View Source
var Parsers = map[string]QLParser{}
Parsers .
Functions ¶
func MapToRawQuery ¶
MapToRawQuery .
func RegisterQueryParser ¶
RegisterQueryParser .
func RegisterResponseFormater ¶
RegisterResponseFormater .
Types ¶
type Aggregation ¶
type Aggregation struct { ID string Aggregation elastic.Aggregation }
Aggregation .
type Column ¶
type Column struct { ID string Property Property FuncName string // Params []interface{} // Function Function }
Column .
type Context ¶
type Context struct { Req *Request Source *elastic.SearchSource Resp *elastic.SearchResult Attributes map[string]interface{} ChartMeta *chartmeta.ChartMeta T i18n.Translator Lang i18n.LanguageCodes }
Context .
type Function ¶
type Function interface { Aggregations(ctx *Context, flags ...Flag) ([]*Aggregation, error) Handle(ctx *Context, aggs elastic.Aggregations) (interface{}, error) SupportOrderBy() bool SupportReduce() bool }
Function .
type Group ¶
type Group struct { ID string Property Property // Limit int Sort *Order Filters []*GroupFilter ColumnAggs map[string]bool }
Group .
type GroupFilter ¶
GroupFilter .
type Order ¶
type Order struct { ID string Property Property // FuncName string // Params []interface{} // Sort string // }
Order .
type Queryer ¶
type Queryer interface {
QueryWithFormatV1(qlang, statement, format string, langCodes i18n.LanguageCodes) (*Response, error)
}
Queryer .
func New ¶
func New(index indexloader.Interface, charts *chartmeta.Manager, meta *metricmeta.Manager, t i18n.Translator) Queryer
New .
type Request ¶
type Request struct { Name string Metrics []string Start, End int64 // ms TimeAlign TimeAlign Select []*Column Where []*query.Filter GroupBy []*Group OrderBy []*Order Limit []int Debug bool Aggregate *Column ExistKeys map[string]struct{} Columns map[string]*Column TimeKey string // Specify the time field. OriginalTimeUnit tsql.TimeUnit // The unit of the time field. EndOffset int64 Interval float64 Points float64 AlignEnd bool ClusterNames []string LegendMap map[string]*chartmeta.DataMeta // Legend name -> Legend display name ChartType string Trans bool TransGroup bool DefaultNullValue interface{} }
Request .
func (*Request) InitTimestamp ¶
InitTimestamp .
type Response ¶
type Response struct { Total int64 `json:"total"` Metrics []string `json:"metrics"` Elapsed struct { Search time.Duration `json:"search"` } `json:"elapsed"` Data interface{} `json:"data"` Interval float64 `json:"interval"` // contains filtered or unexported fields }
Response .
Click to show internal directories.
Click to hide internal directories.