pool

package
v0.0.0-...-6e2457b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

代码解读

buf = buf[:runtime.Stack(buf, false)]

这行代码 buf = buf[:runtime.Stack(buf, false)] 涉及到 Go 语言的内建函数 runtime.Stack 和对切片的操作。让我们一步步解析这行代码:

1. runtime.Stack

runtime.Stack 是 Go 语言 runtime 包中的一个函数,它用于获取当前 goroutine 的堆栈跟踪信息。其函数签名如下:

func Stack(buf []byte, all bool) int
  • 参数

    • buf []byte:一个 byte 切片,用来存储堆栈信息。
    • all bool:一个布尔值,如果为 true,则获取所有 goroutine 的堆栈信息;如果为 false,则只获取当前 goroutine 的堆栈信息。
  • 返回值

    • 返回填充到 buf 中的字节数。
2. 切片的切片操作 buf[:runtime.Stack(buf, false)]

切片操作 buf[:runtime.Stack(buf, false)] 是对 buf 切片进行重新切片,以调整其长度。

  • buf:原始的 byte 切片。
  • runtime.Stack(buf, false):调用 runtime.Stack 函数,填充 buf 并返回填充的字节数。
  • buf[:runtime.Stack(buf, false)]:将 buf 切片截取到 runtime.Stack 返回的长度。这意味着只保留 buf 中已经填充的部分(即堆栈信息),去掉之后未被使用的部分。
为什么这样用?

这种用法有几个好处:

  1. 重用缓冲区:可以重用已有的缓冲区 buf,减少内存分配的次数。
  2. 获取准确的切片长度:通过 runtime.Stack 返回的长度来调整切片,确保切片长度正好等于堆栈信息的长度,避免切片中包含无效或未初始化的数据。
示例

假设 buf 最初被分配了 2048 字节的空间,但实际的堆栈信息只占用了 512 字节。使用 buf[:runtime.Stack(buf, false)] 后,buf 的长度将调整为 512,只包含有效的堆栈信息。

这种做法在处理错误日志、调试信息时非常有用,因为它可以有效地减少内存使用并提供清晰的堆栈跟踪信息。

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NewErrTaskPoolIsNotRunning = errors.New("taskpool is not running")
	NewErrTaskPoolIsClosing    = errors.New("taskpool is closing")
	NewErrTaskPoolIsStopped    = errors.New("taskpool is stopped")
	NewErrTaskPoolIsStarted    = errors.New("taskpool is started")
	NewErrTaskRunningPanic     = errors.New("task running panic")
	NewErrInvalidTask          = errors.New("invalid task")

	NewErrInvalidArgument          = errors.New("invalid argument")
	NewErrInitGoInvalid            = fmt.Errorf("%w: initGo should be greater than 0", NewErrInvalidArgument)
	NewErrTaskQueueSizeInvalid     = fmt.Errorf("%w: taskQueueSize should be greater than or equal to 0", NewErrInvalidArgument)
	NewErrQueueBacklogRateInvalid  = fmt.Errorf("%w: queueBacklogRate should be within the range [0.0, 1.0]", NewErrInvalidArgument)
	NewErrGoroutineConditionNotMet = fmt.Errorf("%w: initGo, coreGo, and maxGo must satisfy the condition: initGo <= coreGo <= maxGo", NewErrInvalidArgument)
)

错误定义

Functions

func WithCoreGo

func WithCoreGo(coreGo int) option.Option[OnDemandBlockTaskPool]

WithCoreGo 设置任务池的核心协程数选项。

func WithMaxGo

func WithMaxGo(maxGo int) option.Option[OnDemandBlockTaskPool]

WithMaxGo 设置任务池的最大协程数选项。

func WithMaxIdleTime

func WithMaxIdleTime(maxIdleTime time.Duration) option.Option[OnDemandBlockTaskPool]

WithMaxIdleTime 设置任务池的最大空闲时间选项。

func WithQueueBacklogRate

func WithQueueBacklogRate(queueBacklogRate float64) option.Option[OnDemandBlockTaskPool]

WithQueueBacklogRate 设置任务池的队列积压率选项。

Types

type OnDemandBlockTaskPool

type OnDemandBlockTaskPool struct {
	// contains filtered or unexported fields
}

OnDemandBlockTaskPool 按需创建 Goroutine 的并发阻塞的任务池

func NewOnDemandBlockTaskPool

func NewOnDemandBlockTaskPool(initGo int, taskQueueSize int, opts ...option.Option[OnDemandBlockTaskPool]) (*OnDemandBlockTaskPool, error)

NewOnDemandBlockTaskPool 创建一个新的 OnDemandBlockTaskPool initGo 是初始协程数 taskQueueSize 是队列大小,即最多有多少个任务在等待调度 使用相应的Option选项可以动态扩展协程数

func (*OnDemandBlockTaskPool) Shutdown

func (Self *OnDemandBlockTaskPool) Shutdown() (<-chan struct{}, error)

Shutdown 用于关闭任务池,拒绝新的任务提交,但会完成所有已提交的任务。 当所有任务完成后,会往返回的 channel 发送信号(一个空结构体),并负责关闭该 channel。 注意,此方法不会中断正在执行的任务。

func (*OnDemandBlockTaskPool) ShutdownNow

func (Self *OnDemandBlockTaskPool) ShutdownNow() ([]Task, error)

ShutdownNow 立即关闭任务池,并且返回所有剩余未执行的任务(不包含正在执行的任务)。

func (*OnDemandBlockTaskPool) Start

func (Self *OnDemandBlockTaskPool) Start() error

Start 开始调度任务执行。 此方法将任务池状态设置为运行,并启动初始化数量的 goroutine 来处理任务。

func (*OnDemandBlockTaskPool) States

func (Self *OnDemandBlockTaskPool) States(ctx context.Context, interval time.Duration) (<-chan State, error)

States 返回一个通道,该通道定期发送任务池的状态。 interval 参数指定发送状态信息的时间间隔。 如果上下文 ctx 被取消或任务池的上下文 interruptCtx 被取消,通道将关闭并返回。

func (*OnDemandBlockTaskPool) Submit

func (Self *OnDemandBlockTaskPool) Submit(ctx context.Context, task Task) error

Submit 提交一个任务 如果此时队列已满,那么将会阻塞调用者。 如果因为 ctx 的原因返回,那么将会返回 ctx.Err() 在调用 Start 前后都可以调用 Submit

type State

type State struct {
	PoolState      int32 // 任务池的状态码
	CurrentGo      int32 // 当前任务池中 Goroutine 的数量
	QueueSize      int32 // 任务队列的大小
	WaitingTaskCnt int32 // 等待执行的任务数量
	RunningTaskCnt int32 // 当前正在执行的任务数量
	Timestamp      int64 // 状态记录的时间戳
}

State 定义了任务池的运行状态结构体

type Task

type Task interface {
	// Run 执行任务的方法。
	// ctx 提供了任务执行的上下文,可以实现超时控制。
	Run(ctx context.Context) error
}

Task 定义了一个任务的接口,包含 Run 方法用于执行任务。

type TaskPool

type TaskPool interface {
	// Submit 提交一个任务到任务池中执行。
	// ctx 提供了一个可以控制任务提交超时的上下文。
	// task 是要提交的任务。
	// 如果在 ctx 过期前任务没有被成功提交,或者任务池已经关闭,将返回错误。
	Submit(ctx context.Context, task Task) error

	// Start 启动任务池,开始调度任务执行。
	// 在调用此方法之前,任务池不会执行任何任务。
	// 某些实现可能允许在调用 Start 之后继续提交任务。
	Start() error

	// Shutdown 安全关闭任务池,停止接手新任务,等待已提交的任务执行完成。
	// 如果任务池尚未启动,则立即返回。
	// 返回一个 chan struct{},当所有任务执行完毕时,会向该通道发送一个信号(空结构体)作为通知。
	Shutdown() (<-chan struct{}, error)

	// ShutdownNow 立即关闭任务池,尝试中断正在执行的任务,并返回所有未执行的任务列表。
	// 能否中断任务取决于 TaskPool 和 Task 的具体实现。
	ShutdownNow() ([]Task, error)

	// States 提供一个通道,定期发送任务池的运行状态。
	// ctx 用于控制何时停止发送状态信息。
	// interval 指定发送状态信息的时间间隔。
	States(ctx context.Context, interval time.Duration) (<-chan State, error)
}

TaskPool 定义了一个任务池的接口,它能够提交任务、启动任务调度、关闭任务池,并提供任务池状态信息。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL