compactor

package
v1.6.2-0...-8a36637 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildUserID

func BuildUserID(id int) string

func SchemaPeriodForTable

func SchemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool)

func SetupTable

func SetupTable(t *testing.T, path string, commonDBsConfig IndexesConfig, perUserDBsConfig PerUserIndexesConfig)

func SortTablesByRange

func SortTablesByRange(tables []string)

Types

type CompactedIndex

type CompactedIndex interface {
	// IndexProcessor is used for applying custom retention and processing delete requests.
	retention.IndexProcessor
	// Cleanup should clean up all the state built during compaction.
	// It is typically called at the end or in case of an error.
	Cleanup()
	// ToIndexFile is used to convert the CompactedIndex to an IndexFile for uploading to the object store.
	// Once the IndexFile is uploaded using Index.Reader, the file is closed using Index.Close and removed from disk using Index.Path.
	ToIndexFile() (index.Index, error)
}

CompactedIndex is built by TableCompactor for IndexSet after compaction. It would be used for: 1. applying custom retention, processing delete requests using IndexProcessor 2. uploading the compacted index to storage by converting it to index.Index using ToIndexFile After all the operations are successfully done or in case of failure, Cleanup would be called to cleanup the state.

type Compactor

type Compactor struct {
	services.Service

	DeleteRequestsHandler     *deletion.DeleteRequestHandler
	DeleteRequestsGRPCHandler *deletion.GRPCRequestHandler
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor(cfg Config, objectStoreClients map[config.DayTime]client.ObjectClient, deleteStoreClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer, metricsNamespace string) (*Compactor, error)

func (*Compactor) CompactTable

func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error

func (*Compactor) OnRingInstanceHeartbeat

func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)

func (*Compactor) OnRingInstanceRegister

func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)

func (*Compactor) OnRingInstanceStopping

func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler)

func (*Compactor) OnRingInstanceTokens

func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)

func (*Compactor) RegisterIndexCompactor

func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor IndexCompactor)

func (*Compactor) RunCompaction

func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) (err error)

func (*Compactor) ServeHTTP

func (c *Compactor) ServeHTTP(w http.ResponseWriter, req *http.Request)

type Config

type Config struct {
	WorkingDirectory            string              `yaml:"working_directory"`
	CompactionInterval          time.Duration       `yaml:"compaction_interval"`
	ApplyRetentionInterval      time.Duration       `yaml:"apply_retention_interval"`
	RetentionEnabled            bool                `yaml:"retention_enabled"`
	RetentionDeleteDelay        time.Duration       `yaml:"retention_delete_delay"`
	RetentionDeleteWorkCount    int                 `yaml:"retention_delete_worker_count"`
	RetentionTableTimeout       time.Duration       `yaml:"retention_table_timeout"`
	DeleteRequestStore          string              `yaml:"delete_request_store"`
	DeleteRequestStoreKeyPrefix string              `yaml:"delete_request_store_key_prefix"`
	DeleteBatchSize             int                 `yaml:"delete_batch_size"`
	DeleteRequestCancelPeriod   time.Duration       `yaml:"delete_request_cancel_period"`
	DeleteMaxInterval           time.Duration       `yaml:"delete_max_interval"`
	MaxCompactionParallelism    int                 `yaml:"max_compaction_parallelism"`
	UploadParallelism           int                 `yaml:"upload_parallelism"`
	CompactorRing               lokiring.RingConfig `` /* 210-byte string literal not displayed */
	RunOnce                     bool                `yaml:"_" doc:"hidden"`
	TablesToCompact             int                 `yaml:"tables_to_compact"`
	SkipLatestNTables           int                 `yaml:"skip_latest_n_tables"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate verifies the config does not contain inappropriate values

type IndexCompactor

type IndexCompactor interface {
	// NewTableCompactor returns a new TableCompactor for compacting a table.
	// commonIndexSet refers to common index files or in other words multi-tenant index.
	// existingUserIndexSet refers to existing user specific index files in the storage.
	// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
	// who does not have an index for it in existingUserIndexSet.
	// periodConfig holds the PeriodConfig for the table.
	NewTableCompactor(
		ctx context.Context,
		commonIndexSet IndexSet,
		existingUserIndexSet map[string]IndexSet,
		makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
		periodConfig config.PeriodConfig,
	) TableCompactor

	// OpenCompactedIndexFile opens a compressed index file at given path.
	OpenCompactedIndexFile(
		ctx context.Context,
		path,
		tableName,
		userID,
		workingDir string,
		periodConfig config.PeriodConfig,
		logger log.Logger,
	) (
		CompactedIndex,
		error,
	)
}

type IndexFileConfig

type IndexFileConfig struct {
	CompressFile bool
}

type IndexRecords

type IndexRecords struct {
	Start, NumRecords int
}

type IndexSet

type IndexSet interface {
	GetTableName() string
	ListSourceFiles() []storage.IndexFile
	GetSourceFile(indexFile storage.IndexFile) (string, error)
	GetLogger() log.Logger
	GetWorkingDir() string
	// SetCompactedIndex sets the CompactedIndex for upload/applying retention and making the compactor remove the source files.
	// CompactedIndex can be nil only in case of all the source files in common index set being compacted away to per tenant index.
	// It would return an error if the CompactedIndex is nil and removeSourceFiles is true in case of user index set since
	// compaction should either create new files or can be a noop if there is nothing to compact.
	// There is no need to call SetCompactedIndex if no changes were made to the index for this IndexSet.
	SetCompactedIndex(compactedIndex CompactedIndex, removeSourceFiles bool) error
}

type IndexesConfig

type IndexesConfig struct {
	NumUnCompactedFiles, NumCompactedFiles int
}

func (IndexesConfig) String

func (c IndexesConfig) String() string

type Limits

type Limits interface {
	deletion.Limits
	retention.Limits
	DefaultLimits() *validation.Limits
}

type MakeEmptyUserIndexSetFunc

type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)

type PerUserIndexesConfig

type PerUserIndexesConfig struct {
	IndexesConfig
	NumUsers int
}

func (PerUserIndexesConfig) String

func (c PerUserIndexesConfig) String() string

type TableCompactor

type TableCompactor interface {
	// CompactTable compacts the table.
	// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
	CompactTable() (err error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL