workerpool

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2021 License: Apache-2.0 Imports: 5 Imported by: 5

README

worker-pool

High performance, thread-safe, extendable worker pool.

Usage

package main

import (
   "log"
   "runtime"
   
   workerpool "github.com/linxGnu/gumble/worker-pool"
)

// task closure
func moduloTask(ctx context.Context, a, b, N uint) *workerpool.Task {
	return workerpool.NewTask(ctx, func(ctx context.Context) (interface{}, error) {
		return modulo(a, b, N), nil
	})
}

// calculate a^b MODULO N
func modulo(a, b uint, N uint) uint {
	switch b {
	case 0:
		return 1 % N
	case 1:
		return a % N
	default:
		if b&1 == 0 {
			t := modulo(a, b>>1, N)
			return uint(uint64(t) * uint64(t) % uint64(N))
		} else {
			t := modulo(a, b>>1, N)
			t = uint(uint64(t) * uint64(t) % uint64(N))
			return uint(uint64(a) * uint64(t) % uint64(N))
		}
	}
}

func main() {
    pool := workerpool.NewPool(nil, workerpool.Option{NumberWorker: runtime.NumCPU()})
	pool.Start()

	// Calculate (1^1 + 2^2 + 3^3 + ... + 1000000^1000000) modulo 1234567
	tasks := make([]*workerpool.Task, 0, 1000000)
	for i := 1; i <= 1000000; i++ {
		task := moduloTask(context.Background(), uint(i), uint(i), 1234567)
		pool.Do(task)
		tasks = append(tasks, task)
	}

	// collect task results
	var s1, s2 uint
	for i := range tasks {
		if result := <-tasks[i].Result(); result.Err != nil {
			log.Fatal(result.Err)
		} else {
			s1 = uint((uint64(s1) + uint64(result.Result.(uint))) % 1234567)
		}
	}

	// sequential computation
	for i := 1; i <= 1000000; i++ {
		s2 = uint((uint64(s2) + uint64(modulo(uint(i), uint(i), 1234567))) % 1234567)
	}
	if s1 != s2 {
		log.Fatal(s1, s2)
    }
    
	pool.Stop()    
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option struct {
	// NumberWorker number of workers.
	// Default: runtime.NumCPU()
	NumberWorker int `yaml:"number_worker" json:"number_worker"`
	// ExpandableLimit limits number of workers to be expanded on demand.
	// Default: 0 (no expandable)
	ExpandableLimit int32 `yaml:"expandable_limit" json:"expandable_limit"`
	// ExpandedLifetime represents lifetime of expanded worker (in nanoseconds)/
	// Default: 1 minute.
	ExpandedLifetime time.Duration `yaml:"expanded_lifetime" json:"expanded_lifetime"`
}

Option represents pool option.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a lightweight worker pool with capable of auto-expand on demand.

func NewPool

func NewPool(ctx context.Context, opt Option) (p *Pool)

NewPool create new worker pool

func (*Pool) Do

func (p *Pool) Do(t *Task)

Do a task.

func (*Pool) Execute

func (p *Pool) Execute(exec func(context.Context) (interface{}, error)) (t *Task)

Execute a task.

func (*Pool) ExecuteWithCtx

func (p *Pool) ExecuteWithCtx(ctx context.Context, exec func(context.Context) (interface{}, error)) (t *Task)

ExecuteWithCtx a task with custom context.

func (*Pool) Start

func (p *Pool) Start()

Start workers.

func (*Pool) Stop

func (p *Pool) Stop()

Stop worker. Wait all task done.

func (*Pool) TryDo

func (p *Pool) TryDo(t *Task) (addedToQueue bool)

TryDo try to execute a task. If task queue is full, returns immediately and addedToQueue is false.

func (*Pool) TryExecute

func (p *Pool) TryExecute(exec func(context.Context) (interface{}, error)) (t *Task, addedToQueue bool)

TryExecute try to execute a task. If task queue is full, returns immediately and addedToQueue is false.

func (*Pool) TryExecuteWithCtx

func (p *Pool) TryExecuteWithCtx(ctx context.Context, exec func(context.Context) (interface{}, error)) (t *Task, addedToQueue bool)

TryExecuteWithCtx try to execute a task with custom context. If task queue is full, returns immediately and addedToQueue is false.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represent a task.

func NewTask

func NewTask(ctx context.Context, executor func(context.Context) (interface{}, error)) *Task

NewTask create new task.

func (*Task) Execute

func (t *Task) Execute()

Execute task.

func (*Task) Result

func (t *Task) Result() <-chan *TaskResult

Result pushed via channel

type TaskResult

type TaskResult struct {
	Result interface{}
	Err    error
}

TaskResult represent result of task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL