remote

package
v0.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2022 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultChunkedReadLimit = 5e+7

DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows. 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.

Variables

View Source
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)

Functions

func DecodeReadRequest

func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error)

DecodeReadRequest reads a remote.Request from a http.Request.

func DecodeWriteRequest

func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error)

DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling snappy decompression.

func EncodeReadResponse

func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error

EncodeReadResponse writes a remote.Response to a http.ResponseWriter.

func FromLabelMatchers

func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error)

FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.

func FromQueryResult

func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet

FromQueryResult unpacks and sorts a QueryResult proto.

func LabelProtosToMetric

func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric

LabelProtosToMetric unpack a []*prompb.Label to a model.Metric

func MergeLabels

func MergeLabels(primary, secondary []prompb.Label) []prompb.Label

MergeLabels merges two sets of sorted proto labels, preferring those in primary to those in secondary when there is an overlap.

func NegotiateResponseType

func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.ReadRequest_ResponseType, error)

NegotiateResponseType returns first accepted response type that this server supports. On the empty accepted list we assume that the SAMPLES response type was requested. This is to maintain backward compatibility.

func NewReadHandler

func NewReadHandler(logger log.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame int) http.Handler

NewReadHandler creates a http.Handler that accepts remote read requests and writes them to the provided queryable.

func NewSampleAndChunkQueryableClient

func NewSampleAndChunkQueryableClient(
	c ReadClient,
	externalLabels labels.Labels,
	requiredMatchers []*labels.Matcher,
	readRecent bool,
	callback startTimeCallback,
) storage.SampleAndChunkQueryable

NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.

func NewWriteHandler

func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler

NewWriteHandler creates a http.Handler that accepts remote write requests and writes them to the provided appendable.

func StreamChunkedReadResponses

func StreamChunkedReadResponses(
	stream io.Writer,
	queryIndex int64,
	ss storage.ChunkSeriesSet,
	sortedExternalLabels []prompb.Label,
	maxBytesInFrame int,
) (storage.Warnings, error)

StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller. It expects Series set with populated chunks.

func ToQuery

func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error)

ToQuery builds a Query proto.

func ToQueryResult

func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, storage.Warnings, error)

ToQueryResult builds a QueryResult proto.

Types

type ChunkedReader

type ChunkedReader struct {
	// contains filtered or unexported fields
}

ChunkedReader is a buffered reader that expects uvarint delimiter and checksum before each message. It will allocate as much as the biggest frame defined by delimiter (on top of bufio.Reader allocations).

func NewChunkedReader

func NewChunkedReader(r io.Reader, sizeLimit uint64, data []byte) *ChunkedReader

NewChunkedReader constructs a ChunkedReader. It allows passing data slice for byte slice reuse, which will be increased to needed size if smaller.

func (*ChunkedReader) Next

func (r *ChunkedReader) Next() ([]byte, error)

Next returns the next length-delimited record from the input, or io.EOF if there are no more records available. Returns io.ErrUnexpectedEOF if a short record is found, with a length of n but fewer than n bytes of data. Next also verifies the given checksum with Castagnoli polynomial CRC-32 checksum.

NOTE: The slice returned is valid only until a subsequent call to Next. It's a caller's responsibility to copy the returned slice if needed.

func (*ChunkedReader) NextProto

func (r *ChunkedReader) NextProto(pb proto.Message) error

NextProto consumes the next available record by calling r.Next, and decodes it into the protobuf with proto.Unmarshal.

type ChunkedWriter

type ChunkedWriter struct {
	// contains filtered or unexported fields
}

ChunkedWriter is an io.Writer wrapper that allows streaming by adding uvarint delimiter before each write in a form of length of the corresponded byte array.

func NewChunkedWriter

func NewChunkedWriter(w io.Writer, f http.Flusher) *ChunkedWriter

NewChunkedWriter constructs a ChunkedWriter.

func (*ChunkedWriter) Write

func (w *ChunkedWriter) Write(b []byte) (int, error)

Write writes given bytes to the stream and flushes it. Each frame includes:

1. uvarint for the size of the data frame. 2. big-endian uint32 for the Castagnoli polynomial CRC-32 checksum of the data frame. 3. the bytes of the given data.

Write returns number of sent bytes for a given buffer. The number does not include delimiter and checksum bytes.

type Client

type Client struct {
	Client *http.Client
	// contains filtered or unexported fields
}

Client allows reading and writing from/to a remote HTTP endpoint.

func (Client) Endpoint

func (c Client) Endpoint() string

Endpoint is the remote read or write endpoint.

func (Client) Name

func (c Client) Name() string

Name uniquely identifies the client.

func (*Client) Read

func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error)

Read reads from a remote endpoint.

func (*Client) Store

func (c *Client) Store(ctx context.Context, req []byte) error

Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled and encoded bytes from codec.go.

type ClientConfig

type ClientConfig struct {
	URL              *config_util.URL
	Timeout          model.Duration
	HTTPClientConfig config_util.HTTPClientConfig
	SigV4Config      *sigv4.SigV4Config
	Headers          map[string]string
	RetryOnRateLimit bool
}

ClientConfig configures a client.

type HTTPError

type HTTPError struct {
	// contains filtered or unexported fields
}

func (HTTPError) Error

func (e HTTPError) Error() string

func (HTTPError) Status

func (e HTTPError) Status() int

type MetadataAppender

type MetadataAppender interface {
	AppendMetadata(context.Context, []scrape.MetricMetadata)
}

MetadataAppender is an interface used by the Metadata Watcher to send metadata, It is read from the scrape manager, on to somewhere else.

type MetadataWatcher

type MetadataWatcher struct {
	// contains filtered or unexported fields
}

MetadataWatcher watches the Scrape Manager for a given WriteMetadataTo.

func NewMetadataWatcher

func NewMetadataWatcher(l log.Logger, mg ReadyScrapeManager, name string, w MetadataAppender, interval model.Duration, deadline time.Duration) *MetadataWatcher

NewMetadataWatcher builds a new MetadataWatcher.

func (*MetadataWatcher) Start

func (mw *MetadataWatcher) Start()

Start the MetadataWatcher.

func (*MetadataWatcher) Stop

func (mw *MetadataWatcher) Stop()

Stop the MetadataWatcher.

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager manages a queue of samples to be sent to the Storage indicated by the provided WriteClient. Implements writeTo interface used by WAL Watcher.

func NewQueueManager

func NewQueueManager(
	metrics *queueManagerMetrics,
	watcherMetrics *wal.WatcherMetrics,
	readerMetrics *wal.LiveReaderMetrics,
	logger log.Logger,
	dir string,
	samplesIn *ewmaRate,
	cfg config.QueueConfig,
	mCfg config.MetadataConfig,
	externalLabels labels.Labels,
	relabelConfigs []*relabel.Config,
	client WriteClient,
	flushDeadline time.Duration,
	interner *pool,
	highestRecvTimestamp *maxTimestamp,
	sm ReadyScrapeManager,
	enableExemplarRemoteWrite bool,
) *QueueManager

NewQueueManager builds a new QueueManager and starts a new WAL watcher with queue manager as the WriteTo destination. The WAL watcher takes the dir parameter as the base directory for where the WAL shall be located. Note that the full path to the WAL directory will be constructed as <dir>/wal.

func (*QueueManager) Append

func (t *QueueManager) Append(samples []record.RefSample) bool

Append queues a sample to be sent to the remote storage. Blocks until all samples are enqueued on their shards or a shutdown signal is received.

func (*QueueManager) AppendExemplars

func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool

func (*QueueManager) AppendMetadata

func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata)

AppendMetadata sends metadata the remote storage. Metadata is sent in batches, but is not parallelized.

func (*QueueManager) SeriesReset

func (t *QueueManager) SeriesReset(index int)

SeriesReset is used when reading a checkpoint. WAL Watcher should have stored series records with the checkpoints index number, so we can now delete any ref ID's lower than that # from the two maps.

func (*QueueManager) SetClient

func (t *QueueManager) SetClient(c WriteClient)

SetClient updates the client used by a queue. Used when only client specific fields are updated to avoid restarting the queue.

func (*QueueManager) Start

func (t *QueueManager) Start()

Start the queue manager sending samples to the remote storage. Does not block.

func (*QueueManager) Stop

func (t *QueueManager) Stop()

Stop stops sending samples to the remote storage and waits for pending sends to complete.

func (*QueueManager) StoreSeries

func (t *QueueManager) StoreSeries(series []record.RefSeries, index int)

StoreSeries keeps track of which series we know about for lookups when sending samples to remote.

func (*QueueManager) UpdateSeriesSegment

func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int)

UpdateSeriesSegment updates the segment number held against the series, so we can trim older ones in SeriesReset.

type ReadClient

type ReadClient interface {
	Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error)
}

ReadClient uses the SAMPLES method of remote read to read series samples from remote server. TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926).

func NewReadClient

func NewReadClient(name string, conf *ClientConfig) (ReadClient, error)

NewReadClient creates a new client for remote read.

type ReadyScrapeManager

type ReadyScrapeManager interface {
	Get() (*scrape.Manager, error)
}

type RecoverableError

type RecoverableError struct {
	// contains filtered or unexported fields
}

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage represents all the remote read and write endpoints. It implements storage.Storage.

func NewStorage

func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage

NewStorage returns a remote.Storage.

func (*Storage) Appender

func (s *Storage) Appender(ctx context.Context) storage.Appender

Appender implements storage.Storage.

func (*Storage) ApplyConfig

func (s *Storage) ApplyConfig(conf *config.Config) error

ApplyConfig updates the state as the new config requires.

func (*Storage) ChunkQuerier

func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error)

ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers of each configured remote read endpoint.

func (*Storage) Close

func (s *Storage) Close() error

Close the background processing of the storage queues.

func (*Storage) LowestSentTimestamp

func (s *Storage) LowestSentTimestamp() int64

LowestSentTimestamp returns the lowest sent timestamp across all queues.

func (*Storage) Querier

func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error)

Querier returns a storage.MergeQuerier combining the remote client queriers of each configured remote read endpoint. Returned querier will never return error as all queryables are assumed best effort. Additionally all returned queriers ensure that its Select's SeriesSets have ready data after first `Next` invoke. This is because Prometheus (fanout and secondary queries) can't handle the stream failing half way through by design.

func (*Storage) StartTime

func (s *Storage) StartTime() (int64, error)

StartTime implements the Storage interface.

type Watchable

type Watchable interface {
	TargetsActive() map[string][]*scrape.Target
}

Watchable represents from where we fetch active targets for metadata.

type WriteClient

type WriteClient interface {
	// Store stores the given samples in the remote storage.
	Store(context.Context, []byte) error
	// Name uniquely identifies the remote storage.
	Name() string
	// Endpoint is the remote read or write endpoint for the storage client.
	Endpoint() string
}

WriteClient defines an interface for sending a batch of samples to an external timeseries database.

func NewWriteClient

func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error)

NewWriteClient creates a new client for remote write.

type WriteStorage

type WriteStorage struct {
	// contains filtered or unexported fields
}

WriteStorage represents all the remote write storage.

func NewWriteStorage

func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage

NewWriteStorage creates and runs a WriteStorage.

func (*WriteStorage) Appender

func (rws *WriteStorage) Appender(_ context.Context) storage.Appender

Appender implements storage.Storage.

func (*WriteStorage) ApplyConfig

func (rws *WriteStorage) ApplyConfig(conf *config.Config) error

ApplyConfig updates the state as the new config requires. Only stop & create queues which have changes.

func (*WriteStorage) Close

func (rws *WriteStorage) Close() error

Close closes the WriteStorage.

func (*WriteStorage) LowestSentTimestamp

func (rws *WriteStorage) LowestSentTimestamp() int64

LowestSentTimestamp returns the lowest sent timestamp across all queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL