rangedloop

package
v1.71.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2023 License: AGPL-3.0 Imports: 14 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateUUIDBoundaries

func CreateUUIDBoundaries(nRanges uint32) ([]uuid.UUID, error)

CreateUUIDBoundaries splits up the entire 128-bit UUID range into equal parts.

func MakeUUIDWithTopBits

func MakeUUIDWithTopBits(topBits uint32) (uuid.UUID, error)

MakeUUIDWithTopBits creates a zeroed UUID with the top 32 bits set from the input. Technically the result is not a UUID since it doesn't have the version and variant bits set.

Types

type Config added in v1.68.1

type Config struct {
	Parallelism        int           `help:"how many chunks of segments to process in parallel" default:"2"`
	BatchSize          int           `help:"how many items to query in a batch" default:"2500"`
	AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
	Interval           time.Duration `help:"how often to run the loop" releaseDefault:"2h" devDefault:"10s" testDefault:"10s"`
}

Config contains configurable values for the shared loop.

type LiveCountObserver added in v1.71.1

type LiveCountObserver struct {
	// contains filtered or unexported fields
}

LiveCountObserver reports a count of segments during loop execution. This can be used to report the rate and progress of the loop.

func NewLiveCountObserver added in v1.71.1

func NewLiveCountObserver() *LiveCountObserver

NewLiveCountObserver . To avoid pollution, make sure to only use one instance of this observer. Also make sure to only add it to instances of the loop which are actually doing something.

func (*LiveCountObserver) Finish added in v1.71.1

func (o *LiveCountObserver) Finish(ctx context.Context) error

Finish does nothing at loop end.

func (*LiveCountObserver) Fork added in v1.71.1

func (o *LiveCountObserver) Fork(ctx context.Context) (Partial, error)

Fork returns a shared instance so we have a view of all loop ranges.

func (*LiveCountObserver) Join added in v1.71.1

func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error

Join does nothing because the instance is shared across ranges.

func (*LiveCountObserver) Process added in v1.71.1

func (o *LiveCountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error

Process increments the counter.

func (*LiveCountObserver) Start added in v1.71.1

Start resets the count at start of the ranged segment loop.

func (*LiveCountObserver) Stats added in v1.71.1

func (o *LiveCountObserver) Stats(cb func(key monkit.SeriesKey, field string, val float64))

Stats implements monkit.StatSource to report the number of segments.

type MetabaseRangeSplitter added in v1.70.1

type MetabaseRangeSplitter struct {
	// contains filtered or unexported fields
}

MetabaseRangeSplitter implements RangeSplitter.

func NewMetabaseRangeSplitter added in v1.70.1

func NewMetabaseRangeSplitter(db *metabase.DB, asOfSystemInterval time.Duration, batchSize int) MetabaseRangeSplitter

NewMetabaseRangeSplitter creates the segment provider.

func (*MetabaseRangeSplitter) CreateRanges added in v1.70.1

func (provider *MetabaseRangeSplitter) CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error)

CreateRanges splits the segment table into chunks.

type MetabaseSegmentProvider added in v1.70.1

type MetabaseSegmentProvider struct {
	// contains filtered or unexported fields
}

MetabaseSegmentProvider implements SegmentProvider.

func (*MetabaseSegmentProvider) Iterate added in v1.70.1

func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error

Iterate loops over a part of the segment table.

type Observer added in v1.68.1

type Observer interface {
	// Start is called at the beginning of each segment loop.
	Start(context.Context, time.Time) error

	// Fork creates a Partial to process a chunk of all the segments. It is
	// called after Start. It is not called concurrently.
	Fork(context.Context) (Partial, error)

	// Join is called for each partial returned by Fork. This gives the
	// opportunity to merge the output like in a reduce step. It will be called
	// before Finish. It is not called concurrently.
	Join(context.Context, Partial) error

	// Finish is called after all segments are processed by all observers.
	Finish(context.Context) error
}

Observer subscribes to the parallel segment loop. It is intended that a naïve implementation is threadsafe.

type ObserverDuration added in v1.70.1

type ObserverDuration struct {
	Observer Observer
	// Duration is set to -1 when the observer has errored out
	// so someone watching metrics can tell that something went wrong.
	Duration time.Duration
}

ObserverDuration reports back on how long it took the observer to process all the segments.

type Partial added in v1.68.1

type Partial interface {
	// Process is called repeatedly with batches of segments.
	// It is not called concurrently on the same instance.
	Process(context.Context, []segmentloop.Segment) error
}

Partial processes a part of the total range of segments.

type RangeSplitter added in v1.70.1

type RangeSplitter interface {
	CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error)
}

RangeSplitter splits a source of segments into ranges, so that multiple segments can be processed concurrently. It usually abstracts over a database. It is a subcomponent of the ranged segment loop.

type SegmentProvider added in v1.70.1

type SegmentProvider interface {
	Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error
}

SegmentProvider iterates through a range of segments.

type Service added in v1.68.1

type Service struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Service iterates through all segments and calls the attached observers for every segment

architecture: Service

func NewService added in v1.68.1

func NewService(log *zap.Logger, config Config, provider RangeSplitter, observers []Observer) *Service

NewService creates a new instance of the ranged loop service.

func (*Service) Run added in v1.68.1

func (service *Service) Run(ctx context.Context) (err error)

Run starts the looping service.

func (*Service) RunOnce added in v1.68.1

func (service *Service) RunOnce(ctx context.Context) (observerDurations []ObserverDuration, err error)

RunOnce goes through one time and sends information to observers.

type UUIDRange

type UUIDRange struct {
	Start *uuid.UUID
	End   *uuid.UUID
}

UUIDRange describes a range of UUID values. Start and End can be open-ended.

func CreateUUIDRanges

func CreateUUIDRanges(nRanges uint32) ([]UUIDRange, error)

CreateUUIDRanges splits up the entire 128-bit UUID range into equal parts.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL