sched

package
v1.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: MIT, GPL-3.0 Imports: 2 Imported by: 1

README

并发任务调度库

见: sched

或: https://github.com/fufuok/sched

简洁, 高效, 并发限制, 复用 goroutine

package sched // import "github.com/fufuok/utils/sched"

type Option func(w *Pool)
    func Queues(limit int) Option
    func Workers(limit int) Option
type Pool struct{ ... }
    func New(opts ...Option) *Pool

使用示例

package main

import (
	"fmt"
	"time"

	"github.com/fufuok/utils/sched"
	"github.com/fufuok/utils/xsync"
)

func main() {
	count := xsync.NewCounter()
	bus := sched.New() // 默认并发数: runtime.NumCPU()
	for i := 0; i < 30; i++ {
		bus.Add(1)
		bus.RunWithArgs(func(n ...interface{}) {
			count.Add(int64(n[0].(int)))
		}, i)
	}
	bus.Wait()
	fmt.Println("count:", count.Value()) // count: 435

	// 继续下一批任务
	bus.Add(1)
	bus.Run(func() {
		fmt.Println("is running:", bus.IsRunning(), bus.Running()) // is running: true 1
	})
	bus.Wait()
	bus.Release()

	// 指定并发数, 指定队列缓冲数
	bus = sched.New(sched.Workers(2), sched.Queues(1))
	bus.Add(5)
	for i := 0; i < 5; i++ {
		bus.Run(func() {
			fmt.Println(time.Now())
			time.Sleep(time.Second)
		})
	}
	bus.WaitAndRelease()
	fmt.Println("is running:", bus.IsRunning()) // is running: false
}

ff

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(w *Pool)

Option is a scheduler option.

func Queues added in v0.6.1

func Queues(limit int) Option

Queues is buffer capacity of the tasks channel.

func Workers

func Workers(limit int) Option

Workers is number of workers that can execute tasks concurrently.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a worker pool.

func New

func New(opts ...Option) *Pool

New creates a new task scheduler and returns a pool of workers.

func (*Pool) Add

func (p *Pool) Add(numTasks int) int

func (*Pool) IsRunning

func (p *Pool) IsRunning() bool

func (*Pool) Release

func (p *Pool) Release()

func (*Pool) Run

func (p *Pool) Run(f ...func())

Run runs f in the current pool.

func (*Pool) RunWithArgs

func (p *Pool) RunWithArgs(f func(args ...interface{}), args ...interface{})

func (*Pool) Running

func (p *Pool) Running() uint64

func (*Pool) Wait

func (p *Pool) Wait()

func (*Pool) WaitAndRelease

func (p *Pool) WaitAndRelease()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL