Documentation ¶
Index ¶
- Variables
- type Config
- type LoopInfo
- type MetabaseDB
- type NullObserver
- type Observer
- type Segment
- type Service
- func (loop *Service) Close() (err error)
- func (loop *Service) Join(ctx context.Context, observer Observer) (err error)
- func (loop *Service) Monitor(ctx context.Context, observer Observer) (err error)
- func (loop *Service) Run(ctx context.Context) (err error)
- func (loop *Service) RunOnce(ctx context.Context) (err error)
- func (loop *Service) Wait()
Constants ¶
This section is empty.
Variables ¶
var ( // Error is a standard error class for this component. Error = errs.Class("segments loop") // ErrClosed is a loop closed error. ErrClosed = Error.New("loop closed") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s" testDefault:"1s"` RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"` ListLimit int `help:"how many items to query in a batch" default:"2500"` AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"` SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"` }
Config contains configurable values for the segments loop.
type MetabaseDB ¶
type MetabaseDB interface { // Now returns the time on the database. Now(ctx context.Context) (time.Time, error) // IterateLoopSegments iterates through all streams passed in as arguments. IterateLoopSegments(ctx context.Context, opts metabase.IterateLoopSegments, fn func(context.Context, metabase.LoopSegmentsIterator) error) (err error) // GetTableStats gathers statistics about the tables. GetTableStats(context.Context, metabase.GetTableStats) (metabase.TableStats, error) }
MetabaseDB contains iterators for the metabase data.
type NullObserver ¶
type NullObserver struct{}
NullObserver is an observer that does nothing. This is useful for joining and ensuring the segments loop runs once before you use a real observer.
func (NullObserver) InlineSegment ¶
func (NullObserver) InlineSegment(context.Context, *Segment) error
InlineSegment implements the Observer interface.
func (NullObserver) LoopStarted ¶
func (NullObserver) LoopStarted(context.Context, LoopInfo) error
LoopStarted is called at each loop start.
func (NullObserver) RemoteSegment ¶
func (NullObserver) RemoteSegment(context.Context, *Segment) error
RemoteSegment implements the Observer interface.
type Observer ¶
type Observer interface { LoopStarted(context.Context, LoopInfo) error RemoteSegment(context.Context, *Segment) error InlineSegment(context.Context, *Segment) error }
Observer is an interface defining an observer that can subscribe to the segments loop.
architecture: Observer
type Segment ¶
type Segment metabase.LoopSegmentEntry
Segment contains information about segment metadata which will be received by observers.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is a segments loop service.
architecture: Service
func New ¶
func New(log *zap.Logger, config Config, metabaseDB MetabaseDB) *Service
New creates a new segments loop service.
func (*Service) Join ¶
Join will join the looper for one full cycle until completion and then returns. Joining will trigger a new iteration after coalesce duration. On ctx cancel the observer will return without completely finishing. Only on full complete iteration it will return nil. Safe to be called concurrently.
func (*Service) Monitor ¶
Monitor will join the looper for one full cycle until completion and then returns. Joining with monitoring won't trigger after coalesce duration. On ctx cancel the observer will return without completely finishing. Only on full complete iteration it will return nil. Safe to be called concurrently.
func (*Service) Run ¶
Run starts the looping service. It can only be called once, otherwise a panic will occur.