mb

package module
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2022 License: BSD-2-Clause Imports: 2 Imported by: 0

README

Message batching queue

This package very useful for organizing batch messages.
Can help you create batch inserts to a database for example. Thread safe and well tested.

// create new queue object
batch := mb.New((any)(nil), 0)

// add new message to the queue
batch.Add(msg)

// wait until anybody add message/messages
// will return the slice of all queued messages. ([]T)
messages := batch.Wait()

// wait until count of messages will be more than 10
// if we have more than 100 messages, will be returned only 100
messages := batch.WaitMinMax(10, 100)

// when we have 0 messages returned that means the queue is closed.
if len(messages) == 0 {
	return
}

// close queue
// if the queue has messages all receivers will get remaining data.
batch.Close()
Docs

https://godoc.org/github.com/cheggaaa/mb/v2

Installation

go get -u github.com/cheggaaa/mb/v2

Example
package main

import (
	"fmt"
	"time"

	"github.com/cheggaaa/mb/v2"
)

func main() {
	// create the queue with 10 items capacity
	q := mb.New("", 10)

	// create the channel for showing when all work will be done
	done := make(chan bool)

	// start two workers
	go worker("first", q, done)
	go worker("second", q, done)

	// start two publishers
	go publisher("first", q)
	go publisher("second", q)

	// give time to work
	time.Sleep(time.Second)

	// close the queue
	q.Close()

	// and wait until all sent messages will be processed
	for i := 0; i < 2; i++ {
		<-done
	}
}

func publisher(name string, q *mb.MB[string]) {
	fmt.Printf("Publisher %s: started\n", name)
	var i int
	for {
		// will sending name and counter
		msg := fmt.Sprintf("%s - %d", name, i)
		// add
		if err := q.Add(msg); err != nil {
			// non-nil err mean that queue is closed
			break
		}
		// 10 messages per second
		time.Sleep(time.Second / 10)
		i++
	}
	fmt.Printf("Publisher %s: closed\n", name)
}

func worker(name string, q *mb.MB[string], done chan bool) {
	fmt.Printf("Worker %s: started\n", name)
	for {
		// getting messages
		msgs := q.Wait()

		if len(msgs) == 0 {
			// 0 messages mean that queue is closed
			break
		}

		msgsForPrint := ""
		for _, msg := range msgs {
			msgsForPrint += fmt.Sprintf("\t%s\n", msg)
		}
		fmt.Printf("Worker %s: %d messages received\n%s", name, len(msgs), msgsForPrint)

		// doing working, for example, send messages to remote server
		time.Sleep(time.Second / 3)
	}
	fmt.Printf("Worker %s: closed\n", name)
	done <- true
}

Documentation

Overview

Package mb - queue with message batching feature

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("mb: MB closed")

ErrClosed is returned when you add message to closed queue

View Source
var ErrOverflowed = errors.New("mb: overflowed")

ErrOverflowed means new messages can't be added until there is free space in the queue

View Source
var ErrTooManyMessages = errors.New("mb: too many messages")

ErrTooManyMessages means that adding more messages (at one call) than the limit

Functions

This section is empty.

Types

type MB

type MB[T any] struct {
	// contains filtered or unexported fields
}

MB - message batching object Implements queue. Based on condition variables

func New

func New[T any](elType T, size int) *MB[T]

New returns a new MB with given queue size. size <= 0 means unlimited

func (*MB[T]) Add

func (mb *MB[T]) Add(msgs ...T) (err error)

Add - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - wait until will free place

func (*MB[T]) Close

func (mb *MB[T]) Close() (err error)

Close closes the queue All added messages will be available for Wait When queue paused messages do not be released for Wait (use GetAll for fetching them)

func (*MB[T]) GetAll

func (mb *MB[T]) GetAll() (msgs []T)

GetAll return all messages and flush queue Works on closed queue

func (*MB[T]) Len

func (mb *MB[T]) Len() (l int)

Len returning current size of queue

func (*MB[T]) Pause

func (mb *MB[T]) Pause()

Pause lock all "Wait" routines until call Resume

func (*MB[T]) Resume

func (mb *MB[T]) Resume()

Resume release all "Wait" routines

func (*MB[T]) Stats

func (mb *MB[T]) Stats() (addCount, addMsgsCount, getCount, getMsgsCount int64)

Stats returning current statistic of queue usage addCount - count of calls Add addMsgsCount - count of added messages getCount - count of calls Wait getMsgsCount - count of issued messages

func (*MB[T]) TryAdd

func (mb *MB[T]) TryAdd(msgs ...T) (err error)

TryAdd - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - returning ErrOverflowed

func (*MB[T]) Wait

func (mb *MB[T]) Wait() (msgs []T)

Wait until anybody add message Returning array of accumulated messages When queue will be closed length of array will be 0

func (*MB[T]) WaitMax

func (mb *MB[T]) WaitMax(max int) (msgs []T)

WaitMax it's Wait with limit of maximum returning array size

func (*MB[T]) WaitMin

func (mb *MB[T]) WaitMin(min int) (msgs []T)

WaitMin it's Wait with limit of minimum returning array size

func (*MB[T]) WaitMinMax

func (mb *MB[T]) WaitMinMax(min, max int) (msgs []T)

WaitMinMax it's Wait with limit of minimum and maximum returning array size value < 0 means no limit

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL