Documentation
¶
Overview ¶
The bloom gateway is a component that can be run as a standalone microserivce target and provides capabilities for filtering ChunkRefs based on a given list of line filter expressions.
Index ¶
- Constants
- type AddressProvider
- type BlockResolver
- type BloomQuerier
- type CacheConfig
- type CacheLimits
- type Client
- type ClientCache
- type ClientConfig
- type ClientFactory
- type Config
- type GRPCPool
- type Gateway
- type GatewayClient
- type JumpHashClientPool
- type Limits
- type PoolConfig
- type QuerierConfig
- type Stats
- func (s *Stats) AddBlocksFetchTime(t time.Duration)
- func (s *Stats) AddPostProcessingTime(t time.Duration)
- func (s *Stats) AddProcessedBlocksTotal(delta int)
- func (s *Stats) AddProcessingTime(t time.Duration)
- func (s *Stats) AddQueueTime(t time.Duration)
- func (s *Stats) AddTotalProcessingTime(t time.Duration)
- func (s *Stats) Duration() (dur time.Duration)
- func (s *Stats) IncProcessedBlocks()
- func (s *Stats) KVArgs() []any
- type Task
Constants ¶
const (
Day = 24 * time.Hour
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddressProvider ¶
type AddressProvider interface {
Addresses() []string
}
type BlockResolver ¶ added in v3.1.0
type BlockResolver interface {
Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) (blocks []blockWithSeries, skipped []*logproto.GroupedChunkRefs, err error)
}
func NewBlockResolver ¶ added in v3.1.0
func NewBlockResolver(store bloomshipper.StoreBase, logger log.Logger) BlockResolver
type BloomQuerier ¶
type BloomQuerier struct {
// contains filtered or unexported fields
}
BloomQuerier is a store-level abstraction on top of Client It is used by the index gateway to filter ChunkRefs based on given line fiter expression.
func NewQuerier ¶
func NewQuerier(c Client, cfg QuerierConfig, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier
type CacheConfig ¶
type CacheConfig struct {
resultscache.Config `yaml:",inline"`
}
func (*CacheConfig) RegisterFlags ¶
func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags registers flags.
func (*CacheConfig) RegisterFlagsWithPrefix ¶
func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type CacheLimits ¶
type CacheLimits interface { resultscache.Limits BloomGatewayCacheKeyInterval(tenantID string) time.Duration }
type Client ¶
type Client interface {
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
}
type ClientCache ¶
type ClientCache struct {
// contains filtered or unexported fields
}
func NewBloomGatewayClientCacheMiddleware ¶
func NewBloomGatewayClientCacheMiddleware( logger log.Logger, next logproto.BloomGatewayClient, c cache.Cache, limits CacheLimits, cacheGen resultscache.CacheGenNumberLoader, retentionEnabled bool, ) *ClientCache
func (*ClientCache) FilterChunkRefs ¶
func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error)
type ClientConfig ¶
type ClientConfig struct { // PoolConfig defines the behavior of the gRPC connection pool used to communicate // with the Bloom Gateway. PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."` // GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server. GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` // Cache configures the cache used to store the results of the Bloom Gateway server. Cache CacheConfig `yaml:"results_cache,omitempty"` CacheResults bool `yaml:"cache_results"` // Client sharding using DNS disvovery and jumphash Addresses string `yaml:"addresses,omitempty"` }
IndexGatewayClientConfig configures the Index Gateway client used to communicate with the Index Gateway server.
func (*ClientConfig) RegisterFlags ¶
func (i *ClientConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags registers flags for the Bloom Gateway client configuration.
func (*ClientConfig) RegisterFlagsWithPrefix ¶
func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix registers flags for the Bloom Gateway client configuration with a common prefix.
func (*ClientConfig) Validate ¶
func (i *ClientConfig) Validate() error
type ClientFactory ¶ added in v3.3.0
type ClientFactory func(addr string) (client.PoolClient, error)
func (ClientFactory) New ¶ added in v3.3.0
func (f ClientFactory) New(addr string) (client.PoolClient, error)
type Config ¶
type Config struct { // Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks. Enabled bool `yaml:"enabled"` // Client configures the Bloom Gateway client Client ClientConfig `yaml:"client,omitempty" doc:""` WorkerConcurrency int `yaml:"worker_concurrency"` BlockQueryConcurrency int `yaml:"block_query_concurrency"` MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"` NumMultiplexItems int `yaml:"num_multiplex_tasks"` }
Config configures the Bloom Gateway component.
func (*Config) RegisterFlags ¶
RegisterFlags registers flags for the Bloom Gateway configuration.
func (*Config) RegisterFlagsWithPrefix ¶
RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.
type GRPCPool ¶
type GRPCPool struct { grpc_health_v1.HealthClient logproto.BloomGatewayClient io.Closer }
GRPCPool represents a pool of gRPC connections to different bloom gateway instances. Interfaces are inlined for simplicity to automatically satisfy interface functions.
func NewBloomGatewayGRPCPool ¶
func NewBloomGatewayGRPCPool(address string, opts []grpc.DialOption) (*GRPCPool, error)
NewBloomGatewayGRPCPool instantiates a new pool of GRPC connections for the Bloom Gateway Internally, it also instantiates a protobuf bloom gateway client and a health client.
type Gateway ¶
func New ¶
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error)
New returns a new instance of the Bloom Gateway.
func (*Gateway) FilterChunkRefs ¶
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error)
FilterChunkRefs implements BloomGatewayServer
type GatewayClient ¶
type GatewayClient struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient( cfg ClientConfig, limits Limits, registerer prometheus.Registerer, logger log.Logger, cacheGen resultscache.CacheGenNumberLoader, retentionEnabled bool, ) (*GatewayClient, error)
func (*GatewayClient) Close ¶
func (c *GatewayClient) Close()
func (*GatewayClient) FilterChunks ¶
func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
FilterChunks implements Client
type JumpHashClientPool ¶
type JumpHashClientPool struct { services.Service *jumphash.Selector sync.RWMutex // contains filtered or unexported fields }
func NewJumpHashClientPool ¶
func NewJumpHashClientPool(clientFactory ClientFactory, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) (*JumpHashClientPool, error)
func (*JumpHashClientPool) Addr ¶ added in v3.1.0
func (p *JumpHashClientPool) Addr(key string) (string, error)
func (*JumpHashClientPool) GetClientFor ¶ added in v3.3.0
func (p *JumpHashClientPool) GetClientFor(addr string) (client.PoolClient, error)
GetClientFor implements clientPool.
func (*JumpHashClientPool) Stop ¶
func (p *JumpHashClientPool) Stop()
type Limits ¶
type Limits interface { CacheLimits BloomGatewayShardSize(tenantID string) int BloomGatewayEnabled(tenantID string) bool }
type PoolConfig ¶
PoolConfig is config for creating a Pool.
func (*PoolConfig) RegisterFlagsWithPrefix ¶
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (*PoolConfig) Validate ¶
func (cfg *PoolConfig) Validate() error
type QuerierConfig ¶ added in v3.1.0
type QuerierConfig struct { // MinTableOffset is derived from the compactor's MinTableOffset MinTableOffset int }
type Stats ¶
type Stats struct { Status string NumTasks, NumMatchers int ChunksRequested, ChunksFiltered int SeriesRequested, SeriesFiltered int QueueTime *atomic.Duration BlocksFetchTime *atomic.Duration ProcessingTime, TotalProcessingTime *atomic.Duration PostProcessingTime *atomic.Duration ProcessedBlocks *atomic.Int32 // blocks processed for this specific request ProcessedBlocksTotal *atomic.Int32 // blocks processed for multiplexed request }
func ContextWithEmptyStats ¶
ContextWithEmptyStats returns a context with empty stats.
func FromContext ¶
FromContext gets the Stats out of the Context. Returns nil if stats have not been initialised in the context.
func (*Stats) AddBlocksFetchTime ¶
func (*Stats) AddPostProcessingTime ¶
func (*Stats) AddProcessedBlocksTotal ¶ added in v3.3.0
func (*Stats) AddProcessingTime ¶
func (*Stats) AddQueueTime ¶
func (*Stats) AddTotalProcessingTime ¶
func (*Stats) IncProcessedBlocks ¶
func (s *Stats) IncProcessedBlocks()
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is the data structure that is enqueued to the internal queue and dequeued by query workers
func (Task) Bounds ¶
Bounds implements Bounded see pkg/storage/stores/shipper/indexshipper/tsdb.Bounded