syncs

package
v1.0.51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package syncs 异步控制.

包括 RoutineGroup, LeakyBucket, Limit 频率限制, Counter 高性能计数器.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrSizeLessZero = errors.New("size less than 0")
	ErrLimit        = errors.New("limit time")
	ErrTimeOut      = errors.New("time out")
	ErrFull         = errors.New("full")
)

Functions

func Consume added in v0.3.22

func Consume[E any](cacheSize, sliceMax, routine int, minTime time.Duration,
	pac ProducerAndConsumer[E],
) int64

Consume 消费. cacheSize 缓存数量. sliceMax 每次消费最大数量. routine 消费协程数量. minTime 预计的最小消费时间. producer 生产者. consumer 消费者.

func ConsumeFunc added in v0.3.22

func ConsumeFunc[E any](cacheSize, sliceMax, routine int, minTime time.Duration,
	producFunc func(produceChan chan E),
	consumFunc func(num int, elems []E),
) int64

ConsumeFunc 消费. cacheSize 缓存数量. sliceMax 每次消费最大数量. routine 消费协程数量. minTime 预计的最小消费时间. producer 生产者. consumer 消费者. nolint

Example

nolint: testableexamples

package main

import (
	"fmt"
	"time"

	"github.com/xuender/oils/syncs"
)

func main() {
	syncs.ConsumeFunc(10, 3, 3, time.Millisecond, func(produceChan chan int) {
		for i := 0; i < 100; i++ {
			produceChan <- i
		}
	}, func(num int, elems []int) {
		time.Sleep(time.Microsecond)
		fmt.Println(num, elems)
	})
}
Output:

func Merge added in v0.4.38

func Merge[T any](less func(T, T) bool, chans ...chan T) <-chan T

Merge 多个管道内容合并.

Types

type Counter added in v0.4.29

type Counter[T comparable] struct {
	// contains filtered or unexported fields
}

Counter 计数器.

Example
package main

import (
	"fmt"

	"github.com/xuender/oils/syncs"
)

func main() {
	counter := syncs.NewCounter[string]()

	counter.Inc("test1")
	counter.Inc("test2")
	counter.Inc("test1")
	counter.Add("test3", 5)
	counter.Dec("test3")

	fmt.Println(counter.Get("test1"))
	fmt.Println(counter.Get("test2"))
	fmt.Println(counter.Get("test3"))
	counter.Clean()
	fmt.Println(counter.Get("test1"))
	fmt.Println(counter.Size())

}
Output:

2
1
4
0
1
Example (Concurrent)
package main

import (
	"fmt"
	"time"

	"github.com/xuender/oils/syncs"
)

func main() {
	counter := syncs.NewCounter[int]()
	work := func() {
		for i := 0; i < 1000; i++ {
			counter.Inc(i)
		}
	}

	for i := 0; i < 1000; i++ {
		go work()
	}

	time.Sleep(time.Second)

	fmt.Println(counter.Sum())

}
Output:

1000000

func NewCounter added in v0.4.29

func NewCounter[T comparable]() *Counter[T]

NewCounter 新建计数器.

func (*Counter[T]) Add added in v0.4.29

func (p *Counter[T]) Add(key T, num int64) int64

Add 增加key的值num.

func (*Counter[T]) Clean added in v0.4.29

func (p *Counter[T]) Clean()

Clean 清空.

func (*Counter[T]) Dec added in v0.4.29

func (p *Counter[T]) Dec(key T) int64

Dec 自减key.

func (*Counter[T]) Get added in v0.4.29

func (p *Counter[T]) Get(key T) int64

Get 获取key的计数.

func (*Counter[T]) Inc added in v0.4.29

func (p *Counter[T]) Inc(key T) int64

Inc 自增key.

func (*Counter[T]) Keys added in v0.5.42

func (p *Counter[T]) Keys() []T

Keys 键值.

func (*Counter[T]) Size added in v0.4.30

func (p *Counter[T]) Size() int

Size 键值数量.

func (*Counter[T]) Sum added in v0.4.30

func (p *Counter[T]) Sum() int64

Sum 计数总量.

type Limit added in v0.3.24

type Limit struct {
	// contains filtered or unexported fields
}

Limit 频率限制.

func NewLimit added in v0.3.24

func NewLimit(qps uint) *Limit

NewLimit 频率限制,qps每秒限制次数.

func (*Limit) QPS added in v0.5.41

func (p *Limit) QPS(qps uint)

QPS 设置qps.

func (*Limit) Try added in v0.3.24

func (p *Limit) Try() error

Try 尝试执行.

func (*Limit) Wait added in v0.4.25

func (p *Limit) Wait()

Wait 等待执行.

type LimitQueue added in v0.5.42

type LimitQueue[T any] struct {
	// contains filtered or unexported fields
}

LimitQueue 限频队列.

func NewLimitQueue added in v0.5.42

func NewLimitQueue[T any](qps uint, timeOut time.Duration, yield func(T)) *LimitQueue[T]

NewLimitQueue 新建限频队列.

func (*LimitQueue[T]) Add added in v0.5.42

func (p *LimitQueue[T]) Add(elem T) error

func (*LimitQueue[T]) Len added in v0.5.44

func (p *LimitQueue[T]) Len() int

func (*LimitQueue[T]) QPS added in v0.5.43

func (p *LimitQueue[T]) QPS() uint

QPS 当前.

func (*LimitQueue[T]) SetQPS added in v0.5.43

func (p *LimitQueue[T]) SetQPS(qps uint64)

SetQPS 修改QPS.

func (*LimitQueue[T]) SetTimeOut added in v0.5.43

func (p *LimitQueue[T]) SetTimeOut(timeOut time.Duration)

func (*LimitQueue[T]) TimeOut added in v0.5.43

func (p *LimitQueue[T]) TimeOut() time.Duration

type Limiter added in v0.4.31

type Limiter interface {
	Wait()
	Try() error
}

Limiter 等待器.

type Pool added in v1.0.51

type Pool[I, O any] struct {
	// contains filtered or unexported fields
}

Pool Goroutine 池.

Example
package main

import (
	"fmt"
	"time"

	"github.com/samber/lo"
	"github.com/xuender/oils/syncs"
)

func main() {
	pool := syncs.NewPool(10, func(value, num int) string {
		time.Sleep(time.Millisecond)

		return fmt.Sprintf("%d: %d*2=%d", num, value, value*2)
	})

	outputs := pool.Post(lo.Range(100))

	fmt.Println(len(outputs))

}
Output:

100
Example (Context)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/samber/lo"
	"github.com/xuender/oils/syncs"
)

func main() {
	pool := syncs.NewPool(10, func(input lo.Tuple2[context.Context, int], num int) int {
		time.Sleep(time.Millisecond)

		return input.B * input.B
	})

	inputs := lo.Map(lo.Range(100), func(num, _ int) lo.Tuple2[context.Context, int] {
		return lo.T2(context.Background(), num)
	})
	outputs := pool.Post(inputs)

	fmt.Println(len(outputs))

}
Output:

100
Example (Error)
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/samber/lo"
	"github.com/xuender/oils/syncs"
)

func main() {
	pool := syncs.NewPool(10, func(value, num int) lo.Tuple2[int, error] {
		time.Sleep(time.Millisecond)

		if value == 0 {
			// nolint
			return lo.T2(0, errors.New("divide by zero"))
		}

		return lo.T2[int, error](100/value, nil)
	})

	outputs := pool.Post(lo.Range(100))

	fmt.Println(len(outputs))

}
Output:

100

func NewPool added in v1.0.51

func NewPool[I, O any](size int, yield func(I, int) O) *Pool[I, O]

NewPool 新建 Goroutine 池.

func (*Pool[I, O]) Close added in v1.0.51

func (p *Pool[I, O]) Close()

func (*Pool[I, O]) Post added in v1.0.51

func (p *Pool[I, O]) Post(inputs []I) []O

type ProducerAndConsumer added in v0.3.22

type ProducerAndConsumer[E any] interface {
	Produc(produceChan chan E)
	Consum(num int, elems []E)
}

type Queue added in v0.5.42

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue 队列.

func NewQueue added in v0.5.42

func NewQueue[T any](size uint, yield func(T)) *Queue[T]

NewQueue 新建队列.

func (*Queue[T]) Add added in v0.5.42

func (p *Queue[T]) Add(elem T) error

Add 队列增加.

func (*Queue[T]) Consume added in v0.5.42

func (p *Queue[T]) Consume()

Consume 队列消费.

func (*Queue[T]) SetSize added in v0.5.42

func (p *Queue[T]) SetSize(size uint)

SetSize 修改队列尺寸.

func (*Queue[T]) Size added in v0.5.42

func (p *Queue[T]) Size() uint

Size 队列尺寸.

type RoutineGroup

type RoutineGroup struct {
	// contains filtered or unexported fields
}

RoutineGroup 协程组,是sync.WaitGroup的增强版本.

func NewRoutineGroup

func NewRoutineGroup(size uint) *RoutineGroup

NewRoutineGroup 协程组,控制协程总数量.

func (*RoutineGroup) Add

func (p *RoutineGroup) Add(delta uint)

Add 增加协程.

func (*RoutineGroup) Done

func (p *RoutineGroup) Done()

Done 协程完成.

func (*RoutineGroup) Inc added in v0.5.47

func (p *RoutineGroup) Inc()

Inc 加1.

func (*RoutineGroup) Wait

func (p *RoutineGroup) Wait()

Wait 等待全部完成.

Directories

Path Synopsis
Package maps 多协程并发安全的TreeMap.
Package maps 多协程并发安全的TreeMap.
rdb
Package rdb Redis工具.
Package rdb Redis工具.
mock
nolint
nolint

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL