Documentation ¶
Overview ¶
Package syncs 异步控制.
包括 RoutineGroup, LeakyBucket, Limit 频率限制, Counter 高性能计数器.
Index ¶
- Variables
- func Consume[E any](cacheSize, sliceMax, routine int, minTime time.Duration, ...) int64
- func ConsumeFunc[E any](cacheSize, sliceMax, routine int, minTime time.Duration, ...) int64
- func Merge[T any](less func(T, T) bool, chans ...chan T) <-chan T
- type Counter
- type Limit
- type LimitQueue
- type Limiter
- type Pool
- type ProducerAndConsumer
- type Queue
- type RoutineGroup
Examples ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func Consume ¶ added in v0.3.22
func Consume[E any](cacheSize, sliceMax, routine int, minTime time.Duration, pac ProducerAndConsumer[E], ) int64
Consume 消费. cacheSize 缓存数量. sliceMax 每次消费最大数量. routine 消费协程数量. minTime 预计的最小消费时间. producer 生产者. consumer 消费者.
func ConsumeFunc ¶ added in v0.3.22
func ConsumeFunc[E any](cacheSize, sliceMax, routine int, minTime time.Duration, producFunc func(produceChan chan E), consumFunc func(num int, elems []E), ) int64
ConsumeFunc 消费. cacheSize 缓存数量. sliceMax 每次消费最大数量. routine 消费协程数量. minTime 预计的最小消费时间. producer 生产者. consumer 消费者. nolint
Example ¶
nolint: testableexamples
package main import ( "fmt" "time" "github.com/xuender/oils/syncs" ) func main() { syncs.ConsumeFunc(10, 3, 3, time.Millisecond, func(produceChan chan int) { for i := 0; i < 100; i++ { produceChan <- i } }, func(num int, elems []int) { time.Sleep(time.Microsecond) fmt.Println(num, elems) }) }
Output:
Types ¶
type Counter ¶ added in v0.4.29
type Counter[T comparable] struct { // contains filtered or unexported fields }
Counter 计数器.
Example ¶
package main import ( "fmt" "github.com/xuender/oils/syncs" ) func main() { counter := syncs.NewCounter[string]() counter.Inc("test1") counter.Inc("test2") counter.Inc("test1") counter.Add("test3", 5) counter.Dec("test3") fmt.Println(counter.Get("test1")) fmt.Println(counter.Get("test2")) fmt.Println(counter.Get("test3")) counter.Clean() fmt.Println(counter.Get("test1")) fmt.Println(counter.Size()) }
Output: 2 1 4 0 1
Example (Concurrent) ¶
package main import ( "fmt" "time" "github.com/xuender/oils/syncs" ) func main() { counter := syncs.NewCounter[int]() work := func() { for i := 0; i < 1000; i++ { counter.Inc(i) } } for i := 0; i < 1000; i++ { go work() } time.Sleep(time.Second) fmt.Println(counter.Sum()) }
Output: 1000000
type Limit ¶ added in v0.3.24
type Limit struct {
// contains filtered or unexported fields
}
Limit 频率限制.
type LimitQueue ¶ added in v0.5.42
type LimitQueue[T any] struct { // contains filtered or unexported fields }
LimitQueue 限频队列.
func NewLimitQueue ¶ added in v0.5.42
func NewLimitQueue[T any](qps uint, timeOut time.Duration, yield func(T)) *LimitQueue[T]
NewLimitQueue 新建限频队列.
func (*LimitQueue[T]) Add ¶ added in v0.5.42
func (p *LimitQueue[T]) Add(elem T) error
func (*LimitQueue[T]) Len ¶ added in v0.5.44
func (p *LimitQueue[T]) Len() int
func (*LimitQueue[T]) SetQPS ¶ added in v0.5.43
func (p *LimitQueue[T]) SetQPS(qps uint64)
SetQPS 修改QPS.
func (*LimitQueue[T]) SetTimeOut ¶ added in v0.5.43
func (p *LimitQueue[T]) SetTimeOut(timeOut time.Duration)
func (*LimitQueue[T]) TimeOut ¶ added in v0.5.43
func (p *LimitQueue[T]) TimeOut() time.Duration
type Pool ¶ added in v1.0.51
type Pool[I, O any] struct { // contains filtered or unexported fields }
Pool Goroutine 池.
Example ¶
package main import ( "fmt" "time" "github.com/samber/lo" "github.com/xuender/oils/syncs" ) func main() { pool := syncs.NewPool(10, func(value, num int) string { time.Sleep(time.Millisecond) return fmt.Sprintf("%d: %d*2=%d", num, value, value*2) }) outputs := pool.Post(lo.Range(100)) fmt.Println(len(outputs)) }
Output: 100
Example (Context) ¶
package main import ( "context" "fmt" "time" "github.com/samber/lo" "github.com/xuender/oils/syncs" ) func main() { pool := syncs.NewPool(10, func(input lo.Tuple2[context.Context, int], num int) int { time.Sleep(time.Millisecond) return input.B * input.B }) inputs := lo.Map(lo.Range(100), func(num, _ int) lo.Tuple2[context.Context, int] { return lo.T2(context.Background(), num) }) outputs := pool.Post(inputs) fmt.Println(len(outputs)) }
Output: 100
Example (Error) ¶
package main import ( "errors" "fmt" "time" "github.com/samber/lo" "github.com/xuender/oils/syncs" ) func main() { pool := syncs.NewPool(10, func(value, num int) lo.Tuple2[int, error] { time.Sleep(time.Millisecond) if value == 0 { // nolint return lo.T2(0, errors.New("divide by zero")) } return lo.T2[int, error](100/value, nil) }) outputs := pool.Post(lo.Range(100)) fmt.Println(len(outputs)) }
Output: 100
type ProducerAndConsumer ¶ added in v0.3.22
type Queue ¶ added in v0.5.42
type Queue[T any] struct { // contains filtered or unexported fields }
Queue 队列.
type RoutineGroup ¶
type RoutineGroup struct {
// contains filtered or unexported fields
}
RoutineGroup 协程组,是sync.WaitGroup的增强版本.
Source Files ¶
Click to show internal directories.
Click to hide internal directories.