syncutils

package module
v0.0.0-...-6c27427 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2021 License: MIT Imports: 3 Imported by: 7

README

go-sync-utils

GoDoc Go Report Card Build Status codecov

The library that provides utility entities for syncing.

Features

  • utility entities for the sync.WaitGroup type:
    • interface of the sync.WaitGroup type;
    • operating with a set of such interfaces as a whole;
  • sending to a channel without blocking even if the channel is busy;
  • wrapper for an abstract handler allowed to call it concurrently:
    • starting:
      • start in the caller goroutine;
      • start in a goroutine pool;
    • stopping:
      • can be called after both kinds of the starting;
      • blocks the execution flow until the stopping will be completed.

Installation

Prepare the directory:

$ mkdir --parents "$(go env GOPATH)/src/github.com/thewizardplusplus/"
$ cd "$(go env GOPATH)/src/github.com/thewizardplusplus/"

Clone this repository:

$ git clone https://github.com/thewizardplusplus/go-sync-utils.git
$ cd go-sync-utils

Install dependencies with the dep tool:

$ dep ensure -vendor-only

Examples

syncutils.MultiWaitGroup:

package main

import (
	"fmt"
	"math/rand"
	"strings"
	"sync"
	"time"

	syncutils "github.com/thewizardplusplus/go-sync-utils"
)

type Call struct {
	Method    string
	Arguments []interface{}
}

func (call Call) String() string {
	arguments := strings.Trim(fmt.Sprint(call.Arguments), "[]")
	return fmt.Sprintf("%s(%s)", call.Method, arguments)
}

type MockWaitGroup struct {
	sync.Mutex

	Calls []Call
}

func (mock *MockWaitGroup) Add(delta int) {
	mock.Lock()
	defer mock.Unlock()

	mock.Calls = append(mock.Calls, Call{"Add", []interface{}{delta}})
}

func (mock *MockWaitGroup) Done() {
	mock.Lock()
	defer mock.Unlock()

	mock.Calls = append(mock.Calls, Call{"Done", []interface{}{}})
}

func (mock *MockWaitGroup) Wait() {
	mock.Lock()
	defer mock.Unlock()

	mock.Calls = append(mock.Calls, Call{"Wait", []interface{}{}})
}

func main() {
	waitGroupMock := new(MockWaitGroup)
	waitGroups := syncutils.MultiWaitGroup{waitGroupMock, new(sync.WaitGroup)}
	for _, duration := range []time.Duration{
		time.Duration(rand.Intn(100)) * time.Millisecond,
		time.Duration(rand.Intn(100)) * time.Millisecond,
	} {
		waitGroups.Add(1)

		go func(duration time.Duration) {
			defer waitGroups.Done()

			time.Sleep(duration)
		}(duration)
	}

	waitGroups.Wait()

	for _, call := range waitGroupMock.Calls {
		fmt.Println(call)
	}

	// Unordered output:
	// Add(1)
	// Add(1)
	// Done()
	// Done()
	// Wait()
}

syncutils.UnboundedSend:

package main

import (
	"fmt"

	syncutils "github.com/thewizardplusplus/go-sync-utils"
)

func main() {
	numbers := make(chan int, 2)
	for number := 0; number < 10; number++ {
		syncutils.UnboundedSend(numbers, number)
	}

	for index := 0; index < 10; index++ {
		number := <-numbers
		fmt.Println(number)
	}

	// Unordered output:
	// 0
	// 1
	// 2
	// 3
	// 4
	// 5
	// 6
	// 7
	// 8
	// 9
}

syncutils.ConcurrentHandler:

package main

import (
	"context"
	"fmt"
	"runtime"
	"sync"

	syncutils "github.com/thewizardplusplus/go-sync-utils"
)

type Handler struct {
	waitGroup *sync.WaitGroup
	locker    sync.Mutex
	dataGroup []interface{}
}

func (handler *Handler) Handle(ctx context.Context, data interface{}) {
	defer handler.waitGroup.Done()

	handler.locker.Lock()
	defer handler.locker.Unlock()

	handler.dataGroup = append(handler.dataGroup, data)
}

func main() {
	// start the data handling
	waitGroup := new(sync.WaitGroup)
	innerHandler := &Handler{waitGroup: waitGroup}

	concurrentHandler := syncutils.NewConcurrentHandler(1000, innerHandler)
	go concurrentHandler.StartConcurrently(context.Background(), runtime.NumCPU())
	defer concurrentHandler.Stop()

	// handle the data
	for index := 0; index < 10; index++ {
		waitGroup.Add(1)

		data := fmt.Sprintf("data #%d", index)
		concurrentHandler.Handle(data)
	}
	waitGroup.Wait()

	// print the handled data
	for _, data := range innerHandler.dataGroup {
		fmt.Println(data)
	}

	// Unordered output:
	// data #0
	// data #1
	// data #2
	// data #3
	// data #4
	// data #5
	// data #6
	// data #7
	// data #8
	// data #9
}

License

The MIT License (MIT)

Copyright © 2020-2021 thewizardplusplus

Documentation

Overview

Package syncutils provides utility entities for syncing.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func UnboundedSend

func UnboundedSend(channel interface{}, data interface{})

UnboundedSend sends the provided data to the provided channel without blocking even if the channel is busy. This function works with any types, but the data should have the type assignable to the type of the channel item. The order of receiving data may not correspond to the order of sending one.

Example
package main

import (
	"fmt"

	syncutils "github.com/thewizardplusplus/go-sync-utils"
)

func main() {
	numbers := make(chan int, 2)
	for number := 0; number < 10; number++ {
		syncutils.UnboundedSend(numbers, number)
	}

	for index := 0; index < 10; index++ {
		number := <-numbers
		fmt.Println(number)
	}

}
Output:

0
1
2
3
4
5
6
7
8
9

Types

type ConcurrentHandler

type ConcurrentHandler struct {
	// contains filtered or unexported fields
}

ConcurrentHandler wraps an abstract handler and allows to call it concurrently.

Example
package main

import (
	"context"
	"fmt"
	"runtime"
	"sync"

	syncutils "github.com/thewizardplusplus/go-sync-utils"
)

type Handler struct {
	waitGroup *sync.WaitGroup
	locker    sync.Mutex
	dataGroup []interface{}
}

func (handler *Handler) Handle(ctx context.Context, data interface{}) {
	defer handler.waitGroup.Done()

	handler.locker.Lock()
	defer handler.locker.Unlock()

	handler.dataGroup = append(handler.dataGroup, data)
}

func main() {
	// start the data handling
	waitGroup := new(sync.WaitGroup)
	innerHandler := &Handler{waitGroup: waitGroup}

	concurrentHandler := syncutils.NewConcurrentHandler(1000, innerHandler)
	go concurrentHandler.StartConcurrently(context.Background(), runtime.NumCPU())
	defer concurrentHandler.Stop()

	// handle the data
	for index := 0; index < 10; index++ {
		waitGroup.Add(1)

		data := fmt.Sprintf("data #%d", index)
		concurrentHandler.Handle(data)
	}
	waitGroup.Wait()

	// print the handled data
	for _, data := range innerHandler.dataGroup {
		fmt.Println(data)
	}

}
Output:

data #0
data #1
data #2
data #3
data #4
data #5
data #6
data #7
data #8
data #9

func NewConcurrentHandler

func NewConcurrentHandler(
	bufferSize int,
	innerHandler Handler,
) ConcurrentHandler

NewConcurrentHandler creates a concurrent wrapper for the passed abstract handler. The buffer size specifies the capacity of the inner channel used for passing data to the inner handler.

func (ConcurrentHandler) Handle

func (handler ConcurrentHandler) Handle(data interface{})

Handle sends the passed data to the inner handler via the inner channel. This method does not block the execution flow.

func (ConcurrentHandler) Start

func (handler ConcurrentHandler) Start(ctx context.Context)

Start processes data from the inner channel by the inner handler. This method performs the processing directly in the caller goroutine and blocks the execution flow until the processing will be stopped.

func (ConcurrentHandler) StartConcurrently

func (handler ConcurrentHandler) StartConcurrently(
	ctx context.Context,
	concurrencyFactor int,
)

StartConcurrently processes data from the inner channel by the inner handler. This method performs the processing in a goroutine pool (the concurrency factor specifies goroutine count in the pool). Regardless, it blocks the execution flow anyway until the processing will be stopped.

func (ConcurrentHandler) Stop

func (handler ConcurrentHandler) Stop()

Stop interrupts the processing data from the inner channel by the inner handler. This method can be called after both the Start() and StartConcurrently() methods. This method blocks the execution flow until the interrupting will be completed.

type ContextCancellerInterface

type ContextCancellerInterface interface {
	CancelContext()
}

ContextCancellerInterface ...

It is used only for mock generating.

type Handler

type Handler interface {
	Handle(ctx context.Context, data interface{})
}

Handler represents the interface of an abstract handler.

type MultiWaitGroup

type MultiWaitGroup []WaitGroup

MultiWaitGroup allows operating with a set of WaitGroup interfaces as a whole. It sequentially calls corresponding methods on each interface in the set in the same order in which interfaces are presented.

It might be useful for the simultaneous use of the sync.WaitGroup object and its mock. Attention! In this case, the real object must go last to avoid data races.

Example
package main

import (
	"fmt"
	"math/rand"
	"strings"
	"sync"
	"time"

	syncutils "github.com/thewizardplusplus/go-sync-utils"
)

type Call struct {
	Method    string
	Arguments []interface{}
}

func (call Call) String() string {
	arguments := strings.Trim(fmt.Sprint(call.Arguments), "[]")
	return fmt.Sprintf("%s(%s)", call.Method, arguments)
}

type MockWaitGroup struct {
	sync.Mutex

	Calls []Call
}

func (mock *MockWaitGroup) Add(delta int) {
	mock.Lock()
	defer mock.Unlock()

	mock.Calls = append(mock.Calls, Call{"Add", []interface{}{delta}})
}

func (mock *MockWaitGroup) Done() {
	mock.Lock()
	defer mock.Unlock()

	mock.Calls = append(mock.Calls, Call{"Done", []interface{}{}})
}

func (mock *MockWaitGroup) Wait() {
	mock.Lock()
	defer mock.Unlock()

	mock.Calls = append(mock.Calls, Call{"Wait", []interface{}{}})
}

func main() {
	waitGroupMock := new(MockWaitGroup)
	waitGroups := syncutils.MultiWaitGroup{waitGroupMock, new(sync.WaitGroup)}
	for _, duration := range []time.Duration{
		time.Duration(rand.Intn(100)) * time.Millisecond,
		time.Duration(rand.Intn(100)) * time.Millisecond,
	} {
		waitGroups.Add(1)

		go func(duration time.Duration) {
			defer waitGroups.Done()

			time.Sleep(duration)
		}(duration)
	}

	waitGroups.Wait()

	for _, call := range waitGroupMock.Calls {
		fmt.Println(call)
	}

}
Output:

Add(1)
Add(1)
Done()
Done()
Wait()

func (MultiWaitGroup) Add

func (waitGroups MultiWaitGroup) Add(delta int)

Add sequentially calls the method of the same name on each interface in the set in the same order in which interfaces are presented.

func (MultiWaitGroup) Done

func (waitGroups MultiWaitGroup) Done()

Done sequentially calls the method of the same name on each interface in the set in the same order in which interfaces are presented.

func (MultiWaitGroup) Wait

func (waitGroups MultiWaitGroup) Wait()

Wait sequentially calls the method of the same name on each interface in the set in the same order in which interfaces are presented.

type WaitGroup

type WaitGroup interface {
	Add(delta int)
	Done()
	Wait()
}

WaitGroup represents the interface of the sync.WaitGroup type. It might be useful for supporting the ability to mock the latter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL