metaloop

package
v1.33.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2021 License: AGPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// Error is a standard error class for this component.
	Error = errs.Class("metainfo loop")
	// ErrClosed is a loop closed error.
	ErrClosed = Error.New("loop closed")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s" testDefault:"1s"`
	RateLimit        float64       `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"`
	ListLimit        int           `help:"how many items to query in a batch" default:"2500" testDefault:"10000"`

	AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`

	SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"`
}

Config contains configurable values for the metainfo loop.

type LoopInfo

type LoopInfo struct {
	Started time.Time
}

LoopInfo contains information about the current loop.

type MetabaseDB

type MetabaseDB interface {
	// Now returns the time on the database.
	Now(ctx context.Context) (time.Time, error)
	// IterateLoopObjects iterates through all objects in metabase for metainfo loop purpose.
	IterateLoopObjects(ctx context.Context, opts metabase.IterateLoopObjects, fn func(context.Context, metabase.LoopObjectsIterator) error) (err error)
	// IterateLoopStreams iterates through all streams passed in as arguments.
	IterateLoopStreams(ctx context.Context, opts metabase.IterateLoopStreams, handleStream func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error) (err error)

	// GetTableStats gathers statistics about the tables.
	GetTableStats(context.Context, metabase.GetTableStats) (metabase.TableStats, error)
}

MetabaseDB contains iterators for the metabase data.

type NullObserver

type NullObserver struct{}

NullObserver is an observer that does nothing. This is useful for joining and ensuring the metainfo loop runs once before you use a real observer.

func (NullObserver) InlineSegment

func (NullObserver) InlineSegment(context.Context, *Segment) error

InlineSegment implements the Observer interface.

func (NullObserver) LoopStarted

func (NullObserver) LoopStarted(context.Context, LoopInfo) error

LoopStarted is called at each loop start.

func (NullObserver) Object

Object implements the Observer interface.

func (NullObserver) RemoteSegment

func (NullObserver) RemoteSegment(context.Context, *Segment) error

RemoteSegment implements the Observer interface.

type Object

Object is the object info passed to Observer by metainfo loop.

func (*Object) Expired

func (object *Object) Expired(now time.Time) bool

Expired checks if object expired relative to now.

type Observer

type Observer interface {
	Object(context.Context, *Object) error
	RemoteSegment(context.Context, *Segment) error
	InlineSegment(context.Context, *Segment) error
	LoopStarted(context.Context, LoopInfo) error
}

Observer is an interface defining an observer that can subscribe to the metainfo loop.

architecture: Observer

type Segment

type Segment struct {
	Location       metabase.SegmentLocation // tally, repair, graceful exit, audit
	ExpirationDate time.Time                // tally, repair

	metabase.LoopSegmentEntry
}

Segment is the segment info passed to Observer by metainfo loop.

func (*Segment) Expired

func (segment *Segment) Expired(now time.Time) bool

Expired checks if segment is expired relative to now.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is a metainfo loop service.

architecture: Service

func New

func New(config Config, metabaseDB MetabaseDB) *Service

New creates a new metainfo loop service.

func (*Service) Close

func (loop *Service) Close() (err error)

Close closes the looping services.

func (*Service) Join

func (loop *Service) Join(ctx context.Context, observer Observer) (err error)

Join will join the looper for one full cycle until completion and then returns. Joining will trigger a new iteration after coalesce duration. On ctx cancel the observer will return without completely finishing. Only on full complete iteration it will return nil. Safe to be called concurrently.

func (*Service) Monitor added in v1.29.1

func (loop *Service) Monitor(ctx context.Context, observer Observer) (err error)

Monitor will join the looper for one full cycle until completion and then returns. Joining with monitoring won't trigger after coalesce duration. On ctx cancel the observer will return without completely finishing. Only on full complete iteration it will return nil. Safe to be called concurrently.

func (*Service) Run

func (loop *Service) Run(ctx context.Context) (err error)

Run starts the looping service. It can only be called once, otherwise a panic will occur.

func (*Service) RunOnce

func (loop *Service) RunOnce(ctx context.Context) (err error)

RunOnce goes through metainfo one time and sends information to observers.

It is not safe to call this concurrently with Run.

func (*Service) Wait

func (loop *Service) Wait()

Wait waits for run to be finished. Safe to be called concurrently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL