execpool

package
v0.0.0-...-f633857 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2024 License: AGPL-3.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrShuttingDownError = errors.New("not processed, execpool service is shutting down")

ErrShuttingDownError is the error returned when a job is not processed because the service is shutting down

Functions

This section is empty.

Types

type BacklogPool

type BacklogPool interface {
	ExecutionPool
	EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error
	BufferSize() (length, capacity int)
}

BacklogPool supports all the ExecutionPool functions plus few more that tests the pending tasks.

func MakeBacklog

func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}) BacklogPool

MakeBacklog creates a backlog

type BatchProcessor

type BatchProcessor interface {
	// ProcessBatch processes a batch packed from the stream in the execpool
	ProcessBatch(jobs []InputJob)
	// GetErredUnprocessed returns an unprocessed jobs because of an err
	GetErredUnprocessed(ue InputJob, err error)
	// Cleanup called on the unprocessed jobs when the service shuts down
	Cleanup(ue []InputJob, err error)
}

BatchProcessor is the interface of the functions needed to prepare a batch from the stream, process and return the results

type ExecFunc

type ExecFunc func(interface{}) interface{}

A ExecFunc is a unit of work to be executed by a Pool goroutine.

Note that a ExecFunc will occupy a Pool goroutine, so do not schedule tasks that spend an excessive amount of time waiting.

type ExecutionPool

type ExecutionPool interface {
	Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{}, i Priority, out chan interface{}) error
	GetOwner() interface{}
	Shutdown()
	GetParallelism() int
}

ExecutionPool interface exposes the core functionality of the execution pool.

func MakePool

func MakePool(owner interface{}) ExecutionPool

MakePool creates a pool.

type InputJob

type InputJob interface {
	GetNumberOfBatchableItems() (count uint64, err error)
}

InputJob is the interface the incoming jobs need to implement

type Priority

type Priority uint8

A Priority specifies a hint to the Pool to execute a Task at some priority.

Tasks with higher Priority values will tend to finish more quickly.

If there are tasks with different priorities, a worker will pick the highest-priority task to execute next.

const (
	LowPriority Priority = iota
	HighPriority
)

The list of all valid priority values. When adding new ones, add them before numPrios. (i.e. there should be no gaps, and the first priority needs to be zero)

type StreamToBatch

type StreamToBatch struct {
	// contains filtered or unexported fields
}

StreamToBatch makes batches from incoming stream of jobs, and submits the batches to the exec pool

func MakeStreamToBatch

func MakeStreamToBatch(inputChan <-chan InputJob, execPool BacklogPool,
	batchProcessor BatchProcessor) *StreamToBatch

MakeStreamToBatch creates a new stream to batch converter

func (*StreamToBatch) Start

func (sv *StreamToBatch) Start(ctx context.Context)

Start is called when the StreamToBatch is created and whenever it needs to restart after the ctx is canceled

func (*StreamToBatch) WaitForStop

func (sv *StreamToBatch) WaitForStop()

WaitForStop waits until the batching loop terminates afer the ctx is canceled

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL