collector

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2024 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewCollectors

func NewCollectors(config *config.Config,
	newRecord func() interface{},
	key func(record interface{}) interface{},
	reducer func(key, source interface{}),
	mapper func(accumulator *Accumulator) interface{},
	loader loader2.Loader, metrics *gmetric.Service, count int) ([]*Service, []Collector, error)

Types

type Accumulator

type Accumulator struct {
	Map        map[interface{}]interface{}
	UseFastMap bool
	FastMap    *fmap.FastMap[any]

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewAccumulator

func NewAccumulator(fastMap *FMapPool) *Accumulator

func (*Accumulator) GetOrCreate

func (a *Accumulator) GetOrCreate(key interface{}, get func() interface{}) (interface{}, bool)

func (*Accumulator) Len

func (a *Accumulator) Len() int

func (*Accumulator) Put

func (a *Accumulator) Put(key, value interface{}) interface{}

type BackupError

type BackupError struct {
	Total  int
	Failed int
	// contains filtered or unexported fields
}

func (*BackupError) Error

func (b *BackupError) Error() string

type Batch

type Batch struct {
	ID string
	*tconfig.Stream
	Accumulator *Accumulator
	Started     time.Time

	PendingURL string

	sync.Mutex
	// contains filtered or unexported fields
}

func NewBatch

func NewBatch(stream *tconfig.Stream, disabled bool, fs afs.Service, options ...Option) (*Batch, error)

func (*Batch) HasPendingTransaction

func (b *Batch) HasPendingTransaction() bool

func (*Batch) IsActive

func (b *Batch) IsActive(batch *config.Batch) bool

func (*Batch) IsReadyForFlush

func (b *Batch) IsReadyForFlush(ts *time.Time) bool

func (*Batch) ScheduleAt

func (b *Batch) ScheduleAt(ts *time.Time)

type Collector

type Collector interface {
	Collect(record interface{}) error
	CollectAll(record ...interface{}) error
	ID() string
}

Collector represents a collector

type FMapPool

type FMapPool struct {
	// contains filtered or unexported fields
}

func NewFMapPool

func NewFMapPool(expectedSize int, allocSize int) *FMapPool

NewFMapPool creates a new pool

func (*FMapPool) Get

func (c *FMapPool) Get() *fmap.FastMap[any]

func (*FMapPool) Put

func (c *FMapPool) Put(m *fmap.FastMap[any])

type Option

type Option func(o *Options)

func WithFastMapPool

func WithFastMapPool(pool *FMapPool) Option

func WithInstanceId

func WithInstanceId(instanceId string) Option

func WithStreamURLSymLinkTrg

func WithStreamURLSymLinkTrg(streamURLSymLinkTrg string) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions(options ...Option) *Options

func (*Options) Apply

func (o *Options) Apply(opts ...Option)

func (*Options) GetInstanceId

func (o *Options) GetInstanceId() string

func (*Options) GetStreamURLSymLinkTrg

func (o *Options) GetStreamURLSymLinkTrg() string

type Service

type Service struct {
	*msg.Provider
	// contains filtered or unexported fields
}

func New

func New(config *config.Config,
	newRecord func() interface{},
	key func(record interface{}) interface{},
	reducer func(key, source interface{}),
	mapper func(accumulator *Accumulator) interface{},
	loader loader2.Loader, metrics *gmetric.Service, options ...Option) (*Service, error)

func (*Service) Close

func (s *Service) Close() error

func (*Service) Collect

func (s *Service) Collect(record interface{}) error

func (*Service) CollectAll

func (s *Service) CollectAll(records ...interface{}) error

func (*Service) Flush

func (s *Service) Flush(batch *Batch) error

func (*Service) ID

func (s *Service) ID() string

func (*Service) IsUp

func (s *Service) IsUp() bool

func (*Service) NotifyWatcher

func (s *Service) NotifyWatcher()

func (*Service) RetryFailed

func (s *Service) RetryFailed(ctx context.Context, onStartUp bool) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL