Documentation
¶
Index ¶
- Constants
- Variables
- func ApplyFilterOperator(node *structs.ASTNode, timeRange *dtu.TimeRange, ...) *structs.NodeResult
- func ApplyMetricsQuery(mQuery *structs.MetricsQuery, timeRange *dtu.MetricsTimeRange, qid uint64, ...) *mresults.MetricsResult
- func ApplyVectorArithmetic(aggs *structs.QueryAggregators, qid uint64) *structs.NodeResult
- func AssociateSearchInfoWithQid(qid uint64, result *segresults.SearchResults, aggs *structs.QueryAggregators, ...) error
- func CancelQuery(qid uint64)
- func ConvertASTNodeToSearchNode(node *structs.ASTNode, qid uint64) *structs.SearchNode
- func DeleteQuery(qid uint64)
- func ExtractSSRFromSearchNode(node *structs.SearchNode, ...) map[string]*structs.SegmentSearchRequest
- func GetAllMetricNamesOverTheTimeRange(timeRange *dtu.MetricsTimeRange, orgid uint64) ([]string, error)
- func GetAllTagsTreesWithinTimeRange(timeRange *dtu.MetricsTimeRange, myid uint64, ...) ([]*tagstree.AllTagTreeReaders, error)
- func GetBucketsForQid(qid uint64) (map[string]*structs.AggregationResult, error)
- func GetEncodedSegStatsForRPC(qid uint64, segKeyEnc uint16) ([]byte, bool, error)
- func GetExternalAggsInfoFiles() ([]string, error)
- func GetFinalColsOrder(columnsOrder map[string]int) []string
- func GetMeasureResultsForQid(qid uint64, pullGrpBucks bool, skenc uint16, limit int) ([]*structs.BucketHolder, []string, []string, []string, int)
- func GetNodeAndQueryTypes(sNode *structs.SearchNode, aggs *structs.QueryAggregators) (structs.SearchNodeType, structs.QueryType)
- func GetNodeResultsForRRCCmd(queryInfo *QueryInformation, sTime time.Time, ...) *structs.NodeResult
- func GetNodeResultsForSegmentStatsCmd(queryInfo *QueryInformation, sTime time.Time, ...) *structs.NodeResult
- func GetNodeResultsFromQSRS(sortedQSRSlice []*QuerySegmentRequest, queryInfo *QueryInformation, ...) *structs.NodeResult
- func GetNodeTypeFromCondition(searchCond *structs.SearchCondition) structs.SearchNodeType
- func GetNodeTypeFromNode(node *structs.SearchNode) structs.SearchNodeType
- func GetNodeTypeFromQuery(query *structs.SearchQuery) structs.SearchNodeType
- func GetNumMatchedRRCs(qid uint64) (uint64, error)
- func GetQueryCountInfoForQid(qid uint64) *structs.QueryCount
- func GetQueryInfoForQid(qid uint64) (*structs.QueryCount, uint64, error)
- func GetQueryResponseForRPC(scroll int, qid uint64) ([]*utils.RecordResultContainer, *blockresults.TimeBuckets, ...)
- func GetQueryType(qid uint64) structs.QueryType
- func GetRawRecordInfoForQid(scroll int, qid uint64) ([]*utils.RecordResultContainer, uint64, map[uint16]string, error)
- func GetRemoteRawLogInfo(remoteID string, inrrcs []*utils.RecordResultContainer, qid uint64) ([]map[string]interface{}, []string, error)
- func GetTotalSegmentsToSearch(qid uint64) (uint64, error)
- func GetTotalsRecsSearchedForQid(qid uint64) (uint64, error)
- func GetUniqueSearchErrors(qid uint64) (string, error)
- func IncrementNumFinishedSegments(incr int, qid uint64, recsSearched uint64, skEnc uint16, remoteId string, ...)
- func InitQueryInfoRefresh(getMyIds func() []uint64)
- func InitQueryMetrics()
- func InitQueryNode(getMyIds func() []uint64, ...) error
- func MicroIndexCheck(currQuery *SearchQuery, filesToSearch map[string]map[string]*BlockTracker, ...) (map[string]*SegmentSearchRequest, error)
- func PopulateMetricsMetadataForTheFile_TestOnly(mFileName string) error
- func SetFinalStatsForQid(qid uint64, nodeResult *structs.NodeResult) error
- type DistributedQueryService
- func (d *DistributedQueryService) DistributeRotatedRequests(qI *QueryInformation, qsrs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, error)
- func (d *DistributedQueryService) DistributeUnrotatedQuery(qI *QueryInformation) (uint64, error)
- func (d *DistributedQueryService) Wait(qid uint64, querySummary *summary.QuerySummary) error
- type DistributedQueryServiceInterface
- type QueryInformation
- func (qi *QueryInformation) GetAggregators() *structs.QueryAggregators
- func (qi *QueryInformation) GetIndexInfo() *structs.TableInfo
- func (qi *QueryInformation) GetOrgId() uint64
- func (qi *QueryInformation) GetPqid() string
- func (qi *QueryInformation) GetQid() uint64
- func (qi *QueryInformation) GetQueryRangeEndMs() uint64
- func (qi *QueryInformation) GetQueryRangeStartMs() uint64
- func (qi *QueryInformation) GetQueryType() structs.QueryType
- func (qi *QueryInformation) GetSearchNode() *structs.SearchNode
- func (qi *QueryInformation) GetSizeLimit() uint64
- func (qi *QueryInformation) IsAlreadyDistributed() bool
- func (qi *QueryInformation) SetAlreadyDistributed()
- func (qi *QueryInformation) Wait(querySummary *summary.QuerySummary) error
- type QuerySegmentRequest
- func ConvertSegKeysToQueryRequests(qInfo *QueryInformation, allPossibleKeys map[string]map[string]*dtu.TimeRange) []*QuerySegmentRequest
- func FilterAggSegKeysToQueryResults(qInfo *QueryInformation, allPossibleKeys map[string]map[string]*dtu.TimeRange, ...) ([]*QuerySegmentRequest, uint64)
- func FilterSegKeysToQueryResults(qInfo *QueryInformation, qsegs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, uint64)
- func (qsr *QuerySegmentRequest) GetEntireFileMicroIndexFilter() map[string]map[string]*structs.BlockTracker
- func (qsr *QuerySegmentRequest) GetMicroIndexFilter() (map[string]map[string]*structs.BlockTracker, error)
- func (qsr *QuerySegmentRequest) GetSegKey() string
- func (qsr *QuerySegmentRequest) GetTableName() string
- type QueryState
- type QueryStateChanData
- type QueryUpdate
- type QueryUpdateType
- type ResultSegmentSearchRequestMap
- type RunningQueryState
Constants ¶
const CANCEL_QUERY_AFTER_SECONDS = 5 * 60 // If 0, the query will never timeout
const MAX_GRP_BUCKS = 3000
const QUERY_INFO_REFRESH_LOOP_SECS = 300
const SECONDS_REFRESH_GLOBAL_METADATA = 30
const SECONDS_REREAD_META = 5
const SECONDS_REREAD_META_SSR = 60
const SEGMETA_FILENAME = "/segmeta.json"
Variables ¶
var ExtractKibanaRequestsFn func([]string, uint64) map[string]*structs.SegmentSearchRequest
Functions ¶
func ApplyFilterOperator ¶
func ApplyFilterOperator(node *structs.ASTNode, timeRange *dtu.TimeRange, aggs *structs.QueryAggregators, qid uint64, qc *structs.QueryContext) *structs.NodeResult
func ApplyMetricsQuery ¶
func ApplyMetricsQuery(mQuery *structs.MetricsQuery, timeRange *dtu.MetricsTimeRange, qid uint64, querySummary *summary.QuerySummary) *mresults.MetricsResult
func ApplyVectorArithmetic ¶
func ApplyVectorArithmetic(aggs *structs.QueryAggregators, qid uint64) *structs.NodeResult
func AssociateSearchInfoWithQid ¶
func AssociateSearchInfoWithQid(qid uint64, result *segresults.SearchResults, aggs *structs.QueryAggregators, dqs DistributedQueryServiceInterface, qType structs.QueryType) error
func CancelQuery ¶
func CancelQuery(qid uint64)
func ConvertASTNodeToSearchNode ¶
func ConvertASTNodeToSearchNode(node *structs.ASTNode, qid uint64) *structs.SearchNode
func DeleteQuery ¶
func DeleteQuery(qid uint64)
Removes reference to qid. If qid does not exist this is a noop
func ExtractSSRFromSearchNode ¶
func ExtractSSRFromSearchNode(node *structs.SearchNode, filesToSearch map[string]map[string]*structs.BlockTracker, timeRange *dtu.TimeRange, indexNames []string, querySummary *summary.QuerySummary, qid uint64, isQueryPersistent bool, pqid string) map[string]*structs.SegmentSearchRequest
put this in segwriter -> raw search unrotated
func GetAllMetricNamesOverTheTimeRange ¶
func GetAllMetricNamesOverTheTimeRange(timeRange *dtu.MetricsTimeRange, orgid uint64) ([]string, error)
func GetAllTagsTreesWithinTimeRange ¶
func GetAllTagsTreesWithinTimeRange(timeRange *dtu.MetricsTimeRange, myid uint64, querySummary *summary.QuerySummary) ([]*tagstree.AllTagTreeReaders, error)
func GetBucketsForQid ¶
func GetBucketsForQid(qid uint64) (map[string]*structs.AggregationResult, error)
func GetEncodedSegStatsForRPC ¶
Gets the json encoding of segstats for RPC. Returns encoded segstats for the given segkeyEnc and qid, bool if the query is segstats or not, and error
func GetFinalColsOrder ¶
The colIndex within this map may be larger than the length of the map
func GetMeasureResultsForQid ¶
func GetMeasureResultsForQid(qid uint64, pullGrpBucks bool, skenc uint16, limit int) ([]*structs.BucketHolder, []string, []string, []string, int)
gets the measure results for the running query. if the query is segment stats, it will delete the input segkeyenc
func GetNodeAndQueryTypes ¶
func GetNodeAndQueryTypes(sNode *structs.SearchNode, aggs *structs.QueryAggregators) (structs.SearchNodeType, structs.QueryType)
func GetNodeResultsForRRCCmd ¶
func GetNodeResultsForRRCCmd(queryInfo *QueryInformation, sTime time.Time, allSegFileResults *segresults.SearchResults, querySummary *summary.QuerySummary, getUnrotated bool, getRotated bool, orgid uint64) *structs.NodeResult
func GetNodeResultsForSegmentStatsCmd ¶
func GetNodeResultsForSegmentStatsCmd(queryInfo *QueryInformation, sTime time.Time, allSegFileResults *segresults.SearchResults, qsrs []*QuerySegmentRequest, querySummary *summary.QuerySummary, getUnrotated bool, getRotated bool, orgid uint64) *structs.NodeResult
func GetNodeResultsFromQSRS ¶
func GetNodeResultsFromQSRS(sortedQSRSlice []*QuerySegmentRequest, queryInfo *QueryInformation, sTime time.Time, allSegFileResults *segresults.SearchResults, querySummary *summary.QuerySummary) *structs.NodeResult
Base function to apply operators on query segment requests
func GetNodeTypeFromCondition ¶
func GetNodeTypeFromCondition(searchCond *structs.SearchCondition) structs.SearchNodeType
func GetNodeTypeFromNode ¶
func GetNodeTypeFromNode(node *structs.SearchNode) structs.SearchNodeType
todo: better and more generic node types. Right now, we just assume if its not ColumnValue, then it has to be TimeRangeQuery
func GetNodeTypeFromQuery ¶
func GetNodeTypeFromQuery(query *structs.SearchQuery) structs.SearchNodeType
func GetNumMatchedRRCs ¶
returns the length of rrcs that exist in *search.SearchResults this will be used to determine if more scrolling can be done
func GetQueryCountInfoForQid ¶
func GetQueryCountInfoForQid(qid uint64) *structs.QueryCount
returns the query counts for the qid. If qid does not exist, this will return a QueryCount set to 0
func GetQueryInfoForQid ¶
func GetQueryInfoForQid(qid uint64) (*structs.QueryCount, uint64, error)
returns the query counts and searched count for the qid. If qid does not exist, this will return a QueryCount set to 0
func GetQueryResponseForRPC ¶
func GetQueryResponseForRPC(scroll int, qid uint64) ([]*utils.RecordResultContainer, *blockresults.TimeBuckets, *blockresults.GroupByBuckets, map[uint16]string, error)
returns rrcs, raw time buckets, raw groupby buckets, querycounts, map of segkey encoding, and errors
func GetQueryType ¶
func GetRawRecordInfoForQid ¶
func GetRawRecordInfoForQid(scroll int, qid uint64) ([]*utils.RecordResultContainer, uint64, map[uint16]string, error)
returns the rrcs, query counts, map of segkey encoding, and errors
func GetRemoteRawLogInfo ¶
func GetRemoteRawLogInfo(remoteID string, inrrcs []*utils.RecordResultContainer, qid uint64) ([]map[string]interface{}, []string, error)
Get remote raw logs and columns based on the remoteID and all RRCs
func GetUniqueSearchErrors ¶
func IncrementNumFinishedSegments ¶
func IncrementNumFinishedSegments(incr int, qid uint64, recsSearched uint64, skEnc uint16, remoteId string, doBuckPull bool, sstMap map[string]*structs.SegStats)
increments the finished segments. If incr is 0, then the current query is finished and a histogram will be flushed
func InitQueryInfoRefresh ¶
func InitQueryInfoRefresh(getMyIds func() []uint64)
ingest only nodes should call this to be fetching remote pqs information
func InitQueryMetrics ¶
func InitQueryMetrics()
func InitQueryNode ¶
func InitQueryNode(getMyIds func() []uint64, extractKibanaRequestsFn func([]string, uint64) map[string]*structs.SegmentSearchRequest) error
Inits metadata layer and search limiter
func MicroIndexCheck ¶
func MicroIndexCheck(currQuery *SearchQuery, filesToSearch map[string]map[string]*BlockTracker, timeRange *dtu.TimeRange, indexNames []string, querySummary *summary.QuerySummary, qid uint64, isQueryPersistent bool, pqid string) (map[string]*SegmentSearchRequest, error)
Top level micro index checking function. For a filter, input segkeys, timeRange, indexNames will do the following: 1. blockbloom/ blockrange filtering 2. search request generation
Assumes that filesToSearch has been time filtered Returns a map[string]*SegmentSearchRequest mapping a segment key to the corresponding search request and any errors
func SetFinalStatsForQid ¶
func SetFinalStatsForQid(qid uint64, nodeResult *structs.NodeResult) error
Types ¶
type DistributedQueryService ¶
type DistributedQueryService struct {
// contains filtered or unexported fields
}
func InitDistQueryService ¶
func InitDistQueryService(querySummary *summary.QuerySummary, allSegFileResults *segresults.SearchResults) *DistributedQueryService
func (*DistributedQueryService) DistributeRotatedRequests ¶
func (d *DistributedQueryService) DistributeRotatedRequests(qI *QueryInformation, qsrs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, error)
func (*DistributedQueryService) DistributeUnrotatedQuery ¶
func (d *DistributedQueryService) DistributeUnrotatedQuery(qI *QueryInformation) (uint64, error)
func (*DistributedQueryService) Wait ¶
func (d *DistributedQueryService) Wait(qid uint64, querySummary *summary.QuerySummary) error
type DistributedQueryServiceInterface ¶
type DistributedQueryServiceInterface interface { Wait(qid uint64, querySummary *summary.QuerySummary) error DistributeRotatedRequests(qI *QueryInformation, qsrs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, error) DistributeUnrotatedQuery(qI *QueryInformation) (uint64, error) }
type QueryInformation ¶
type QueryInformation struct {
// contains filtered or unexported fields
}
Holder struct for all query information
func InitQueryInformation ¶
func InitQueryInformation(s *structs.SearchNode, aggs *structs.QueryAggregators, queryRange *dtu.TimeRange, indexInfo *structs.TableInfo, sizeLimit uint64, parallelismPerFile int64, qid uint64, dqs DistributedQueryServiceInterface, orgid uint64) (*QueryInformation, error)
Returns a holder struct with query information
This contains DistributedQueryServiceInterface, which will be used to send grpcs to other nodes as needed ¶
The caller is responsible for calling qs.Wait() to wait for all grpcs to finish
func (*QueryInformation) GetAggregators ¶
func (qi *QueryInformation) GetAggregators() *structs.QueryAggregators
func (*QueryInformation) GetIndexInfo ¶
func (qi *QueryInformation) GetIndexInfo() *structs.TableInfo
func (*QueryInformation) GetOrgId ¶
func (qi *QueryInformation) GetOrgId() uint64
func (*QueryInformation) GetPqid ¶
func (qi *QueryInformation) GetPqid() string
func (*QueryInformation) GetQid ¶
func (qi *QueryInformation) GetQid() uint64
func (*QueryInformation) GetQueryRangeEndMs ¶
func (qi *QueryInformation) GetQueryRangeEndMs() uint64
func (*QueryInformation) GetQueryRangeStartMs ¶
func (qi *QueryInformation) GetQueryRangeStartMs() uint64
func (*QueryInformation) GetQueryType ¶
func (qi *QueryInformation) GetQueryType() structs.QueryType
func (*QueryInformation) GetSearchNode ¶
func (qi *QueryInformation) GetSearchNode() *structs.SearchNode
func (*QueryInformation) GetSizeLimit ¶
func (qi *QueryInformation) GetSizeLimit() uint64
func (*QueryInformation) IsAlreadyDistributed ¶
func (qi *QueryInformation) IsAlreadyDistributed() bool
func (*QueryInformation) SetAlreadyDistributed ¶
func (qi *QueryInformation) SetAlreadyDistributed()
func (*QueryInformation) Wait ¶
func (qi *QueryInformation) Wait(querySummary *summary.QuerySummary) error
waits and closes the distributed query service
type QuerySegmentRequest ¶
type QuerySegmentRequest struct { QueryInformation HasMatchedRrc bool // contains filtered or unexported fields }
func ConvertSegKeysToQueryRequests ¶
func ConvertSegKeysToQueryRequests(qInfo *QueryInformation, allPossibleKeys map[string]map[string]*dtu.TimeRange) []*QuerySegmentRequest
func FilterAggSegKeysToQueryResults ¶
func FilterAggSegKeysToQueryResults(qInfo *QueryInformation, allPossibleKeys map[string]map[string]*dtu.TimeRange, aggs *structs.QueryAggregators, segType structs.SegType) ([]*QuerySegmentRequest, uint64)
Returns sorted order of query segment requests, count of keys to raw search
func FilterSegKeysToQueryResults ¶
func FilterSegKeysToQueryResults(qInfo *QueryInformation, qsegs []*QuerySegmentRequest) ([]*QuerySegmentRequest, uint64, uint64)
Returns sorted order of query segment requests, count of keys to raw search, count of keys in PQS
func (*QuerySegmentRequest) GetEntireFileMicroIndexFilter ¶
func (qsr *QuerySegmentRequest) GetEntireFileMicroIndexFilter() map[string]map[string]*structs.BlockTracker
returns map[table] -> map[segKey] -> entire file block tracker to pass into MicroIndexCheck and ExtractSSRFromSearchNode
func (*QuerySegmentRequest) GetMicroIndexFilter ¶
func (qsr *QuerySegmentRequest) GetMicroIndexFilter() (map[string]map[string]*structs.BlockTracker, error)
returns map[table] -> map[segKey] -> blkTracker to pass into MicroIndexCheck and ExtractSSRFromSearchNode Returns error if qsr.blkTracker is nil
func (*QuerySegmentRequest) GetSegKey ¶
func (qsr *QuerySegmentRequest) GetSegKey() string
func (*QuerySegmentRequest) GetTableName ¶
func (qsr *QuerySegmentRequest) GetTableName() string
type QueryState ¶
type QueryState int
const ( RUNNING QueryState = iota + 1 QUERY_UPDATE // flush segment counts & aggs & records (if matched) COMPLETE TIMEOUT ERROR )
func (QueryState) String ¶
func (qs QueryState) String() string
type QueryStateChanData ¶
type QueryStateChanData struct { StateName QueryState QueryUpdate *QueryUpdate PercentComplete float64 }
type QueryUpdate ¶
type QueryUpdate struct { QUpdate QueryUpdateType SegKeyEnc uint16 RemoteID string }
type QueryUpdateType ¶
type QueryUpdateType int
const ( QUERY_UPDATE_LOCAL QueryUpdateType = iota + 1 QUERY_UPDATE_REMOTE )
type RunningQueryState ¶
type RunningQueryState struct { StateChan chan *QueryStateChanData // channel to send state changes of query QType structs.QueryType // contains filtered or unexported fields }
func StartQuery ¶
func StartQuery(qid uint64, async bool) (*RunningQueryState, error)
Starts tracking the query state. If async is true, the RunningQueryState.StateChan will be defined & will be sent updates If async, updates will be sent for any update to RunningQueryState. Caller is responsible to call DeleteQuery
func (*RunningQueryState) IsAsync ¶
func (rQuery *RunningQueryState) IsAsync() bool
func (*RunningQueryState) SendQueryStateComplete ¶
func (rQuery *RunningQueryState) SendQueryStateComplete()