rangedloop

package
v1.79.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2023 License: AGPL-3.0 Imports: 15 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// Error is a standard error class for this component.
	Error = errs.Class("ranged loop")
)

Functions

func CreateUUIDBoundaries

func CreateUUIDBoundaries(nRanges uint32) ([]uuid.UUID, error)

CreateUUIDBoundaries splits up the entire 128-bit UUID range into equal parts.

func MakeUUIDWithTopBits

func MakeUUIDWithTopBits(topBits uint32) (uuid.UUID, error)

MakeUUIDWithTopBits creates a zeroed UUID with the top 32 bits set from the input. Technically the result is not a UUID since it doesn't have the version and variant bits set.

Types

type Config added in v1.68.1

type Config struct {
	Parallelism        int           `help:"how many chunks of segments to process in parallel" default:"2"`
	BatchSize          int           `help:"how many items to query in a batch" default:"2500"`
	AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
	Interval           time.Duration `help:"how often to run the loop" releaseDefault:"2h" devDefault:"10s" testDefault:"10s"`

	SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"`
}

Config contains configurable values for the shared loop.

type LiveCountObserver added in v1.71.1

type LiveCountObserver struct {
	// contains filtered or unexported fields
}

LiveCountObserver reports a count of segments during loop execution. This can be used to report the rate and progress of the loop. TODO we may need better name for this type.

func NewLiveCountObserver added in v1.71.1

func NewLiveCountObserver(metabase *metabase.DB, suspiciousProcessedRatio float64, asOfSystemInterval time.Duration) *LiveCountObserver

NewLiveCountObserver . To avoid pollution, make sure to only use one instance of this observer. Also make sure to only add it to instances of the loop which are actually doing something.

func (*LiveCountObserver) Finish added in v1.71.1

func (o *LiveCountObserver) Finish(ctx context.Context) (err error)

Finish gets segments count after range execution and verifies them against processed segments and segments in table before loop execution.

func (*LiveCountObserver) Fork added in v1.71.1

func (o *LiveCountObserver) Fork(ctx context.Context) (Partial, error)

Fork returns a shared instance so we have a view of all loop ranges.

func (*LiveCountObserver) Join added in v1.71.1

func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error

Join does nothing because the instance is shared across ranges.

func (*LiveCountObserver) Process added in v1.71.1

func (o *LiveCountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error

Process increments the counter.

func (*LiveCountObserver) Start added in v1.71.1

func (o *LiveCountObserver) Start(ctx context.Context, startTime time.Time) (err error)

Start resets the count at start of the ranged segment loop and gets statistis about segments table.

func (*LiveCountObserver) Stats added in v1.71.1

func (o *LiveCountObserver) Stats(cb func(key monkit.SeriesKey, field string, val float64))

Stats implements monkit.StatSource to report the number of segments.

type MetabaseRangeSplitter added in v1.70.1

type MetabaseRangeSplitter struct {
	// contains filtered or unexported fields
}

MetabaseRangeSplitter implements RangeSplitter.

func NewMetabaseRangeSplitter added in v1.70.1

func NewMetabaseRangeSplitter(db *metabase.DB, asOfSystemInterval time.Duration, batchSize int) *MetabaseRangeSplitter

NewMetabaseRangeSplitter creates the segment provider.

func (*MetabaseRangeSplitter) CreateRanges added in v1.70.1

func (provider *MetabaseRangeSplitter) CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error)

CreateRanges splits the segment table into chunks.

type MetabaseSegmentProvider added in v1.70.1

type MetabaseSegmentProvider struct {
	// contains filtered or unexported fields
}

MetabaseSegmentProvider implements SegmentProvider.

func (*MetabaseSegmentProvider) Iterate added in v1.70.1

func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error

Iterate loops over a part of the segment table.

func (*MetabaseSegmentProvider) Range added in v1.74.1

func (provider *MetabaseSegmentProvider) Range() UUIDRange

Range returns range which is processed by this provider.

type Observer added in v1.68.1

type Observer interface {
	// Start is called at the beginning of each segment loop.
	Start(context.Context, time.Time) error

	// Fork creates a Partial to process a chunk of all the segments. It is
	// called after Start. It is not called concurrently.
	Fork(context.Context) (Partial, error)

	// Join is called for each partial returned by Fork. This gives the
	// opportunity to merge the output like in a reduce step. It will be called
	// before Finish. It is not called concurrently.
	Join(context.Context, Partial) error

	// Finish is called after all segments are processed by all observers.
	Finish(context.Context) error
}

Observer subscribes to the parallel segment loop. It is intended that a naïve implementation is threadsafe.

type ObserverDuration added in v1.70.1

type ObserverDuration struct {
	Observer Observer
	// Duration is set to -1 when the observer has errored out
	// so someone watching metrics can tell that something went wrong.
	Duration time.Duration
}

ObserverDuration reports back on how long it took the observer to process all the segments.

type Partial added in v1.68.1

type Partial interface {
	// Process is called repeatedly with batches of segments.
	// It is not called concurrently on the same instance.
	Process(context.Context, []segmentloop.Segment) error
}

Partial processes a part of the total range of segments.

type RangeSplitter added in v1.70.1

type RangeSplitter interface {
	CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error)
}

RangeSplitter splits a source of segments into ranges, so that multiple segments can be processed concurrently. It usually abstracts over a database. It is a subcomponent of the ranged segment loop.

type SegmentProvider added in v1.70.1

type SegmentProvider interface {
	Range() UUIDRange
	Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error
}

SegmentProvider iterates through a range of segments.

type Service added in v1.68.1

type Service struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Service iterates through all segments and calls the attached observers for every segment

architecture: Service

func NewService added in v1.68.1

func NewService(log *zap.Logger, config Config, provider RangeSplitter, observers []Observer) *Service

NewService creates a new instance of the ranged loop service.

func (*Service) Close added in v1.73.4

func (service *Service) Close() error

Close stops the ranged loop.

func (*Service) Run added in v1.68.1

func (service *Service) Run(ctx context.Context) (err error)

Run starts the looping service.

func (*Service) RunOnce added in v1.68.1

func (service *Service) RunOnce(ctx context.Context) (observerDurations []ObserverDuration, err error)

RunOnce goes through one time and sends information to observers.

type UUIDRange

type UUIDRange struct {
	Start *uuid.UUID
	End   *uuid.UUID
}

UUIDRange describes a range of UUID values. Start and End can be open-ended.

func CreateUUIDRanges

func CreateUUIDRanges(nRanges uint32) ([]UUIDRange, error)

CreateUUIDRanges splits up the entire 128-bit UUID range into equal parts.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL