pool

package module
v0.0.0-...-47aab7a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2022 License: Apache-2.0 Imports: 3 Imported by: 3

Documentation

Overview

Package pool implements a pool of workers for simultaneous execution of different tasks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrResponseClosed = fmt.Errorf("response channel closed")
)

Functions

This section is empty.

Types

type JobRequest

type JobRequest[T any] struct {
	Request  chan NonBlockingRunner[T]
	Response chan JobResponse[T]
	// contains filtered or unexported fields
}

JobRequest keeps necessary channels for requesting the execution of a task in a worker. Use it to send a task to the worker and get the response. You have to use JobRequest[T] Close() function after the task execution (or if you want to skip the task execution) to ensure that the worker is free again.

func NewJobRequest

func NewJobRequest[T any]() *JobRequest[T]

NewJobRequest creates a new JobRequest[T] struct which is used to interact with a worker.

func (*JobRequest[T]) Close

func (j *JobRequest[T]) Close()

Close closes the request and response channels of the JobRequest[T] struct. Use it when you want to finish an interaction with the worker and make it available to get another task.

func (*JobRequest[T]) SendResponse

func (j *JobRequest[T]) SendResponse(resp JobResponse[T]) error

SendResponse sends a response to the JobRequest[T] consumer.

type JobResponse

type JobResponse[T any] struct {
	Value T
	Err   error
}

JobResponse keeps a response from a task sent to a worker.

type NonBlocking

type NonBlocking[T any] struct {
	// contains filtered or unexported fields
}

NonBlocking carries a worker tasks channel, a wait group, and other values.

func NewNonBlocking

func NewNonBlocking[T any](workersCnt int) *NonBlocking[T]

NewNonBlocking creates a new worker pool.

func (*NonBlocking[T]) RequestChan

func (p *NonBlocking[T]) RequestChan() chan *JobRequest[T]

RequestChan returns a request channel for executing a task in a worker. You will need to retrieve a JobRequest[T] struct from the channel for requesting the execution of a task in a worker. Usage example: Gets a channel to get a free worker requests := workers.RequestChan() Retrieves a first free worker for a task execution req := <-requests Sends a task to the worker then worker starts the task execution req.Request <- task{param1, param2} Waits until the task execution is finished and retrieves a response struct resp := <-req.Response After the task is finished, the worker is free and returns to the worker pool, waiting for another task.

func (*NonBlocking[T]) Run

func (p *NonBlocking[T]) Run(ctx context.Context)

Run starts workers in the pool.

func (*NonBlocking[T]) Stop

func (p *NonBlocking[T]) Stop()

Stop stops workers in the pool.

type NonBlockingRunner

type NonBlockingRunner[T any] interface {
	Job(ctx context.Context) JobResponse[T]
}

NonBlockingRunner is an interface for a task that can be executed in non-blocking worker pool.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool carries a worker tasks channel, a wait group, and other values.

func New

func New(workersCnt int) *Pool

New creates a new worker pool.

func (*Pool) Execute

func (p *Pool) Execute(task Runner)

Execute adds a new task in the tasks queue of a worker pool.

func (*Pool) Run

func (p *Pool) Run(ctx context.Context)

Run starts workers in the pool.

func (*Pool) Stop

func (p *Pool) Stop()

Stop stops workers in the pool.

type Runner

type Runner interface {
	Job(ctx context.Context)
}

Runner is an interface for a task that can be executed in worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL