Documentation ¶
Index ¶
- Variables
- type InjuredSegment
- type InsertBuffer
- type MockRepairQueue
- func (m *MockRepairQueue) Clean(ctx context.Context, before time.Time) (deleted int64, err error)
- func (m *MockRepairQueue) Count(ctx context.Context) (count int, err error)
- func (m *MockRepairQueue) Delete(ctx context.Context, s *InjuredSegment) error
- func (m *MockRepairQueue) Insert(ctx context.Context, s *InjuredSegment) (alreadyInserted bool, err error)
- func (m *MockRepairQueue) InsertBatch(ctx context.Context, segments []*InjuredSegment) (newlyInsertedSegments []*InjuredSegment, err error)
- func (m *MockRepairQueue) Select(ctx context.Context) (*InjuredSegment, error)
- func (m *MockRepairQueue) SelectN(ctx context.Context, limit int) ([]InjuredSegment, error)
- func (m *MockRepairQueue) TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, ...) (rowsAffected int64, err error)
- type RepairQueue
Constants ¶
This section is empty.
Variables ¶
var ErrEmpty = errs.Class("empty queue")
ErrEmpty is returned when attempting to Dequeue from an empty queue.
var Error = errs.Class("repair queue")
Error is a standard error class for this package.
Functions ¶
This section is empty.
Types ¶
type InjuredSegment ¶ added in v1.34.1
type InjuredSegment struct { StreamID uuid.UUID Position metabase.SegmentPosition SegmentHealth float64 AttemptedAt *time.Time UpdatedAt time.Time InsertedAt time.Time }
InjuredSegment contains information about segment which should be repaired.
type InsertBuffer ¶ added in v1.55.1
type InsertBuffer struct {
// contains filtered or unexported fields
}
InsertBuffer exposes a synchronous API to buffer a batch of segments and insert them at once. Not threadsafe. Call Flush() before discarding.
func NewInsertBuffer ¶ added in v1.55.1
func NewInsertBuffer( queue RepairQueue, batchSize int, ) *InsertBuffer
NewInsertBuffer wraps a RepairQueue with buffer logic.
func (*InsertBuffer) Flush ¶ added in v1.55.1
func (r *InsertBuffer) Flush(ctx context.Context) (err error)
Flush inserts the remaining segments into the database.
func (*InsertBuffer) Insert ¶ added in v1.55.1
func (r *InsertBuffer) Insert( ctx context.Context, segment *InjuredSegment, newInsertCallback func(), ) (err error)
Insert adds a segment to the batch of the next insert, and does a synchronous database insert when the batch size is reached. When it is determined that this segment is newly queued, firstInsertCallback is called. for the purpose of metrics.
type MockRepairQueue ¶ added in v1.86.2
type MockRepairQueue struct {
Segments []*InjuredSegment
}
MockRepairQueue helps testing RepairQueue.
func (*MockRepairQueue) Count ¶ added in v1.86.2
func (m *MockRepairQueue) Count(ctx context.Context) (count int, err error)
Count implements RepairQueue.
func (*MockRepairQueue) Delete ¶ added in v1.86.2
func (m *MockRepairQueue) Delete(ctx context.Context, s *InjuredSegment) error
Delete implements RepairQueue.
func (*MockRepairQueue) Insert ¶ added in v1.86.2
func (m *MockRepairQueue) Insert(ctx context.Context, s *InjuredSegment) (alreadyInserted bool, err error)
Insert implements RepairQueue.
func (*MockRepairQueue) InsertBatch ¶ added in v1.86.2
func (m *MockRepairQueue) InsertBatch(ctx context.Context, segments []*InjuredSegment) (newlyInsertedSegments []*InjuredSegment, err error)
InsertBatch implements RepairQueue.
func (*MockRepairQueue) Select ¶ added in v1.86.2
func (m *MockRepairQueue) Select(ctx context.Context) (*InjuredSegment, error)
Select implements RepairQueue.
func (*MockRepairQueue) SelectN ¶ added in v1.86.2
func (m *MockRepairQueue) SelectN(ctx context.Context, limit int) ([]InjuredSegment, error)
SelectN implements RepairQueue.
func (*MockRepairQueue) TestingSetAttemptedTime ¶ added in v1.86.2
func (m *MockRepairQueue) TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)
TestingSetAttemptedTime implements RepairQueue.
type RepairQueue ¶
type RepairQueue interface { // Insert adds an injured segment. Insert(ctx context.Context, s *InjuredSegment) (alreadyInserted bool, err error) // InsertBatch adds multiple injured segments InsertBatch(ctx context.Context, segments []*InjuredSegment) (newlyInsertedSegments []*InjuredSegment, err error) // Select gets an injured segment. Select(ctx context.Context) (*InjuredSegment, error) // Delete removes an injured segment. Delete(ctx context.Context, s *InjuredSegment) error // Clean removes all segments last updated before a certain time Clean(ctx context.Context, before time.Time) (deleted int64, err error) // SelectN lists limit amount of injured segments. SelectN(ctx context.Context, limit int) ([]InjuredSegment, error) // Count counts the number of segments in the repair queue. Count(ctx context.Context) (count int, err error) // TestingSetAttemptedTime sets attempted time for a segment. TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error) }
RepairQueue implements queueing for segments that need repairing. Implementation can be found at satellite/satellitedb/repairqueue.go.
architecture: Database