receive

package
v0.34.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: Apache-2.0 Imports: 73 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// DefaultStatsLimit is the default value used for limiting tenant stats.
	DefaultStatsLimit = 10
	// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
	DefaultReplicaHeader = "THANOS-REPLICA"
	// AllTenantsQueryParam is the query parameter for getting TSDB stats for all tenants.
	AllTenantsQueryParam = "all_tenants"
	// LimitStatsQueryParam is the query parameter for limiting the amount of returned TSDB stats.
	LimitStatsQueryParam = "limit"
)

Variables

View Source
var ErrNotReady = errors.New("TSDB not ready")

ErrNotReady is returned if the underlying storage is not ready yet.

Functions

func ConfigFromWatcher added in v0.32.0

func ConfigFromWatcher(ctx context.Context, updates chan<- []HashringConfig, cw *ConfigWatcher) error

func NewEmptyRequestLimitsConfig added in v0.29.0

func NewEmptyRequestLimitsConfig() *requestLimitsConfig

func NewHeadSeriesLimit added in v0.29.0

func NewHeadSeriesLimit(w WriteLimitsConfig, registerer prometheus.Registerer, logger log.Logger) *headSeriesLimit

func NewNopConfig added in v0.29.0

func NewNopConfig() nopConfigContent

NewNopConfig creates a no-op config content (no configuration).

func NewNopSeriesLimit added in v0.28.0

func NewNopSeriesLimit() *nopSeriesLimit

Types

type Appendable

type Appendable interface {
	Appender(ctx context.Context) (storage.Appender, error)
}

Appendable returns an Appender.

type ConfigWatcher

type ConfigWatcher struct {
	// contains filtered or unexported fields
}

ConfigWatcher is able to watch a file containing a hashring configuration for updates.

func NewConfigWatcher

func NewConfigWatcher(logger log.Logger, reg prometheus.Registerer, path string, interval model.Duration) (*ConfigWatcher, error)

NewConfigWatcher creates a new ConfigWatcher.

func (*ConfigWatcher) C

func (cw *ConfigWatcher) C() <-chan []HashringConfig

C returns a chan that gets hashring configuration updates.

func (*ConfigWatcher) Run

func (cw *ConfigWatcher) Run(ctx context.Context)

Run starts the ConfigWatcher until the given context is canceled.

func (*ConfigWatcher) Stop added in v0.15.0

func (cw *ConfigWatcher) Stop()

Stop shuts down the config watcher.

func (*ConfigWatcher) ValidateConfig added in v0.12.0

func (cw *ConfigWatcher) ValidateConfig() error

ValidateConfig returns an error if the configuration that's being watched is not valid.

type DefaultLimitsConfig added in v0.29.0

type DefaultLimitsConfig struct {
	// RequestLimits holds the difficult per-request limits.
	RequestLimits requestLimitsConfig `yaml:"request"`
	// HeadSeriesLimit specifies the maximum number of head series allowed for any tenant.
	HeadSeriesLimit uint64 `yaml:"head_series_limit"`
}

type Endpoint added in v0.32.0

type Endpoint struct {
	Address string `json:"address"`
	AZ      string `json:"az"`
}

func (*Endpoint) UnmarshalJSON added in v0.32.0

func (e *Endpoint) UnmarshalJSON(data []byte) error

type GlobalLimitsConfig added in v0.29.0

type GlobalLimitsConfig struct {
	// MaxConcurrency represents the maximum concurrency during write operations.
	MaxConcurrency int64 `yaml:"max_concurrency"`
	// MetaMonitoring options specify the query, url and client for Query API address used in head series limiting.
	MetaMonitoringURL        string                         `yaml:"meta_monitoring_url"`
	MetaMonitoringHTTPClient *clientconfig.HTTPClientConfig `yaml:"meta_monitoring_http_client"`
	MetaMonitoringLimitQuery string                         `yaml:"meta_monitoring_limit_query"`
	// contains filtered or unexported fields
}

type Handler

type Handler struct {
	Limiter *Limiter
	// contains filtered or unexported fields
}

Handler serves a Prometheus remote write receiving HTTP endpoint.

func NewHandler

func NewHandler(logger log.Logger, o *Options) *Handler

func (*Handler) Close

func (h *Handler) Close()

Close stops the Handler.

func (*Handler) Hashring

func (h *Handler) Hashring(hashring Hashring)

Hashring sets the hashring for the handler and marks the hashring as ready. The hashring must be set to a non-nil value in order for the handler to be ready and usable. If the hashring is nil, then the handler is marked as not ready.

func (*Handler) RemoteWrite added in v0.11.0

func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error)

RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.

func (*Handler) Run

func (h *Handler) Run() error

Run serves the HTTP endpoints.

type Hashring

type Hashring interface {
	// Get returns the first node that should handle the given tenant and time series.
	Get(tenant string, timeSeries *prompb.TimeSeries) (string, error)
	// GetN returns the nth node that should handle the given tenant and time series.
	GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error)
	// Nodes returns a sorted slice of nodes that are in this hashring. Addresses could be duplicated
	// if, for example, the same address is used for multiple tenants in the multi-hashring.
	Nodes() []string
}

Hashring finds the correct node to handle a given time series for a specified tenant. It returns the node and any error encountered.

func NewMultiHashring added in v0.32.0

func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error)

newMultiHashring creates a multi-tenant hashring for a given slice of groups. Which hashring to use for a tenant is determined by the tenants field of the hashring configuration.

type HashringAlgorithm added in v0.27.0

type HashringAlgorithm string

HashringAlgorithm is the algorithm used to distribute series in the ring.

const (
	AlgorithmHashmod HashringAlgorithm = "hashmod"
	AlgorithmKetama  HashringAlgorithm = "ketama"

	// SectionsPerNode is the number of sections in the ring assigned to each node
	// in the ketama hashring. A higher number yields a better series distribution,
	// but also comes with a higher memory cost.
	SectionsPerNode = 1000
)

type HashringConfig

type HashringConfig struct {
	Hashring       string            `json:"hashring,omitempty"`
	Tenants        []string          `json:"tenants,omitempty"`
	Endpoints      []Endpoint        `json:"endpoints"`
	Algorithm      HashringAlgorithm `json:"algorithm,omitempty"`
	ExternalLabels map[string]string `json:"external_labels,omitempty"`
}

HashringConfig represents the configuration for a hashring a receive node knows about.

func ParseConfig added in v0.32.0

func ParseConfig(content []byte) ([]HashringConfig, error)

ParseConfig parses the raw configuration content and returns a HashringConfig.

type Limiter added in v0.29.0

type Limiter struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Limiter is responsible for managing the configuration and initialization of different types that apply limits to the Receive instance.

func NewLimiter added in v0.29.0

func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error)

NewLimiter creates a new *Limiter given a configuration and prometheus registerer.

func (*Limiter) CanReload added in v0.29.0

func (l *Limiter) CanReload() bool

func (*Limiter) HeadSeriesLimiter added in v0.29.0

func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter

func (*Limiter) RequestLimiter added in v0.29.0

func (l *Limiter) RequestLimiter() requestLimiter

RequestLimiter is a safe getter for the request limiter.

func (*Limiter) StartConfigReloader added in v0.29.0

func (l *Limiter) StartConfigReloader(ctx context.Context) error

StartConfigReloader starts the automatic configuration reloader based off of the file indicated by pathOrContent. It starts a Go routine in the given *run.Group.

func (*Limiter) WriteGate added in v0.29.0

func (l *Limiter) WriteGate() gate.Gate

WriteGate is a safe getter for the write gate.

type MultiTSDB added in v0.13.0

type MultiTSDB struct {
	// contains filtered or unexported fields
}

func NewMultiTSDB added in v0.13.0

func NewMultiTSDB(
	dataDir string,
	l log.Logger,
	reg prometheus.Registerer,
	tsdbOpts *tsdb.Options,
	labels labels.Labels,
	tenantLabelName string,
	bucket objstore.Bucket,
	allowOutOfOrderUpload bool,
	hashFunc metadata.HashFunc,
) *MultiTSDB

NewMultiTSDB creates new MultiTSDB. NOTE: Passed labels must be sorted lexicographically (alphabetically).

func (*MultiTSDB) Close added in v0.14.0

func (t *MultiTSDB) Close() error

func (*MultiTSDB) Flush added in v0.13.0

func (t *MultiTSDB) Flush() error

func (*MultiTSDB) Open added in v0.13.0

func (t *MultiTSDB) Open() error

func (*MultiTSDB) Prune added in v0.28.0

func (t *MultiTSDB) Prune(ctx context.Context) error

Prune flushes and closes the TSDB for tenants that haven't received any new samples for longer than the TSDB retention period.

func (*MultiTSDB) RemoveLockFilesIfAny added in v0.15.0

func (t *MultiTSDB) RemoveLockFilesIfAny() error

func (*MultiTSDB) SetHashringConfig added in v0.32.0

func (t *MultiTSDB) SetHashringConfig(cfg []HashringConfig) error

func (*MultiTSDB) Sync added in v0.13.0

func (t *MultiTSDB) Sync(ctx context.Context) (int, error)

func (*MultiTSDB) TSDBExemplars added in v0.22.0

func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB

func (*MultiTSDB) TSDBLocalClients added in v0.29.0

func (t *MultiTSDB) TSDBLocalClients() []store.Client

func (*MultiTSDB) TenantAppendable added in v0.13.0

func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error)

func (*MultiTSDB) TenantStats added in v0.28.0

func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats

type Options

type Options struct {
	Writer            *Writer
	ListenAddress     string
	Registry          *prometheus.Registry
	TenantHeader      string
	TenantField       string
	DefaultTenantID   string
	ReplicaHeader     string
	Endpoint          string
	ReplicationFactor uint64
	ReceiverMode      ReceiverMode
	Tracer            opentracing.Tracer
	TLSConfig         *tls.Config
	DialOpts          []grpc.DialOption
	ForwardTimeout    time.Duration
	MaxBackoff        time.Duration
	RelabelConfigs    []*relabel.Config
	TSDBStats         TSDBStats
	Limiter           *Limiter
}

Options for the web Handler.

type ReadyStorage added in v0.13.0

type ReadyStorage struct {
	// contains filtered or unexported fields
}

ReadyStorage implements the Storage interface while allowing to set the actual storage at a later point in time. TODO: Replace this with upstream Prometheus implementation when it is exposed.

func (*ReadyStorage) Appender added in v0.13.0

func (s *ReadyStorage) Appender(ctx context.Context) (storage.Appender, error)

Appender implements the Storage interface.

func (*ReadyStorage) Close added in v0.13.0

func (s *ReadyStorage) Close() error

Close implements the Storage interface.

func (*ReadyStorage) ExemplarQuerier added in v0.22.0

func (s *ReadyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)

ExemplarQuerier implements the Storage interface.

func (*ReadyStorage) Get added in v0.13.0

func (s *ReadyStorage) Get() *tsdb.DB

Get the storage.

func (*ReadyStorage) Querier added in v0.13.0

func (s *ReadyStorage) Querier(mint, maxt int64) (storage.Querier, error)

Querier implements the Storage interface.

func (*ReadyStorage) Set added in v0.13.0

func (s *ReadyStorage) Set(db *tsdb.DB)

Set the storage.

func (*ReadyStorage) StartTime added in v0.13.0

func (s *ReadyStorage) StartTime() (int64, error)

StartTime implements the Storage interface.

type ReceiveAppender added in v0.32.0

type ReceiveAppender struct {
	storage.Appender
	// contains filtered or unexported fields
}

Wraps storage.Appender to add validation and logging.

func (*ReceiveAppender) Append added in v0.32.0

type ReceiverMode added in v0.22.0

type ReceiverMode string
const (
	RouterOnly     ReceiverMode = "RouterOnly"
	IngestorOnly   ReceiverMode = "IngestorOnly"
	RouterIngestor ReceiverMode = "RouterIngestor"
)

type RootLimitsConfig added in v0.29.0

type RootLimitsConfig struct {
	// WriteLimits hold the limits for writing data.
	WriteLimits WriteLimitsConfig `yaml:"write"`
}

RootLimitsConfig is the root configuration for limits.

func ParseLimitConfigContent added in v0.29.0

func ParseLimitConfigContent(limitsConfig fileContent) (*RootLimitsConfig, error)

ParseLimitConfigContent parses the limit configuration from the path or content.

func ParseRootLimitConfig added in v0.29.0

func ParseRootLimitConfig(content []byte) (*RootLimitsConfig, error)

ParseRootLimitConfig parses the root limit configuration. Even though the result is a pointer, it will only be nil if an error is returned.

func (RootLimitsConfig) AreHeadSeriesLimitsConfigured added in v0.29.0

func (r RootLimitsConfig) AreHeadSeriesLimitsConfigured() bool

type SingleNodeHashring

type SingleNodeHashring string

SingleNodeHashring always returns the same node.

func (SingleNodeHashring) Get

func (s SingleNodeHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error)

Get implements the Hashring interface.

func (SingleNodeHashring) GetN

GetN implements the Hashring interface.

func (SingleNodeHashring) Nodes added in v0.34.0

func (s SingleNodeHashring) Nodes() []string

type TSDBStats added in v0.28.0

type TSDBStats interface {
	// TenantStats returns TSDB head stats for the given tenants.
	// If no tenantIDs are provided, stats for all tenants are returned.
	TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats
}

type TenantStorage added in v0.13.0

type TenantStorage interface {
	TenantAppendable(string) (Appendable, error)
}

type TenantsWriteLimitsConfig added in v0.29.0

type TenantsWriteLimitsConfig map[string]*WriteLimitConfig

TenantsWriteLimitsConfig is a map of tenant IDs to their *WriteLimitConfig.

type UnRegisterer added in v0.8.0

type UnRegisterer struct {
	// contains filtered or unexported fields
}

UnRegisterer is a Prometheus registerer that ensures that collectors can be registered by unregistering already-registered collectors. FlushableStorage uses this registerer in order to not lose metric values between DB flushes.

This type cannot embed the inner registerer, because Prometheus since v2.39.0 is wrapping the Registry with prometheus.WrapRegistererWithPrefix. This wrapper will call the Register function of the wrapped registerer. If UnRegisterer is the wrapped registerer, this would end up calling the inner registerer's Register, which doesn't implement the "unregister" logic that this type intends to use.

func NewUnRegisterer added in v0.29.0

func NewUnRegisterer(inner prometheus.Registerer) *UnRegisterer

func (*UnRegisterer) MustRegister added in v0.8.0

func (u *UnRegisterer) MustRegister(cs ...prometheus.Collector)

MustRegister registers the given collectors. It panics if an error happens. Note that if a collector is already registered it will be re-registered without panicking.

func (*UnRegisterer) Register added in v0.29.0

func (u *UnRegisterer) Register(c prometheus.Collector) error

Register registers the given collector. If it's already registered, it will be unregistered and registered.

func (*UnRegisterer) Unregister added in v0.29.0

func (u *UnRegisterer) Unregister(c prometheus.Collector) bool

Unregister unregisters the given collector.

type WriteLimitConfig added in v0.29.0

type WriteLimitConfig struct {
	// RequestLimits holds the difficult per-request limits.
	RequestLimits *requestLimitsConfig `yaml:"request"`
	// HeadSeriesLimit specifies the maximum number of head series allowed for a tenant.
	HeadSeriesLimit *uint64 `yaml:"head_series_limit"`
}

A tenant might not always have limits configured, so things here must use pointers.

func NewEmptyWriteLimitConfig added in v0.29.0

func NewEmptyWriteLimitConfig() *WriteLimitConfig

Utils for initializing.

func (*WriteLimitConfig) SetHeadSeriesLimit added in v0.29.0

func (w *WriteLimitConfig) SetHeadSeriesLimit(val uint64) *WriteLimitConfig

func (*WriteLimitConfig) SetRequestLimits added in v0.29.0

func (w *WriteLimitConfig) SetRequestLimits(rl *requestLimitsConfig) *WriteLimitConfig

type WriteLimitsConfig added in v0.29.0

type WriteLimitsConfig struct {
	// GlobalLimits are limits that are shared across all tenants.
	GlobalLimits GlobalLimitsConfig `yaml:"global"`
	// DefaultLimits are the default limits for tenants without specified limits.
	DefaultLimits DefaultLimitsConfig `yaml:"default"`
	// TenantsLimits are the limits per tenant.
	TenantsLimits TenantsWriteLimitsConfig `yaml:"tenants"`
}

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(logger log.Logger, multiTSDB TenantStorage, opts *WriterOptions) *Writer

func (*Writer) Write added in v0.8.0

func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteRequest) error

type WriterOptions added in v0.32.0

type WriterOptions struct {
	Intern                   bool
	TooFarInFutureTimeWindow int64 // Unit: nanoseconds
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL