storekit

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidData .
	ErrInvalidData = errors.New("invalid data")
	// ErrIteratorClosed .
	ErrIteratorClosed = errors.New("iterator closed")
	// ErrWriterClosed .
	ErrWriterClosed = errors.New("writer closed")
	// ErrOpNotSupported .
	ErrOpNotSupported = errors.New("operation not supported")
)
View Source
var DefaultLatencyBuckets = []float64{1, 50, 100, 250, 500, 750, 1000, 5000, 10000}

Milliseconds

View Source
var DefaultNopWriter = NopWriter{}

DefaultNopWriter .

View Source
var DefaultStdoutWriter = StdoutWriter{
	Filter: func(val Data) bool { return true },
}

DefaultStdoutWriter .

View Source
var ErrExitConsume = errors.New("exit consume")

ErrExitConsume .

Functions

func BatchConsume

func BatchConsume(ctx context.Context, r BatchReader, w BatchWriter, opts *BatchConsumeOptions) error

BatchConsume .

Types

type BatchConsumeOptions

type BatchConsumeOptions struct {
	BufferSize  int
	ReadTimeout time.Duration
	// ReadErrorHandler return err to exit, return nil to continue
	ReadErrorHandler func(err error) error
	// WriteErrorHandler return err to retry write, return nil to continue
	WriteErrorHandler func(list []Data, err error) error
	// ConfirmErrorHandler return err to exit, return nil to continue
	ConfirmErrorHandler func(err error) error
	Backoff             TimerBackoff
	Statistics          ConsumeStatistics
}

BatchConsumeOptions .

type BatchReader

type BatchReader interface {
	ReadN(buf []Data, timeout time.Duration) (int, error)
	Confirm() error
	Close() error
}

BatchReader .

func NewMockBatchReader

func NewMockBatchReader(ctx context.Context, interval time.Duration, batchSize int, creator func() interface{}) BatchReader

NewMockBatchReader .

type BatchWriter

type BatchWriter interface {
	WriteN(vals ...Data) (int, error)
	Close() error
}

BatchWriter .

type BufferedWriter

type BufferedWriter struct {
	// contains filtered or unexported fields
}

BufferedWriter .

func NewBufferedWriter

func NewBufferedWriter(w BatchWriter, capacity int) *BufferedWriter

NewBufferedWriter .

func (*BufferedWriter) Close

func (b *BufferedWriter) Close() error

Close .

func (*BufferedWriter) Data

func (b *BufferedWriter) Data() []Data

Data .

func (*BufferedWriter) Flush

func (b *BufferedWriter) Flush() error

Flush .

func (*BufferedWriter) Size

func (b *BufferedWriter) Size() int

Size .

func (*BufferedWriter) Write

func (b *BufferedWriter) Write(data Data) error

Write .

func (*BufferedWriter) WriteN

func (b *BufferedWriter) WriteN(data ...Data) (int, error)

WriteN returns the number of buffers written to the data. if a Flush error occurs, the error will be returned

type Comparer

type Comparer interface {
	Compare(a, b Data) int
}

Comparer .

type ConsumeStatistics

type ConsumeStatistics interface {
	ReadError(err error)
	WriteError(data []Data, err error)
	ConfirmError(data []Data, err error)
	Success(data []Data)
	ObserveReadLatency(start time.Time)
	ObserveWriteLatency(start time.Time)
}

ConsumeStatistics .

var NopConsumeStatistics ConsumeStatistics = &nopConsumeStatistics{}

type Counter

type Counter interface {
	Total() (int64, error)
}

type Data

type Data = interface{}

Data .

type EmptyIterator

type EmptyIterator struct{}

EmptyIterator .

func (EmptyIterator) Close

func (it EmptyIterator) Close() error

Close .

func (EmptyIterator) Error

func (it EmptyIterator) Error() error

Error .

func (EmptyIterator) First

func (it EmptyIterator) First() bool

First .

func (EmptyIterator) Last

func (it EmptyIterator) Last() bool

Last .

func (EmptyIterator) Next

func (it EmptyIterator) Next() bool

Next .

func (EmptyIterator) Prev

func (it EmptyIterator) Prev() bool

Prev .

func (EmptyIterator) Total

func (it EmptyIterator) Total() (int64, error)

Total .

func (EmptyIterator) Value

func (it EmptyIterator) Value() interface{}

Value .

type Flusher

type Flusher interface {
	Flush() error
}

Flusher

type InMemoryRateLimiter

type InMemoryRateLimiter struct {
	// contains filtered or unexported fields
}

func NewInMemoryRateLimiter

func NewInMemoryRateLimiter(cfg RateLimitConfig) *InMemoryRateLimiter

func (*InMemoryRateLimiter) ReserveN

func (im *InMemoryRateLimiter) ReserveN(n int) time.Duration

type Int64Comparer

type Int64Comparer struct{}

Int64Comparer .

func (Int64Comparer) Compare

func (c Int64Comparer) Compare(a, b Data) int

Compare .

type Iterator

type Iterator interface {
	First() bool
	Last() bool
	Next() bool
	Prev() bool
	Value() Data
	Error() error
	Close() error
}

Iterator .

func MergedHeadOverlappedIterator

func MergedHeadOverlappedIterator(cmp Comparer, its ...Iterator) Iterator

MergedHeadOverlappedIterator .

func NewListIterator

func NewListIterator(list ...Data) Iterator

NewListIterator .

func OrderedIterator

func OrderedIterator(its ...Iterator) Iterator

OrderedIterator .

type ListIterator

type ListIterator struct {
	// contains filtered or unexported fields
}

ListIterator .

func (*ListIterator) Close

func (it *ListIterator) Close() error

Close .

func (*ListIterator) Error

func (it *ListIterator) Error() error

Error .

func (*ListIterator) First

func (it *ListIterator) First() bool

First .

func (*ListIterator) Last

func (it *ListIterator) Last() bool

Last .

func (*ListIterator) Next

func (it *ListIterator) Next() bool

Next .

func (*ListIterator) Prev

func (it *ListIterator) Prev() bool

Prev .

func (*ListIterator) Total

func (it *ListIterator) Total() (int64, error)

Total .

func (*ListIterator) Value

func (it *ListIterator) Value() Data

Value .

type Matcher

type Matcher interface {
	Match(val Data) bool
}

Matcher .

type MultiplierBackoff

type MultiplierBackoff struct {
	Base     time.Duration
	Max      time.Duration
	Duration time.Duration
	Factor   float64
}

MultiplierBackoff .

func NewMultiplierBackoff

func NewMultiplierBackoff() *MultiplierBackoff

NewMultiplierBackoff

func (*MultiplierBackoff) Reset

func (b *MultiplierBackoff) Reset()

func (*MultiplierBackoff) Wait

func (b *MultiplierBackoff) Wait() <-chan time.Time

type NopWriter

type NopWriter struct{}

NopWriter .

func (NopWriter) Close

func (w NopWriter) Close() error

func (NopWriter) Write

func (w NopWriter) Write(val Data) error

func (NopWriter) WriteN

func (w NopWriter) WriteN(vals ...Data) (int, error)

type RateLimitConfig

type RateLimitConfig struct {
	Duration time.Duration `file:"duration"`
	Limit    int           `file:"limit"`
}

type RateLimiter

type RateLimiter interface {
	// pass n token to limiter, return the duration you must wait
	ReserveN(n int) time.Duration
}

type StdoutWriter

type StdoutWriter struct {
	Filter func(val Data) bool
}

StdoutWriter .

func (StdoutWriter) Close

func (w StdoutWriter) Close() error

func (StdoutWriter) Write

func (w StdoutWriter) Write(val Data) error

func (StdoutWriter) WriteN

func (w StdoutWriter) WriteN(vals ...Data) (int, error)

type TimerBackoff

type TimerBackoff interface {
	Reset()
	Wait() <-chan time.Time
}

TimerBackoff .

type Writer

type Writer interface {
	Write(val Data) error
	Close() error
}

Writer .

func WrapBatchWriter

func WrapBatchWriter(
	bw BatchWriter,
	bufferSize uint64,
	timeout time.Duration,
	errorh func(error) error,
) Writer

WrapBatchWriter .

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL