Documentation ¶
Index ¶
- Constants
- Variables
- func ExtractShardRequestMatchersAndAST(query string) (chunk.Predicate, error)
- func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing
- type BloomQuerier
- type ClientConfig
- type ClientPool
- type Config
- type Gateway
- func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequest) (result *logproto.GetChunkRefResponse, err error)
- func (g *Gateway) GetSeries(ctx context.Context, req *logproto.GetSeriesRequest) (*logproto.GetSeriesResponse, error)
- func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.IndexGateway_GetShardsServer) error
- func (g *Gateway) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error)
- func (g *Gateway) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
- func (g *Gateway) LabelNamesForMetricName(ctx context.Context, req *logproto.LabelNamesForMetricNameRequest) (*logproto.LabelResponse, error)
- func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *logproto.LabelValuesForMetricNameRequest) (*logproto.LabelResponse, error)
- func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, ...) error
- type GatewayClient
- func (s *GatewayClient) BatchWrite(_ context.Context, _ index.WriteBatch) error
- func (s *GatewayClient) GetChunkRef(ctx context.Context, in *logproto.GetChunkRefRequest) (*logproto.GetChunkRefResponse, error)
- func (s *GatewayClient) GetSeries(ctx context.Context, in *logproto.GetSeriesRequest) (*logproto.GetSeriesResponse, error)
- func (s *GatewayClient) GetShards(ctx context.Context, in *logproto.ShardsRequest) (res *logproto.ShardsResponse, err error)
- func (s *GatewayClient) GetStats(ctx context.Context, in *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error)
- func (s *GatewayClient) GetVolume(ctx context.Context, in *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
- func (s *GatewayClient) LabelNamesForMetricName(ctx context.Context, in *logproto.LabelNamesForMetricNameRequest) (*logproto.LabelResponse, error)
- func (s *GatewayClient) LabelValuesForMetricName(ctx context.Context, in *logproto.LabelValuesForMetricNameRequest) (*logproto.LabelResponse, error)
- func (s *GatewayClient) NewWriteBatch() index.WriteBatch
- func (s *GatewayClient) QueryIndex(_ context.Context, _ *logproto.QueryIndexRequest, _ ...grpc.CallOption) (logproto.IndexGateway_QueryIndexClient, error)
- func (s *GatewayClient) QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error
- func (s *GatewayClient) Stop()
- type IndexClient
- type IndexClientWithRange
- type IndexQuerier
- type Limits
- type Metrics
- type Mode
- type NoopStrategy
- type ServerInterceptors
- type ShardingStrategy
- type ShuffleShardingStrategy
Constants ¶
const ( NumTokens = 128 ReplicationFactor = 3 )
Variables ¶
var ( // IndexesSync is the operation used to check the authoritative owners of an index // (replicas included). IndexesSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) // IndexesRead is the operation run by the querier/query frontent to query // indexes via the index gateway. IndexesRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) )
Functions ¶
func ExtractShardRequestMatchersAndAST ¶
ExtractShardRequestMatchersAndAST extracts the matchers and AST from a query string. It errors if there is more than one matcher group in the AST as this is supposed to be split out during query planning before reaching this point.
func GetShuffleShardingSubring ¶
GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by index gateway servers and clients in order to guarantee the same logic is used.
Types ¶
type BloomQuerier ¶
type ClientConfig ¶
type ClientConfig struct { // Mode sets in which mode the client will operate. It is actually defined at the // index_gateway YAML section and reused here. Mode Mode `yaml:"-"` // PoolConfig defines the behavior of the gRPC connection pool used to communicate // with the Index Gateway. // // Only relevant for the ring mode. // It is defined at the distributors YAML section and reused here. PoolConfig clientpool.PoolConfig `yaml:"-"` // Ring is the Index Gateway ring used to find the appropriate Index Gateway instance // this client should talk to. // // Only relevant for the ring mode. Ring ring.ReadRing `yaml:"-"` // GRPCClientConfig configures the gRPC connection between the Index Gateway client and the server. // // Used by both, ring and simple mode. GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` // Address of the Index Gateway instance responsible for retaining the index for all tenants. // // Only relevant for the simple mode. Address string `yaml:"server_address,omitempty"` // Forcefully disable the use of the index gateway client for the storage. // This is mainly useful for the index-gateway component which should always use the storage. Disabled bool `yaml:"-"` // LogGatewayRequests configures if requests sent to the gateway should be logged or not. // The log messages are of type debug and contain the address of the gateway and the relevant tenant. LogGatewayRequests bool `yaml:"log_gateway_requests"` GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"` GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"` }
ClientConfig configures the Index Gateway client used to communicate with the Index Gateway server.
func (*ClientConfig) RegisterFlags ¶
func (i *ClientConfig) RegisterFlags(f *flag.FlagSet)
func (*ClientConfig) RegisterFlagsWithPrefix ¶
func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix register client-specific flags with the given prefix.
Flags that are used by both, client and server, are defined in the indexgateway package.
type ClientPool ¶
type ClientPool struct { grpc_health_v1.HealthClient logproto.IndexGatewayClient io.Closer }
ClientPool represents a pool of gRPC connections to different index gateway instances.
Only used when Index Gateway is configured to run in ring mode.
func NewClientPool ¶
func NewClientPool(address string, opts []grpc.DialOption) (*ClientPool, error)
NewClientPool instantiates a new pool of IndexGateway GRPC connections.
Internally, it also instantiates a protobuf index gateway client and a health client.
type Config ¶
type Config struct { // Mode configures in which mode the client will be running when querying and communicating with an Index Gateway instance. Mode Mode `yaml:"mode"` // Ring configures the ring key-value store used to save and retrieve the different Index Gateway instances. // // In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration // section and the ingester configuration by default). Ring ring.RingConfig `` /* 272-byte string literal not displayed */ }
Config configures an Index Gateway server.
func (*Config) RegisterFlags ¶
RegisterFlags register all IndexGatewayClientConfig flags and all the flags of its subconfigs but with a prefix (ex: shipper).
type Gateway ¶
func NewIndexGateway ¶
func NewIndexGateway(cfg Config, limits Limits, log log.Logger, r prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange, bloomQuerier BloomQuerier) (*Gateway, error)
NewIndexGateway instantiates a new Index Gateway and start its services.
In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started. Otherwise, it starts an Idle Service that doesn't have lifecycle hooks.
func (*Gateway) GetChunkRef ¶
func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequest) (result *logproto.GetChunkRefResponse, err error)
func (*Gateway) GetSeries ¶
func (g *Gateway) GetSeries(ctx context.Context, req *logproto.GetSeriesRequest) (*logproto.GetSeriesResponse, error)
func (*Gateway) GetShards ¶
func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.IndexGateway_GetShardsServer) error
func (*Gateway) GetStats ¶
func (g *Gateway) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error)
func (*Gateway) GetVolume ¶
func (g *Gateway) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
func (*Gateway) LabelNamesForMetricName ¶
func (g *Gateway) LabelNamesForMetricName(ctx context.Context, req *logproto.LabelNamesForMetricNameRequest) (*logproto.LabelResponse, error)
func (*Gateway) LabelValuesForMetricName ¶
func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *logproto.LabelValuesForMetricNameRequest) (*logproto.LabelResponse, error)
func (*Gateway) QueryIndex ¶
func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, server logproto.IndexGateway_QueryIndexServer) error
type GatewayClient ¶
type GatewayClient struct {
// contains filtered or unexported fields
}
func NewGatewayClient ¶
func NewGatewayClient(cfg ClientConfig, r prometheus.Registerer, limits Limits, logger log.Logger, metricsNamespace string) (*GatewayClient, error)
NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance.
If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created using a ring. Otherwise, it creates a GRPC connection pool to as many addresses as can be resolved from the given address.
func (*GatewayClient) BatchWrite ¶
func (s *GatewayClient) BatchWrite(_ context.Context, _ index.WriteBatch) error
func (*GatewayClient) GetChunkRef ¶
func (s *GatewayClient) GetChunkRef(ctx context.Context, in *logproto.GetChunkRefRequest) (*logproto.GetChunkRefResponse, error)
func (*GatewayClient) GetSeries ¶
func (s *GatewayClient) GetSeries(ctx context.Context, in *logproto.GetSeriesRequest) (*logproto.GetSeriesResponse, error)
func (*GatewayClient) GetShards ¶
func (s *GatewayClient) GetShards( ctx context.Context, in *logproto.ShardsRequest, ) (res *logproto.ShardsResponse, err error)
func (*GatewayClient) GetStats ¶
func (s *GatewayClient) GetStats(ctx context.Context, in *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error)
func (*GatewayClient) GetVolume ¶
func (s *GatewayClient) GetVolume(ctx context.Context, in *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
func (*GatewayClient) LabelNamesForMetricName ¶
func (s *GatewayClient) LabelNamesForMetricName(ctx context.Context, in *logproto.LabelNamesForMetricNameRequest) (*logproto.LabelResponse, error)
func (*GatewayClient) LabelValuesForMetricName ¶
func (s *GatewayClient) LabelValuesForMetricName(ctx context.Context, in *logproto.LabelValuesForMetricNameRequest) (*logproto.LabelResponse, error)
func (*GatewayClient) NewWriteBatch ¶
func (s *GatewayClient) NewWriteBatch() index.WriteBatch
func (*GatewayClient) QueryIndex ¶
func (s *GatewayClient) QueryIndex(_ context.Context, _ *logproto.QueryIndexRequest, _ ...grpc.CallOption) (logproto.IndexGateway_QueryIndexClient, error)
func (*GatewayClient) QueryPages ¶
func (s *GatewayClient) QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error
func (*GatewayClient) Stop ¶
func (s *GatewayClient) Stop()
Stop stops the execution of this gateway client.
type IndexClient ¶
type IndexClient interface { seriesindex.ReadClient Stop() }
type IndexClientWithRange ¶
type IndexClientWithRange struct { IndexClient TableRange config.TableRange }
type IndexQuerier ¶
type IndexQuerier interface { stores.ChunkFetcher index.BaseReader index.StatsReader Stop() }
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func NewMetrics(r prometheus.Registerer) *Metrics
type Mode ¶
type Mode string
Mode represents in which mode an Index Gateway instance is running.
Right now, two modes are supported: simple mode (default) and ring mode.
const ( // SimpleMode is a mode where an Index Gateway instance solely handle all the work. SimpleMode Mode = "simple" // RingMode is a mode where different Index Gateway instances are assigned to handle different tenants. // // It is more horizontally scalable than the simple mode, but requires running a key-value store ring. RingMode Mode = "ring" )
type NoopStrategy ¶
type NoopStrategy struct{}
NoopStrategy is an implementation of the ShardingStrategy that does not filter anything. This is used when the index gateway runs in simple mode or when the index gateway runs in ring mode, but the ring manager runs in client mode.
func NewNoopStrategy ¶
func NewNoopStrategy() *NoopStrategy
func (*NoopStrategy) FilterTenants ¶
func (s *NoopStrategy) FilterTenants(tenantIDs []string) ([]string, error)
FilterTenants implements ShardingStrategy.
type ServerInterceptors ¶
type ServerInterceptors struct { PerTenantRequestCount grpc.UnaryServerInterceptor // contains filtered or unexported fields }
func NewServerInterceptors ¶
func NewServerInterceptors(r prometheus.Registerer) *ServerInterceptors
type ShardingStrategy ¶
type ShardingStrategy interface { // FilterTenants whose indexes should be loaded by the index gateway. // Returns the list of user IDs that should be synced by the index gateway. FilterTenants(tenantID []string) ([]string, error) }
func GetShardingStrategy ¶
func GetShardingStrategy(cfg Config, indexGatewayRingManager *lokiring.RingManager, o Limits) ShardingStrategy
GetShardingStrategy returns the correct ShardingStrategy implementation based on provided configuration.
type ShuffleShardingStrategy ¶
type ShuffleShardingStrategy struct {
// contains filtered or unexported fields
}
func NewShuffleShardingStrategy ¶
func NewShuffleShardingStrategy(r ring.ReadRing, l Limits, instanceAddr, instanceID string) *ShuffleShardingStrategy
func (*ShuffleShardingStrategy) FilterTenants ¶
func (s *ShuffleShardingStrategy) FilterTenants(tenantIDs []string) ([]string, error)
FilterTenants implements ShardingStrategy.