workerpool

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2023 License: Apache-2.0, BSD-2-Clause Imports: 17 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockingQueuedWorkerPool

type BlockingQueuedWorkerPool struct {
	// contains filtered or unexported fields
}

BlockingQueuedWorkerPool represents a set of workers with a blocking queue of pending tasks.

func NewBlockingQueuedWorkerPool

func NewBlockingQueuedWorkerPool(optionalOptions ...Option) (result *BlockingQueuedWorkerPool)

NewBlockingQueuedWorkerPool returns a new stopped WorkerPool.

func (*BlockingQueuedWorkerPool) CreateTask

func (b *BlockingQueuedWorkerPool) CreateTask(f func(), optionalStackTrace ...string) *WorkerPoolTask

CreateTask creates a new BlockingQueueWorkerPoolTask with the given handler and optional ClosureStackTrace.

func (*BlockingQueuedWorkerPool) DecreasePendingTasksCounter

func (b *BlockingQueuedWorkerPool) DecreasePendingTasksCounter()

DecreasePendingTasksCounter decreases the pending task counter.

func (*BlockingQueuedWorkerPool) GetPendingQueueSize

func (b *BlockingQueuedWorkerPool) GetPendingQueueSize() int

GetPendingQueueSize returns the amount of tasks pending to the processed.

func (*BlockingQueuedWorkerPool) GetWorkerCount

func (b *BlockingQueuedWorkerPool) GetWorkerCount() int

GetWorkerCount returns the worker count for the WorkerPool.

func (*BlockingQueuedWorkerPool) IncreasePendingTasksCounter

func (b *BlockingQueuedWorkerPool) IncreasePendingTasksCounter()

IncreasePendingTasksCounter increases the pending task counter.

func (*BlockingQueuedWorkerPool) IsRunning

func (b *BlockingQueuedWorkerPool) IsRunning() (isRunning bool)

IsRunning returns true if the WorkerPool is running.

func (*BlockingQueuedWorkerPool) Run

func (b *BlockingQueuedWorkerPool) Run()

Run starts the WorkerPool and waits for its shutdown.

func (*BlockingQueuedWorkerPool) Start

func (b *BlockingQueuedWorkerPool) Start()

Start starts the WorkerPool (non-blocking).

func (*BlockingQueuedWorkerPool) Stop

func (b *BlockingQueuedWorkerPool) Stop()

Stop stops the WorkerPool.

func (*BlockingQueuedWorkerPool) StopAndWait

func (b *BlockingQueuedWorkerPool) StopAndWait()

StopAndWait stops the WorkerPool and waits for its shutdown.

func (*BlockingQueuedWorkerPool) Submit

func (b *BlockingQueuedWorkerPool) Submit(handler func())

Submit submits a handler function to the queue and blocks if the queue is full.

func (*BlockingQueuedWorkerPool) SubmitTask

func (b *BlockingQueuedWorkerPool) SubmitTask(task *WorkerPoolTask)

SubmitTask submits a task to the queue and blocks if the queue is full (it should only be used instead of Submit if manually handling the task is necessary to create better debug outputs).

func (*BlockingQueuedWorkerPool) TrySubmit

func (b *BlockingQueuedWorkerPool) TrySubmit(f func()) (added bool)

TrySubmit tries to queue the execution of the handler function and ignores the handler if there is no capacity for it to be added.

func (*BlockingQueuedWorkerPool) TrySubmitTask

func (b *BlockingQueuedWorkerPool) TrySubmitTask(task *WorkerPoolTask) (added bool)

TrySubmitTask tries to queue the execution of the task and ignores the task if there is no capacity for it to be added (it should only be used instead of TrySubmit if manually handling the task is necessary to create better debug outputs).

func (*BlockingQueuedWorkerPool) WaitUntilAllTasksProcessed

func (b *BlockingQueuedWorkerPool) WaitUntilAllTasksProcessed()

WaitUntilAllTasksProcessed waits until all tasks are processed.

type Group

type Group struct {
	PendingChildrenCounter *syncutils.Counter
	// contains filtered or unexported fields
}

func NewGroup

func NewGroup(name string) (group *Group)

func (*Group) CreateGroup

func (g *Group) CreateGroup(name string) (group *Group)

func (*Group) CreatePool

func (g *Group) CreatePool(name string, optsWorkerCount ...int) (pool *UnboundedWorkerPool)

func (*Group) Group

func (g *Group) Group(name string) (pool *Group, exists bool)

func (*Group) Name

func (g *Group) Name() (name string)

func (*Group) Pool

func (g *Group) Pool(name string) (pool *UnboundedWorkerPool, exists bool)

func (*Group) Pools

func (g *Group) Pools() (pools map[string]*UnboundedWorkerPool)

func (*Group) Root

func (g *Group) Root() *Group

func (*Group) Shutdown

func (g *Group) Shutdown()

func (*Group) String

func (g *Group) String() (humanReadable string)

func (*Group) Wait

func (g *Group) Wait()

func (*Group) WaitAll

func (g *Group) WaitAll()

type NonBlockingQueuedWorkerPool

type NonBlockingQueuedWorkerPool struct {
	// contains filtered or unexported fields
}

NonBlockingQueuedWorkerPool implements a non-blocking goroutine pool backed by a queue.

func NewNonBlockingQueuedWorkerPool

func NewNonBlockingQueuedWorkerPool(workerFunc func(Task), optionalOptions ...Option) (result *NonBlockingQueuedWorkerPool)

NewNonBlockingQueuedWorkerPool creates and starts a new worker pool for the supplied function, with the supplied options.

func (*NonBlockingQueuedWorkerPool) GetPendingQueueSize

func (wp *NonBlockingQueuedWorkerPool) GetPendingQueueSize() int

GetPendingQueueSize gets the current amount of pending tasks in the queue.

func (*NonBlockingQueuedWorkerPool) GetWorkerCount

func (wp *NonBlockingQueuedWorkerPool) GetWorkerCount() int

GetWorkerCount gets the configured worker count.

func (*NonBlockingQueuedWorkerPool) Stop

func (wp *NonBlockingQueuedWorkerPool) Stop()

Stop closes this pool. If FlushTasksAtShutdown was set, it allows currently running and pending tasks to complete.

func (*NonBlockingQueuedWorkerPool) StopAndWait

func (wp *NonBlockingQueuedWorkerPool) StopAndWait()

StopAndWait closes the pool and waits for tasks to complete.

func (*NonBlockingQueuedWorkerPool) Submit

func (wp *NonBlockingQueuedWorkerPool) Submit(params ...interface{}) (chan interface{}, bool)

Submit is an alias for TrySubmit.

func (*NonBlockingQueuedWorkerPool) TrySubmit

func (wp *NonBlockingQueuedWorkerPool) TrySubmit(params ...interface{}) (result chan interface{}, added bool)

TrySubmit submits a BlockingQueueWorkerPoolTask to this pool (it drops the BlockingQueueWorkerPoolTask if not enough workers are available and the queue is full). It returns a channel to obtain the BlockingQueueWorkerPoolTask result, and a boolean if the BlockingQueueWorkerPoolTask was successfully submitted to the queue.

type Option

type Option func(*Options)

func FlushTasksAtShutdown

func FlushTasksAtShutdown(flush bool) Option

func QueueSize

func QueueSize(queueSize int) Option

func WithAlias

func WithAlias(alias string) Option

func WorkerCount

func WorkerCount(workerCount int) Option

type Options

type Options struct {
	Alias                string
	WorkerCount          int
	QueueSize            int
	FlushTasksAtShutdown bool
}

func (Options) Override

func (options Options) Override(optionalOptions ...Option) *Options

type Task

type Task struct {
	// contains filtered or unexported fields
}

func (*Task) Param

func (task *Task) Param(index int) interface{}

func (*Task) Return

func (task *Task) Return(result interface{})

type UnboundedWorkerPool

type UnboundedWorkerPool struct {
	Name                string
	PendingTasksCounter *syncutils.Counter
	Queue               *syncutils.Stack[*WorkerPoolTask]
	ShutdownComplete    sync.WaitGroup
	// contains filtered or unexported fields
}

func NewUnboundedWorkerPool

func NewUnboundedWorkerPool(name string, optsWorkerCount ...int) (newUnboundedWorkerPool *UnboundedWorkerPool)

func (*UnboundedWorkerPool) IsRunning

func (u *UnboundedWorkerPool) IsRunning() (isRunning bool)

func (*UnboundedWorkerPool) Shutdown

func (u *UnboundedWorkerPool) Shutdown(cancelPendingTasks ...bool) (self *UnboundedWorkerPool)

func (*UnboundedWorkerPool) Start

func (u *UnboundedWorkerPool) Start() (self *UnboundedWorkerPool)

func (*UnboundedWorkerPool) Submit

func (u *UnboundedWorkerPool) Submit(task func(), optStackTrace ...string)

func (*UnboundedWorkerPool) WorkerCount

func (u *UnboundedWorkerPool) WorkerCount() (workerCount int)

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func New

func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool)

func (*WorkerPool) GetPendingQueueSize

func (wp *WorkerPool) GetPendingQueueSize() int

func (*WorkerPool) GetWorkerCount

func (wp *WorkerPool) GetWorkerCount() int

func (*WorkerPool) Run

func (wp *WorkerPool) Run()

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

func (*WorkerPool) StopAndWait

func (wp *WorkerPool) StopAndWait()

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}, added bool)

func (*WorkerPool) TrySubmit

func (wp *WorkerPool) TrySubmit(params ...interface{}) (result chan interface{}, added bool)

type WorkerPoolTask

type WorkerPoolTask struct {
	// contains filtered or unexported fields
}

WorkerPoolTask is a task that is executed by a BlockingQueuedWorkerPool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL