segmentloop

package
v1.61.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2022 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// Error is a standard error class for this component.
	Error = errs.Class("segments loop")
	// ErrClosed is a loop closed error.
	ErrClosed = Error.New("loop closed")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s" testDefault:"1s"`
	RateLimit        float64       `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"`
	ListLimit        int           `help:"how many items to query in a batch" default:"2500"`

	AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`

	SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"`
}

Config contains configurable values for the segments loop.

type LoopInfo

type LoopInfo struct {
	Started time.Time
}

LoopInfo contains information about the current loop.

type MetabaseDB

type MetabaseDB interface {
	// Now returns the time on the database.
	Now(ctx context.Context) (time.Time, error)
	// IterateLoopStreams iterates through all streams passed in as arguments.
	IterateLoopSegments(ctx context.Context, opts metabase.IterateLoopSegments, fn func(context.Context, metabase.LoopSegmentsIterator) error) (err error)

	// GetTableStats gathers statistics about the tables.
	GetTableStats(context.Context, metabase.GetTableStats) (metabase.TableStats, error)
}

MetabaseDB contains iterators for the metabase data.

type NullObserver

type NullObserver struct{}

NullObserver is an observer that does nothing. This is useful for joining and ensuring the segments loop runs once before you use a real observer.

func (NullObserver) InlineSegment

func (NullObserver) InlineSegment(context.Context, *Segment) error

InlineSegment implements the Observer interface.

func (NullObserver) LoopStarted

func (NullObserver) LoopStarted(context.Context, LoopInfo) error

LoopStarted is called at each loop start.

func (NullObserver) RemoteSegment

func (NullObserver) RemoteSegment(context.Context, *Segment) error

RemoteSegment implements the Observer interface.

type Observer

type Observer interface {
	LoopStarted(context.Context, LoopInfo) error
	RemoteSegment(context.Context, *Segment) error
	InlineSegment(context.Context, *Segment) error
}

Observer is an interface defining an observer that can subscribe to the segments loop.

architecture: Observer

type Segment

type Segment metabase.LoopSegmentEntry

Segment contains information about segment metadata which will be received by observers.

func (*Segment) Expired added in v1.32.2

func (s *Segment) Expired(now time.Time) bool

Expired checks if segment expired relative to now.

func (Segment) Inline

func (s Segment) Inline() bool

Inline returns true if segment is inline.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is a segments loop service.

architecture: Service

func New

func New(log *zap.Logger, config Config, metabaseDB MetabaseDB) *Service

New creates a new segments loop service.

func (*Service) Close

func (loop *Service) Close() (err error)

Close closes the looping services.

func (*Service) Join

func (loop *Service) Join(ctx context.Context, observer Observer) (err error)

Join will join the looper for one full cycle until completion and then returns. Joining will trigger a new iteration after coalesce duration. On ctx cancel the observer will return without completely finishing. Only on full complete iteration it will return nil. Safe to be called concurrently.

func (*Service) Monitor

func (loop *Service) Monitor(ctx context.Context, observer Observer) (err error)

Monitor will join the looper for one full cycle until completion and then returns. Joining with monitoring won't trigger after coalesce duration. On ctx cancel the observer will return without completely finishing. Only on full complete iteration it will return nil. Safe to be called concurrently.

func (*Service) Run

func (loop *Service) Run(ctx context.Context) (err error)

Run starts the looping service. It can only be called once, otherwise a panic will occur.

func (*Service) RunOnce

func (loop *Service) RunOnce(ctx context.Context) (err error)

RunOnce goes through segments one time and sends information to observers.

It is not safe to call this concurrently with Run.

func (*Service) Wait

func (loop *Service) Wait()

Wait waits for run to be finished. Safe to be called concurrently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL