bloomgateway

package
v3.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2025 License: AGPL-3.0 Imports: 43 Imported by: 0

Documentation

Overview

The bloom gateway is a component that can be run as a standalone microserivce target and provides capabilities for filtering ChunkRefs based on a given list of line filter expressions.

Index

Constants

View Source
const (
	Day = 24 * time.Hour
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AddressProvider

type AddressProvider interface {
	Addresses() []string
}

type BlockResolver added in v3.1.0

type BlockResolver interface {
	Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) (blocks []blockWithSeries, skipped []*logproto.GroupedChunkRefs, err error)
}

func NewBlockResolver added in v3.1.0

func NewBlockResolver(store bloomshipper.StoreBase, logger log.Logger) BlockResolver

type BloomQuerier

type BloomQuerier struct {
	// contains filtered or unexported fields
}

BloomQuerier is a store-level abstraction on top of Client It is used by the index gateway to filter ChunkRefs based on given line fiter expression.

func NewQuerier

func NewQuerier(c Client, cfg QuerierConfig, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier

func (*BloomQuerier) FilterChunkRefs

func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, bool, error)

type CacheConfig

type CacheConfig struct {
	resultscache.Config `yaml:",inline"`
}

func (*CacheConfig) RegisterFlags

func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags.

func (*CacheConfig) RegisterFlagsWithPrefix

func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type CacheLimits

type CacheLimits interface {
	resultscache.Limits
	BloomGatewayCacheKeyInterval(tenantID string) time.Duration
}

type Client

type Client interface {
	FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
}

type ClientCache

type ClientCache struct {
	// contains filtered or unexported fields
}

func NewBloomGatewayClientCacheMiddleware

func NewBloomGatewayClientCacheMiddleware(
	logger log.Logger,
	next logproto.BloomGatewayClient,
	c cache.Cache,
	limits CacheLimits,
	cacheGen resultscache.CacheGenNumberLoader,
	retentionEnabled bool,
) *ClientCache

func (*ClientCache) FilterChunkRefs

type ClientConfig

type ClientConfig struct {
	// PoolConfig defines the behavior of the gRPC connection pool used to communicate
	// with the Bloom Gateway.
	PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."`

	// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
	GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`

	// Cache configures the cache used to store the results of the Bloom Gateway server.
	Cache        CacheConfig `yaml:"results_cache,omitempty"`
	CacheResults bool        `yaml:"cache_results"`

	// Client sharding using DNS disvovery and jumphash
	Addresses string `yaml:"addresses,omitempty"`
}

IndexGatewayClientConfig configures the Index Gateway client used to communicate with the Index Gateway server.

func (*ClientConfig) RegisterFlags

func (i *ClientConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags for the Bloom Gateway client configuration.

func (*ClientConfig) RegisterFlagsWithPrefix

func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix registers flags for the Bloom Gateway client configuration with a common prefix.

func (*ClientConfig) Validate

func (i *ClientConfig) Validate() error

type ClientFactory added in v3.3.0

type ClientFactory func(addr string) (client.PoolClient, error)

func (ClientFactory) New added in v3.3.0

func (f ClientFactory) New(addr string) (client.PoolClient, error)

type Config

type Config struct {
	// Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks.
	Enabled bool `yaml:"enabled"`
	// Client configures the Bloom Gateway client
	Client ClientConfig `yaml:"client,omitempty" doc:""`

	WorkerConcurrency       int `yaml:"worker_concurrency"`
	BlockQueryConcurrency   int `yaml:"block_query_concurrency"`
	MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
	NumMultiplexItems       int `yaml:"num_multiplex_tasks"`
}

Config configures the Bloom Gateway component.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags for the Bloom Gateway configuration.

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.

func (*Config) Validate

func (cfg *Config) Validate() error

type GRPCPool

GRPCPool represents a pool of gRPC connections to different bloom gateway instances. Interfaces are inlined for simplicity to automatically satisfy interface functions.

func NewBloomGatewayGRPCPool

func NewBloomGatewayGRPCPool(address string, opts []grpc.DialOption) (*GRPCPool, error)

NewBloomGatewayGRPCPool instantiates a new pool of GRPC connections for the Bloom Gateway Internally, it also instantiates a protobuf bloom gateway client and a health client.

type Gateway

type Gateway struct {
	services.Service
	// contains filtered or unexported fields
}

func New

func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error)

New returns a new instance of the Bloom Gateway.

func (*Gateway) FilterChunkRefs

FilterChunkRefs implements BloomGatewayServer

type GatewayClient

type GatewayClient struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(
	cfg ClientConfig,
	limits Limits,
	registerer prometheus.Registerer,
	logger log.Logger,
	cacheGen resultscache.CacheGenNumberLoader,
	retentionEnabled bool,
) (*GatewayClient, error)

func (*GatewayClient) Close

func (c *GatewayClient) Close()

func (*GatewayClient) FilterChunks

func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)

FilterChunks implements Client

type JumpHashClientPool

type JumpHashClientPool struct {
	services.Service
	*jumphash.Selector
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewJumpHashClientPool

func NewJumpHashClientPool(clientFactory ClientFactory, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) (*JumpHashClientPool, error)

func (*JumpHashClientPool) Addr added in v3.1.0

func (p *JumpHashClientPool) Addr(key string) (string, error)

func (*JumpHashClientPool) GetClientFor added in v3.3.0

func (p *JumpHashClientPool) GetClientFor(addr string) (client.PoolClient, error)

GetClientFor implements clientPool.

func (*JumpHashClientPool) Stop

func (p *JumpHashClientPool) Stop()

type Limits

type Limits interface {
	CacheLimits
	BloomGatewayShardSize(tenantID string) int
	BloomGatewayEnabled(tenantID string) bool
}

type PoolConfig

type PoolConfig struct {
	CheckInterval time.Duration `yaml:"check_interval"`
}

PoolConfig is config for creating a Pool.

func (*PoolConfig) RegisterFlagsWithPrefix

func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*PoolConfig) Validate

func (cfg *PoolConfig) Validate() error

type QuerierConfig added in v3.1.0

type QuerierConfig struct {
	// MinTableOffset is derived from the compactor's MinTableOffset
	MinTableOffset int
}

type Stats

type Stats struct {
	Status                              string
	NumTasks, NumMatchers               int
	ChunksRequested, ChunksFiltered     int
	SeriesRequested, SeriesFiltered     int
	QueueTime                           *atomic.Duration
	BlocksFetchTime                     *atomic.Duration
	ProcessingTime, TotalProcessingTime *atomic.Duration
	PostProcessingTime                  *atomic.Duration
	ProcessedBlocks                     *atomic.Int32 // blocks processed for this specific request
	ProcessedBlocksTotal                *atomic.Int32 // blocks processed for multiplexed request
}

func ContextWithEmptyStats

func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context)

ContextWithEmptyStats returns a context with empty stats.

func FromContext

func FromContext(ctx context.Context) *Stats

FromContext gets the Stats out of the Context. Returns nil if stats have not been initialised in the context.

func (*Stats) AddBlocksFetchTime

func (s *Stats) AddBlocksFetchTime(t time.Duration)

func (*Stats) AddPostProcessingTime

func (s *Stats) AddPostProcessingTime(t time.Duration)

func (*Stats) AddProcessedBlocksTotal added in v3.3.0

func (s *Stats) AddProcessedBlocksTotal(delta int)

func (*Stats) AddProcessingTime

func (s *Stats) AddProcessingTime(t time.Duration)

func (*Stats) AddQueueTime

func (s *Stats) AddQueueTime(t time.Duration)

func (*Stats) AddTotalProcessingTime

func (s *Stats) AddTotalProcessingTime(t time.Duration)

func (*Stats) Duration

func (s *Stats) Duration() (dur time.Duration)

aggregates the total duration

func (*Stats) IncProcessedBlocks

func (s *Stats) IncProcessedBlocks()

func (*Stats) KVArgs

func (s *Stats) KVArgs() []any

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is the data structure that is enqueued to the internal queue and dequeued by query workers

func (Task) Bounds

func (t Task) Bounds() (model.Time, model.Time)

Bounds implements Bounded see pkg/storage/stores/shipper/indexshipper/tsdb.Bounded

func (Task) Close

func (t Task) Close()

func (Task) CloseWithError

func (t Task) CloseWithError(err error)

func (Task) Copy

func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task

Copy returns a copy of the existing task but with a new slice of grouped chunk refs

func (Task) Done

func (t Task) Done() <-chan struct{}

func (Task) Err

func (t Task) Err() error

func (Task) RequestIter

func (t Task) RequestIter() iter.Iterator[v1.Request]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL