executors

package
v0.0.0-...-115e584 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

处理大块任务的执行

将任务切割为多块来执行

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BulkExecutor

type BulkExecutor struct {
	// contains filtered or unexported fields
}

BulkExecutor 在满足以下任一要求时触发执行: 1. 达到给定的任务规模时 2. flush间隔时间已过

func NewBulkExecutor

func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor

NewBulkExecutor 返回一个BulkExecutor实例

func (*BulkExecutor) Add

func (be *BulkExecutor) Add(task interface{}) error

Add 添加task

func (*BulkExecutor) Flush

func (be *BulkExecutor) Flush()

Flush 强制刷新并执行任务

func (*BulkExecutor) Wait

func (be *BulkExecutor) Wait()

Wait 等待task执行完成

type BulkOption

type BulkOption func(options *bulkOptions)

func WithBulkInterval

func WithBulkInterval(duration time.Duration) BulkOption

WithBulkInterval 自定义BulkExecutor的flushInterval

func WithBulkTasks

func WithBulkTasks(tasks int) BulkOption

WithBulkTasks 自定义BulkExecutor的cachedTasks

type ChunkExecutor

type ChunkExecutor struct {
	// contains filtered or unexported fields
}

ChunkExecutor 在满足以下任一要求时触发执行: 1. 达到给定的任务规模时 2. flush间隔时间已过

func NewChunkExecutor

func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor

NewChunkExecutor 返回一个ChunkExecutor实例

func (*ChunkExecutor) Add

func (ce *ChunkExecutor) Add(task interface{}, size int) error

Add 添加使用给定task和给定size的thunk到ce

func (*ChunkExecutor) Flush

func (ce *ChunkExecutor) Flush()

Flush 强制flush并执行tasks

func (*ChunkExecutor) Wait

func (ce *ChunkExecutor) Wait()

Wait 等待执行的完成

type ChunkOption

type ChunkOption func(options *chunkOptions)

func WithChunkBytes

func WithChunkBytes(size int) ChunkOption

WithChunkBytes 自定义ChunkExecutor的chunk大小

func WithFlushInterval

func WithFlushInterval(duration time.Duration) ChunkOption

WithFlushInterval 自定义ChunkExecutor的flush时间间隔

type DelayExecutor

type DelayExecutor struct {
	// contains filtered or unexported fields
}

DelayExecutor 使用给定的延迟时间延迟一个任务

func NewDelayExecutor

func NewDelayExecutor(fn func(), delay time.Duration) *DelayExecutor

NewDelayExecutor 返回一个使用给定fn和delay的DelayExecutor实例

func (*DelayExecutor) Trigger

func (de *DelayExecutor) Trigger()

Trigger 在给定的延迟后触发任务的执行,多次触发是安全的

type Execute

type Execute func(tasks []interface{})

Execute 定义如何执行tasks的方法

type LessExecutor

type LessExecutor struct {
	// contains filtered or unexported fields
}

A LessExecutor 在给定的时间间隔内限制执行一次

func NewLessExecutor

func NewLessExecutor(threshold time.Duration) *LessExecutor

NewLessExecutor 返回一个以threshold为时间间隔的LessExecutor对象

func (*LessExecutor) DoOrDiscard

func (le *LessExecutor) DoOrDiscard(execute func()) bool

DoOrDiscard 执行或丢弃任务取决于是否在时间间隔内执行了另一个任务

type PeriodicalExecutor

type PeriodicalExecutor struct {
	// contains filtered or unexported fields
}

PeriodicalExecutor 是一个定期执行task的executor

func NewPeriodicalExecutor

func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor

NewPeriodicalExecutor 返回一个使用给定internal和container的PeriodicalExecutor实例

func (*PeriodicalExecutor) Add

func (pe *PeriodicalExecutor) Add(task interface{})

Add 添加一个task到PeriodicalExecutor中

func (*PeriodicalExecutor) Flush

func (pe *PeriodicalExecutor) Flush() bool

Flush 强制PeriodicalExecutor执行task

func (*PeriodicalExecutor) Sync

func (pe *PeriodicalExecutor) Sync(fn func())

Sync 让调用者使用pe线程安全地运行fn

func (*PeriodicalExecutor) Wait

func (pe *PeriodicalExecutor) Wait()

Wait 等待execution的完成

type TaskContainer

type TaskContainer interface {
	// AddTask 添加task到容器中,如果添加后容器需要刷缓存,就返回true
	AddTask(task interface{}) bool
	// Execute 在刷缓存时执行容器中收集的task
	Execute(tasks interface{})
	// RemoveAll 移除所有的task,并返回它们
	RemoveAll() interface{}
}

TaskContainer 该接口作为定期执行的底层容器类型 由具体的NewPeriodicalExecutor调用者实现

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL