Documentation ¶
Index ¶
- Constants
- Variables
- func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error)
- func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error)
- func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error)
- func DecodeWriteV2Request(r io.Reader) (*writev2.Request, error)
- func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
- func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error)
- func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
- func MergeLabels(primary, secondary []prompb.Label) []prompb.Label
- func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.ReadRequest_ResponseType, error)
- func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, ...) storage.SeriesSet
- func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, ...) http.Handler
- func NewReadHandler(logger *slog.Logger, r prometheus.Registerer, ...) http.Handler
- func NewSampleAndChunkQueryableClient(c ReadClient, externalLabels labels.Labels, requiredMatchers []*labels.Matcher, ...) storage.SampleAndChunkQueryable
- func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, ...) http.Handler
- func StreamChunkedReadResponses(stream io.Writer, queryIndex int64, ss storage.ChunkSeriesSet, ...) (annotations.Annotations, error)
- func ToLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error)
- func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error)
- func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, annotations.Annotations, error)
- type ChunkedReader
- type ChunkedWriter
- type Client
- type ClientConfig
- type Compression
- type HTTPError
- type MetadataAppender
- type MetadataWatcher
- type QueueManager
- func (t *QueueManager) Append(samples []record.RefSample) bool
- func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool
- func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHistogramSample) bool
- func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) bool
- func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scrape.MetricMetadata)
- func (t *QueueManager) SeriesReset(index int)
- func (t *QueueManager) SetClient(c WriteClient)
- func (t *QueueManager) Start()
- func (t *QueueManager) Stop()
- func (t *QueueManager) StoreMetadata(meta []record.RefMetadata)
- func (t *QueueManager) StoreSeries(series []record.RefSeries, index int)
- func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int)
- type ReadClient
- type ReadyScrapeManager
- type RecoverableError
- type Storage
- func (s *Storage) Appender(ctx context.Context) storage.Appender
- func (s *Storage) ApplyConfig(conf *config.Config) error
- func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
- func (s *Storage) Close() error
- func (s *Storage) LowestSentTimestamp() int64
- func (s *Storage) Notify()
- func (s *Storage) Querier(mint, maxt int64) (storage.Querier, error)
- func (s *Storage) StartTime() (int64, error)
- type Watchable
- type WriteClient
- type WriteResponseStats
- type WriteStorage
Constants ¶
const ( RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" RemoteWriteVersion1HeaderValue = "0.1.0" RemoteWriteVersion20HeaderValue = "2.0.0" )
Variables ¶
var ( // UserAgent represents Prometheus version to use for user agent header. UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) AcceptedResponseTypes = []prompb.ReadRequest_ResponseType{ prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES, } )
Functions ¶
func DecodeOTLPWriteRequest ¶ added in v0.47.0
func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error)
func DecodeReadRequest ¶
func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error)
DecodeReadRequest reads a remote.Request from a http.Request.
func DecodeWriteRequest ¶
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error)
DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling snappy decompression. Used also by documentation/examples/remote_storage.
func DecodeWriteV2Request ¶ added in v0.54.0
DecodeWriteV2Request from an io.Reader into a writev2.Request, handling snappy decompression. Used also by documentation/examples/remote_storage.
func EncodeReadResponse ¶
func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
EncodeReadResponse writes a remote.Response to a http.ResponseWriter.
func FromLabelMatchers ¶
func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error)
FromLabelMatchers converts protobuf label matchers to Prometheus label matchers.
func FromQueryResult ¶
func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
FromQueryResult unpacks and sorts a QueryResult proto.
func MergeLabels ¶
MergeLabels merges two sets of sorted proto labels, preferring those in primary to those in secondary when there is an overlap.
func NegotiateResponseType ¶
func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.ReadRequest_ResponseType, error)
NegotiateResponseType returns first accepted response type that this server supports. On the empty accepted list we assume that the SAMPLES response type was requested. This is to maintain backward compatibility.
func NewChunkedSeriesSet ¶ added in v0.55.0
func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel func(error)) storage.SeriesSet
func NewOTLPWriteHandler ¶ added in v0.47.0
func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler
NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and writes them to the provided appendable.
func NewReadHandler ¶
func NewReadHandler(logger *slog.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame int) http.Handler
NewReadHandler creates a http.Handler that accepts remote read requests and writes them to the provided queryable.
func NewSampleAndChunkQueryableClient ¶
func NewSampleAndChunkQueryableClient( c ReadClient, externalLabels labels.Labels, requiredMatchers []*labels.Matcher, readRecent bool, callback startTimeCallback, ) storage.SampleAndChunkQueryable
NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.
func NewWriteHandler ¶
func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg) http.Handler
NewWriteHandler creates a http.Handler that accepts remote write requests with the given message in acceptedProtoMsgs and writes them to the provided appendable.
NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write.
func StreamChunkedReadResponses ¶
func StreamChunkedReadResponses( stream io.Writer, queryIndex int64, ss storage.ChunkSeriesSet, sortedExternalLabels []prompb.Label, maxBytesInFrame int, marshalPool *sync.Pool, ) (annotations.Annotations, error)
StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller. It expects Series set with populated chunks.
func ToLabelMatchers ¶ added in v0.54.0
func ToLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error)
ToLabelMatchers converts Prometheus label matchers to protobuf label matchers.
func ToQuery ¶
func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error)
ToQuery builds a Query proto.
func ToQueryResult ¶
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, annotations.Annotations, error)
ToQueryResult builds a QueryResult proto.
Types ¶
type ChunkedReader ¶
type ChunkedReader struct {
// contains filtered or unexported fields
}
ChunkedReader is a buffered reader that expects uvarint delimiter and checksum before each message. It will allocate as much as the biggest frame defined by delimiter (on top of bufio.Reader allocations).
func NewChunkedReader ¶
func NewChunkedReader(r io.Reader, sizeLimit uint64, data []byte) *ChunkedReader
NewChunkedReader constructs a ChunkedReader. It allows passing data slice for byte slice reuse, which will be increased to needed size if smaller.
func (*ChunkedReader) Next ¶
func (r *ChunkedReader) Next() ([]byte, error)
Next returns the next length-delimited record from the input, or io.EOF if there are no more records available. Returns io.ErrUnexpectedEOF if a short record is found, with a length of n but fewer than n bytes of data. Next also verifies the given checksum with Castagnoli polynomial CRC-32 checksum.
NOTE: The slice returned is valid only until a subsequent call to Next. It's a caller's responsibility to copy the returned slice if needed.
type ChunkedWriter ¶
type ChunkedWriter struct {
// contains filtered or unexported fields
}
ChunkedWriter is an io.Writer wrapper that allows streaming by adding uvarint delimiter before each write in a form of length of the corresponded byte array.
func NewChunkedWriter ¶
func NewChunkedWriter(w io.Writer, f http.Flusher) *ChunkedWriter
NewChunkedWriter constructs a ChunkedWriter.
func (*ChunkedWriter) Write ¶
func (w *ChunkedWriter) Write(b []byte) (int, error)
Write writes given bytes to the stream and flushes it. Each frame includes:
1. uvarint for the size of the data frame. 2. big-endian uint32 for the Castagnoli polynomial CRC-32 checksum of the data frame. 3. the bytes of the given data.
Write returns number of sent bytes for a given buffer. The number does not include delimiter and checksum bytes.
type Client ¶
Client allows reading and writing from/to a remote HTTP endpoint.
type ClientConfig ¶
type ClientConfig struct { URL *config_util.URL Timeout model.Duration HTTPClientConfig config_util.HTTPClientConfig SigV4Config *sigv4.SigV4Config AzureADConfig *azuread.AzureADConfig GoogleIAMConfig *googleiam.Config Headers map[string]string RetryOnRateLimit bool WriteProtoMsg config.RemoteWriteProtoMsg ChunkedReadLimit uint64 }
ClientConfig configures a client.
type Compression ¶ added in v0.54.0
type Compression string
Compression represents the encoding. Currently remote storage supports only one, but we experiment with more, thus leaving the compression scaffolding for now. NOTE(bwplotka): Keeping it public, as a non-stable help for importers to use.
const ( // SnappyBlockCompression represents https://github.com/google/snappy/blob/2c94e11145f0b7b184b831577c93e5a41c4c0346/format_description.txt SnappyBlockCompression Compression = "snappy" )
type MetadataAppender ¶
type MetadataAppender interface {
AppendWatcherMetadata(context.Context, []scrape.MetricMetadata)
}
MetadataAppender is an interface used by the Metadata Watcher to send metadata, It is read from the scrape manager, on to somewhere else.
type MetadataWatcher ¶
type MetadataWatcher struct {
// contains filtered or unexported fields
}
MetadataWatcher watches the Scrape Manager for a given WriteMetadataTo.
func NewMetadataWatcher ¶
func NewMetadataWatcher(l *slog.Logger, mg ReadyScrapeManager, name string, w MetadataAppender, interval model.Duration, deadline time.Duration) *MetadataWatcher
NewMetadataWatcher builds a new MetadataWatcher.
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager manages a queue of samples to be sent to the Storage indicated by the provided WriteClient. Implements writeTo interface used by WAL Watcher.
func NewQueueManager ¶
func NewQueueManager( metrics *queueManagerMetrics, watcherMetrics *wlog.WatcherMetrics, readerMetrics *wlog.LiveReaderMetrics, logger *slog.Logger, dir string, samplesIn *ewmaRate, cfg config.QueueConfig, mCfg config.MetadataConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client WriteClient, flushDeadline time.Duration, interner *pool, highestRecvTimestamp *maxTimestamp, sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, protoMsg config.RemoteWriteProtoMsg, ) *QueueManager
NewQueueManager builds a new QueueManager and starts a new WAL watcher with queue manager as the WriteTo destination. The WAL watcher takes the dir parameter as the base directory for where the WAL shall be located. Note that the full path to the WAL directory will be constructed as <dir>/wal.
func (*QueueManager) Append ¶
func (t *QueueManager) Append(samples []record.RefSample) bool
Append queues a sample to be sent to the remote storage. Blocks until all samples are enqueued on their shards or a shutdown signal is received.
func (*QueueManager) AppendExemplars ¶
func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool
func (*QueueManager) AppendFloatHistograms ¶ added in v0.42.0
func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHistogramSample) bool
func (*QueueManager) AppendHistograms ¶ added in v0.40.0
func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) bool
func (*QueueManager) AppendWatcherMetadata ¶ added in v0.54.0
func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scrape.MetricMetadata)
AppendWatcherMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized. This is only used for the metadata_config.send setting and 1.x Remote Write.
func (*QueueManager) SeriesReset ¶
func (t *QueueManager) SeriesReset(index int)
SeriesReset is used when reading a checkpoint. WAL Watcher should have stored series records with the checkpoints index number, so we can now delete any ref ID's lower than that # from the two maps.
func (*QueueManager) SetClient ¶
func (t *QueueManager) SetClient(c WriteClient)
SetClient updates the client used by a queue. Used when only client specific fields are updated to avoid restarting the queue.
func (*QueueManager) Start ¶
func (t *QueueManager) Start()
Start the queue manager sending samples to the remote storage. Does not block.
func (*QueueManager) Stop ¶
func (t *QueueManager) Stop()
Stop stops sending samples to the remote storage and waits for pending sends to complete.
func (*QueueManager) StoreMetadata ¶ added in v0.54.0
func (t *QueueManager) StoreMetadata(meta []record.RefMetadata)
StoreMetadata keeps track of known series' metadata for lookups when sending samples to remote.
func (*QueueManager) StoreSeries ¶
func (t *QueueManager) StoreSeries(series []record.RefSeries, index int)
StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
func (*QueueManager) UpdateSeriesSegment ¶
func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int)
UpdateSeriesSegment updates the segment number held against the series, so we can trim older ones in SeriesReset.
type ReadClient ¶
type ReadClient interface {
Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error)
}
ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can also fall back to the SAMPLES method if necessary.
func NewReadClient ¶
func NewReadClient(name string, conf *ClientConfig) (ReadClient, error)
NewReadClient creates a new client for remote read.
type ReadyScrapeManager ¶
type RecoverableError ¶
type RecoverableError struct {
// contains filtered or unexported fields
}
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage represents all the remote read and write endpoints. It implements storage.Storage.
func NewStorage ¶
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL bool) *Storage
NewStorage returns a remote.Storage.
func (*Storage) ApplyConfig ¶
ApplyConfig updates the state as the new config requires.
func (*Storage) ChunkQuerier ¶
func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers of each configured remote read endpoint.
func (*Storage) LowestSentTimestamp ¶
LowestSentTimestamp returns the lowest sent timestamp across all queues.
func (*Storage) Querier ¶
Querier returns a storage.MergeQuerier combining the remote client queriers of each configured remote read endpoint. Returned querier will never return error as all queryables are assumed best effort. Additionally all returned queriers ensure that its Select's SeriesSets have ready data after first `Next` invoke. This is because Prometheus (fanout and secondary queries) can't handle the stream failing half way through by design.
type WriteClient ¶
type WriteClient interface { // Store stores the given samples in the remote storage. Store(ctx context.Context, req []byte, retryAttempt int) (WriteResponseStats, error) // Name uniquely identifies the remote storage. Name() string // Endpoint is the remote read or write endpoint for the storage client. Endpoint() string }
WriteClient defines an interface for sending a batch of samples to an external timeseries database.
func NewWriteClient ¶
func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error)
NewWriteClient creates a new client for remote write.
type WriteResponseStats ¶ added in v0.54.0
type WriteResponseStats struct { // Samples represents X-Prometheus-Remote-Write-Written-Samples Samples int // Histograms represents X-Prometheus-Remote-Write-Written-Histograms Histograms int // Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars Exemplars int // Confirmed means we can trust those statistics from the point of view // of the PRW 2.0 spec. When parsed from headers, it means we got at least one // response header from the Receiver to confirm those numbers, meaning it must // be a at least 2.0 Receiver. See ParseWriteResponseStats for details. Confirmed bool }
WriteResponseStats represents the response write statistics specified in https://github.com/prometheus/docs/pull/2486
func ParseWriteResponseStats ¶ added in v0.54.0
func ParseWriteResponseStats(r *http.Response) (s WriteResponseStats, err error)
ParseWriteResponseStats returns WriteResponseStats parsed from the response headers.
As per 2.0 spec, missing header means 0. However, abrupt HTTP errors, 1.0 Receivers or buggy 2.0 Receivers might result in no response headers specified and that might NOT necessarily mean nothing was written. To represent that we set s.Confirmed = true only when see at least on response header.
Error is returned when any of the header fails to parse as int64.
func (WriteResponseStats) Add ¶ added in v0.54.0
func (s WriteResponseStats) Add(rs WriteResponseStats) WriteResponseStats
Add returns the sum of this WriteResponseStats plus the given WriteResponseStats.
func (WriteResponseStats) AllSamples ¶ added in v0.54.0
func (s WriteResponseStats) AllSamples() int
AllSamples returns both float and histogram sample numbers.
func (WriteResponseStats) NoDataWritten ¶ added in v0.54.0
func (s WriteResponseStats) NoDataWritten() bool
NoDataWritten returns true if statistics indicate no data was written.
func (WriteResponseStats) SetHeaders ¶ added in v0.54.0
func (s WriteResponseStats) SetHeaders(w http.ResponseWriter)
SetHeaders sets response headers in a given response writer. Make sure to use it before http.ResponseWriter.WriteHeader and .Write.
type WriteStorage ¶
type WriteStorage struct {
// contains filtered or unexported fields
}
WriteStorage represents all the remote write storage.
func NewWriteStorage ¶
func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage
NewWriteStorage creates and runs a WriteStorage.
func (*WriteStorage) Appender ¶
func (rws *WriteStorage) Appender(_ context.Context) storage.Appender
Appender implements storage.Storage.
func (*WriteStorage) ApplyConfig ¶
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error
ApplyConfig updates the state as the new config requires. Only stop & create queues which have changes.
func (*WriteStorage) LowestSentTimestamp ¶
func (rws *WriteStorage) LowestSentTimestamp() int64
LowestSentTimestamp returns the lowest sent timestamp across all queues.
func (*WriteStorage) Notify ¶ added in v0.45.5
func (rws *WriteStorage) Notify()
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package googleiam provides an http.RoundTripper that attaches an Google Cloud accessToken to remote write requests.
|
Package googleiam provides an http.RoundTripper that attaches an Google Cloud accessToken to remote write requests. |
otlptranslator
|
|