Documentation ¶
Overview ¶
Package syncutils provides utility entities for syncing.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func UnboundedSend ¶
func UnboundedSend(channel interface{}, data interface{})
UnboundedSend sends the provided data to the provided channel without blocking even if the channel is busy. This function works with any types, but the data should have the type assignable to the type of the channel item. The order of receiving data may not correspond to the order of sending one.
Example ¶
package main import ( "fmt" syncutils "github.com/thewizardplusplus/go-sync-utils" ) func main() { numbers := make(chan int, 2) for number := 0; number < 10; number++ { syncutils.UnboundedSend(numbers, number) } for index := 0; index < 10; index++ { number := <-numbers fmt.Println(number) } }
Output: 0 1 2 3 4 5 6 7 8 9
Types ¶
type ConcurrentHandler ¶
type ConcurrentHandler struct {
// contains filtered or unexported fields
}
ConcurrentHandler wraps an abstract handler and allows to call it concurrently.
Example ¶
package main import ( "context" "fmt" "runtime" "sync" syncutils "github.com/thewizardplusplus/go-sync-utils" ) type Handler struct { waitGroup *sync.WaitGroup locker sync.Mutex dataGroup []interface{} } func (handler *Handler) Handle(ctx context.Context, data interface{}) { defer handler.waitGroup.Done() handler.locker.Lock() defer handler.locker.Unlock() handler.dataGroup = append(handler.dataGroup, data) } func main() { // start the data handling waitGroup := new(sync.WaitGroup) innerHandler := &Handler{waitGroup: waitGroup} concurrentHandler := syncutils.NewConcurrentHandler(1000, innerHandler) go concurrentHandler.StartConcurrently(context.Background(), runtime.NumCPU()) defer concurrentHandler.Stop() // handle the data for index := 0; index < 10; index++ { waitGroup.Add(1) data := fmt.Sprintf("data #%d", index) concurrentHandler.Handle(data) } waitGroup.Wait() // print the handled data for _, data := range innerHandler.dataGroup { fmt.Println(data) } }
Output: data #0 data #1 data #2 data #3 data #4 data #5 data #6 data #7 data #8 data #9
func NewConcurrentHandler ¶
func NewConcurrentHandler( bufferSize int, innerHandler Handler, ) ConcurrentHandler
NewConcurrentHandler creates a concurrent wrapper for the passed abstract handler. The buffer size specifies the capacity of the inner channel used for passing data to the inner handler.
func (ConcurrentHandler) Handle ¶
func (handler ConcurrentHandler) Handle(data interface{})
Handle sends the passed data to the inner handler via the inner channel. This method does not block the execution flow.
func (ConcurrentHandler) Start ¶
func (handler ConcurrentHandler) Start(ctx context.Context)
Start processes data from the inner channel by the inner handler. This method performs the processing directly in the caller goroutine and blocks the execution flow until the processing will be stopped.
func (ConcurrentHandler) StartConcurrently ¶
func (handler ConcurrentHandler) StartConcurrently( ctx context.Context, concurrencyFactor int, )
StartConcurrently processes data from the inner channel by the inner handler. This method performs the processing in a goroutine pool (the concurrency factor specifies goroutine count in the pool). Regardless, it blocks the execution flow anyway until the processing will be stopped.
func (ConcurrentHandler) Stop ¶
func (handler ConcurrentHandler) Stop()
Stop interrupts the processing data from the inner channel by the inner handler. This method can be called after both the Start() and StartConcurrently() methods. This method blocks the execution flow until the interrupting will be completed.
type ContextCancellerInterface ¶
type ContextCancellerInterface interface {
CancelContext()
}
ContextCancellerInterface ...
It is used only for mock generating.
type MultiWaitGroup ¶
type MultiWaitGroup []WaitGroup
MultiWaitGroup allows operating with a set of WaitGroup interfaces as a whole. It sequentially calls corresponding methods on each interface in the set in the same order in which interfaces are presented.
It might be useful for the simultaneous use of the sync.WaitGroup object and its mock. Attention! In this case, the real object must go last to avoid data races.
Example ¶
package main import ( "fmt" "math/rand" "strings" "sync" "time" syncutils "github.com/thewizardplusplus/go-sync-utils" ) type Call struct { Method string Arguments []interface{} } func (call Call) String() string { arguments := strings.Trim(fmt.Sprint(call.Arguments), "[]") return fmt.Sprintf("%s(%s)", call.Method, arguments) } type MockWaitGroup struct { sync.Mutex Calls []Call } func (mock *MockWaitGroup) Add(delta int) { mock.Lock() defer mock.Unlock() mock.Calls = append(mock.Calls, Call{"Add", []interface{}{delta}}) } func (mock *MockWaitGroup) Done() { mock.Lock() defer mock.Unlock() mock.Calls = append(mock.Calls, Call{"Done", []interface{}{}}) } func (mock *MockWaitGroup) Wait() { mock.Lock() defer mock.Unlock() mock.Calls = append(mock.Calls, Call{"Wait", []interface{}{}}) } func main() { waitGroupMock := new(MockWaitGroup) waitGroups := syncutils.MultiWaitGroup{waitGroupMock, new(sync.WaitGroup)} for _, duration := range []time.Duration{ time.Duration(rand.Intn(100)) * time.Millisecond, time.Duration(rand.Intn(100)) * time.Millisecond, } { waitGroups.Add(1) go func(duration time.Duration) { defer waitGroups.Done() time.Sleep(duration) }(duration) } waitGroups.Wait() for _, call := range waitGroupMock.Calls { fmt.Println(call) } }
Output: Add(1) Add(1) Done() Done() Wait()
func (MultiWaitGroup) Add ¶
func (waitGroups MultiWaitGroup) Add(delta int)
Add sequentially calls the method of the same name on each interface in the set in the same order in which interfaces are presented.
func (MultiWaitGroup) Done ¶
func (waitGroups MultiWaitGroup) Done()
Done sequentially calls the method of the same name on each interface in the set in the same order in which interfaces are presented.
func (MultiWaitGroup) Wait ¶
func (waitGroups MultiWaitGroup) Wait()
Wait sequentially calls the method of the same name on each interface in the set in the same order in which interfaces are presented.