Documentation
¶
Index ¶
- Constants
- Variables
- func Add2EmptySeriesResult(t int64, val float64, vector *promql.Vector)
- func ErrorResponse(msg string, errCode string) []byte
- func GetAggFields(sql *influxql.Query) ([]string, bool)
- func GetAnalysisResults(resp *Response, sql *influxql.Query) [][]string
- func GetGzipReader(r io.Reader) (*gzip.Reader, error)
- func GetMSByScrollID(id string) (int64, error)
- func GetRangeTimeForEmptySeriesResult(promComand *promql2influxql.PromCommand) (int64, int64, int64)
- func GetTimeForEmptySeriesResult(promComand *promql2influxql.PromCommand) int64
- func IncQuerySkippingError(err error) bool
- func Interface2str(i interface{}) string
- func IsErrWithEmptyResp(err error) bool
- func NewResponseLogger(w http.ResponseWriter) http.ResponseWriter
- func ParseCredentials(r *http.Request) (*credentials, error)
- func Points2Rows(points []models.Point) ([]influx.Row, error)
- func PutGzipReader(zr *gzip.Reader)
- func QuerySkippingError(err string) bool
- func ReadAll(b []byte, r io.Reader) ([]byte, error)
- func ReadRequestToInfluxQuery(req *prompb.ReadRequest, mst string) (string, error)
- func TagsConverterRemoveInfluxSystemTag(tags map[string]string) models.Tags
- func TransYaccSyntaxErr(errorInfo string) string
- func ValidateLogStream(streamName string) error
- func ValidateRepoAndLogStream(repoName, streamName string) error
- func ValidateRepository(repoName string) error
- func WritePromEmptyResp(rw ResponseWriter, expr parser.Expr, cmd *promql2influxql.PromCommand)
- type AuthenticationMethod
- type FailLogType
- type Fragment
- type GzipWriterPool
- type Handler
- func (h *Handler) AddRoutes(routes ...Route)
- func (h *Handler) Close()
- func (h *Handler) IsWriteNode() bool
- func (h *Handler) Open()
- func (h *Handler) ResponseAggLogQuery(w http.ResponseWriter, para *QueryParam, now time.Time, hist []Histograms, ...) (b []byte, finished bool)
- func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (h *Handler) ValidateAndCheckLogStreamExists(repoName, streamName string) error
- type HighlightFragment
- type Histograms
- type JsonHighlightFragment
- type JsonMapping
- type LogDataType
- type LogResponse
- type LogWriteRequest
- type OpenGeminiCollector
- type ParseField
- type PrintFailLog
- type PromResponse
- type PromTimeValuer
- type QueryAggRequest
- type QueryLogAggResponse
- type QueryLogAggResponseEntry
- func (e *QueryLogAggResponseEntry) GetKey() string
- func (e *QueryLogAggResponseEntry) GetTime() time.Time
- func (e *QueryLogAggResponseEntry) GetValue() interface{}
- func (e *QueryLogAggResponseEntry) SetTime(time time.Time)
- func (e *QueryLogAggResponseEntry) SetValue(value interface{})
- func (e *QueryLogAggResponseEntry) Size() int64
- type QueryLogAnalyticsResponse
- type QueryLogRequest
- type QueryLogResponse
- type QueryParam
- type Response
- type ResponseWriter
- type Route
- type Service
- type StreamTaskConfig
- type StreamTaskConfigResp
- type SubscriberManager
- type Throttler
- type TimeRange
Constants ¶
const ( // DefaultChunkSize specifies the maximum number of points that will // be read before sending results back to the engine. // // This has no relation to the number of bytes that are returned. DefaultChunkSize = 10000 MaxChunkSize = DefaultChunkSize * 50 DefaultInnerChunkSize = 1024 MaxInnerChunkSize = 4096 DefaultDebugRequestsInterval = 10 * time.Second MaxDebugRequestsInterval = 6 * time.Hour )
const ( MaxTtl int64 = 3000 PermanentSaveTtl int64 = 3650 ScannerBufferSize int = 10 * 1024 * 1024 MaxContentLen int = 10 * 1024 * 1024 MaxRequestBodyLength int64 = 100 * 1024 * 1024 MaxLogTagsLen = 1024 * 1024 MaxUnixTimestampNs int64 = 4102416000000000000 MinUnixTimestampNs int64 = 1e16 NewlineLen int64 = 1 MaxRowLen = 3500 DefaultAggLogQueryTimeout = 500 IncAggLogQueryRetryCount int = 3 DefaultMaxLogStoreAnalyzeResponseNum = 100 MaxSplitCharLen int = 128 UTCPrefix int = len("UTC") MarshalFieldPunctuation int = len(`,"":`) )
const ( JSON LogDataType = 0 JSONArray LogDataType = 1 Tags = "tags" Tag = "tag" Content = "content" Time = "time" FailTag = "__fail_tag__" FailLog = "__fail_log__" RetryTag = "__retry_tag__" FailLogTag = "failLog" ExpiredLogTag = "expiredLog" BigLogTag = "bigLog" MstSuffix = "_0000" )
const ( EmptyValue = "" Reverse = "reverse" TimeoutMs = "timeout_ms" Explain = "explain" IsTruncate = "is_truncate" From = "from" To = "to" Scroll = "scroll" ScrollId = "scroll_id" Limit = "limit" Highlight = "highlight" JsonHighlight = "json" Sql = "sql" Select = "select" Query = "query" Https = "https" Http = "Http" )
Query parameter
const ( ErrSyntax = "syntax error" ErrParsingQuery = "error parsing query" ErrShardGroupNotFound = "log shard group not found" ErrShardNotFound = "shard not found" )
Err substring
const ( Timestamp = "timestamp" Cursor = "cursor" IsOverflow = "is_overflow" )
Query result fields
const ( MaxConsumeChanSize = 24 BlockSize = 4 )
const ( DefaultLogLimit = 10 MaxLogLimit = 1000 MinLogLimit = 0 MaxQueryLen = 2048 DefaultLogQueryTimeout = 10000 MaxTimeoutMs = 60000 MinTimeoutMs = 1000 MaxToValue = 9223372036854775807 MinFromValue = 0 MaxScrollIdLen = 400 MinScrollIdLen = 10 )
const ( Repository = "repository" LogStream = "logStream" Complete = "Complete" InComplete = "InComplete" XContentLength = "X-Content-Length" LogProxy = "Log-Proxy" Null = "null" )
URL query parameter
const ( IncIterNumCacheSize int64 = 1 * 1024 * 1024 QueryMetaCacheTTL = 10 * time.Minute QueryLogAggResponseEntrySize = 343 )
const ( EmptyPromMst string = "" MetricStore string = "metric_store" )
const ( StatusSuccess status = "success" StatusError status = "error" // Non-standard status code (originally introduced by nginx) for the case when a client closes // the connection while the server is still processing the request. StatusClientClosedConnection = 499 )
const (
// LogReqErr default error
LogReqErr = "CSSOP.00050001"
)
const MaxPointsForSeries = 11000
Variables ¶
var ( ErrLogRepoEmpty = errors.New("repository name should not be none") ErrLogStreamEmpty = errors.New("logstream name should not be none") ErrLogStreamDeleted = errors.New("logstream being deleted") ErrLogStreamInvalid = errors.New("logstream invalid in retentionPolicy") ErrInvalidRepoName = errors.New("invalid repository name") ErrInvalidLogStreamName = errors.New("invalid logstream name") ErrInvalidWriteNode = errors.New("this data node is not used for writing") )
bad req
var ( CompressType = map[string]bool{ "": true, "gzip": true, } )
var ( // ErrBearerAuthDisabled is returned when client specifies bearer auth in // a request but bearer auth is disabled. ErrBearerAuthDisabled = errors.New("bearer auth disabld") )
var (
LogMax = 1000
)
var QueryAggResultCache = cache.NewCache(IncIterNumCacheSize, QueryMetaCacheTTL)
Functions ¶
func Add2EmptySeriesResult ¶ added in v1.3.0
func ErrorResponse ¶
func GetAnalysisResults ¶ added in v1.3.0
func GetMSByScrollID ¶
func GetRangeTimeForEmptySeriesResult ¶ added in v1.3.0
func GetRangeTimeForEmptySeriesResult(promComand *promql2influxql.PromCommand) (int64, int64, int64)
func GetTimeForEmptySeriesResult ¶ added in v1.3.0
func GetTimeForEmptySeriesResult(promComand *promql2influxql.PromCommand) int64
func IncQuerySkippingError ¶
func Interface2str ¶ added in v1.3.0
func Interface2str(i interface{}) string
func IsErrWithEmptyResp ¶ added in v1.3.0
func NewResponseLogger ¶
func NewResponseLogger(w http.ResponseWriter) http.ResponseWriter
func ParseCredentials ¶
parseCredentials parses a request and returns the authentication credentials. The credentials may be present as URL query params, or as a Basic Authentication header. As params: http://127.0.0.1/query?u=username&p=password As basic auth: http://username:password@127.0.0.1 As Bearer token in Authorization header: Bearer <JWT_TOKEN_BLOB> As Token in Authorization header: Token <username:password>
func PutGzipReader ¶
PutGzipReader returns back gzip reader obtained via GetGzipReader.
func QuerySkippingError ¶
func ReadRequestToInfluxQuery ¶
func ReadRequestToInfluxQuery(req *prompb.ReadRequest, mst string) (string, error)
func TransYaccSyntaxErr ¶
func ValidateLogStream ¶
func ValidateRepository ¶
func WritePromEmptyResp ¶ added in v1.3.0
func WritePromEmptyResp(rw ResponseWriter, expr parser.Expr, cmd *promql2influxql.PromCommand)
Types ¶
type AuthenticationMethod ¶
type AuthenticationMethod int
AuthenticationMethod defines the type of authentication used.
const ( // Authenticate using basic authentication. UserAuthentication AuthenticationMethod = iota // Authenticate with jwt. BearerAuthentication )
Supported authentication methods.
type FailLogType ¶ added in v1.3.0
type FailLogType uint8
const ( ParseError FailLogType = iota TimestampError ObjectError ContentFieldError NoContentError )
type GzipWriterPool ¶
type GzipWriterPool struct {
// contains filtered or unexported fields
}
func NewGzipWriterPool ¶
func NewGzipWriterPool() *GzipWriterPool
func (*GzipWriterPool) Get ¶
func (p *GzipWriterPool) Get() *gzip.Writer
func (*GzipWriterPool) Put ¶
func (p *GzipWriterPool) Put(gz *gzip.Writer)
type Handler ¶
type Handler struct { Version string BuildType string MetaClient interface { Database(name string) (*meta2.DatabaseInfo, error) Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error) Authenticate(username, password string) (ui meta2.User, err error) User(username string) (meta2.User, error) AdminUserExists() bool ShowShards(db string, rp string, mst string) models.Rows TagArrayEnabled(db string) bool DataNode(id uint64) (*meta2.DataNode, error) DataNodes() ([]meta2.DataNode, error) CreateDatabase(name string, enableTagArray bool, replicaN uint32, options *obs.ObsOptions) (*meta2.DatabaseInfo, error) Databases() map[string]*meta2.DatabaseInfo MarkDatabaseDelete(name string) error Measurements(database string, ms influxql.Measurements) ([]string, error) CreateStreamPolicy(info *meta2.StreamInfo) error CreateStreamMeasurement(info *meta2.StreamInfo, src, dest *influxql.Measurement, stmt *influxql.SelectStatement) error DropStream(name string) error CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error) RetentionPolicy(database, name string) (rpi *meta2.RetentionPolicyInfo, err error) DBPtView(database string) (meta2.DBPtInfos, error) MarkRetentionPolicyDelete(database, name string) error CreateMeasurement(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo, numOfShards int32, indexR *influxql.IndexRelation, engineType config2.EngineType, colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error) UpdateMeasurement(db, rp, mst string, options *meta2.Options) error GetShardGroupByTimeRange(repoName, streamName string, min, max time.Time) ([]*meta2.ShardGroupInfo, error) RevertRetentionPolicyDelete(database, name string) error } QueryAuthorizer interface { AuthorizeQuery(u meta2.User, query *influxql.Query, database string) error } WriteAuthorizer interface { AuthorizeWrite(username, database string) error } ExtSysCtrl interface { SendSysCtrlOnNode(nodID uint64, req netstorage.SysCtrlRequest) (map[string]string, error) } QueryExecutor *query.Executor Monitor interface { } PointsWriter interface { RetryWritePointRows(database, retentionPolicy string, points []influx.Row) error } RecordWriter interface { RetryWriteLogRecord(rec *record.BulkRecords) error } SubscriberManager Config *config.Config Logger *logger.Logger CLFLogger *zap.Logger StatisticsPusher *statisticsPusher.StatisticsPusher SQLConfig *config2.TSSql // contains filtered or unexported fields }
Handler represents an HTTP handler for the InfluxDB server.
func NewHandler ¶
NewHandler returns a new instance of handler with routes.
func (*Handler) IsWriteNode ¶
func (*Handler) ResponseAggLogQuery ¶
func (h *Handler) ResponseAggLogQuery(w http.ResponseWriter, para *QueryParam, now time.Time, hist []Histograms, count int64) (b []byte, finished bool)
func (*Handler) ServeHTTP ¶
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP responds to HTTP request to the handler.
func (*Handler) ValidateAndCheckLogStreamExists ¶
type HighlightFragment ¶
type Histograms ¶
func GenZeroHistogram ¶
func GenZeroHistogram(opt query.ProcessorOptions, start, end int64, ascending bool) []Histograms
type JsonHighlightFragment ¶ added in v1.3.0
type JsonHighlightFragment struct { Key []HighlightFragment `json:"key"` Value []HighlightFragment `json:"value"` InnerJson map[string]interface{} `json:"innerJson"` }
type JsonMapping ¶
type JsonMapping struct {
// contains filtered or unexported fields
}
type LogDataType ¶
type LogDataType uint8
type LogResponse ¶
type LogWriteRequest ¶
type LogWriteRequest struct {
// contains filtered or unexported fields
}
type OpenGeminiCollector ¶ added in v1.3.0
type OpenGeminiCollector struct {
// contains filtered or unexported fields
}
func NewOpenGeminiCollector ¶ added in v1.3.0
func NewOpenGeminiCollector() *OpenGeminiCollector
func (*OpenGeminiCollector) Collect ¶ added in v1.3.0
func (c *OpenGeminiCollector) Collect(ch chan<- prometheus.Metric)
func (*OpenGeminiCollector) Describe ¶ added in v1.3.0
func (c *OpenGeminiCollector) Describe(chan<- *prometheus.Desc)
type ParseField ¶ added in v1.3.0
type ParseField struct {
// contains filtered or unexported fields
}
type PrintFailLog ¶ added in v1.3.0
type PrintFailLog struct {
// contains filtered or unexported fields
}
type PromResponse ¶ added in v1.3.0
type PromResponse struct { Status status `json:"status"` Data interface{} `json:"data,omitempty"` ErrorType errorType `json:"errorType,omitempty"` Error string `json:"error,omitempty"` }
Response represents a list of statement results.
func (PromResponse) MarshalJSON ¶ added in v1.3.0
func (r PromResponse) MarshalJSON() ([]byte, error)
MarshalJSON encodes a Response struct into JSON.
func (*PromResponse) UnmarshalJSON ¶ added in v1.3.0
func (r *PromResponse) UnmarshalJSON(b []byte) error
UnmarshalJSON decodes the data into the Response struct.
type PromTimeValuer ¶ added in v1.3.0
type PromTimeValuer struct {
// contains filtered or unexported fields
}
func NewPromTimeValuer ¶ added in v1.3.0
func NewPromTimeValuer() *PromTimeValuer
func (*PromTimeValuer) Call ¶ added in v1.3.0
func (t *PromTimeValuer) Call(name string, args []interface{}) (interface{}, bool)
func (*PromTimeValuer) SetValuer ¶ added in v1.3.0
func (t *PromTimeValuer) SetValuer(_ influxql.Valuer, _ int)
func (*PromTimeValuer) Value ¶ added in v1.3.0
func (t *PromTimeValuer) Value(key string) (interface{}, bool)
Value returns the value for a key in the MapValuer.
type QueryAggRequest ¶
type QueryAggRequest struct { Query string `json:"query,omitempty"` Timeout int `json:"timeout_ms,omitempty"` From int64 `json:"from,omitempty"` To int64 `json:"to,omitempty"` Scroll string `json:"scroll,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Sql bool `json:"sql,omitempty"` IncQuery bool Explain bool `json:"explain,omitempty"` }
type QueryLogAggResponse ¶
type QueryLogAggResponse struct { Success bool `json:"success,omitempty"` Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` Request_id string `json:"request_id,omitempty"` Count int64 `json:"total_size"` Progress string `json:"progress,omitempty"` Histograms []Histograms `json:"histograms,omitempty"` Took_ms int64 `json:"took_ms,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Explain string `json:"explain,omitempty"` }
type QueryLogAggResponseEntry ¶
type QueryLogAggResponseEntry struct {
// contains filtered or unexported fields
}
func NewQueryLogAggResponse ¶
func NewQueryLogAggResponse(query string) *QueryLogAggResponseEntry
func (*QueryLogAggResponseEntry) GetKey ¶
func (e *QueryLogAggResponseEntry) GetKey() string
func (*QueryLogAggResponseEntry) GetTime ¶
func (e *QueryLogAggResponseEntry) GetTime() time.Time
func (*QueryLogAggResponseEntry) GetValue ¶
func (e *QueryLogAggResponseEntry) GetValue() interface{}
func (*QueryLogAggResponseEntry) SetTime ¶
func (e *QueryLogAggResponseEntry) SetTime(time time.Time)
func (*QueryLogAggResponseEntry) SetValue ¶
func (e *QueryLogAggResponseEntry) SetValue(value interface{})
func (*QueryLogAggResponseEntry) Size ¶
func (e *QueryLogAggResponseEntry) Size() int64
type QueryLogAnalyticsResponse ¶
type QueryLogAnalyticsResponse struct { Success bool `json:"success,omitempty"` Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` Request_id string `json:"request_id,omitempty"` Count int64 `json:"total_size"` Progress string `json:"progress,omitempty"` Took_ms int64 `json:"took_ms,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` GroupInfo []string `json:"groupInfo,omitempty"` Dataset [][]string `json:"dataset,omitempty"` }
type QueryLogRequest ¶
type QueryLogRequest struct { Explain bool `json:"explain,omitempty"` Query string `json:"query,omitempty"` Reverse bool `json:"reverse,omitempty"` Timeout int `json:"timeout_ms,omitempty"` From int64 `json:"from,omitempty"` To int64 `json:"to,omitempty"` Scroll string `json:"scroll,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Limit int `json:"limit,omitempty"` Highlight bool `json:"highlight,omitempty"` Sql bool `json:"sql,omitempty"` IsTruncate bool `json:"is_truncate,omitempty"` Pretty bool `json:"pretty,omitempty"` }
type QueryLogResponse ¶
type QueryLogResponse struct { Success bool `json:"success,omitempty"` Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` Request_id string `json:"request_id,omitempty"` Count int64 `json:"count,omitempty"` Progress string `json:"progress,omitempty"` Logs []map[string]interface{} `json:"logs,omitempty"` Keys []string `json:"keys,omitempty"` Took_ms int64 `json:"took_ms,omitempty"` Cursor_time int64 `json:"cursor_time,omitempty"` Complete_progress float64 `json:"complete_progress,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Explain string `json:"explain,omitempty"` }
type QueryParam ¶
type QueryParam struct { Ascending bool Explain bool Highlight bool IncQuery bool Truncate bool Pretty bool IterID int32 Timeout int Limit int SeqID int64 Process float64 Scroll string Query string Scroll_id string // QueryID-IterID QueryID string TimeRange TimeRange GroupBytInterval time.Duration // contains filtered or unexported fields }
func NewQueryPara ¶
func NewQueryPara(queryPara interface{}) *QueryParam
type Response ¶
Response represents a list of statement results.
func (*Response) Error ¶
Error returns the first error from any statement. Returns nil if no errors occurred on any statements.
func (Response) MarshalJSON ¶
MarshalJSON encodes a Response struct into JSON.
func (*Response) UnmarshalJSON ¶
UnmarshalJSON decodes the data into the Response struct.
type ResponseWriter ¶
type ResponseWriter interface { // WriteResponse writes a response. WriteResponse(resp Response) (int, error) WritePromResponse(resp PromResponse) (int, error) http.ResponseWriter }
ResponseWriter is an interface for writing a response.
func NewResponseWriter ¶
func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter
NewResponseWriter creates a new ResponseWriter based on the Accept header in the request that wraps the ResponseWriter.
type Route ¶
type Route struct { Name string Method string Pattern string Gzipped bool LoggingEnabled bool HandlerFunc interface{} }
Route specifies how to handle a HTTP verb for a given endpoint.
type Service ¶
type Service struct { Ln []net.Listener Handler *Handler Logger *zap.Logger // contains filtered or unexported fields }
Service manages the listener and handler for an HTTP endpoint.
func NewService ¶
NewService returns a new instance of Service.
func (*Service) Addr ¶
Addr returns the listener's address. Returns nil if listener is closed. test func, so return 0 index addr
func (*Service) BoundHTTPAddr ¶
BoundHTTPAddr returns the string version of the address that the HTTP server is listening on. This is useful if you start an ephemeral server in test with bind address localhost:0. test func, so return 0 index addr
func (*Service) Openlistener ¶
type StreamTaskConfig ¶ added in v1.3.0
type StreamTaskConfigResp ¶ added in v1.3.0
type SubscriberManager ¶
type Throttler ¶
type Throttler struct { // Maximum amount of time requests can wait in queue. // Must be set before adding middleware. EnqueueTimeout time.Duration Logger *zap.Logger // contains filtered or unexported fields }
Throttler represents an HTTP throttler that limits the number of concurrent requests being processed as well as the number of enqueued requests.
func NewThrottler ¶
NewThrottler returns a new instance of Throttler that limits to concurrentN. requests processed at a time and maxEnqueueN requests waiting to be processed.