Documentation ¶
Index ¶
- type ChannelWithResult
- type ConcurrentExecutor
- func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) Execute(concurrency int, payload []TaskType, ...) ([]ResultType, error)
- func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) ExecuteSimple(concurrency int, repeatTimes int, ...) ([]ResultType, error)
- func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) GetErrors() []error
- type ConcurrentExecutorOpt
- func WithContext[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any](context context.Context) ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]
- func WithoutFailFast[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any]() ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]
- type NoTaskType
- type SimpleTaskProcessorFn
- type TaskProcessorFn
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelWithResult ¶
type ChannelWithResult[ResultType any] interface { GetResult() ResultType }
ChannelWithResult is an interface that should be implemented by the result channel
type ConcurrentExecutor ¶
type ConcurrentExecutor[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any] struct { // contains filtered or unexported fields }
ConcurrentExecutor is a utility to execute tasks concurrently
func NewConcurrentExecutor ¶
func NewConcurrentExecutor[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any](logger zerolog.Logger, opts ...ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]) *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]
NewConcurrentExecutor creates a new ConcurrentExecutor
func (*ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) Execute ¶
func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) Execute(concurrency int, payload []TaskType, processorFn TaskProcessorFn[ResultChannelType, TaskType]) ([]ResultType, error)
Execute executes a task that requires a payload. It is executed with given concurrency. The processorFn is the function that processes the task. Executor will attempt to distribute the tasks evenly among the executors.
func (*ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) ExecuteSimple ¶
func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) ExecuteSimple(concurrency int, repeatTimes int, simpleProcessorFn SimpleTaskProcessorFn[ResultChannelType]) ([]ResultType, error)
ExecuteSimple executes a task that doesn't require a payload. It is executed repeatTimes times with given concurrency. The simpleProcessorFn is the function that processes the task. Executor will attempt to distribute the tasks evenly among the executors.
func (*ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) GetErrors ¶
func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) GetErrors() []error
GetErrors returns all errors that occurred during processing
type ConcurrentExecutorOpt ¶
type ConcurrentExecutorOpt[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any] func(c *ConcurrentExecutor[ResultType, ResultChannelType, TaskType])
func WithContext ¶
func WithContext[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any](context context.Context) ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]
/ WithContext sets the context for the executor, if not set it defaults to context.Background()
func WithoutFailFast ¶
func WithoutFailFast[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any]() ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]
WithoutFailFast disables fail fast. Executor will wait for all tasks to finish even if some of them fail.
type NoTaskType ¶
type NoTaskType struct{}
NoTaskType is a dummy type to be used when no task type is needed
type SimpleTaskProcessorFn ¶
type SimpleTaskProcessorFn[ResultChannelType any] func(resultCh chan ResultChannelType, errorCh chan error, executorNum int)
SimpleTaskProcessorFn is a function that processes a task that doesn't require a payload. It should send the result to the resultCh and any error to the errorCh. It should never send to both channels. The executorNum is the index of the executor that is processing the task.
type TaskProcessorFn ¶
type TaskProcessorFn[ResultChannelType, TaskType any] func(resultCh chan ResultChannelType, errorCh chan error, executorNum int, payload TaskType)
TaskProcessorFn is a function that processes a task that requires a payload. It should send the result to the resultCh and any error to the errorCh. It should never send to both channels. The executorNum is the index of the executor that is processing the task. The payload is the task's payload. If task doesn't require one use SimpleTaskProcessorFn instead.