Documentation ¶
Overview ¶
Copyright 2019 The Prometheus Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error)
- func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
- func ExternalLabelsHandler(next storage.Queryable, externalLabels labels.Labels) storage.Queryable
- func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error)
- func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet
- func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric
- func MergeLabels(primary, secondary []prompb.Label) []prompb.Label
- func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.ReadRequest_ResponseType, error)
- func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable
- func QueryableClient(c *Client) storage.Queryable
- func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable
- func StreamChunkedReadResponses(stream io.Writer, queryIndex int64, ss storage.SeriesSet, ...) error
- func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error)
- func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error)
- type ChunkedReader
- type ChunkedWriter
- type Client
- type ClientConfig
- type HTTPError
- type QueueManager
- type Storage
- type StorageClient
- type WriteStorage
Constants ¶
const DefaultChunkedReadLimit = 5e+7
DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows. 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.
Variables ¶
This section is empty.
Functions ¶
func DecodeReadRequest ¶
func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error)
DecodeReadRequest reads a remote.Request from a http.Request.
func EncodeReadResponse ¶
func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
EncodeReadResponse writes a remote.Response to a http.ResponseWriter.
func ExternalLabelsHandler ¶
ExternalLabelsHandler returns a storage.Queryable which creates a externalLabelsQuerier.
func FromLabelMatchers ¶
func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error)
FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.
func FromQueryResult ¶
func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet
FromQueryResult unpacks a QueryResult proto.
func LabelProtosToMetric ¶
LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
func MergeLabels ¶
MergeLabels merges two sets of sorted proto labels, preferring those in primary to those in secondary when there is an overlap.
func NegotiateResponseType ¶
func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.ReadRequest_ResponseType, error)
NegotiateResponseType returns first accepted response type that this server supports. On the empty accepted list we assume that the SAMPLES response type was requested. This is to maintain backward compatibility.
func PreferLocalStorageFilter ¶
PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier if requested timeframe can be answered completely by the local TSDB, and reduces maxt if the timeframe can be partially answered by TSDB.
func QueryableClient ¶
QueryableClient returns a storage.Queryable which queries the given Client to select series sets.
func RequiredMatchersFilter ¶
RequiredMatchersFilter returns a storage.Queryable which creates a requiredMatchersQuerier.
func StreamChunkedReadResponses ¶
func StreamChunkedReadResponses( stream io.Writer, queryIndex int64, ss storage.SeriesSet, sortedExternalLabels []prompb.Label, maxBytesInFrame int, ) error
StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller. TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of re-encoding everything.
func ToQuery ¶
func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error)
ToQuery builds a Query proto.
func ToQueryResult ¶
ToQueryResult builds a QueryResult proto.
Types ¶
type ChunkedReader ¶
type ChunkedReader struct {
// contains filtered or unexported fields
}
ChunkedReader is a buffered reader that expects uvarint delimiter and checksum before each message. It will allocate as much as the biggest frame defined by delimiter (on top of bufio.Reader allocations).
func NewChunkedReader ¶
func NewChunkedReader(r io.Reader, sizeLimit uint64, data []byte) *ChunkedReader
NewChunkedReader constructs a ChunkedReader. It allows passing data slice for byte slice reuse, which will be increased to needed size if smaller.
func (*ChunkedReader) Next ¶
func (r *ChunkedReader) Next() ([]byte, error)
Next returns the next length-delimited record from the input, or io.EOF if there are no more records available. Returns io.ErrUnexpectedEOF if a short record is found, with a length of n but fewer than n bytes of data. Next also verifies the given checksum with Castagnoli polynomial CRC-32 checksum.
NOTE: The slice returned is valid only until a subsequent call to Next. It's a caller's responsibility to copy the returned slice if needed.
type ChunkedWriter ¶
type ChunkedWriter struct {
// contains filtered or unexported fields
}
ChunkedWriter is an io.Writer wrapper that allows streaming by adding uvarint delimiter before each write in a form of length of the corresponded byte array.
func NewChunkedWriter ¶
func NewChunkedWriter(w io.Writer, f http.Flusher) *ChunkedWriter
NewChunkedWriter constructs a ChunkedWriter.
func (*ChunkedWriter) Write ¶
func (w *ChunkedWriter) Write(b []byte) (int, error)
Write writes given bytes to the stream and flushes it. Each frame includes:
1. uvarint for the size of the data frame. 2. big-endian uint32 for the Castagnoli polynomial CRC-32 checksum of the data frame. 3. the bytes of the given data.
Write returns number of sent bytes for a given buffer. The number does not include delimiter and checksum bytes.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client allows reading and writing from/to a remote HTTP endpoint.
func NewClient ¶
func NewClient(index int, conf *ClientConfig) (*Client, error)
NewClient creates a new Client.
type ClientConfig ¶
type ClientConfig struct { URL *config_util.URL Timeout model.Duration HTTPClientConfig config_util.HTTPClientConfig }
ClientConfig configures a Client.
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager manages a queue of samples to be sent to the Storage indicated by the provided StorageClient. Implements writeTo interface used by WAL Watcher.
func NewQueueManager ¶
func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager
NewQueueManager builds a new QueueManager.
func (*QueueManager) Append ¶
func (t *QueueManager) Append(samples []record.RefSample) bool
Append queues a sample to be sent to the remote storage. Blocks until all samples are enqueued on their shards or a shutdown signal is received.
func (*QueueManager) SeriesReset ¶
func (t *QueueManager) SeriesReset(index int)
SeriesReset is used when reading a checkpoint. WAL Watcher should have stored series records with the checkpoints index number, so we can now delete any ref ID's lower than that # from the two maps.
func (*QueueManager) Start ¶
func (t *QueueManager) Start()
Start the queue manager sending samples to the remote storage. Does not block.
func (*QueueManager) Stop ¶
func (t *QueueManager) Stop()
Stop stops sending samples to the remote storage and waits for pending sends to complete.
func (*QueueManager) StoreSeries ¶
func (t *QueueManager) StoreSeries(series []record.RefSeries, index int)
StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage represents all the remote read and write endpoints. It implements storage.Storage.
func NewStorage ¶
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration) *Storage
NewStorage returns a remote.Storage.
func (*Storage) ApplyConfig ¶
ApplyConfig updates the state as the new config requires.
type StorageClient ¶
type StorageClient interface { // Store stores the given samples in the remote storage. Store(context.Context, []byte) error // Name identifies the remote storage implementation. Name() string }
StorageClient defines an interface for sending a batch of samples to an external timeseries database.
type WriteStorage ¶
type WriteStorage struct {
// contains filtered or unexported fields
}
WriteStorage represents all the remote write storage.
func NewWriteStorage ¶
NewWriteStorage creates and runs a WriteStorage.
func (*WriteStorage) Appender ¶
func (rws *WriteStorage) Appender() (storage.Appender, error)
Appender implements storage.Storage.
func (*WriteStorage) ApplyConfig ¶
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error
ApplyConfig updates the state as the new config requires. Only stop & create queues which have changes.