Documentation ¶
Overview ¶
Package tasks is a generated GoMock package.
Package tasks is a generated GoMock package.
Index ¶
- Constants
- Variables
- type ChannelWeightFn
- type DynamicWorkerPoolLimiter
- type DynamicWorkerPoolScheduler
- type FIFOScheduler
- type FIFOSchedulerOptions
- type GroupByScheduler
- type GroupBySchedulerOptions
- type InterleavedWeightedRoundRobinScheduler
- type InterleavedWeightedRoundRobinSchedulerOptions
- type MetricTagsFn
- type MockRunnable
- type MockRunnableMockRecorder
- type MockScheduler
- type MockSchedulerMockRecorder
- type MockTask
- func (m *MockTask) Abort()
- func (m *MockTask) Ack()
- func (m *MockTask) Cancel()
- func (m *MockTask) EXPECT() *MockTaskMockRecorder
- func (m *MockTask) Execute() error
- func (m *MockTask) HandleErr(err error) error
- func (m *MockTask) IsRetryableError(err error) bool
- func (m *MockTask) Nack(err error)
- func (m *MockTask) Reschedule()
- func (m *MockTask) RetryPolicy() backoff.RetryPolicy
- func (m *MockTask) State() State
- type MockTaskMockRecorder
- func (mr *MockTaskMockRecorder) Abort() *gomock.Call
- func (mr *MockTaskMockRecorder) Ack() *gomock.Call
- func (mr *MockTaskMockRecorder) Cancel() *gomock.Call
- func (mr *MockTaskMockRecorder) Execute() *gomock.Call
- func (mr *MockTaskMockRecorder) HandleErr(err interface{}) *gomock.Call
- func (mr *MockTaskMockRecorder) IsRetryableError(err interface{}) *gomock.Call
- func (mr *MockTaskMockRecorder) Nack(err interface{}) *gomock.Call
- func (mr *MockTaskMockRecorder) Reschedule() *gomock.Call
- func (mr *MockTaskMockRecorder) RetryPolicy() *gomock.Call
- func (mr *MockTaskMockRecorder) State() *gomock.Call
- type Priority
- type QuotaRequestFn
- type RateLimitedScheduler
- type RateLimitedSchedulerOptions
- type RateLimitedTaskRunnable
- type Runnable
- type RunnableScheduler
- type RunnableTask
- type Scheduler
- type SequentialScheduler
- type SequentialSchedulerOptions
- type SequentialTaskQueue
- type SequentialTaskQueueFactory
- type State
- type Task
- type TaskChannelKeyFn
- type WeightedChannel
- type WeightedChannels
Constants ¶
const (
WeightedChannelDefaultSize = 1000
)
Variables ¶
var ( PriorityHigh = getPriority(highPriorityClass, mediumPrioritySubclass) PriorityLow = getPriority(lowPriorityClass, mediumPrioritySubclass) )
var ( PriorityName = map[Priority]string{ PriorityHigh: "high", PriorityLow: "low", } PriorityValue = map[string]Priority{ "high": PriorityHigh, "low": PriorityLow, } )
Functions ¶
This section is empty.
Types ¶
type ChannelWeightFn ¶ added in v1.18.0
type ChannelWeightFn[K comparable] func(K) int
ChannelWeightFn is the function for mapping a task channel (key) to its weight
type DynamicWorkerPoolLimiter ¶ added in v1.24.0
type DynamicWorkerPoolLimiter interface { // Dynamic concurrency limiter. Evaluated at submit time. Concurrency() int // Dynamic buffer size limiter. Evaluated at submit time. BufferSize() int }
DynamicWorkerPoolLimiter provides dynamic limiters for DynamicWorkerPoolScheduler.
type DynamicWorkerPoolScheduler ¶ added in v1.24.0
type DynamicWorkerPoolScheduler struct {
// contains filtered or unexported fields
}
DynamicWorkerPoolScheduler manages a pool of worker goroutines to execute Runnable instances. It limits the number of concurrently running workers and buffers tasks when that limit is reached. It limits the buffer size and rejects tasks when that limit is reached. New workers are created on-demand. Workers check for more tasks in the buffer after completing a task. If no tasks are available, the worker stops. The pool can be stopped, which aborts all buffered tasks.
func NewDynamicWorkerPoolScheduler ¶ added in v1.24.0
func NewDynamicWorkerPoolScheduler( limiter DynamicWorkerPoolLimiter, metricsHandler metrics.Handler, ) *DynamicWorkerPoolScheduler
NewDynamicWorkerPoolScheduler creates a DynamicWorkerPoolScheduler with the given limiter.
func (*DynamicWorkerPoolScheduler) InitiateShutdown ¶ added in v1.24.0
func (pool *DynamicWorkerPoolScheduler) InitiateShutdown()
InitiateShutdown aborts all buffered tasks and empties the buffer.
func (*DynamicWorkerPoolScheduler) TrySubmit ¶ added in v1.24.0
func (pool *DynamicWorkerPoolScheduler) TrySubmit(task Runnable) bool
func (*DynamicWorkerPoolScheduler) WaitShutdown ¶ added in v1.24.0
func (pool *DynamicWorkerPoolScheduler) WaitShutdown()
WaitShutdown waits for all worker goroutines to complete.
type FIFOScheduler ¶ added in v1.18.0
type FIFOScheduler[T Task] struct { // contains filtered or unexported fields }
func NewFIFOScheduler ¶ added in v1.18.0
func NewFIFOScheduler[T Task]( options *FIFOSchedulerOptions, logger log.Logger, ) *FIFOScheduler[T]
NewFIFOScheduler creates a new FIFOScheduler
func (*FIFOScheduler[T]) Start ¶ added in v1.18.0
func (f *FIFOScheduler[T]) Start()
func (*FIFOScheduler[T]) Stop ¶ added in v1.18.0
func (f *FIFOScheduler[T]) Stop()
func (*FIFOScheduler[T]) Submit ¶ added in v1.18.0
func (f *FIFOScheduler[T]) Submit(task T)
func (*FIFOScheduler[T]) TrySubmit ¶ added in v1.18.0
func (f *FIFOScheduler[T]) TrySubmit(task T) bool
type FIFOSchedulerOptions ¶ added in v1.18.0
type FIFOSchedulerOptions struct { QueueSize int WorkerCount dynamicconfig.IntPropertyFn }
FIFOSchedulerOptions is the configs for FIFOScheduler
type GroupByScheduler ¶ added in v1.24.0
type GroupByScheduler[K comparable, T Task] struct { // contains filtered or unexported fields }
GroupByScheduler groups tasks based on a provided key function and submits that task for processing on a dedicated scheduler for that group.
func NewGroupByScheduler ¶ added in v1.24.0
func NewGroupByScheduler[K comparable, T Task](options GroupBySchedulerOptions[K, T]) *GroupByScheduler[K, T]
NewGroupByScheduler creates a new GroupByScheduler from given options.
func (*GroupByScheduler[K, T]) Start ¶ added in v1.24.0
func (*GroupByScheduler[K, T]) Start()
func (*GroupByScheduler[K, T]) Stop ¶ added in v1.24.0
func (s *GroupByScheduler[K, T]) Stop()
Stop signals running tasks to stop, aborts any pending tasks and waits up to a minute for all running tasks to complete.
func (*GroupByScheduler[K, T]) Submit ¶ added in v1.24.0
func (s *GroupByScheduler[K, T]) Submit(task T)
func (*GroupByScheduler[K, T]) TrySubmit ¶ added in v1.24.0
func (s *GroupByScheduler[K, T]) TrySubmit(task T) bool
TrySubmit submits a task for processing. If called after the scheduler is shut down, the task will be accepted and aborted.
type GroupBySchedulerOptions ¶ added in v1.24.0
type GroupBySchedulerOptions[K comparable, T Task] struct { Logger log.Logger // A function to determine the group of a task. KeyFn func(T) K // Factory for creating a runnable from a task. RunnableFactory func(T) Runnable // When a new group is encountered, use this function to create a scheduler for that group. SchedulerFactory func(K) RunnableScheduler }
GroupBySchedulerOptions are options for creating a GroupByScheduler.
type InterleavedWeightedRoundRobinScheduler ¶ added in v1.14.0
type InterleavedWeightedRoundRobinScheduler[T Task, K comparable] struct { sync.RWMutex // contains filtered or unexported fields }
InterleavedWeightedRoundRobinScheduler is a round robin scheduler implementation ref: https://en.wikipedia.org/wiki/Weighted_round_robin#Interleaved_WRR
func NewInterleavedWeightedRoundRobinScheduler ¶ added in v1.14.0
func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable]( options InterleavedWeightedRoundRobinSchedulerOptions[T, K], fifoScheduler Scheduler[T], logger log.Logger, ) *InterleavedWeightedRoundRobinScheduler[T, K]
func (*InterleavedWeightedRoundRobinScheduler[T, K]) Start ¶ added in v1.14.0
func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Start()
func (*InterleavedWeightedRoundRobinScheduler[T, K]) Stop ¶ added in v1.14.0
func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Stop()
type InterleavedWeightedRoundRobinSchedulerOptions ¶ added in v1.14.0
type InterleavedWeightedRoundRobinSchedulerOptions[T Task, K comparable] struct { // Required for mapping a task to it's corresponding task channel TaskChannelKeyFn TaskChannelKeyFn[T, K] // Required for getting the weight for a task channel ChannelWeightFn ChannelWeightFn[K] // Optional, if specified, re-evaluate task channel weight when channel is not empty ChannelWeightUpdateCh chan struct{} }
InterleavedWeightedRoundRobinSchedulerOptions is the config for interleaved weighted round robin scheduler
type MetricTagsFn ¶ added in v1.23.0
type MockRunnable ¶ added in v1.24.0
type MockRunnable struct {
// contains filtered or unexported fields
}
MockRunnable is a mock of Runnable interface.
func NewMockRunnable ¶ added in v1.24.0
func NewMockRunnable(ctrl *gomock.Controller) *MockRunnable
NewMockRunnable creates a new mock instance.
func (*MockRunnable) Abort ¶ added in v1.24.0
func (m *MockRunnable) Abort()
Abort mocks base method.
func (*MockRunnable) EXPECT ¶ added in v1.24.0
func (m *MockRunnable) EXPECT() *MockRunnableMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockRunnable) Run ¶ added in v1.24.0
func (m *MockRunnable) Run(arg0 context.Context)
Run mocks base method.
type MockRunnableMockRecorder ¶ added in v1.24.0
type MockRunnableMockRecorder struct {
// contains filtered or unexported fields
}
MockRunnableMockRecorder is the mock recorder for MockRunnable.
func (*MockRunnableMockRecorder) Abort ¶ added in v1.24.0
func (mr *MockRunnableMockRecorder) Abort() *gomock.Call
Abort indicates an expected call of Abort.
func (*MockRunnableMockRecorder) Run ¶ added in v1.24.0
func (mr *MockRunnableMockRecorder) Run(arg0 interface{}) *gomock.Call
Run indicates an expected call of Run.
type MockScheduler ¶ added in v1.14.0
type MockScheduler[T Task] struct { // contains filtered or unexported fields }
MockScheduler is a mock of Scheduler interface.
func NewMockScheduler ¶ added in v1.14.0
func NewMockScheduler[T Task](ctrl *gomock.Controller) *MockScheduler[T]
NewMockScheduler creates a new mock instance.
func (*MockScheduler[T]) EXPECT ¶ added in v1.14.0
func (m *MockScheduler[T]) EXPECT() *MockSchedulerMockRecorder[T]
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockScheduler[T]) Start ¶ added in v1.14.0
func (m *MockScheduler[T]) Start()
Start mocks base method.
func (*MockScheduler[T]) Stop ¶ added in v1.14.0
func (m *MockScheduler[T]) Stop()
Stop mocks base method.
func (*MockScheduler[T]) Submit ¶ added in v1.14.0
func (m *MockScheduler[T]) Submit(task T)
Submit mocks base method.
func (*MockScheduler[T]) TrySubmit ¶ added in v1.17.0
func (m *MockScheduler[T]) TrySubmit(task T) bool
TrySubmit mocks base method.
type MockSchedulerMockRecorder ¶ added in v1.14.0
type MockSchedulerMockRecorder[T Task] struct { // contains filtered or unexported fields }
MockSchedulerMockRecorder is the mock recorder for MockScheduler.
func (*MockSchedulerMockRecorder[T]) Start ¶ added in v1.14.0
func (mr *MockSchedulerMockRecorder[T]) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockSchedulerMockRecorder[T]) Stop ¶ added in v1.14.0
func (mr *MockSchedulerMockRecorder[T]) Stop() *gomock.Call
Stop indicates an expected call of Stop.
func (*MockSchedulerMockRecorder[T]) Submit ¶ added in v1.14.0
func (mr *MockSchedulerMockRecorder[T]) Submit(task interface{}) *gomock.Call
Submit indicates an expected call of Submit.
func (*MockSchedulerMockRecorder[T]) TrySubmit ¶ added in v1.17.0
func (mr *MockSchedulerMockRecorder[T]) TrySubmit(task interface{}) *gomock.Call
TrySubmit indicates an expected call of TrySubmit.
type MockTask ¶ added in v1.14.0
type MockTask struct {
// contains filtered or unexported fields
}
MockTask is a mock of Task interface.
func NewMockTask ¶ added in v1.14.0
func NewMockTask(ctrl *gomock.Controller) *MockTask
NewMockTask creates a new mock instance.
func (*MockTask) EXPECT ¶ added in v1.14.0
func (m *MockTask) EXPECT() *MockTaskMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockTask) IsRetryableError ¶ added in v1.14.0
IsRetryableError mocks base method.
func (*MockTask) Reschedule ¶ added in v1.14.0
func (m *MockTask) Reschedule()
Reschedule mocks base method.
func (*MockTask) RetryPolicy ¶ added in v1.14.0
func (m *MockTask) RetryPolicy() backoff.RetryPolicy
RetryPolicy mocks base method.
type MockTaskMockRecorder ¶ added in v1.14.0
type MockTaskMockRecorder struct {
// contains filtered or unexported fields
}
MockTaskMockRecorder is the mock recorder for MockTask.
func (*MockTaskMockRecorder) Abort ¶ added in v1.21.0
func (mr *MockTaskMockRecorder) Abort() *gomock.Call
Abort indicates an expected call of Abort.
func (*MockTaskMockRecorder) Ack ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) Ack() *gomock.Call
Ack indicates an expected call of Ack.
func (*MockTaskMockRecorder) Cancel ¶ added in v1.17.3
func (mr *MockTaskMockRecorder) Cancel() *gomock.Call
Cancel indicates an expected call of Cancel.
func (*MockTaskMockRecorder) Execute ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) Execute() *gomock.Call
Execute indicates an expected call of Execute.
func (*MockTaskMockRecorder) HandleErr ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) HandleErr(err interface{}) *gomock.Call
HandleErr indicates an expected call of HandleErr.
func (*MockTaskMockRecorder) IsRetryableError ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) IsRetryableError(err interface{}) *gomock.Call
IsRetryableError indicates an expected call of IsRetryableError.
func (*MockTaskMockRecorder) Nack ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) Nack(err interface{}) *gomock.Call
Nack indicates an expected call of Nack.
func (*MockTaskMockRecorder) Reschedule ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) Reschedule() *gomock.Call
Reschedule indicates an expected call of Reschedule.
func (*MockTaskMockRecorder) RetryPolicy ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) RetryPolicy() *gomock.Call
RetryPolicy indicates an expected call of RetryPolicy.
func (*MockTaskMockRecorder) State ¶ added in v1.14.0
func (mr *MockTaskMockRecorder) State() *gomock.Call
State indicates an expected call of State.
type QuotaRequestFn ¶ added in v1.23.0
type RateLimitedScheduler ¶ added in v1.23.0
type RateLimitedScheduler[T Task] struct { // contains filtered or unexported fields }
func NewRateLimitedScheduler ¶ added in v1.23.0
func NewRateLimitedScheduler[T Task]( scheduler Scheduler[T], rateLimiter quotas.RequestRateLimiter, timeSource clock.TimeSource, quotaRequestFn QuotaRequestFn[T], metricTagsFn MetricTagsFn[T], options RateLimitedSchedulerOptions, logger log.Logger, metricsHandler metrics.Handler, ) *RateLimitedScheduler[T]
type RateLimitedSchedulerOptions ¶ added in v1.23.0
type RateLimitedSchedulerOptions struct {
EnableShadowMode bool
}
type RateLimitedTaskRunnable ¶ added in v1.24.0
type RateLimitedTaskRunnable struct { Runnable Limiter quotas.RateLimiter // contains filtered or unexported fields }
RateLimitedTaskRunnable wraps a Runnable with a rate limiter.
func NewRateLimitedTaskRunnableFromTask ¶ added in v1.24.0
func NewRateLimitedTaskRunnableFromTask( task Task, limiter quotas.RateLimiter, metricsHandler metrics.Handler, ) RateLimitedTaskRunnable
NewRateLimitedTaskRunnableFromTask creates a [NewRateLimitedTaskRunnable] from a Task and a [rate.Limiter].
type RunnableScheduler ¶ added in v1.24.0
type RunnableScheduler interface { // InitiateShutdown signals the scheduler to stop without waiting for shutdown to complete. InitiateShutdown() // WaitShutdown waits for the scheduler to complete shutdown. Must be called after InitiateShutdown(). WaitShutdown() // Submit a Runnable for scheduling, if the scheduler is already stopped, the runnable will be aborted. // Returns a boolean indicating whether the task was accepted. TrySubmit(Runnable) bool }
RunnableScheduler is scheduler for Runnable tasks.
type RunnableTask ¶ added in v1.24.0
type RunnableTask struct {
Task
}
RunnableTask turns a Task into a Runnable. Does **not** retry tasks.
func (RunnableTask) Run ¶ added in v1.24.0
func (a RunnableTask) Run(ctx context.Context)
Run the embedded task, handling errors and aborting on context errors.
type Scheduler ¶ added in v1.14.0
Scheduler is the generic interface for scheduling & processing tasks
type SequentialScheduler ¶ added in v1.21.0
type SequentialScheduler[T Task] struct { // contains filtered or unexported fields }
func NewSequentialScheduler ¶ added in v1.21.0
func NewSequentialScheduler[T Task]( options *SequentialSchedulerOptions, taskQueueHashFn collection.HashFunc, taskQueueFactory SequentialTaskQueueFactory[T], logger log.Logger, ) *SequentialScheduler[T]
func (*SequentialScheduler[T]) Start ¶ added in v1.21.0
func (s *SequentialScheduler[T]) Start()
func (*SequentialScheduler[T]) Stop ¶ added in v1.21.0
func (s *SequentialScheduler[T]) Stop()
func (*SequentialScheduler[T]) Submit ¶ added in v1.21.0
func (s *SequentialScheduler[T]) Submit(task T)
func (*SequentialScheduler[T]) TrySubmit ¶ added in v1.21.0
func (s *SequentialScheduler[T]) TrySubmit(task T) bool
type SequentialSchedulerOptions ¶ added in v1.21.0
type SequentialSchedulerOptions struct { QueueSize int WorkerCount dynamicconfig.IntPropertyFn }
type SequentialTaskQueue ¶ added in v1.21.0
type SequentialTaskQueue[T Task] interface { // ID return the ID of the queue, as well as the tasks inside (same) ID() interface{} // Add push a task to the task set Add(T) // Remove pop a task from the task set Remove() T // IsEmpty indicate if the task set is empty IsEmpty() bool // Len return the size of the queue Len() int }
type SequentialTaskQueueFactory ¶ added in v1.21.0
type SequentialTaskQueueFactory[T Task] func(task T) SequentialTaskQueue[T]
type State ¶ added in v1.14.0
type State int
State represents the current state of a task
const ( // TaskStatePending is the state for a task when it's waiting to be processed or currently being processed TaskStatePending State = iota + 1 // TaskStateAborted is the state for a task when its executor shuts down TaskStateAborted // TaskStateCancelled is the state for a task when its execution has request to be cancelled TaskStateCancelled // TaskStateAcked is the state for a task if it has been successfully completed TaskStateAcked // TaskStateNacked is the state for a task if it can not be processed TaskStateNacked )
type Task ¶
type Task interface { // Execute process this task Execute() error // HandleErr handle the error returned by Execute HandleErr(err error) error // IsRetryableError check whether to retry after HandleErr(Execute()) IsRetryableError(err error) bool // RetryPolicy returns the retry policy for task processing RetryPolicy() backoff.RetryPolicy // Abort marks the task as aborted, usually means task executor shutdown Abort() // Cancel marks the task as cancelled, usually by the task submitter Cancel() // Ack marks the task as successful completed Ack() // Nack marks the task as unsuccessful completed Nack(err error) // Reschedule marks the task for retry Reschedule() // State returns the current task state State() State }
Task is the interface for tasks which should be executed sequentially
type TaskChannelKeyFn ¶ added in v1.18.0
type TaskChannelKeyFn[T Task, K comparable] func(T) K
TaskChannelKeyFn is the function for mapping a task to its task channel (key)
type WeightedChannel ¶ added in v1.14.0
type WeightedChannel[T Task] struct { // contains filtered or unexported fields }
func NewWeightedChannel ¶ added in v1.14.0
type WeightedChannels ¶ added in v1.14.0
type WeightedChannels[T Task] []*WeightedChannel[T]
func (WeightedChannels[T]) Len ¶ added in v1.14.0
func (c WeightedChannels[T]) Len() int
func (WeightedChannels[T]) Less ¶ added in v1.14.0
func (c WeightedChannels[T]) Less(i, j int) bool
func (WeightedChannels[T]) Swap ¶ added in v1.14.0
func (c WeightedChannels[T]) Swap(i, j int)