queue

package
v1.87.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2023 License: AGPL-3.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmpty = errs.Class("empty queue")

ErrEmpty is returned when attempting to Dequeue from an empty queue.

View Source
var Error = errs.Class("repair queue")

Error is a standard error class for this package.

Functions

This section is empty.

Types

type InjuredSegment added in v1.34.1

type InjuredSegment struct {
	StreamID uuid.UUID
	Position metabase.SegmentPosition

	SegmentHealth float64
	AttemptedAt   *time.Time
	UpdatedAt     time.Time
	InsertedAt    time.Time
}

InjuredSegment contains information about segment which should be repaired.

type InsertBuffer added in v1.55.1

type InsertBuffer struct {
	// contains filtered or unexported fields
}

InsertBuffer exposes a synchronous API to buffer a batch of segments and insert them at once. Not threadsafe. Call Flush() before discarding.

func NewInsertBuffer added in v1.55.1

func NewInsertBuffer(
	queue RepairQueue,
	batchSize int,
) *InsertBuffer

NewInsertBuffer wraps a RepairQueue with buffer logic.

func (*InsertBuffer) Flush added in v1.55.1

func (r *InsertBuffer) Flush(ctx context.Context) (err error)

Flush inserts the remaining segments into the database.

func (*InsertBuffer) Insert added in v1.55.1

func (r *InsertBuffer) Insert(
	ctx context.Context,
	segment *InjuredSegment,
	newInsertCallback func(),
) (err error)

Insert adds a segment to the batch of the next insert, and does a synchronous database insert when the batch size is reached. When it is determined that this segment is newly queued, firstInsertCallback is called. for the purpose of metrics.

type MockRepairQueue added in v1.86.2

type MockRepairQueue struct {
	Segments []*InjuredSegment
}

MockRepairQueue helps testing RepairQueue.

func (*MockRepairQueue) Clean added in v1.86.2

func (m *MockRepairQueue) Clean(ctx context.Context, before time.Time) (deleted int64, err error)

Clean implements RepairQueue.

func (*MockRepairQueue) Count added in v1.86.2

func (m *MockRepairQueue) Count(ctx context.Context) (count int, err error)

Count implements RepairQueue.

func (*MockRepairQueue) Delete added in v1.86.2

func (m *MockRepairQueue) Delete(ctx context.Context, s *InjuredSegment) error

Delete implements RepairQueue.

func (*MockRepairQueue) Insert added in v1.86.2

func (m *MockRepairQueue) Insert(ctx context.Context, s *InjuredSegment) (alreadyInserted bool, err error)

Insert implements RepairQueue.

func (*MockRepairQueue) InsertBatch added in v1.86.2

func (m *MockRepairQueue) InsertBatch(ctx context.Context, segments []*InjuredSegment) (newlyInsertedSegments []*InjuredSegment, err error)

InsertBatch implements RepairQueue.

func (*MockRepairQueue) Select added in v1.86.2

func (m *MockRepairQueue) Select(ctx context.Context) (*InjuredSegment, error)

Select implements RepairQueue.

func (*MockRepairQueue) SelectN added in v1.86.2

func (m *MockRepairQueue) SelectN(ctx context.Context, limit int) ([]InjuredSegment, error)

SelectN implements RepairQueue.

func (*MockRepairQueue) TestingSetAttemptedTime added in v1.86.2

func (m *MockRepairQueue) TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)

TestingSetAttemptedTime implements RepairQueue.

type RepairQueue

type RepairQueue interface {
	// Insert adds an injured segment.
	Insert(ctx context.Context, s *InjuredSegment) (alreadyInserted bool, err error)
	// InsertBatch adds multiple injured segments
	InsertBatch(ctx context.Context, segments []*InjuredSegment) (newlyInsertedSegments []*InjuredSegment, err error)
	// Select gets an injured segment.
	Select(ctx context.Context) (*InjuredSegment, error)
	// Delete removes an injured segment.
	Delete(ctx context.Context, s *InjuredSegment) error
	// Clean removes all segments last updated before a certain time
	Clean(ctx context.Context, before time.Time) (deleted int64, err error)
	// SelectN lists limit amount of injured segments.
	SelectN(ctx context.Context, limit int) ([]InjuredSegment, error)
	// Count counts the number of segments in the repair queue.
	Count(ctx context.Context) (count int, err error)

	// TestingSetAttemptedTime sets attempted time for a segment.
	TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)
}

RepairQueue implements queueing for segments that need repairing. Implementation can be found at satellite/satellitedb/repairqueue.go.

architecture: Database

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL