query

package
v0.0.0-...-a2dbdad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2024 License: AGPL-3.0 Imports: 42 Imported by: 6

Documentation

Index

Constants

View Source
const CANCEL_QUERY_AFTER_SECONDS = 5 * 60 // If 0, the query will never timeout
View Source
const MAX_GRP_BUCKS = 3000
View Source
const QUERY_INFO_REFRESH_LOOP_SECS = 300
View Source
const SECONDS_REFRESH_GLOBAL_METADATA = 30
View Source
const SECONDS_REREAD_META = 5
View Source
const SECONDS_REREAD_META_SSR = 60
View Source
const SEGMETA_FILENAME = "/segmeta.json"

Variables

View Source
var ExtractKibanaRequestsFn func([]string, uint64) map[string]*structs.SegmentSearchRequest

Functions

func ApplyFilterOperator

func ApplyFilterOperator(node *structs.ASTNode, timeRange *dtu.TimeRange, aggs *structs.QueryAggregators,
	qid uint64, qc *structs.QueryContext) *structs.NodeResult

func ApplyMetricsQuery

func ApplyMetricsQuery(mQuery *structs.MetricsQuery, timeRange *dtu.MetricsTimeRange, qid uint64, querySummary *summary.QuerySummary) *mresults.MetricsResult

func ApplyVectorArithmetic

func ApplyVectorArithmetic(aggs *structs.QueryAggregators, qid uint64) *structs.NodeResult

func CancelQuery

func CancelQuery(qid uint64)

func ConvertASTNodeToSearchNode

func ConvertASTNodeToSearchNode(node *structs.ASTNode, qid uint64) *structs.SearchNode

func DeleteQuery

func DeleteQuery(qid uint64)

Removes reference to qid. If qid does not exist this is a noop

func ExtractSSRFromSearchNode

func ExtractSSRFromSearchNode(node *structs.SearchNode, filesToSearch map[string]map[string]*structs.BlockTracker, timeRange *dtu.TimeRange,
	indexNames []string, querySummary *summary.QuerySummary, qid uint64, isQueryPersistent bool, pqid string) map[string]*structs.SegmentSearchRequest

put this in segwriter -> raw search unrotated

func GetAllMetricNamesOverTheTimeRange

func GetAllMetricNamesOverTheTimeRange(timeRange *dtu.MetricsTimeRange, orgid uint64) ([]string, error)

func GetAllTagsTreesWithinTimeRange

func GetAllTagsTreesWithinTimeRange(timeRange *dtu.MetricsTimeRange, myid uint64, querySummary *summary.QuerySummary) ([]*tagstree.AllTagTreeReaders, error)

func GetBucketsForQid

func GetBucketsForQid(qid uint64) (map[string]*structs.AggregationResult, error)

func GetEncodedSegStatsForRPC

func GetEncodedSegStatsForRPC(qid uint64, segKeyEnc uint16) ([]byte, bool, error)

Gets the json encoding of segstats for RPC. Returns encoded segstats for the given segkeyEnc and qid, bool if the query is segstats or not, and error

func GetExternalAggsInfoFiles

func GetExternalAggsInfoFiles() ([]string, error)

func GetFinalColsOrder

func GetFinalColsOrder(columnsOrder map[string]int) []string

The colIndex within this map may be larger than the length of the map

func GetMeasureResultsForQid

func GetMeasureResultsForQid(qid uint64, pullGrpBucks bool, skenc uint16, limit int) ([]*structs.BucketHolder, []string, []string, []string, int)

gets the measure results for the running query. if the query is segment stats, it will delete the input segkeyenc

func GetNodeResultsForRRCCmd

func GetNodeResultsForRRCCmd(queryInfo *QueryInformation, sTime time.Time, allSegFileResults *segresults.SearchResults,
	querySummary *summary.QuerySummary, getUnrotated bool, getRotated bool, orgid uint64) *structs.NodeResult

func GetNodeResultsForSegmentStatsCmd

func GetNodeResultsForSegmentStatsCmd(queryInfo *QueryInformation, sTime time.Time, allSegFileResults *segresults.SearchResults,
	qsrs []*QuerySegmentRequest, querySummary *summary.QuerySummary, getUnrotated bool, getRotated bool, orgid uint64) *structs.NodeResult

func GetNodeResultsFromQSRS

func GetNodeResultsFromQSRS(sortedQSRSlice []*QuerySegmentRequest, queryInfo *QueryInformation, sTime time.Time,
	allSegFileResults *segresults.SearchResults, querySummary *summary.QuerySummary) *structs.NodeResult

Base function to apply operators on query segment requests

func GetNodeTypeFromCondition

func GetNodeTypeFromCondition(searchCond *structs.SearchCondition) structs.SearchNodeType

func GetNodeTypeFromNode

func GetNodeTypeFromNode(node *structs.SearchNode) structs.SearchNodeType

todo: better and more generic node types. Right now, we just assume if its not ColumnValue, then it has to be TimeRangeQuery

func GetNodeTypeFromQuery

func GetNodeTypeFromQuery(query *structs.SearchQuery) structs.SearchNodeType

func GetNumMatchedRRCs

func GetNumMatchedRRCs(qid uint64) (uint64, error)

returns the length of rrcs that exist in *search.SearchResults this will be used to determine if more scrolling can be done

func GetQueryCountInfoForQid

func GetQueryCountInfoForQid(qid uint64) *structs.QueryCount

returns the query counts for the qid. If qid does not exist, this will return a QueryCount set to 0

func GetQueryInfoForQid

func GetQueryInfoForQid(qid uint64) (*structs.QueryCount, uint64, error)

returns the query counts and searched count for the qid. If qid does not exist, this will return a QueryCount set to 0

func GetQueryResponseForRPC

func GetQueryResponseForRPC(scroll int, qid uint64) ([]*utils.RecordResultContainer, *blockresults.TimeBuckets,
	*blockresults.GroupByBuckets, map[uint16]string, error)

returns rrcs, raw time buckets, raw groupby buckets, querycounts, map of segkey encoding, and errors

func GetQueryType

func GetQueryType(qid uint64) structs.QueryType

func GetRawRecordInfoForQid

func GetRawRecordInfoForQid(scroll int, qid uint64) ([]*utils.RecordResultContainer, uint64, map[uint16]string, error)

returns the rrcs, query counts, map of segkey encoding, and errors

func GetRemoteRawLogInfo

func GetRemoteRawLogInfo(remoteID string, inrrcs []*utils.RecordResultContainer, qid uint64) ([]map[string]interface{}, []string, error)

Get remote raw logs and columns based on the remoteID and all RRCs

func GetTotalSegmentsToSearch

func GetTotalSegmentsToSearch(qid uint64) (uint64, error)

func GetTotalsRecsSearchedForQid

func GetTotalsRecsSearchedForQid(qid uint64) (uint64, error)

func GetUniqueSearchErrors

func GetUniqueSearchErrors(qid uint64) (string, error)

func IncrementNumFinishedSegments

func IncrementNumFinishedSegments(incr int, qid uint64, recsSearched uint64,
	skEnc uint16, remoteId string, doBuckPull bool, sstMap map[string]*structs.SegStats)

increments the finished segments. If incr is 0, then the current query is finished and a histogram will be flushed

func InitQueryInfoRefresh

func InitQueryInfoRefresh(getMyIds func() []uint64)

ingest only nodes should call this to be fetching remote pqs information

func InitQueryMetrics

func InitQueryMetrics()

func InitQueryNode

func InitQueryNode(getMyIds func() []uint64, extractKibanaRequestsFn func([]string, uint64) map[string]*structs.SegmentSearchRequest) error

Inits metadata layer and search limiter

func MicroIndexCheck

func MicroIndexCheck(currQuery *SearchQuery, filesToSearch map[string]map[string]*BlockTracker, timeRange *dtu.TimeRange,
	indexNames []string, querySummary *summary.QuerySummary, qid uint64, isQueryPersistent bool, pqid string) (map[string]*SegmentSearchRequest, error)

Top level micro index checking function. For a filter, input segkeys, timeRange, indexNames will do the following: 1. blockbloom/ blockrange filtering 2. search request generation

Assumes that filesToSearch has been time filtered Returns a map[string]*SegmentSearchRequest mapping a segment key to the corresponding search request and any errors

func PopulateMetricsMetadataForTheFile_TestOnly

func PopulateMetricsMetadataForTheFile_TestOnly(mFileName string) error

func SetFinalStatsForQid

func SetFinalStatsForQid(qid uint64, nodeResult *structs.NodeResult) error

Types

type DistributedQueryService

type DistributedQueryService struct {
	// contains filtered or unexported fields
}

func InitDistQueryService

func InitDistQueryService(querySummary *summary.QuerySummary, allSegFileResults *segresults.SearchResults) *DistributedQueryService

func (*DistributedQueryService) DistributeRotatedRequests

func (d *DistributedQueryService) DistributeRotatedRequests(qI *QueryInformation, qsrs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, error)

func (*DistributedQueryService) DistributeUnrotatedQuery

func (d *DistributedQueryService) DistributeUnrotatedQuery(qI *QueryInformation) (uint64, error)

func (*DistributedQueryService) Wait

func (d *DistributedQueryService) Wait(qid uint64, querySummary *summary.QuerySummary) error

type DistributedQueryServiceInterface

type DistributedQueryServiceInterface interface {
	Wait(qid uint64, querySummary *summary.QuerySummary) error
	DistributeRotatedRequests(qI *QueryInformation, qsrs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, error)
	DistributeUnrotatedQuery(qI *QueryInformation) (uint64, error)
}

type QueryInformation

type QueryInformation struct {
	// contains filtered or unexported fields
}

Holder struct for all query information

func InitQueryInformation

func InitQueryInformation(s *structs.SearchNode, aggs *structs.QueryAggregators, queryRange *dtu.TimeRange,
	indexInfo *structs.TableInfo, sizeLimit uint64, parallelismPerFile int64, qid uint64,
	dqs DistributedQueryServiceInterface, orgid uint64) (*QueryInformation, error)

Returns a holder struct with query information

This contains DistributedQueryServiceInterface, which will be used to send grpcs to other nodes as needed

The caller is responsible for calling qs.Wait() to wait for all grpcs to finish

func (*QueryInformation) GetAggregators

func (qi *QueryInformation) GetAggregators() *structs.QueryAggregators

func (*QueryInformation) GetIndexInfo

func (qi *QueryInformation) GetIndexInfo() *structs.TableInfo

func (*QueryInformation) GetOrgId

func (qi *QueryInformation) GetOrgId() uint64

func (*QueryInformation) GetPqid

func (qi *QueryInformation) GetPqid() string

func (*QueryInformation) GetQid

func (qi *QueryInformation) GetQid() uint64

func (*QueryInformation) GetQueryRangeEndMs

func (qi *QueryInformation) GetQueryRangeEndMs() uint64

func (*QueryInformation) GetQueryRangeStartMs

func (qi *QueryInformation) GetQueryRangeStartMs() uint64

func (*QueryInformation) GetQueryType

func (qi *QueryInformation) GetQueryType() structs.QueryType

func (*QueryInformation) GetSearchNode

func (qi *QueryInformation) GetSearchNode() *structs.SearchNode

func (*QueryInformation) GetSizeLimit

func (qi *QueryInformation) GetSizeLimit() uint64

func (*QueryInformation) IsAlreadyDistributed

func (qi *QueryInformation) IsAlreadyDistributed() bool

func (*QueryInformation) SetAlreadyDistributed

func (qi *QueryInformation) SetAlreadyDistributed()

func (*QueryInformation) Wait

func (qi *QueryInformation) Wait(querySummary *summary.QuerySummary) error

waits and closes the distributed query service

type QuerySegmentRequest

type QuerySegmentRequest struct {
	QueryInformation

	HasMatchedRrc bool
	// contains filtered or unexported fields
}

func ConvertSegKeysToQueryRequests

func ConvertSegKeysToQueryRequests(qInfo *QueryInformation, allPossibleKeys map[string]map[string]*dtu.TimeRange) []*QuerySegmentRequest

func FilterAggSegKeysToQueryResults

func FilterAggSegKeysToQueryResults(qInfo *QueryInformation, allPossibleKeys map[string]map[string]*dtu.TimeRange,
	aggs *structs.QueryAggregators, segType structs.SegType) ([]*QuerySegmentRequest, uint64)

Returns sorted order of query segment requests, count of keys to raw search

func FilterSegKeysToQueryResults

func FilterSegKeysToQueryResults(qInfo *QueryInformation, qsegs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, uint64)

Returns sorted order of query segment requests, count of keys to raw search, count of keys in PQS

func (*QuerySegmentRequest) GetEntireFileMicroIndexFilter

func (qsr *QuerySegmentRequest) GetEntireFileMicroIndexFilter() map[string]map[string]*structs.BlockTracker

returns map[table] -> map[segKey] -> entire file block tracker to pass into MicroIndexCheck and ExtractSSRFromSearchNode

func (*QuerySegmentRequest) GetMicroIndexFilter

func (qsr *QuerySegmentRequest) GetMicroIndexFilter() (map[string]map[string]*structs.BlockTracker, error)

returns map[table] -> map[segKey] -> blkTracker to pass into MicroIndexCheck and ExtractSSRFromSearchNode Returns error if qsr.blkTracker is nil

func (*QuerySegmentRequest) GetSegKey

func (qsr *QuerySegmentRequest) GetSegKey() string

func (*QuerySegmentRequest) GetTableName

func (qsr *QuerySegmentRequest) GetTableName() string

type QueryState

type QueryState int
const (
	RUNNING      QueryState = iota + 1
	QUERY_UPDATE            // flush segment counts & aggs & records (if matched)
	COMPLETE
	TIMEOUT
	ERROR
)

func (QueryState) String

func (qs QueryState) String() string

type QueryStateChanData

type QueryStateChanData struct {
	StateName       QueryState
	QueryUpdate     *QueryUpdate
	PercentComplete float64
}

type QueryUpdate

type QueryUpdate struct {
	QUpdate   QueryUpdateType
	SegKeyEnc uint16
	RemoteID  string
}

type QueryUpdateType

type QueryUpdateType int
const (
	QUERY_UPDATE_LOCAL QueryUpdateType = iota + 1
	QUERY_UPDATE_REMOTE
)

type ResultSegmentSearchRequestMap

type ResultSegmentSearchRequestMap struct {
	Result map[string]*SegmentSearchRequest
	Err    error
}

type RunningQueryState

type RunningQueryState struct {
	StateChan chan *QueryStateChanData // channel to send state changes of query

	QType structs.QueryType
	// contains filtered or unexported fields
}

func StartQuery

func StartQuery(qid uint64, async bool) (*RunningQueryState, error)

Starts tracking the query state. If async is true, the RunningQueryState.StateChan will be defined & will be sent updates If async, updates will be sent for any update to RunningQueryState. Caller is responsible to call DeleteQuery

func (*RunningQueryState) IsAsync

func (rQuery *RunningQueryState) IsAsync() bool

func (*RunningQueryState) SendQueryStateComplete

func (rQuery *RunningQueryState) SendQueryStateComplete()

Directories

Path Synopsis
pqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL