Documentation ¶
Overview ¶
Package mb - queue with message batching feature
Example ¶
var ctx = context.Background() // bufSize - whole capacity of batcher var bufSize = 100 // create the new batcher batcher := mb.New[Item](bufSize) // start goroutine that will wait items // it can be a lot of the wait goroutines var done = make(chan struct{}) go func() { defer close(done) for { // wait items items, err := batcher.Wait(context.Background()) if err != nil { fmt.Printf("waiter received error: %v; stop goroutine\n", err) return } // insert batch to db // while this func works, the batcher collects new item BatchInsert(items) } }() // add two items to batcher batcher.Add(ctx, Item{Id: 1}, Item{Id: 2}) time.Sleep(time.Millisecond) // add more items to batcher for i := 0; i < 10; i++ { // it's safe to call Add from other goroutines batcher.Add(ctx, Item{Id: i + 3}) } // close batcher batcher.Close() // and wait until inserter exits <-done
Output: inserted 2 items inserted 10 items waiter received error: mb: MB closed; stop goroutine
Example (WithTimeLimit) ¶
var ctx = context.Background() // bufSize - whole capacity of batcher var bufSize = 100 // create the new batcher batcher := mb.New[Item](bufSize) // start goroutine that will wait items // it can be a lot of the wait goroutines var done = make(chan struct{}) go func() { defer close(done) ctxWithTimeLimit := mb.CtxWithTimeLimit(ctx, time.Millisecond*200) cond := batcher.NewCond().WithMin(10).WithMax(15) for { // get at least 10 items or after 200 ms get at least 1 item items, err := cond.Wait(ctxWithTimeLimit) if err != nil { fmt.Printf("waiter received error: %v; stop goroutine\n", err) return } // insert batch to db // while this func works, the batcher collects new item BatchInsert(items) } }() // add two items to batcher batcher.Add(ctx, Item{Id: 1}, Item{Id: 2}) time.Sleep(time.Millisecond * 300) // add more items to batcher for i := 0; i < 20; i++ { // it's safe to call Add from other goroutines batcher.Add(ctx, Item{Id: i + 3}) } time.Sleep(time.Second) // close batcher batcher.Close() // and wait until inserter exits <-done
Output: inserted 2 items inserted 15 items inserted 5 items waiter received error: mb: MB closed; stop goroutine
Index ¶
- Variables
- func CtxWithTimeLimit(ctx context.Context, timeLimit time.Duration) context.Context
- type MB
- func (mb *MB[T]) Add(ctx context.Context, msgs ...T) (err error)
- func (mb *MB[T]) Close() (err error)
- func (mb *MB[T]) GetAll() (msgs []T)
- func (mb *MB[T]) Len() (l int)
- func (mb *MB[T]) NewCond() WaitCond[T]
- func (mb *MB[T]) Pause()
- func (mb *MB[T]) Resume()
- func (mb *MB[T]) Stats() (addCount, addMsgsCount, getCount, getMsgsCount int64)
- func (mb *MB[T]) TryAdd(msgs ...T) (err error)
- func (mb *MB[T]) Wait(ctx context.Context) (msgs []T, err error)
- func (mb *MB[T]) WaitCond(ctx context.Context, cond WaitCond[T]) (msgs []T, err error)
- func (mb *MB[T]) WaitOne(ctx context.Context) (msg T, err error)
- type WaitCond
- func (wc WaitCond[T]) Wait(ctx context.Context) (msgs []T, err error)
- func (wc WaitCond[T]) WaitOne(ctx context.Context) (msg T, err error)
- func (wc WaitCond[T]) WithFilter(f func(v T) bool) WaitCond[T]
- func (wc WaitCond[T]) WithMax(max int) WaitCond[T]
- func (wc WaitCond[T]) WithMin(min int) WaitCond[T]
- func (wc WaitCond[T]) WithPriority(priority float64) WaitCond[T]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("mb: MB closed")
ErrClosed is returned when you add message to closed queue
var ErrOverflowed = errors.New("mb: overflowed")
ErrOverflowed means new messages can't be added until there is free space in the queue
var ErrTooManyMessages = errors.New("mb: too many messages")
ErrTooManyMessages means that adding more messages (at one call) than the limit
Functions ¶
func CtxWithTimeLimit ¶
CtxWithTimeLimit makes child context with given timeLimit duration This context can be passed to all Wait methods. When a given min param can't be achieved within a time limit then a min param will be reseted
Types ¶
type MB ¶
type MB[T any] struct { // contains filtered or unexported fields }
MB - message batching object
func (*MB[T]) Add ¶
Add - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - wait until will free place
func (*MB[T]) Close ¶
Close closes the queue All added messages will be available for active Wait When queue is paused, messages do not be released for Wait (use GetAll for fetching them)
func (*MB[T]) GetAll ¶
func (mb *MB[T]) GetAll() (msgs []T)
GetAll return all messages and flush queue Works on closed queue
func (*MB[T]) Stats ¶
Stats returning current statistic of queue usage addCount - count of calls Add addMsgsCount - count of added messages getCount - count of calls Wait getMsgsCount - count of issued messages
func (*MB[T]) TryAdd ¶
TryAdd - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - returning ErrOverflowed
type WaitCond ¶
type WaitCond[T any] struct { Priority float64 Min int Max int Filter func(v T) bool // contains filtered or unexported fields }
WaitCond describes condition for messages
func (WaitCond[T]) WithFilter ¶
WithFilter adds filter to conditions filter function should return true for acceptable message and false for unacceptable
func (WaitCond[T]) WithPriority ¶
WithPriority adds priority to conditions