compactor

package
v1.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2024 License: Apache-2.0 Imports: 49 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit.
	BlockVisitMarkerFile = "visit-mark.json"
	// VisitMarkerVersion1 is the current supported version of visit-mark file.
	VisitMarkerVersion1 = 1
)
View Source
const (
	// CleanerVisitMarkerName is the name of cleaner visit marker file.
	CleanerVisitMarkerName = "cleaner-visit-marker.json"
	// CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file.
	CleanerVisitMarkerVersion1 = 1
)

Variables

View Source
var (
	ErrorBlockVisitMarkerNotFound  = errors.New("block visit marker not found")
	ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON")
	ErrorNotBlockVisitMarker       = errors.New("file is not block visit marker")
)
View Source
var (
	RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

	DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
		return compact.NewDefaultGrouperWithMetrics(
			logger,
			bkt,
			cfg.AcceptMalformedIndex,
			true,
			compactorMetrics.compactions,
			compactorMetrics.compactionRunsStarted,
			compactorMetrics.compactionRunsCompleted,
			compactorMetrics.compactionFailures,
			compactorMetrics.verticalCompactions,
			syncerMetrics.BlocksMarkedForDeletion,
			syncerMetrics.GarbageCollectedBlocks,
			blocksMarkedForNoCompaction,
			metadata.NoneFunc,
			cfg.BlockFilesConcurrency,
			cfg.BlocksFetchConcurrency)
	}

	ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
		return NewShuffleShardingGrouper(
			ctx,
			logger,
			bkt,
			cfg.AcceptMalformedIndex,
			true,
			blocksMarkedForNoCompaction,
			metadata.NoneFunc,
			syncerMetrics,
			compactorMetrics,
			cfg,
			ring,
			ringLifecycle.Addr,
			ringLifecycle.ID,
			limits,
			userID,
			cfg.BlockFilesConcurrency,
			cfg.BlocksFetchConcurrency,
			cfg.CompactionConcurrency,
			cfg.BlockVisitMarkerTimeout,
			blockVisitMarkerReadFailed,
			blockVisitMarkerWriteFailed,
			noCompactionMarkFilter.NoCompactMarkedBlocks)
	}

	DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
		compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
		if err != nil {
			return nil, nil, err
		}

		plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner {
			return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
		}

		return compactor, plannerFactory, nil
	}

	ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
		compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
		if err != nil {
			return nil, nil, err
		}

		plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {

			return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
		}
		return compactor, plannerFactory, nil
	}
)

Functions

func GetCleanerVisitMarkerFilePath added in v1.18.0

func GetCleanerVisitMarkerFilePath() string

func IsBlockVisitMarker added in v1.16.0

func IsBlockVisitMarker(path string) bool

func IsNotBlockVisitMarkerError added in v1.16.0

func IsNotBlockVisitMarkerError(err error) bool

func UpdateBlockVisitMarker added in v1.14.0

func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error

Types

type BlockVisitMarker added in v1.14.0

type BlockVisitMarker struct {
	CompactorID string `json:"compactorID"`
	// VisitTime is a unix timestamp of when the block was visited (mark updated).
	VisitTime int64 `json:"visitTime"`
	// Version of the file.
	Version int `json:"version"`
}

func ReadBlockVisitMarker added in v1.14.0

func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error)

type BlocksCleaner added in v1.2.0

type BlocksCleaner struct {
	services.Service
	// contains filtered or unexported fields
}

func NewBlocksCleaner added in v1.2.0

func NewBlocksCleaner(
	cfg BlocksCleanerConfig,
	bucketClient objstore.InstrumentedBucket,
	usersScanner *cortex_tsdb.UsersScanner,
	cfgProvider ConfigProvider,
	logger log.Logger,
	ringLifecyclerID string,
	reg prometheus.Registerer,
	cleanerVisitMarkerTimeout time.Duration,
	cleanerVisitMarkerFileUpdateInterval time.Duration,
	blocksMarkedForDeletion *prometheus.CounterVec,
) *BlocksCleaner

type BlocksCleanerConfig added in v1.2.0

type BlocksCleanerConfig struct {
	DeletionDelay                      time.Duration
	CleanupInterval                    time.Duration
	CleanupConcurrency                 int
	BlockDeletionMarksMigrationEnabled bool          // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
	TenantCleanupDelay                 time.Duration // Delay before removing tenant deletion mark and "debug".
}

type BlocksCompactorFactory added in v1.8.0

type BlocksCompactorFactory func(
	ctx context.Context,
	cfg Config,
	logger log.Logger,
	reg prometheus.Registerer,
) (compact.Compactor, PlannerFactory, error)

BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.

type BlocksGrouperFactory added in v1.8.0

type BlocksGrouperFactory func(
	ctx context.Context,
	cfg Config,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	blocksMarkedForNoCompact prometheus.Counter,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	syncerMetrics *compact.SyncerMetrics,
	compactorMetrics *compactorMetrics,
	ring *ring.Ring,
	ringLifecycler *ring.Lifecycler,
	limit Limits,
	userID string,
	noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
) compact.Grouper

BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.

type CleanerVisitMarker added in v1.18.0

type CleanerVisitMarker struct {
	CompactorID string      `json:"compactorID"`
	Status      VisitStatus `json:"status"`
	// VisitTime is a unix timestamp of when the partition was visited (mark updated).
	VisitTime int64 `json:"visitTime"`
	// Version of the file.
	Version int `json:"version"`
}

func NewCleanerVisitMarker added in v1.18.0

func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker

func (*CleanerVisitMarker) GetStatus added in v1.18.0

func (b *CleanerVisitMarker) GetStatus() VisitStatus

func (*CleanerVisitMarker) GetVisitMarkerFilePath added in v1.18.0

func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string

func (*CleanerVisitMarker) IsExpired added in v1.18.0

func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool

func (*CleanerVisitMarker) IsVisited added in v1.18.0

func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool

func (*CleanerVisitMarker) String added in v1.18.0

func (b *CleanerVisitMarker) String() string

func (*CleanerVisitMarker) UpdateStatus added in v1.18.0

func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus)

type Compactor

type Compactor struct {
	services.Service

	// Metrics.
	CompactorStartDurationSeconds  prometheus.Gauge
	CompactionRunsStarted          prometheus.Counter
	CompactionRunsInterrupted      prometheus.Counter
	CompactionRunsCompleted        prometheus.Counter
	CompactionRunsFailed           prometheus.Counter
	CompactionRunsLastSuccess      prometheus.Gauge
	CompactionRunDiscoveredTenants prometheus.Gauge
	CompactionRunSkippedTenants    prometheus.Gauge
	CompactionRunSucceededTenants  prometheus.Gauge
	CompactionRunFailedTenants     prometheus.Gauge
	CompactionRunInterval          prometheus.Gauge
	BlocksMarkedForNoCompaction    prometheus.Counter
	// contains filtered or unexported fields
}

Compactor is a multi-tenant TSDB blocks compactor based on Thanos.

func NewCompactor

func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error)

NewCompactor makes a new Compactor.

func (*Compactor) RingHandler added in v0.7.0

func (c *Compactor) RingHandler(w http.ResponseWriter, req *http.Request)

type Config

type Config struct {
	BlockRanges                           cortex_tsdb.DurationList `yaml:"block_ranges"`
	BlockSyncConcurrency                  int                      `yaml:"block_sync_concurrency"`
	MetaSyncConcurrency                   int                      `yaml:"meta_sync_concurrency"`
	ConsistencyDelay                      time.Duration            `yaml:"consistency_delay"`
	DataDir                               string                   `yaml:"data_dir"`
	CompactionInterval                    time.Duration            `yaml:"compaction_interval"`
	CompactionRetries                     int                      `yaml:"compaction_retries"`
	CompactionConcurrency                 int                      `yaml:"compaction_concurrency"`
	CleanupInterval                       time.Duration            `yaml:"cleanup_interval"`
	CleanupConcurrency                    int                      `yaml:"cleanup_concurrency"`
	DeletionDelay                         time.Duration            `yaml:"deletion_delay"`
	TenantCleanupDelay                    time.Duration            `yaml:"tenant_cleanup_delay"`
	SkipBlocksWithOutOfOrderChunksEnabled bool                     `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
	BlockFilesConcurrency                 int                      `yaml:"block_files_concurrency"`
	BlocksFetchConcurrency                int                      `yaml:"blocks_fetch_concurrency"`

	// Whether the migration of block deletion marks to the global markers location is enabled.
	BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`

	EnabledTenants  flagext.StringSliceCSV `yaml:"enabled_tenants"`
	DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

	// Compactors sharding.
	ShardingEnabled  bool       `yaml:"sharding_enabled"`
	ShardingStrategy string     `yaml:"sharding_strategy"`
	ShardingRing     RingConfig `yaml:"sharding_ring"`

	// Allow downstream projects to customise the blocks compactor.
	BlocksGrouperFactory   BlocksGrouperFactory   `yaml:"-"`
	BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`

	// Block visit marker file config
	BlockVisitMarkerTimeout            time.Duration `yaml:"block_visit_marker_timeout"`
	BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

	// Cleaner visit marker file config
	CleanerVisitMarkerTimeout            time.Duration `yaml:"cleaner_visit_marker_timeout"`
	CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`

	AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
	CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
	// contains filtered or unexported fields
}

Config holds the Compactor config.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the Compactor flags.

func (*Config) Validate added in v1.6.0

func (cfg *Config) Validate(limits validation.Limits) error

type ConfigProvider added in v1.8.0

type ConfigProvider interface {
	bucket.TenantConfigProvider
	CompactorBlocksRetentionPeriod(user string) time.Duration
}

ConfigProvider defines the per-tenant config provider for the Compactor.

type LabelRemoverFilter added in v1.2.0

type LabelRemoverFilter struct {
	// contains filtered or unexported fields
}

func NewLabelRemoverFilter added in v1.2.0

func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter

NewLabelRemoverFilter creates a LabelRemoverFilter.

func (*LabelRemoverFilter) Filter added in v1.2.0

Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.

type Limits added in v1.13.0

type Limits interface {
	CompactorTenantShardSize(userID string) int
}

Limits defines limits used by the Compactor.

type PlannerFactory added in v1.13.0

type PlannerFactory func(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	cfg Config,
	noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
	ringLifecycle *ring.Lifecycler,
	userID string,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	compactorMetrics *compactorMetrics,
) compact.Planner

type RingConfig added in v0.7.0

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`
	TokensFilePath         string   `yaml:"tokens_file_path"`
	UnregisterOnShutdown   bool     `yaml:"unregister_on_shutdown"`

	// Injected internally
	ListenPort int `yaml:"-"`

	WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`

	ObservePeriod time.Duration `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags added in v0.7.0

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig added in v0.7.0

func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig

ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config.

type ShuffleShardingGrouper added in v1.13.0

type ShuffleShardingGrouper struct {
	// contains filtered or unexported fields
}

func NewShuffleShardingGrouper added in v1.13.0

func NewShuffleShardingGrouper(
	ctx context.Context,
	logger log.Logger,
	bkt objstore.InstrumentedBucket,
	acceptMalformedIndex bool,
	enableVerticalCompaction bool,
	blocksMarkedForNoCompact prometheus.Counter,
	hashFunc metadata.HashFunc,
	syncerMetrics *compact.SyncerMetrics,
	compactorMetrics *compactorMetrics,
	compactorCfg Config,
	ring ring.ReadRing,
	ringLifecyclerAddr string,
	ringLifecyclerID string,
	limits Limits,
	userID string,
	blockFilesConcurrency int,
	blocksFetchConcurrency int,
	compactionConcurrency int,
	blockVisitMarkerTimeout time.Duration,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
) *ShuffleShardingGrouper

func (*ShuffleShardingGrouper) Groups added in v1.13.0

func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error)

Groups function modified from https://github.com/cortexproject/cortex/pull/2616

type ShuffleShardingPlanner added in v1.13.0

type ShuffleShardingPlanner struct {
	// contains filtered or unexported fields
}

func NewShuffleShardingPlanner added in v1.13.0

func NewShuffleShardingPlanner(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	ranges []int64,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
	ringLifecyclerID string,
	blockVisitMarkerTimeout time.Duration,
	blockVisitMarkerFileUpdateInterval time.Duration,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
) *ShuffleShardingPlanner

func (*ShuffleShardingPlanner) Plan added in v1.13.0

func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error)

type VisitMarker added in v1.18.0

type VisitMarker interface {
	GetVisitMarkerFilePath() string
	UpdateStatus(ownerIdentifier string, status VisitStatus)
	GetStatus() VisitStatus
	IsExpired(visitMarkerTimeout time.Duration) bool
	String() string
}

type VisitMarkerManager added in v1.18.0

type VisitMarkerManager struct {
	// contains filtered or unexported fields
}

func NewVisitMarkerManager added in v1.18.0

func NewVisitMarkerManager(
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	ownerIdentifier string,
	visitMarker VisitMarker,
) *VisitMarkerManager

func (*VisitMarkerManager) DeleteVisitMarker added in v1.18.0

func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context)

func (*VisitMarkerManager) HeartBeat added in v1.18.0

func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool)

func (*VisitMarkerManager) MarkWithStatus added in v1.18.0

func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus)

func (*VisitMarkerManager) ReadVisitMarker added in v1.18.0

func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error

type VisitStatus added in v1.18.0

type VisitStatus string
const (
	Pending    VisitStatus = "pending"
	InProgress VisitStatus = "inProgress"
	Completed  VisitStatus = "completed"
	Failed     VisitStatus = "failed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL