Documentation ¶
Overview ¶
A common util to export data (trace and metric for now) as batch
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseBatchPipe ¶
func NewBaseBatchPipe ¶
func NewBaseBatchPipe[T HasName, B any](impl PipeImpl[T, B], opts ...BaseBatchPipeOpt) *BaseBatchPipe[T, B]
func (*BaseBatchPipe[T, B]) SendItem ¶
func (bc *BaseBatchPipe[T, B]) SendItem(ctx context.Context, items ...T) error
SendItem returns error when pipe is closed
func (*BaseBatchPipe[T, B]) Start ¶
func (bc *BaseBatchPipe[T, B]) Start(ctx context.Context) bool
Start kicks off the merge workers and batch workers, return false if the pipe is workinghas been called
func (*BaseBatchPipe[T, B]) Stop ¶
func (bc *BaseBatchPipe[T, B]) Stop(graceful bool) (<-chan struct{}, bool)
Stop terminates all workers. If graceful asked, wait until all items are processed, otherwise, quit immediately. Caller can use the returned channel to wait Stop finish
type BaseBatchPipeOpt ¶
type BaseBatchPipeOpt interface {
ApplyTo(*baseBatchPipeOpts)
}
type HasName ¶
type HasName interface {
GetName() string
}
HasName decides which table the owner will go to
type ItemBuffer ¶
type ItemBuffer[T any, B any] interface { Reminder Add(item T) Reset() IsEmpty() bool ShouldFlush() bool // GetBatch use bytes.Buffer to mitigate mem allocation and the returned bytes should own its data GetBatch(ctx context.Context, buf *bytes.Buffer) B }
ItemBuffer Stash items and construct a batch can be stored. for instance, an sql inserting all items into a table
type PipeImpl ¶
type PipeImpl[T any, B any] interface { // NewItemBuffer create a new buffer for one kind of Item NewItemBuffer(name string) ItemBuffer[T, B] // NewItemBatchHandler handle the StoreBatch from an ItemBuffer, for example, execute an insert sql. // this handle may be running on multiple goroutine NewItemBatchHandler(ctx context.Context) func(batch B) }
type PipeWithBatchWorkerNum ¶
type PipeWithBatchWorkerNum int
func (PipeWithBatchWorkerNum) ApplyTo ¶
func (x PipeWithBatchWorkerNum) ApplyTo(o *baseBatchPipeOpts)
type PipeWithBufferWorkerNum ¶
type PipeWithBufferWorkerNum int
func (PipeWithBufferWorkerNum) ApplyTo ¶
func (x PipeWithBufferWorkerNum) ApplyTo(o *baseBatchPipeOpts)
type PipeWithItemNameFormatter ¶
func (PipeWithItemNameFormatter) ApplyTo ¶
func (x PipeWithItemNameFormatter) ApplyTo(o *baseBatchPipeOpts)
type Reminder ¶
type Reminder interface { RemindNextAfter() time.Duration RemindBackOff() RemindBackOffCnt() int RemindReset() }
Reminder to force flush ItemBuffer
func NewBackOffClock ¶
func NewConstantClock ¶
Click to show internal directories.
Click to hide internal directories.