clickhouse

package
v0.0.0-...-acdae1e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SQL_GET_SAMPLE_ALERT_EVENT 按alarm_event的name分组,每组取发生事件最晚的记录,并在返回结果中记录同name的告警次数数量
	SQL_GET_SAMPLE_ALERT_EVENT = `` /* 473-byte string literal not displayed */

	SQL_GET_GROUP_COUNTS_ALERT_EVENT = `` /* 216-byte string literal not displayed */

	// SQL_GET_PAGED_ALERT_EVENT 分页取出所有满足条件的告警事件
	SQL_GET_PAGED_ALERT_EVENT = `` /* 275-byte string literal not displayed */

)
View Source
const (
	SQL_GET_APP_LOG_SOURCE = `SELECT LogAttributes['_source_'] as LogSource
		FROM ilogtail_logs
		%s %s`
	SQL_GET_APP_LOG = `SELECT toUnixTimestamp64Micro(Timestamp) as ts,Body
		FROM ilogtail_logs
		%s %s`
)
View Source
const (
	SQL_GET_PARENT_NODES = `` /* 409-byte string literal not displayed */

	SQL_GET_CHILD_NODES = `` /* 343-byte string literal not displayed */

	SQL_GET_DESCENDANT_TOPOLOGY = `` /* 869-byte string literal not displayed */

	SQL_GET_ENTRY_NODES = `
		SELECT entry_service, entry_url
		FROM service_relationship
		%s
		GROUP BY entry_service, entry_url
	`
)
View Source
const (
	TEMPLATE_COUNT_SPAN_TRACE = "SELECT count(1) as total FROM span_trace %s"
	TEMPLATE_QUERY_SPAN_TRACE = "SELECT %s FROM span_trace %s %s"

	SQL_GET_LABEL_FILTER_KEYS = `` /* 135-byte string literal not displayed */

	SQL_GET_FLAGS_FILTER_KEYS = `` /* 131-byte string literal not displayed */

	SQL_GET_FILTER_VALUES = `SELECT DISTINCT
	%s as label_value
	FROM span_trace st
	%s %s`
)
View Source
const (
	SQL_GET_INSTANCE_ERROR_PROPAGATION = `` /* 2325-byte string literal not displayed */

)
View Source
const (

	// SQL_GET_K8S_EVENTS 获取K8s事件告警
	SQL_GET_K8S_EVENTS = `` /* 339-byte string literal not displayed */

)

Variables

View Source
var (
	ALWAYS_TRUE = &whereSQL{
		Wheres: "TRUE",
	}

	ALWAYS_FALSE = &whereSQL{
		Wheres: "FALSE",
	}
)

Functions

func AppendToBuilder

func AppendToBuilder(builder *QueryBuilder, f *request.SpanTraceFilter) error

func Equals

func Equals(key string, value string) *whereSQL

func EqualsIfNotEmpty

func EqualsIfNotEmpty(key string, value string) *whereSQL

EqualsIfNotEmpty value长度为0时,返回always true

func GetAlertType

func GetAlertType(g string) string

func In

func In(key string, values clickhouse.ArraySet) *whereSQL

func InGroup

func InGroup(vgs ValueInGroups) *whereSQL

func MergeWheres

func MergeWheres(sep MergeSep, whereSQLs ...*whereSQL) *whereSQL

MergeWheres 合并多个条件

func NewQueryCondition

func NewQueryCondition(st, et int64, timeField, query string) string

func RnLimit

func RnLimit(p *request.PageParam) string

func ValidCheckAndAdjust

func ValidCheckAndAdjust(f *request.SpanTraceFilter) bool

Types

type AlertEventSample

type AlertEventSample struct {
	model.AlertEvent

	// 记录行号
	Rn         uint64 `ch:"rn" json:"-"`
	AlarmCount uint64 `ch:"alarm_count" json:"alarmCount"`

	AlertKey string `ch:"alert_key" json:"alertKey"`
}

type AlertGroup

type AlertGroup string
const (
	APP_GROUP       AlertGroup = "app"
	NETWORK_GROUP   AlertGroup = "network"
	CONTAINER_GROUP AlertGroup = "container"
	INFRA_GROUP     AlertGroup = "infra"
)

func (AlertGroup) GetAlertType

func (g AlertGroup) GetAlertType() string

type AvailableFilters

type AvailableFilters struct {
	Filters          []request.SpanTraceFilter
	FilterUpdateTime time.Time
}

type ByLimitBuilder

type ByLimitBuilder struct {
	// contains filtered or unexported fields
}

func NewByLimitBuilder

func NewByLimitBuilder() *ByLimitBuilder

func (*ByLimitBuilder) GroupBy

func (builder *ByLimitBuilder) GroupBy(keys ...string) *ByLimitBuilder

func (*ByLimitBuilder) Limit

func (builder *ByLimitBuilder) Limit(limit int) *ByLimitBuilder

func (*ByLimitBuilder) Offset

func (builder *ByLimitBuilder) Offset(offset int) *ByLimitBuilder

func (*ByLimitBuilder) OrderBy

func (builder *ByLimitBuilder) OrderBy(key string, asc bool) *ByLimitBuilder

func (*ByLimitBuilder) String

func (builder *ByLimitBuilder) String() string

返回GroupBy、OrderBy和Limit

type ChildNode

type ChildNode struct {
	Service     string `ch:"service"`
	Url         string `ch:"url"`
	IsTraced    bool   `ch:"traced"`
	ClientGroup string `ch:"clientGroup"`
	ClientType  string `ch:"clientType"`
	ClientPeer  string `ch:"clientPeer"`
	ClientKey   string `ch:"clientKey"`
}

type ChildRelation

type ChildRelation struct {
	ParentService string `ch:"parentService"`
	ParentUrl     string `ch:"parentUrl"`
	Service       string `ch:"service"`
	Url           string `ch:"url"`
	IsTraced      bool   `ch:"traced"`
	ClientGroup   string `ch:"clientGroup"`
	ClientType    string `ch:"clientType"`
	ClientPeer    string `ch:"clientPeer"`
	ClientKey     string `ch:"clientKey"`
}

type EntryNode

type EntryNode struct {
	Service  string `ch:"entry_service" json:"service"`
	Endpoint string `ch:"entry_url" json:"endpoint"`
}

type ErrorInstancePropagation

type ErrorInstancePropagation struct {
	Timestamp       time.Time `ch:"timestamp"`
	Service         string    `ch:"service"`
	InstanceId      string    `ch:"instance_id"`
	TraceId         string    `ch:"trace_id"`
	ErrorTypes      []string  `ch:"error_types"`
	ErrorMsgs       []string  `ch:"error_msgs"`
	ParentServices  []string  `ch:"parent_services"`
	ParentInstances []string  `ch:"parent_instances"`
	ParentTraced    []bool    `ch:"parent_traced"`
	ChildServices   []string  `ch:"child_services"`
	ChildInstances  []string  `ch:"child_instances"`
	ChildTraced     []bool    `ch:"child_traced"`
}

type FaultLogQuery

type FaultLogQuery struct {
	StartTime      int64
	EndTime        int64
	Service        string
	Instance       string
	NodeName       string
	ContainerId    string
	Pid            uint32
	EndPoint       string
	TraceId        string
	PageNum        int
	PageSize       int
	Type           int      // 0 - slow & error, 1 - error
	MultiServices  []string // 匹配多个service
	MultiNamespace []string // 匹配多个namespace
}

type FaultLogResult

type FaultLogResult struct {
	ServiceName string `ch:"service_name" json:"serviceName"`
	InstanceId  string `ch:"instance_id" json:"instanceId"`
	TraceId     string `ch:"trace_id" json:"traceId"`
	StartTime   uint64 `ch:"start_time_us" json:"startTime"`
	EndTime     uint64 `ch:"end_time_us" json:"endTime"`
	EndPoint    string `ch:"endpoint" json:"endpoint"`
	PodName     string `ch:"pod_name" json:"podName"`
	ContainerId string `ch:"container_id" json:"containerId"`
	NodeName    string `ch:"node_name" json:"nodeName"`
	Pid         uint32 `ch:"pid" json:"pid"`
}

type FieldBuilder

type FieldBuilder struct {
	// contains filtered or unexported fields
}

func NewFieldBuilder

func NewFieldBuilder() *FieldBuilder

func (*FieldBuilder) Alias

func (builder *FieldBuilder) Alias(key string, alias string) *FieldBuilder

func (*FieldBuilder) Fields

func (builder *FieldBuilder) Fields(keys ...string) *FieldBuilder

func (*FieldBuilder) String

func (builder *FieldBuilder) String() string

返回检索字段

type FlameGraphData

type FlameGraphData struct {
	StartTime   int64             `ch:"start_time" json:"startTime"`
	EndTime     int64             `ch:"end_time" json:"endTime"`
	PID         uint32            `ch:"pid" json:"pid"`
	TID         uint32            `ch:"tid" json:"tid"`
	SampleType  string            `ch:"sample_type" json:"sampleType"`
	SampleRate  uint32            `ch:"sample_rate" json:"sampleRate"`
	Labels      map[string]string `ch:"labels" json:"labels"`
	FlameBearer string            `ch:"flamebearer" json:"flameBearer"`
}

type K8sEvents

type K8sEvents struct {
	Timestamp          time.Time         `ch:"Timestamp" json:"timestamp"`
	SeverityText       string            `ch:"SeverityText" json:"SeverityText"`
	Body               string            `ch:"Body" json:"body"`
	ResourceAttributes map[string]string `ch:"ResourceAttributes" json:"resourceAttributes"`
	LogAttributes      map[string]string `ch:"LogAttributes" json:"logAttributes"`
}

func (*K8sEvents) GetObjKind

func (e *K8sEvents) GetObjKind() string

func (*K8sEvents) GetObjName

func (e *K8sEvents) GetObjName() string

func (*K8sEvents) GetReason

func (e *K8sEvents) GetReason() string

type K8sEventsCount

type K8sEventsCount struct {
	TimeRange string
	Reason    string
	Severity  string
	Count     uint64
}

type LogContent

type LogContent struct {
	Timestamp int64  `ch:"ts" json:"timestamp"`
	Body      string `ch:"Body" json:"body"`
}

type Logs

type Logs struct {
	Source   string       `json:"source"`
	Contents []LogContent `json:"contents"`
}

type MergeSep

type MergeSep string
const (
	AndSep MergeSep = " AND "
	OrSep  MergeSep = " OR "
)

type NetSegments

type NetSegments struct {
	StartTime        time.Time `ch:"start_time"`
	EndTime          time.Time `ch:"end_time"`
	ResponseDuration uint64    `ch:"response_duration"`
	TapSide          string    `ch:"tap_side"`
	SpanId           string    `ch:"span_id"`
	TraceId          string    `ch:"trace_id"`
}

type PagedAlertEvent

type PagedAlertEvent struct {
	model.AlertEvent

	// 记录行号
	Rn         uint64 `ch:"rn" json:"-"`
	TotalCount uint64 `ch:"total_count" json:"-"`
}

type ParentNode

type ParentNode struct {
	ParentService string `ch:"parentService"`
	ParentUrl     string `ch:"parentUrl"`
	ParentTraced  bool   `ch:"parentTraced"`
	ClientGroup   string `ch:"clientGroup"`
	ClientType    string `ch:"clientType"`
	ClientPeer    string `ch:"clientPeer"`
	ClientKey     string `ch:"clientKey"`
}

type ProfilingEvent

type ProfilingEvent struct {
	Timestamp       time.Time         `json:"timestamp" ch:"timestamp"`
	StartTime       uint64            `json:"startTime" ch:"startTime"`
	EndTime         uint64            `json:"endTime" ch:"endTime"`
	Offset          int64             `json:"offset" ch:"offset"`
	PID             uint32            `json:"pid" ch:"pid"`
	TID             uint32            `json:"tid" ch:"tid"`
	TransactionIDs  string            `json:"transactionIds" ch:"transactionIds"`
	CPUEvents       string            `json:"cpuEvents" ch:"cpuEvents"`
	InnerCalls      string            `json:"innerCalls" ch:"innerCalls"`
	JavaFutexEvents string            `json:"javaFutexEvents" ch:"javaFutexEvents"`
	Spans           string            `json:"spans" ch:"spans"`
	ThreadName      string            `json:"threadName" ch:"threadName"` // 线程名 表中在labels中
	Labels          map[string]string `json:"labels" ch:"labels"`
}

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

func NewQueryBuilder

func NewQueryBuilder() *QueryBuilder

func (*QueryBuilder) And

func (builder *QueryBuilder) And(where *whereSQL) *QueryBuilder

And 将一系列条件whereSQL以And方式加入到QueryBuilder

func (*QueryBuilder) Between

func (builder *QueryBuilder) Between(key string, from interface{}, to interface{}) *QueryBuilder

func (*QueryBuilder) Contains

func (builder *QueryBuilder) Contains(key string, value any) *QueryBuilder

func (*QueryBuilder) Equals

func (builder *QueryBuilder) Equals(key string, value interface{}) *QueryBuilder

func (*QueryBuilder) EqualsNotEmpty

func (builder *QueryBuilder) EqualsNotEmpty(key string, value string) *QueryBuilder

func (*QueryBuilder) Exists

func (builder *QueryBuilder) Exists(key string) *QueryBuilder

func (*QueryBuilder) GreaterThan

func (builder *QueryBuilder) GreaterThan(key string, value any) *QueryBuilder

func (*QueryBuilder) In

func (builder *QueryBuilder) In(key string, values any) *QueryBuilder

组合生成SQL中的 key in (values) 语句, values内部为值数组

func (*QueryBuilder) InStrings

func (builder *QueryBuilder) InStrings(key string, values []string) *QueryBuilder

func (*QueryBuilder) LessThan

func (builder *QueryBuilder) LessThan(key string, value any) *QueryBuilder

func (*QueryBuilder) Like

func (builder *QueryBuilder) Like(key string, values any) *QueryBuilder

func (*QueryBuilder) NotContains

func (builder *QueryBuilder) NotContains(key string, value any) *QueryBuilder

func (*QueryBuilder) NotEquals

func (builder *QueryBuilder) NotEquals(key string, value interface{}) *QueryBuilder

func (*QueryBuilder) NotExists

func (builder *QueryBuilder) NotExists(key string) *QueryBuilder

func (*QueryBuilder) NotIn

func (builder *QueryBuilder) NotIn(key string, values any) *QueryBuilder

func (*QueryBuilder) NotLike

func (builder *QueryBuilder) NotLike(key string, values any) *QueryBuilder

func (*QueryBuilder) Statement

func (builder *QueryBuilder) Statement(where string) *QueryBuilder

func (*QueryBuilder) String

func (builder *QueryBuilder) String() string

返回查询条件

type QueryCount

type QueryCount struct {
	Total uint64 `ch:"total"`
}

type QueryTraceResult

type QueryTraceResult struct {
	Timestamp      int64   `ch:"ts" json:"timestamp"`
	Duration       uint64  `ch:"duration_us" json:"duration"`
	ServiceName    string  `ch:"service_name" json:"serviceName"`
	Pid            uint32  `ch:"pid" json:"pid"`
	Tid            uint32  `ch:"tid" json:"tid"`
	TraceId        string  `ch:"trace_id" json:"traceId"`
	EndPoint       string  `ch:"endpoint" json:"endpoint"`
	InstanceId     string  `ch:"instance_id" json:"instanceId"`
	SpanId         string  `ch:"span_id" json:"spanId"`
	ApmType        string  `ch:"apm_type" json:"apmType"`
	Reason         string  `ch:"reason" json:"reason"`
	IsError        bool    `ch:"is_error" json:"isError"`
	IsSlow         bool    `ch:"is_slow" json:"isSlow"`
	ThresholdValue float64 `ch:"threshold_value" json:"thresholdValue"`

	Labels  map[string]string `ch:"labels" json:"labels"`
	Flags   map[string]bool   `ch:"flags"  json:"flags"`
	Metrics map[string]uint64 `ch:"metrics" json:"metrics"`

	MutatedValue uint64 `ch:"mutated_value" json:"mutatedValue"`
	IsMutated    uint8  `ch:"is_mutated" json:"isMutated"` // 延时是否突变
}

type Repo

type Repo interface {
	// ========== service_relationship ==========
	// 查询上游节点列表
	ListParentNodes(req *request.GetServiceEndpointTopologyRequest) (*model.TopologyNodes, error)
	// 查询下游节点列表
	ListChildNodes(req *request.GetServiceEndpointTopologyRequest) (*model.TopologyNodes, error)
	// 查询所有子孙服务节点列表
	ListDescendantNodes(req *request.GetDescendantMetricsRequest) (*model.TopologyNodes, error)
	// 查询所有子孙节点的调用关系
	ListDescendantRelations(req *request.GetServiceEndpointTopologyRequest) ([]*model.ToplogyRelation, error)
	// 查询入口节点列表
	ListEntryEndpoints(req *request.GetServiceEntryEndpointsRequest) ([]EntryNode, error)

	// ========== error_propagation ==========
	// 查询实例相关的错误传播链
	ListErrorPropagation(req *request.GetErrorInstanceRequest) ([]ErrorInstancePropagation, error)

	// ========== span_trace ==========
	GetAvailableFilterKey(startTime, endTime time.Time, needUpdate bool) ([]request.SpanTraceFilter, error)
	UpdateFilterKey(startTime, endTime time.Time) ([]request.SpanTraceFilter, error)
	GetFieldValues(searchText string, filter *request.SpanTraceFilter, startTime, endTime time.Time) (*SpanTraceOptions, error)
	// 分页查询故障现场日志列表
	GetFaultLogPageList(query *FaultLogQuery) ([]FaultLogResult, int64, error)
	// 分页Trace列表
	GetTracePageList(req *request.GetTracePageListRequest) ([]QueryTraceResult, int64, error)

	CountK8sEvents(startTime int64, endTim int64, pods []string) ([]K8sEventsCount, error)

	// ========== application_logs ==========
	// 查询故障现场日志内容, sourceFrom 可为空, 将返回可查到的第一个来源中的日志
	QueryApplicationLogs(req *request.GetFaultLogContentRequest) (logContents *Logs, availableSource []string, err error)
	// 查询故障现场日志内容可用的来源
	QueryApplicationLogsAvailableSource(faultLog FaultLogResult) ([]string, error)

	CreateLogTable(req *request.LogTableRequest) ([]string, error)
	DropLogTable(req *request.LogTableRequest) ([]string, error)
	UpdateLogTable(req *request.LogTableRequest, old []request.Field) ([]string, error)

	QueryAllLogs(req *request.LogQueryRequest) ([]map[string]any, string, error)
	QueryLogContext(req *request.LogQueryContextRequest) ([]map[string]any, []map[string]any, error)
	GetLogChart(req *request.LogQueryRequest) ([]map[string]any, int64, error)
	GetLogIndex(req *request.LogIndexRequest) (map[string]uint64, uint64, error)

	OtherLogTable() ([]map[string]any, error)
	OtherLogTableInfo(req *request.OtherTableInfoRequest) ([]map[string]any, error)

	InsertBatchAlertEvents(ctx context.Context, events []*model.AlertEvent) error
	ReadAlertEvent(ctx context.Context, id uuid.UUID) (*model.AlertEvent, error)
	GetConn() driver.Conn

	//config
	ModifyTableTTL(ctx context.Context, mapResult []model.ModifyTableTTLMap) error
	GetTables(tables []model.Table) ([]model.TablesQuery, error)

	// ========== alert events ==========
	// 查询按group和级别采样的告警事件,sampleCount为每个group和级别采样的数量
	GetAlertEventCountGroupByInstance(startTime time.Time, endTime time.Time, filter request.AlertFilter, instances []*model.ServiceInstance) ([]model.AlertEventCount, error)
	// 查询按labels采样的告警事件,sampleCount为每个labels采样的数量
	GetAlertEventsSample(sampleCount int, startTime time.Time, endTime time.Time, filter request.AlertFilter, instances []*model.ServiceInstance) ([]AlertEventSample, error)
	// 查询按pageParam分页的告警事件
	GetAlertEvents(startTime time.Time, endTime time.Time, filter request.AlertFilter, instances []*model.ServiceInstance, pageParam *request.PageParam) ([]PagedAlertEvent, int, error)

	// ========== k8s events ============
	// SeverityNumber > 9 (warning)
	GetK8sAlertEventsSample(startTime time.Time, endTime time.Time, instances []*model.ServiceInstance) ([]K8sEvents, error)

	// profiling_event
	// GetOnOffCPU 获取span执行消耗
	GetOnOffCPU(pid uint32, nodeName string, startTime, endTime int64) (*[]ProfilingEvent, error)

	// ========== network (deepflow) ==========
	GetNetworkSpanSegments(traceId string, spanId string) ([]NetSegments, error)

	// ========== flame graph ===========
	GetFlameGraphData(startTime, endTime int64, nodeName string, pid, tid int64, sampleType, spanId, traceId string) (*[]FlameGraphData, error)
	// contains filtered or unexported methods
}

func New

func New(logger *zap.Logger, address []string, database string, username string, password string) (Repo, error)

type ServiceNode

type ServiceNode struct {
	Service  string `ch:"service" json:"service"`
	Endpoint string `ch:"url" json:"endpoint"`
	IsTraced bool   `ch:"traced" json:"isTraced"`
}

type Source

type Source struct {
	LogSource string `ch:"LogSource"`
}

type SpanTraceOptions

type SpanTraceOptions struct {
	request.SpanTraceFilter

	Options any `json:"options"`
}

type ValueInGroups

type ValueInGroups struct {
	Keys        []string
	ValueGroups []clickhouse.GroupSet
}

ValueInGroups 用于在OrInGroups中传入多组InGroups参数并做OR连接 每个ValueInGroups生成如下SQL,x是EqualIfNotEmpty中的每个值 (keys) IN (ValueGroups)

type WrappedConn

type WrappedConn struct {
	driver.Conn
	// contains filtered or unexported fields
}

func (*WrappedConn) Exec

func (c *WrappedConn) Exec(ctx context.Context, query string, args ...any) error

func (*WrappedConn) Ping

func (c *WrappedConn) Ping(ctx context.Context) error

func (*WrappedConn) Query

func (c *WrappedConn) Query(ctx context.Context, query string, args ...any) (driver.Rows, error)

func (*WrappedConn) QueryRow

func (c *WrappedConn) QueryRow(ctx context.Context, query string, args ...any) driver.Row

func (*WrappedConn) Select

func (c *WrappedConn) Select(ctx context.Context, dest any, query string, args ...any) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL