Documentation ¶
Index ¶
- func BuildUserID(id int) string
- func SchemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool)
- func SetupTable(t *testing.T, path string, commonDBsConfig IndexesConfig, ...)
- func SortTablesByRange(tables []string)
- type CompactedIndex
- type Compactor
- func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error
- func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)
- func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, ...) (ring.InstanceState, ring.Tokens)
- func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler)
- func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
- func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor IndexCompactor)
- func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) (err error)
- func (c *Compactor) ServeHTTP(w http.ResponseWriter, req *http.Request)
- type Config
- type IndexCompactor
- type IndexFileConfig
- type IndexRecords
- type IndexSet
- type IndexesConfig
- type Limits
- type MakeEmptyUserIndexSetFunc
- type PerUserIndexesConfig
- type TableCompactor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildUserID ¶
func SchemaPeriodForTable ¶
func SchemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool)
func SetupTable ¶
func SetupTable(t *testing.T, path string, commonDBsConfig IndexesConfig, perUserDBsConfig PerUserIndexesConfig)
func SortTablesByRange ¶
func SortTablesByRange(tables []string)
Types ¶
type CompactedIndex ¶
type CompactedIndex interface { // IndexProcessor is used for applying custom retention and processing delete requests. retention.IndexProcessor // Cleanup should clean up all the state built during compaction. // It is typically called at the end or in case of an error. Cleanup() // ToIndexFile is used to convert the CompactedIndex to an IndexFile for uploading to the object store. // Once the IndexFile is uploaded using Index.Reader, the file is closed using Index.Close and removed from disk using Index.Path. ToIndexFile() (index.Index, error) }
CompactedIndex is built by TableCompactor for IndexSet after compaction. It would be used for: 1. applying custom retention, processing delete requests using IndexProcessor 2. uploading the compacted index to storage by converting it to index.Index using ToIndexFile After all the operations are successfully done or in case of failure, Cleanup would be called to cleanup the state.
type Compactor ¶
type Compactor struct { services.Service DeleteRequestsHandler *deletion.DeleteRequestHandler DeleteRequestsGRPCHandler *deletion.GRPCRequestHandler // contains filtered or unexported fields }
func NewCompactor ¶
func NewCompactor(cfg Config, objectStoreClients map[config.DayTime]client.ObjectClient, deleteStoreClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer, metricsNamespace string) (*Compactor, error)
func (*Compactor) CompactTable ¶
func (*Compactor) OnRingInstanceHeartbeat ¶
func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)
func (*Compactor) OnRingInstanceRegister ¶
func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)
func (*Compactor) OnRingInstanceStopping ¶
func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler)
func (*Compactor) OnRingInstanceTokens ¶
func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
func (*Compactor) RegisterIndexCompactor ¶
func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor IndexCompactor)
func (*Compactor) RunCompaction ¶
type Config ¶
type Config struct { WorkingDirectory string `yaml:"working_directory"` CompactionInterval time.Duration `yaml:"compaction_interval"` ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"` RetentionEnabled bool `yaml:"retention_enabled"` RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"` DeleteRequestStore string `yaml:"delete_request_store"` DeleteRequestStoreKeyPrefix string `yaml:"delete_request_store_key_prefix"` DeleteBatchSize int `yaml:"delete_batch_size"` DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` DeleteMaxInterval time.Duration `yaml:"delete_max_interval"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` UploadParallelism int `yaml:"upload_parallelism"` CompactorRing lokiring.RingConfig `` /* 210-byte string literal not displayed */ RunOnce bool `yaml:"_" doc:"hidden"` TablesToCompact int `yaml:"tables_to_compact"` SkipLatestNTables int `yaml:"skip_latest_n_tables"` }
func (*Config) RegisterFlags ¶
RegisterFlags registers flags.
type IndexCompactor ¶
type IndexCompactor interface { // NewTableCompactor returns a new TableCompactor for compacting a table. // commonIndexSet refers to common index files or in other words multi-tenant index. // existingUserIndexSet refers to existing user specific index files in the storage. // makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user // who does not have an index for it in existingUserIndexSet. // periodConfig holds the PeriodConfig for the table. NewTableCompactor( ctx context.Context, commonIndexSet IndexSet, existingUserIndexSet map[string]IndexSet, makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc, periodConfig config.PeriodConfig, ) TableCompactor // OpenCompactedIndexFile opens a compressed index file at given path. OpenCompactedIndexFile( ctx context.Context, path, tableName, userID, workingDir string, periodConfig config.PeriodConfig, logger log.Logger, ) ( CompactedIndex, error, ) }
type IndexFileConfig ¶
type IndexFileConfig struct {
CompressFile bool
}
type IndexRecords ¶
type IndexRecords struct {
Start, NumRecords int
}
type IndexSet ¶
type IndexSet interface { GetTableName() string ListSourceFiles() []storage.IndexFile GetSourceFile(indexFile storage.IndexFile) (string, error) GetLogger() log.Logger GetWorkingDir() string // SetCompactedIndex sets the CompactedIndex for upload/applying retention and making the compactor remove the source files. // CompactedIndex can be nil only in case of all the source files in common index set being compacted away to per tenant index. // It would return an error if the CompactedIndex is nil and removeSourceFiles is true in case of user index set since // compaction should either create new files or can be a noop if there is nothing to compact. // There is no need to call SetCompactedIndex if no changes were made to the index for this IndexSet. SetCompactedIndex(compactedIndex CompactedIndex, removeSourceFiles bool) error }
type IndexesConfig ¶
type IndexesConfig struct {
NumUnCompactedFiles, NumCompactedFiles int
}
func (IndexesConfig) String ¶
func (c IndexesConfig) String() string
type PerUserIndexesConfig ¶
type PerUserIndexesConfig struct { IndexesConfig NumUsers int }
func (PerUserIndexesConfig) String ¶
func (c PerUserIndexesConfig) String() string
type TableCompactor ¶
type TableCompactor interface { // CompactTable compacts the table. // After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets. CompactTable() (err error) }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.