Documentation ¶
Index ¶
- Variables
- func BatchConsume(ctx context.Context, r BatchReader, w BatchWriter, opts *BatchConsumeOptions) error
- type BatchConsumeOptions
- type BatchReader
- type BatchWriter
- type BufferedWriter
- type Comparer
- type ConsumeStatistics
- type Counter
- type Data
- type EmptyIterator
- func (it EmptyIterator) Close() error
- func (it EmptyIterator) Error() error
- func (it EmptyIterator) First() bool
- func (it EmptyIterator) Last() bool
- func (it EmptyIterator) Next() bool
- func (it EmptyIterator) Prev() bool
- func (it EmptyIterator) Total() (int64, error)
- func (it EmptyIterator) Value() interface{}
- type Flusher
- type InMemoryRateLimiter
- type Int64Comparer
- type Iterator
- type ListIterator
- func (it *ListIterator) Close() error
- func (it *ListIterator) Error() error
- func (it *ListIterator) First() bool
- func (it *ListIterator) Last() bool
- func (it *ListIterator) Next() bool
- func (it *ListIterator) Prev() bool
- func (it *ListIterator) Total() (int64, error)
- func (it *ListIterator) Value() Data
- type Matcher
- type MultiplierBackoff
- type NopWriter
- type RateLimitConfig
- type RateLimiter
- type StdoutWriter
- type TimerBackoff
- type Writer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrInvalidData . ErrInvalidData = errors.New("invalid data") // ErrIteratorClosed . ErrIteratorClosed = errors.New("iterator closed") // ErrWriterClosed . ErrWriterClosed = errors.New("writer closed") // ErrOpNotSupported . ErrOpNotSupported = errors.New("operation not supported") )
View Source
var DefaultLatencyBuckets = []float64{1, 50, 100, 250, 500, 750, 1000, 5000, 10000}
Milliseconds
View Source
var DefaultNopWriter = NopWriter{}
DefaultNopWriter .
View Source
var DefaultStdoutWriter = StdoutWriter{ Filter: func(val Data) bool { return true }, }
DefaultStdoutWriter .
View Source
var ErrExitConsume = errors.New("exit consume")
ErrExitConsume .
Functions ¶
func BatchConsume ¶
func BatchConsume(ctx context.Context, r BatchReader, w BatchWriter, opts *BatchConsumeOptions) error
BatchConsume .
Types ¶
type BatchConsumeOptions ¶
type BatchConsumeOptions struct { BufferSize int ReadTimeout time.Duration // ReadErrorHandler return err to exit, return nil to continue ReadErrorHandler func(err error) error // WriteErrorHandler return err to retry write, return nil to continue WriteErrorHandler func(list []Data, err error) error // ConfirmErrorHandler return err to exit, return nil to continue ConfirmErrorHandler func(err error) error Backoff TimerBackoff Statistics ConsumeStatistics }
BatchConsumeOptions .
type BatchReader ¶
type BatchReader interface { ReadN(buf []Data, timeout time.Duration) (int, error) Confirm() error Close() error }
BatchReader .
func NewMockBatchReader ¶
func NewMockBatchReader(ctx context.Context, interval time.Duration, batchSize int, creator func() interface{}) BatchReader
NewMockBatchReader .
type BatchWriter ¶
BatchWriter .
type BufferedWriter ¶
type BufferedWriter struct {
// contains filtered or unexported fields
}
BufferedWriter .
func NewBufferedWriter ¶
func NewBufferedWriter(w BatchWriter, capacity int) *BufferedWriter
NewBufferedWriter .
type ConsumeStatistics ¶
type ConsumeStatistics interface { ReadError(err error) WriteError(data []Data, err error) ConfirmError(data []Data, err error) Success(data []Data) ObserveReadLatency(start time.Time) ObserveWriteLatency(start time.Time) }
ConsumeStatistics .
var NopConsumeStatistics ConsumeStatistics = &nopConsumeStatistics{}
type InMemoryRateLimiter ¶
type InMemoryRateLimiter struct {
// contains filtered or unexported fields
}
func NewInMemoryRateLimiter ¶
func NewInMemoryRateLimiter(cfg RateLimitConfig) *InMemoryRateLimiter
type Iterator ¶
type Iterator interface { First() bool Last() bool Next() bool Prev() bool Value() Data Error() error Close() error }
Iterator .
func MergedHeadOverlappedIterator ¶
MergedHeadOverlappedIterator .
type ListIterator ¶
type ListIterator struct {
// contains filtered or unexported fields
}
ListIterator .
type MultiplierBackoff ¶
type MultiplierBackoff struct { Base time.Duration Max time.Duration Duration time.Duration Factor float64 }
MultiplierBackoff .
func (*MultiplierBackoff) Reset ¶
func (b *MultiplierBackoff) Reset()
func (*MultiplierBackoff) Wait ¶
func (b *MultiplierBackoff) Wait() <-chan time.Time
type RateLimitConfig ¶
type RateLimiter ¶
type StdoutWriter ¶
StdoutWriter .
func (StdoutWriter) Close ¶
func (w StdoutWriter) Close() error
func (StdoutWriter) Write ¶
func (w StdoutWriter) Write(val Data) error
Source Files ¶
Click to show internal directories.
Click to hide internal directories.