Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrShuttingDownError = errors.New("not processed, execpool service is shutting down")
ErrShuttingDownError is the error returned when a job is not processed because the service is shutting down
Functions ¶
This section is empty.
Types ¶
type BacklogPool ¶
type BacklogPool interface { ExecutionPool EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error BufferSize() (length, capacity int) }
BacklogPool supports all the ExecutionPool functions plus few more that tests the pending tasks.
func MakeBacklog ¶
func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}, profLabels ...string) BacklogPool
MakeBacklog creates a backlog
type BatchProcessor ¶
type BatchProcessor interface { // ProcessBatch processes a batch packed from the stream in the execpool ProcessBatch(jobs []InputJob) // GetErredUnprocessed returns an unprocessed jobs because of an err GetErredUnprocessed(ue InputJob, err error) // Cleanup called on the unprocessed jobs when the service shuts down Cleanup(ue []InputJob, err error) }
BatchProcessor is the interface of the functions needed to prepare a batch from the stream, process and return the results
type ExecFunc ¶
type ExecFunc func(interface{}) interface{}
A ExecFunc is a unit of work to be executed by a Pool goroutine.
Note that a ExecFunc will occupy a Pool goroutine, so do not schedule tasks that spend an excessive amount of time waiting.
type ExecutionPool ¶
type ExecutionPool interface { Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{}, i Priority, out chan interface{}) error GetOwner() interface{} Shutdown() GetParallelism() int }
ExecutionPool interface exposes the core functionality of the execution pool.
func MakePool ¶
func MakePool(owner interface{}, profLabels ...string) ExecutionPool
MakePool creates a pool.
type Priority ¶
type Priority uint8
A Priority specifies a hint to the Pool to execute a Task at some priority.
Tasks with higher Priority values will tend to finish more quickly.
If there are tasks with different priorities, a worker will pick the highest-priority task to execute next.
type StreamToBatch ¶
type StreamToBatch struct {
// contains filtered or unexported fields
}
StreamToBatch makes batches from incoming stream of jobs, and submits the batches to the exec pool
func MakeStreamToBatch ¶
func MakeStreamToBatch(inputChan <-chan InputJob, execPool BacklogPool, batchProcessor BatchProcessor) *StreamToBatch
MakeStreamToBatch creates a new stream to batch converter
func (*StreamToBatch) Start ¶
func (sv *StreamToBatch) Start(ctx context.Context)
Start is called when the StreamToBatch is created and whenever it needs to restart after the ctx is canceled
func (*StreamToBatch) WaitForStop ¶
func (sv *StreamToBatch) WaitForStop()
WaitForStop waits until the batching loop terminates after the ctx is canceled