gopool

package
v1.2.112 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

README

GoPool for workers 并发请求 go routines 池

Examples

package main

import (
	"context"
	"fmt"
	gojob "github.com/gif-gif/go.io/go-job"
	golog "github.com/gif-gif/go.io/go-log"
	gopool "github.com/gif-gif/go.io/go-pool"
	"github.com/gif-gif/go.io/goio"
	"time"
)

func main() {
	goio.Init(goio.DEVELOPMENT)
	testGroupContext()
}

func testDynamicSize() {
	gp := gopool.NewDynamicSizePool(100, 10)
	defer gp.StopAndWait()

	cron, _ := gojob.New()
	defer cron.Stop()
	cron.Start()
	cron.SecondX(nil, 1, func() {
		gp.PrintPoolStats()
	})

	for i := 0; i < 1000; i++ {
		n := i
		gp.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
			time.Sleep(1 * time.Second)
		})
	}

	golog.InfoF("end of Submit")
}

func testFixedSize() {
	gp := gopool.NewFixedSizePool(100, 10)
	defer gp.StopAndWait()

	cron, _ := gojob.New()
	defer cron.Stop()
	cron.Start()
	cron.SecondX(nil, 1, func() {
		gp.PrintPoolStats()
	})

	for i := 0; i < 1000; i++ {
		n := i
		gp.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
			time.Sleep(1 * time.Second)
		})
	}

	golog.InfoF("end of Submit")
}

func testContext() {
	ctx, _ := context.WithTimeout(context.Background(), time.Second*10) //超时或者取消ctx 时池子会被关闭,未开始执行的任务会被取消执行

	gp := gopool.NewContextPool(10, 10, ctx)
	defer gp.StopAndWait()

	cron, _ := gojob.New()
	defer cron.Stop()
	cron.Start()
	cron.SecondX(nil, 1, func() {
		gp.PrintPoolStats()
	})

	for i := 0; i < 1000; i++ {
		n := i
		gp.Submit(func() {
			fmt.Printf("Task #%d started\n", n)
			time.Sleep(1 * time.Second)
			fmt.Printf("Task #%d finished\n", n)
		})
	}

	golog.InfoF("end of Submit")
}

func testTaskGroup() {
	gp := gopool.NewDynamicSizePool(100, 1000)
	defer gp.StopAndWait()

	cron, _ := gojob.New()
	defer cron.Stop()
	cron.Start()
	cron.SecondX(nil, 1, func() {
		gp.PrintPoolStats()
	})

	group := gp.NewTaskGroup()

	for i := 0; i < 1000; i++ {
		n := i
		group.Submit(func() {
			fmt.Printf("Task #%d started\n", n)
			time.Sleep(1 * time.Second)
			fmt.Printf("Task #%d finished\n", n)
		})
	}

	group.Wait() // wait for tasks to finish
	golog.InfoF("end of TaskGroup")
}

func testGroupContext() {
	gp := gopool.NewDynamicSizePool(10, 1000)
	defer gp.StopAndWait()

	cron, _ := gojob.New()
	defer cron.Stop()
	cron.Start()
	cron.SecondX(nil, 1, func() {
		gp.PrintPoolStats()
	})

	group, _ := gp.NewGroupContext() //可以用 ctx 和 group 组合使用

	for i := 0; i < 1000; i++ {
		n := i
		group.Submit(func() error {
			fmt.Printf("Task #%d started\n", n)
			time.Sleep(1 * time.Second)
			fmt.Printf("Task #%d finished\n", n)
			if n > 1 { //出错后其他未开始任务执行会被取消
				return fmt.Errorf("test group error")
			}
			return nil
		})
	}

	err := group.Wait() // wait for tasks to finish
	if err != nil {
		golog.InfoF("end of GroupContext", err)
	} else {
		golog.InfoF("end of GroupContext")
	}
}


Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GoPool

type GoPool struct {
	// contains filtered or unexported fields
}

func New

func New(maxWorkers, maxCapacity int, options ...pond.Option) *GoPool

func NewContextPool

func NewContextPool(maxWorkers, maxCapacity int, ctx context.Context) *GoPool

Create a context that will be cancelled Tasks being processed will continue until they finish, but queued tasks are cancelled.

func NewDynamicSizePool

func NewDynamicSizePool(maxWorkers, maxCapacity int) *GoPool

Create a buffered (non-blocking) pool that can scale up to maxWorkers workers

and has a buffer capacity of maxCapacity tasks

创建一个缓冲(非阻塞)池,最多可扩展到maxWorkers个Worker,缓冲容量为maxCapacity个任务(大于这个会阻塞等待提交)

func NewFixedSizePool

func NewFixedSizePool(maxWorkers, minWorkers int) *GoPool

使用固定数量的Worker创建一个无缓冲(阻塞)池,提交任务等待

func (*GoPool) NewGroupContext

func (g *GoPool) NewGroupContext() (*pond.TaskGroupWithContext, context.Context)

group, ctx := pool.GroupContext(context.Background())

group.Submit(func() error {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
		resp, err := http.DefaultClient.Do(req)
		if err == nil {
			resp.Body.Close()
		}
	return err
})

Wait for all fn to complete.

err := group.Wait()
if err != nil {
	fmt.Printf("Failed to Error: %v", err)
} else {
	fmt.Println("Successfully all")
}

Create a task group associated to a context

func (*GoPool) NewTaskGroup

func (g *GoPool) NewTaskGroup() *pond.TaskGroup
 Create a task group

		group := pool.Group()

		// Submit a group of related tasks
		for i := 0; i < 20; i++ {
			n := i
			group.Submit(func() {
				fmt.Printf("Running group task #%d\n", n)
			})
		}

		// Wait for all tasks in the group to complete
		group.Wait()
	}

func (*GoPool) PoolStats

func (g *GoPool) PoolStats() PoolStat

func (*GoPool) PrintPoolStats

func (g *GoPool) PrintPoolStats()

func (*GoPool) PrometheusHandler

func (g *GoPool) PrometheusHandler()

func (*GoPool) Stop added in v1.2.6

func (g *GoPool) Stop() context.Context

Stop会导致此池停止接受新任务,并向所有workers发出退出信号。 worker正在执行的任务将一直持续到完成(除非流程终止)。 队列中的任务将不会被执行。 此方法返回一个上下文对象,当池完全停止时,该对象将被取消。

func (*GoPool) StopAndWait

func (g *GoPool) StopAndWait()

func (*GoPool) StopAndWaitFor added in v1.2.6

func (g *GoPool) StopAndWaitFor(deadline time.Duration)

StopAndWaitFor停止此池并等待队列中的所有任务完成 或者达到给定的截止日期,以先到者为准。

func (*GoPool) Stopped added in v1.2.6

func (g *GoPool) Stopped() bool

如果池已停止并且不再接受任务,则Stopped返回true,否则返回false。

func (*GoPool) Submit

func (g *GoPool) Submit(fn func())

type PoolStat

type PoolStat struct {
	RunningWorkers  int
	IdleWorkers     int
	SubmittedTasks  uint64
	WaitingTasks    uint64
	SuccessfulTasks uint64
	FailedTasks     uint64
	CompletedTasks  uint64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL