Documentation
¶
Index ¶
- Variables
- func WithCoreGo(coreGo int) option.Option[OnDemandBlockTaskPool]
- func WithMaxGo(maxGo int) option.Option[OnDemandBlockTaskPool]
- func WithMaxIdleTime(maxIdleTime time.Duration) option.Option[OnDemandBlockTaskPool]
- func WithQueueBacklogRate(queueBacklogRate float64) option.Option[OnDemandBlockTaskPool]
- type OnDemandBlockTaskPool
- func (Self *OnDemandBlockTaskPool) Shutdown() (<-chan struct{}, error)
- func (Self *OnDemandBlockTaskPool) ShutdownNow() ([]Task, error)
- func (Self *OnDemandBlockTaskPool) Start() error
- func (Self *OnDemandBlockTaskPool) States(ctx context.Context, interval time.Duration) (<-chan State, error)
- func (Self *OnDemandBlockTaskPool) Submit(ctx context.Context, task Task) error
- type State
- type Task
- type TaskPool
Constants ¶
This section is empty.
Variables ¶
View Source
var ( NewErrTaskPoolIsNotRunning = errors.New("taskpool is not running") NewErrTaskPoolIsClosing = errors.New("taskpool is closing") NewErrTaskPoolIsStopped = errors.New("taskpool is stopped") NewErrTaskPoolIsStarted = errors.New("taskpool is started") NewErrTaskRunningPanic = errors.New("task running panic") NewErrInvalidTask = errors.New("invalid task") NewErrInvalidArgument = errors.New("invalid argument") NewErrInitGoInvalid = fmt.Errorf("%w: initGo should be greater than 0", NewErrInvalidArgument) NewErrTaskQueueSizeInvalid = fmt.Errorf("%w: taskQueueSize should be greater than or equal to 0", NewErrInvalidArgument) NewErrQueueBacklogRateInvalid = fmt.Errorf("%w: queueBacklogRate should be within the range [0.0, 1.0]", NewErrInvalidArgument) NewErrGoroutineConditionNotMet = fmt.Errorf("%w: initGo, coreGo, and maxGo must satisfy the condition: initGo <= coreGo <= maxGo", NewErrInvalidArgument) )
错误定义
Functions ¶
func WithCoreGo ¶
func WithCoreGo(coreGo int) option.Option[OnDemandBlockTaskPool]
WithCoreGo 设置任务池的核心协程数选项。
func WithMaxGo ¶
func WithMaxGo(maxGo int) option.Option[OnDemandBlockTaskPool]
WithMaxGo 设置任务池的最大协程数选项。
func WithMaxIdleTime ¶
func WithMaxIdleTime(maxIdleTime time.Duration) option.Option[OnDemandBlockTaskPool]
WithMaxIdleTime 设置任务池的最大空闲时间选项。
func WithQueueBacklogRate ¶
func WithQueueBacklogRate(queueBacklogRate float64) option.Option[OnDemandBlockTaskPool]
WithQueueBacklogRate 设置任务池的队列积压率选项。
Types ¶
type OnDemandBlockTaskPool ¶
type OnDemandBlockTaskPool struct {
// contains filtered or unexported fields
}
OnDemandBlockTaskPool 按需创建 Goroutine 的并发阻塞的任务池
func NewOnDemandBlockTaskPool ¶
func NewOnDemandBlockTaskPool(initGo int, taskQueueSize int, opts ...option.Option[OnDemandBlockTaskPool]) (*OnDemandBlockTaskPool, error)
NewOnDemandBlockTaskPool 创建一个新的 OnDemandBlockTaskPool initGo 是初始协程数 taskQueueSize 是队列大小,即最多有多少个任务在等待调度 使用相应的Option选项可以动态扩展协程数
func (*OnDemandBlockTaskPool) Shutdown ¶
func (Self *OnDemandBlockTaskPool) Shutdown() (<-chan struct{}, error)
Shutdown 用于关闭任务池,拒绝新的任务提交,但会完成所有已提交的任务。 当所有任务完成后,会往返回的 channel 发送信号(一个空结构体),并负责关闭该 channel。 注意,此方法不会中断正在执行的任务。
func (*OnDemandBlockTaskPool) ShutdownNow ¶
func (Self *OnDemandBlockTaskPool) ShutdownNow() ([]Task, error)
ShutdownNow 立即关闭任务池,并且返回所有剩余未执行的任务(不包含正在执行的任务)。
func (*OnDemandBlockTaskPool) Start ¶
func (Self *OnDemandBlockTaskPool) Start() error
Start 开始调度任务执行。 此方法将任务池状态设置为运行,并启动初始化数量的 goroutine 来处理任务。
type State ¶
type State struct { PoolState int32 // 任务池的状态码 CurrentGo int32 // 当前任务池中 Goroutine 的数量 QueueSize int32 // 任务队列的大小 WaitingTaskCnt int32 // 等待执行的任务数量 RunningTaskCnt int32 // 当前正在执行的任务数量 Timestamp int64 // 状态记录的时间戳 }
State 定义了任务池的运行状态结构体
type TaskPool ¶
type TaskPool interface { // Submit 提交一个任务到任务池中执行。 // ctx 提供了一个可以控制任务提交超时的上下文。 // task 是要提交的任务。 // 如果在 ctx 过期前任务没有被成功提交,或者任务池已经关闭,将返回错误。 Submit(ctx context.Context, task Task) error // Start 启动任务池,开始调度任务执行。 // 在调用此方法之前,任务池不会执行任何任务。 // 某些实现可能允许在调用 Start 之后继续提交任务。 Start() error // Shutdown 安全关闭任务池,停止接手新任务,等待已提交的任务执行完成。 // 如果任务池尚未启动,则立即返回。 // 返回一个 chan struct{},当所有任务执行完毕时,会向该通道发送一个信号(空结构体)作为通知。 Shutdown() (<-chan struct{}, error) // ShutdownNow 立即关闭任务池,尝试中断正在执行的任务,并返回所有未执行的任务列表。 // 能否中断任务取决于 TaskPool 和 Task 的具体实现。 ShutdownNow() ([]Task, error) // States 提供一个通道,定期发送任务池的运行状态。 // ctx 用于控制何时停止发送状态信息。 // interval 指定发送状态信息的时间间隔。 States(ctx context.Context, interval time.Duration) (<-chan State, error) }
TaskPool 定义了一个任务池的接口,它能够提交任务、启动任务调度、关闭任务池,并提供任务池状态信息。
Click to show internal directories.
Click to hide internal directories.