esinfluxql

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FuncFlagNone   = FuncFlag(0)
	FuncFlagSelect = FuncFlag(1 << (iota - 1))
	FuncFlagWhere
	FuncFlagOrderBy
)

FuncFlag .

Variables

View Source
var AggFunctions = map[string]*AggFuncDefine{
	"max": {
		Flag: FuncFlagSelect | FuncFlagOrderBy,
		New: newUnaryValueAggFunction(
			"max",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewMaxAggregation().Script(script), nil
				}
				return elastic.NewMaxAggregation().Field(field), nil
			},
			func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) {
				return aggs.Max(id)
			},
		),
	},
	"min": {
		Flag: FuncFlagSelect | FuncFlagOrderBy,
		New: newUnaryValueAggFunction(
			"min",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewMinAggregation().Script(script), nil
				}
				return elastic.NewMinAggregation().Field(field), nil
			},
			func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) {
				return aggs.Min(id)
			},
		),
	},
	"avg": {
		Flag: FuncFlagSelect | FuncFlagOrderBy,
		New: newUnaryValueAggFunction(
			"avg",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewAvgAggregation().Script(script), nil
				}
				return elastic.NewAvgAggregation().Field(field), nil
			},
			func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) {
				return aggs.Avg(id)
			},
		),
	},
	"mean": {
		Flag: FuncFlagSelect | FuncFlagOrderBy,
		New: newUnaryValueAggFunction(
			"mean",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewAvgAggregation().Script(script), nil
				}
				return elastic.NewAvgAggregation().Field(field), nil
			},
			func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) {
				return aggs.Avg(id)
			},
		),
	},
	"sum": {
		Flag: FuncFlagSelect | FuncFlagOrderBy,
		New: newUnaryValueAggFunction(
			"sum",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewSumAggregation().Script(script), nil
				}
				return elastic.NewSumAggregation().Field(field), nil
			},
			func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) {
				return aggs.Sum(id)
			},
		),
	},
	"count": {
		Flag: FuncFlagSelect | FuncFlagOrderBy,
		New: newUnaryValueAggFunction(
			"count",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewValueCountAggregation().Script(script), nil
				}
				return elastic.NewValueCountAggregation().Field(field), nil
			},
			func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) {
				return aggs.ValueCount(id)
			},
		),
	},
	"distinct": {
		Flag: FuncFlagSelect | FuncFlagOrderBy,
		New: newUnaryValueAggFunction(
			"distinct",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewCardinalityAggregation().Script(script), nil
				}
				return elastic.NewCardinalityAggregation().Field(field), nil
			},
			func(ctx *Context, id string, aggs elastic.Aggregations) (*elastic.AggregationValueMetric, bool) {
				return aggs.Cardinality(id)
			},
		),
	},
	"median": {
		Flag: FuncFlagSelect,
		New: newUnaryAggFunction(
			"median",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewPercentilesAggregation().Percentiles(50).Script(script), nil
				}
				return elastic.NewPercentilesAggregation().Percentiles(50).Field(field), nil
			},
			func(ctx *Context, id, field string, call *influxql.Call, aggs elastic.Aggregations) (interface{}, bool) {
				percents, ok := aggs.Percentiles(id)
				if !ok || percents == nil {
					return nil, false
				}
				for _, v := range percents.Values {
					return v, true
				}
				return nil, true
			},
		),
	},
	"percentiles": {
		Flag: FuncFlagSelect,
		New: newMultivariateAggFunction(
			"percentiles",
			func(ctx *Context, id, field string, params []influxql.Expr, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if len(params) == 0 {
					return nil, fmt.Errorf("not percent data")
				}
				ref, ok, err := getLiteralValue(ctx, params[0])
				if !ok || err != nil {
					return nil, fmt.Errorf("invalid percent type error")
				}
				floatPercent, ok := ref.(float64)
				if !ok {
					return nil, fmt.Errorf("invalid percent type error")
				}
				if floatPercent < 0 || floatPercent > 100 {
					return nil, errors.New("percent was out of range")
				}
				if script != nil {
					return elastic.NewPercentilesAggregation().Percentiles(floatPercent).Script(script), nil
				}
				return elastic.NewPercentilesAggregation().Percentiles(floatPercent).Field(field), nil
			},
			func(ctx *Context, id, field string, params []influxql.Expr, call *influxql.Call, aggs elastic.Aggregations) (interface{}, bool) {
				percents, ok := aggs.Percentiles(id)
				if !ok || percents == nil {
					return nil, false
				}
				for _, v := range percents.Values {
					return v, true
				}
				return nil, true
			},
		),
	},

	"diff": {
		Flag: FuncFlagSelect,
		New: newUnaryAggFunction(
			"diff",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewMinAggregation().Script(script), nil
				}
				return elastic.NewMinAggregation().Field(field), nil
			},
			func(ctx *Context, id, field string, call *influxql.Call, aggs elastic.Aggregations) (interface{}, bool) {
				if next, ok := ctx.attributesCache["next"]; ok {
					min, _ := aggs.Min(id)
					if min == nil {
						return nil, false
					}
					if min.Value == nil {
						return 0, true
					}
					if next, ok := next.(elastic.Aggregations); ok {
						if next, ok := next.Min(id); ok && next != nil && next.Value != nil {
							return *next.Value - *min.Value, true
						}
					}
				}

				return 0, true
			},
		),
	},

	"diffps": {
		Flag: FuncFlagSelect,
		New: newUnaryAggFunction(
			"diffps",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewMinAggregation().Script(script), nil
				}
				return elastic.NewMinAggregation().Field(field), nil
			},
			func(ctx *Context, id, field string, call *influxql.Call, aggs elastic.Aggregations) (interface{}, bool) {
				if next, ok := ctx.attributesCache["next"]; ok {
					min, _ := aggs.Min(id)
					if min == nil {
						return nil, false
					}
					if min.Value == nil {
						return 0, true
					}
					if next, ok := next.(elastic.Aggregations); ok {
						if next, ok := next.Min(id); ok && next != nil && next.Value != nil {
							if ctx.targetTimeUnit == tsql.UnsetTimeUnit {
								ctx.targetTimeUnit = tsql.Nanosecond
							}
							seconds := float64(ctx.interval*int64(ctx.targetTimeUnit)) / float64(tsql.Second)
							return (*next.Value - *min.Value) / seconds, true
						}
					}
				}

				return 0, true
			},
		),
	},
	"rateps": {
		Flag: FuncFlagSelect,
		New: newUnaryAggFunction(
			"rateps",
			func(ctx *Context, id, field string, script *elastic.Script, flags ...FuncFlag) (elastic.Aggregation, error) {
				if script != nil {
					return elastic.NewSumAggregation().Script(script), nil
				}
				return elastic.NewSumAggregation().Field(field), nil
			},
			func(ctx *Context, id, field string, call *influxql.Call, aggs elastic.Aggregations) (interface{}, bool) {
				sum, _ := aggs.Sum(id)
				if sum == nil {
					return nil, false
				}
				if sum.Value == nil {
					return 0, true
				}
				if ctx.targetTimeUnit == tsql.UnsetTimeUnit {
					ctx.targetTimeUnit = tsql.Nanosecond
				}
				seconds := float64(ctx.interval*int64(ctx.targetTimeUnit)) / float64(tsql.Second)
				return *sum.Value / seconds, true
			},
		),
	},
	"first": newSourceFieldAggFunction("first", tsql.TimestampKey, true),
	"last":  newSourceFieldAggFunction("last", tsql.TimestampKey, false),
	"value": newSourceFieldAggFunction("value", tsql.TimestampKey, false),
}

AggFunctions .

View Source
var PainlessFunctions map[string]*PainlessFunction

PainlessFunctions .

Functions

func IsAggFunction

func IsAggFunction(name string) bool

IsAggFunction .

func IsFunction

func IsFunction(name string) bool

IsFunction .

func New

func New(start, end int64, stmt string) tsql.Parser

New start and end always nanosecond

func SortResultSet

func SortResultSet(rs *tsql.ResultSet, sorts influxql.SortFields)

SortResultSet .

Types

type AggFuncDefine

type AggFuncDefine struct {
	Flag FuncFlag
	New  func(ctx *Context, id string, call *influxql.Call) (AggHandler, error)
}

AggFuncDefine .

type AggHandler

type AggHandler interface {
	Aggregations(aggs map[string]elastic.Aggregation, flags ...FuncFlag) error
	Handle(aggs elastic.Aggregations) (interface{}, error)
}

AggHandler .

type Columns

type Columns []*tsql.Column

Columns .

func (Columns) Len

func (cs Columns) Len() int

func (Columns) Less

func (cs Columns) Less(i, j int) bool

func (Columns) Swap

func (cs Columns) Swap(i, j int)

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context .

func (*Context) Aggregations

func (c *Context) Aggregations() elastic.Aggregations

Aggregations .

func (*Context) AttributesCache

func (c *Context) AttributesCache() map[string]interface{}

AttributeCache .

func (*Context) GetFuncID

func (c *Context) GetFuncID(call *influxql.Call, deftyp influxql.DataType) string

GetFuncID .

func (*Context) HandleScopeAgg

func (c *Context) HandleScopeAgg(scope string, aggs elastic.Aggregations, expr influxql.Expr) (interface{}, error)

HandleScopeAgg .

func (*Context) Interval

func (c *Context) Interval() int64

Interval .

func (*Context) Now

func (c *Context) Now() time.Time

Now .

func (*Context) OriginalTimeUnit

func (c *Context) OriginalTimeUnit() tsql.TimeUnit

OriginalTimeUnit .

func (*Context) Range

func (c *Context) Range(conv bool) (int64, int64)

Range .

func (*Context) RowNum

func (c *Context) RowNum() int64

RowNum .

func (*Context) TargetTimeUnit

func (c *Context) TargetTimeUnit() tsql.TimeUnit

TargetTimeUnit .

func (*Context) TimeKey

func (c *Context) TimeKey() string

TimeKey .

type FuncFlag

type FuncFlag int32

FuncFlag .

type PainlessFunction added in v1.4.0

type PainlessFunction struct {
	Name         string
	Objective    bool
	ObjectType   string
	DefaultValue string
	Convert      func(ctx *Context, call *influxql.Call, deftyp influxql.DataType, fields map[string]bool) (string, error)
}

PainlessFunction todo .

type Parser

type Parser struct {
	// contains filtered or unexported fields
}

Parser .

func (*Parser) ParseQuery

func (p *Parser) ParseQuery() ([]tsql.Query, error)

ParseQuery .

func (*Parser) ParseRawQuery

func (p *Parser) ParseRawQuery() ([]*tsql.Source, *elastic.BoolQuery, *elastic.SearchSource, error)

ParseRawQuery .

func (*Parser) SetFilter

func (p *Parser) SetFilter(filter *elastic.BoolQuery) tsql.Parser

SetFilter .

func (*Parser) SetMaxTimePoints

func (p *Parser) SetMaxTimePoints(points int64) tsql.Parser

SetMaxTimePoints .

func (*Parser) SetOriginalTimeUnit

func (p *Parser) SetOriginalTimeUnit(unit tsql.TimeUnit) tsql.Parser

SetOriginalTimeUnit .

func (*Parser) SetParams

func (p *Parser) SetParams(params map[string]interface{}) tsql.Parser

SetParams .

func (*Parser) SetTargetTimeUnit

func (p *Parser) SetTargetTimeUnit(unit tsql.TimeUnit) tsql.Parser

SetTargetTimeUnit .

func (*Parser) SetTimeKey

func (p *Parser) SetTimeKey(key string) tsql.Parser

SetTimeKey .

type Query

type Query struct {
	// contains filtered or unexported fields
}

Query .

func (*Query) BoolQuery

func (q *Query) BoolQuery() *elastic.BoolQuery

BoolQuery .

func (*Query) Context

func (q *Query) Context() tsql.Context

Context .

func (*Query) ParseResult

func (q *Query) ParseResult(resp *elastic.SearchResult) (*tsql.ResultSet, error)

ParseResult .

func (*Query) SearchSource

func (q *Query) SearchSource() *elastic.SearchSource

SearchSource .

func (*Query) SetAllColumnsCallback

func (q *Query) SetAllColumnsCallback(fn func(start, end int64, sources []*tsql.Source) ([]*tsql.Column, error))

SetAllColumnsCallback .

func (*Query) Sources

func (q *Query) Sources() []*tsql.Source

Sources .

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL