Documentation ¶
Index ¶
- Variables
- func CreateUUIDBoundaries(nRanges uint32) ([]uuid.UUID, error)
- func MakeUUIDWithTopBits(topBits uint32) (uuid.UUID, error)
- type Config
- type LiveCountObserver
- func (o *LiveCountObserver) Finish(ctx context.Context) (err error)
- func (o *LiveCountObserver) Fork(ctx context.Context) (Partial, error)
- func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error
- func (o *LiveCountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error
- func (o *LiveCountObserver) Start(ctx context.Context, startTime time.Time) (err error)
- func (o *LiveCountObserver) Stats(cb func(key monkit.SeriesKey, field string, val float64))
- type MetabaseRangeSplitter
- type MetabaseSegmentProvider
- type Observer
- type ObserverDuration
- type Partial
- type RangeSplitter
- type SegmentProvider
- type Service
- type UUIDRange
Constants ¶
This section is empty.
Variables ¶
var ( // Error is a standard error class for this component. Error = errs.Class("ranged loop") )
Functions ¶
func CreateUUIDBoundaries ¶
CreateUUIDBoundaries splits up the entire 128-bit UUID range into equal parts.
Types ¶
type Config ¶ added in v1.68.1
type Config struct { Parallelism int `help:"how many chunks of segments to process in parallel" default:"2"` BatchSize int `help:"how many items to query in a batch" default:"2500"` AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"` Interval time.Duration `help:"how often to run the loop" releaseDefault:"2h" devDefault:"10s" testDefault:"10s"` SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"` }
Config contains configurable values for the shared loop.
type LiveCountObserver ¶ added in v1.71.1
type LiveCountObserver struct {
// contains filtered or unexported fields
}
LiveCountObserver reports a count of segments during loop execution. This can be used to report the rate and progress of the loop. TODO we may need better name for this type.
func NewLiveCountObserver ¶ added in v1.71.1
func NewLiveCountObserver(metabase *metabase.DB, suspiciousProcessedRatio float64, asOfSystemInterval time.Duration) *LiveCountObserver
NewLiveCountObserver . To avoid pollution, make sure to only use one instance of this observer. Also make sure to only add it to instances of the loop which are actually doing something.
func (*LiveCountObserver) Finish ¶ added in v1.71.1
func (o *LiveCountObserver) Finish(ctx context.Context) (err error)
Finish gets segments count after range execution and verifies them against processed segments and segments in table before loop execution.
func (*LiveCountObserver) Fork ¶ added in v1.71.1
func (o *LiveCountObserver) Fork(ctx context.Context) (Partial, error)
Fork returns a shared instance so we have a view of all loop ranges.
func (*LiveCountObserver) Join ¶ added in v1.71.1
func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error
Join does nothing because the instance is shared across ranges.
func (*LiveCountObserver) Process ¶ added in v1.71.1
func (o *LiveCountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error
Process increments the counter.
func (*LiveCountObserver) Start ¶ added in v1.71.1
Start resets the count at start of the ranged segment loop and gets statistis about segments table.
func (*LiveCountObserver) Stats ¶ added in v1.71.1
func (o *LiveCountObserver) Stats(cb func(key monkit.SeriesKey, field string, val float64))
Stats implements monkit.StatSource to report the number of segments.
type MetabaseRangeSplitter ¶ added in v1.70.1
type MetabaseRangeSplitter struct {
// contains filtered or unexported fields
}
MetabaseRangeSplitter implements RangeSplitter.
func NewMetabaseRangeSplitter ¶ added in v1.70.1
func NewMetabaseRangeSplitter(db *metabase.DB, asOfSystemInterval time.Duration, batchSize int) *MetabaseRangeSplitter
NewMetabaseRangeSplitter creates the segment provider.
func (*MetabaseRangeSplitter) CreateRanges ¶ added in v1.70.1
func (provider *MetabaseRangeSplitter) CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error)
CreateRanges splits the segment table into chunks.
type MetabaseSegmentProvider ¶ added in v1.70.1
type MetabaseSegmentProvider struct {
// contains filtered or unexported fields
}
MetabaseSegmentProvider implements SegmentProvider.
func (*MetabaseSegmentProvider) Iterate ¶ added in v1.70.1
func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error
Iterate loops over a part of the segment table.
func (*MetabaseSegmentProvider) Range ¶ added in v1.74.1
func (provider *MetabaseSegmentProvider) Range() UUIDRange
Range returns range which is processed by this provider.
type Observer ¶ added in v1.68.1
type Observer interface { // Start is called at the beginning of each segment loop. Start(context.Context, time.Time) error // Fork creates a Partial to process a chunk of all the segments. It is // called after Start. It is not called concurrently. Fork(context.Context) (Partial, error) // Join is called for each partial returned by Fork. This gives the // opportunity to merge the output like in a reduce step. It will be called // before Finish. It is not called concurrently. Join(context.Context, Partial) error // Finish is called after all segments are processed by all observers. Finish(context.Context) error }
Observer subscribes to the parallel segment loop. It is intended that a naïve implementation is threadsafe.
type ObserverDuration ¶ added in v1.70.1
type ObserverDuration struct { Observer Observer // Duration is set to -1 when the observer has errored out // so someone watching metrics can tell that something went wrong. Duration time.Duration }
ObserverDuration reports back on how long it took the observer to process all the segments.
type Partial ¶ added in v1.68.1
type Partial interface { // Process is called repeatedly with batches of segments. // It is not called concurrently on the same instance. Process(context.Context, []segmentloop.Segment) error }
Partial processes a part of the total range of segments.
type RangeSplitter ¶ added in v1.70.1
type RangeSplitter interface {
CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error)
}
RangeSplitter splits a source of segments into ranges, so that multiple segments can be processed concurrently. It usually abstracts over a database. It is a subcomponent of the ranged segment loop.
type SegmentProvider ¶ added in v1.70.1
type SegmentProvider interface { Range() UUIDRange Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error }
SegmentProvider iterates through a range of segments.
type Service ¶ added in v1.68.1
Service iterates through all segments and calls the attached observers for every segment
architecture: Service
func NewService ¶ added in v1.68.1
func NewService(log *zap.Logger, config Config, provider RangeSplitter, observers []Observer) *Service
NewService creates a new instance of the ranged loop service.
type UUIDRange ¶
UUIDRange describes a range of UUID values. Start and End can be open-ended.
func CreateUUIDRanges ¶
CreateUUIDRanges splits up the entire 128-bit UUID range into equal parts.