Versions in this module Expand all Collapse all v0 v0.1.1 Oct 8, 2024 v0.1.0 Oct 8, 2024 Changes in this version + func NewCollectors(config *config.Config, newRecord func() interface{}, ...) ([]*Service, []Collector, error) + type Accumulator struct + FastMap *fmap.FastMap[any] + Map map[interface{}]interface{} + UseFastMap bool + func NewAccumulator(fastMap *FMapPool) *Accumulator + func (a *Accumulator) GetOrCreate(key interface{}, get func() interface{}) (interface{}, bool) + func (a *Accumulator) Len() int + func (a *Accumulator) Put(key, value interface{}) interface{} + type BackupError struct + Failed int + Total int + func (b *BackupError) Error() string + type Batch struct + Accumulator *Accumulator + ID string + PendingURL string + Started time.Time + func NewBatch(stream *tconfig.Stream, disabled bool, fs afs.Service, options ...Option) (*Batch, error) + func (b *Batch) HasPendingTransaction() bool + func (b *Batch) IsActive(batch *config.Batch) bool + func (b *Batch) IsReadyForFlush(ts *time.Time) bool + func (b *Batch) ScheduleAt(ts *time.Time) + type Collector interface + Collect func(record interface{}) error + CollectAll func(record ...interface{}) error + ID func() string + type FMapPool struct + func NewFMapPool(expectedSize int, allocSize int) *FMapPool + func (c *FMapPool) Get() *fmap.FastMap[any] + func (c *FMapPool) Put(m *fmap.FastMap[any]) + type Option func(o *Options) + func WithFastMapPool(pool *FMapPool) Option + func WithInstanceId(instanceId string) Option + func WithStreamURLSymLinkTrg(streamURLSymLinkTrg string) Option + type Options struct + func NewOptions(options ...Option) *Options + func (o *Options) Apply(opts ...Option) + func (o *Options) GetInstanceId() string + func (o *Options) GetStreamURLSymLinkTrg() string + type Service struct + func New(config *config.Config, newRecord func() interface{}, ...) (*Service, error) + func (s *Service) Close() error + func (s *Service) Collect(record interface{}) error + func (s *Service) CollectAll(records ...interface{}) error + func (s *Service) Flush(batch *Batch) error + func (s *Service) ID() string + func (s *Service) IsUp() bool + func (s *Service) NotifyWatcher() + func (s *Service) RetryFailed(ctx context.Context, onStartUp bool) error