tasks

package
v1.27.0-126.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package tasks is a generated GoMock package.

Package tasks is a generated GoMock package.

Index

Constants

View Source
const (
	WeightedChannelDefaultSize = 1000
)

Variables

View Source
var (
	PriorityHigh = getPriority(highPriorityClass, mediumPrioritySubclass)
	PriorityLow  = getPriority(lowPriorityClass, mediumPrioritySubclass)
)
View Source
var (
	PriorityName = map[Priority]string{
		PriorityHigh: "high",
		PriorityLow:  "low",
	}

	PriorityValue = map[string]Priority{
		"high": PriorityHigh,
		"low":  PriorityLow,
	}
)

Functions

This section is empty.

Types

type ChannelWeightFn added in v1.18.0

type ChannelWeightFn[K comparable] func(K) int

ChannelWeightFn is the function for mapping a task channel (key) to its weight

type DynamicWorkerPoolLimiter added in v1.24.0

type DynamicWorkerPoolLimiter interface {
	// Dynamic concurrency limiter. Evaluated at submit time.
	Concurrency() int
	// Dynamic buffer size limiter. Evaluated at submit time.
	BufferSize() int
}

DynamicWorkerPoolLimiter provides dynamic limiters for DynamicWorkerPoolScheduler.

type DynamicWorkerPoolScheduler added in v1.24.0

type DynamicWorkerPoolScheduler struct {
	// contains filtered or unexported fields
}

DynamicWorkerPoolScheduler manages a pool of worker goroutines to execute Runnable instances. It limits the number of concurrently running workers and buffers tasks when that limit is reached. It limits the buffer size and rejects tasks when that limit is reached. New workers are created on-demand. Workers check for more tasks in the buffer after completing a task. If no tasks are available, the worker stops. The pool can be stopped, which aborts all buffered tasks.

func NewDynamicWorkerPoolScheduler added in v1.24.0

func NewDynamicWorkerPoolScheduler(
	limiter DynamicWorkerPoolLimiter,
	metricsHandler metrics.Handler,
) *DynamicWorkerPoolScheduler

NewDynamicWorkerPoolScheduler creates a DynamicWorkerPoolScheduler with the given limiter.

func (*DynamicWorkerPoolScheduler) InitiateShutdown added in v1.24.0

func (pool *DynamicWorkerPoolScheduler) InitiateShutdown()

InitiateShutdown aborts all buffered tasks and empties the buffer.

func (*DynamicWorkerPoolScheduler) TrySubmit added in v1.24.0

func (pool *DynamicWorkerPoolScheduler) TrySubmit(task Runnable) bool

func (*DynamicWorkerPoolScheduler) WaitShutdown added in v1.24.0

func (pool *DynamicWorkerPoolScheduler) WaitShutdown()

WaitShutdown waits for all worker goroutines to complete.

type FIFOScheduler added in v1.18.0

type FIFOScheduler[T Task] struct {
	// contains filtered or unexported fields
}

func NewFIFOScheduler added in v1.18.0

func NewFIFOScheduler[T Task](
	options *FIFOSchedulerOptions,
	logger log.Logger,
) *FIFOScheduler[T]

NewFIFOScheduler creates a new FIFOScheduler

func (*FIFOScheduler[T]) Start added in v1.18.0

func (f *FIFOScheduler[T]) Start()

func (*FIFOScheduler[T]) Stop added in v1.18.0

func (f *FIFOScheduler[T]) Stop()

func (*FIFOScheduler[T]) Submit added in v1.18.0

func (f *FIFOScheduler[T]) Submit(task T)

func (*FIFOScheduler[T]) TrySubmit added in v1.18.0

func (f *FIFOScheduler[T]) TrySubmit(task T) bool

type FIFOSchedulerOptions added in v1.18.0

type FIFOSchedulerOptions struct {
	QueueSize   int
	WorkerCount dynamicconfig.TypedSubscribable[int]
}

FIFOSchedulerOptions is the configs for FIFOScheduler

type GroupByScheduler added in v1.24.0

type GroupByScheduler[K comparable, T Task] struct {
	// contains filtered or unexported fields
}

GroupByScheduler groups tasks based on a provided key function and submits that task for processing on a dedicated scheduler for that group.

func NewGroupByScheduler added in v1.24.0

func NewGroupByScheduler[K comparable, T Task](options GroupBySchedulerOptions[K, T]) *GroupByScheduler[K, T]

NewGroupByScheduler creates a new GroupByScheduler from given options.

func (*GroupByScheduler[K, T]) Start added in v1.24.0

func (*GroupByScheduler[K, T]) Start()

func (*GroupByScheduler[K, T]) Stop added in v1.24.0

func (s *GroupByScheduler[K, T]) Stop()

Stop signals running tasks to stop, aborts any pending tasks and waits up to a minute for all running tasks to complete.

func (*GroupByScheduler[K, T]) Submit added in v1.24.0

func (s *GroupByScheduler[K, T]) Submit(task T)

func (*GroupByScheduler[K, T]) TrySubmit added in v1.24.0

func (s *GroupByScheduler[K, T]) TrySubmit(task T) bool

TrySubmit submits a task for processing. If called after the scheduler is shut down, the task will be accepted and aborted.

type GroupBySchedulerOptions added in v1.24.0

type GroupBySchedulerOptions[K comparable, T Task] struct {
	Logger log.Logger
	// A function to determine the group of a task.
	KeyFn func(T) K
	// Factory for creating a runnable from a task.
	RunnableFactory func(T) Runnable
	// When a new group is encountered, use this function to create a scheduler for that group.
	SchedulerFactory func(K) RunnableScheduler
}

GroupBySchedulerOptions are options for creating a GroupByScheduler.

type InterleavedWeightedRoundRobinScheduler added in v1.14.0

type InterleavedWeightedRoundRobinScheduler[T Task, K comparable] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InterleavedWeightedRoundRobinScheduler is a round robin scheduler implementation ref: https://en.wikipedia.org/wiki/Weighted_round_robin#Interleaved_WRR

func NewInterleavedWeightedRoundRobinScheduler added in v1.14.0

func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable](
	options InterleavedWeightedRoundRobinSchedulerOptions[T, K],
	fifoScheduler Scheduler[T],
	logger log.Logger,
) *InterleavedWeightedRoundRobinScheduler[T, K]

func (*InterleavedWeightedRoundRobinScheduler[T, K]) Start added in v1.14.0

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Start()

func (*InterleavedWeightedRoundRobinScheduler[T, K]) Stop added in v1.14.0

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Stop()

func (*InterleavedWeightedRoundRobinScheduler[T, K]) Submit added in v1.14.0

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Submit(
	task T,
)

func (*InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit added in v1.17.0

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit(
	task T,
) bool

type InterleavedWeightedRoundRobinSchedulerOptions added in v1.14.0

type InterleavedWeightedRoundRobinSchedulerOptions[T Task, K comparable] struct {
	// Required for mapping a task to it's corresponding task channel
	TaskChannelKeyFn TaskChannelKeyFn[T, K]
	// Required for getting the weight for a task channel
	ChannelWeightFn ChannelWeightFn[K]
	// Optional, if specified, re-evaluate task channel weight when channel is not empty
	ChannelWeightUpdateCh chan struct{}
	// Optional, if specified, delete inactive channels after this duration
	InactiveChannelDeletionDelay dynamicconfig.DurationPropertyFn
}

InterleavedWeightedRoundRobinSchedulerOptions is the config for interleaved weighted round robin scheduler

type MetricTagsFn added in v1.23.0

type MetricTagsFn[T Task] func(T) []metrics.Tag

type MockRunnable added in v1.24.0

type MockRunnable struct {
	// contains filtered or unexported fields
}

MockRunnable is a mock of Runnable interface.

func NewMockRunnable added in v1.24.0

func NewMockRunnable(ctrl *gomock.Controller) *MockRunnable

NewMockRunnable creates a new mock instance.

func (*MockRunnable) Abort added in v1.24.0

func (m *MockRunnable) Abort()

Abort mocks base method.

func (*MockRunnable) EXPECT added in v1.24.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRunnable) Run added in v1.24.0

func (m *MockRunnable) Run(arg0 context.Context)

Run mocks base method.

type MockRunnableMockRecorder added in v1.24.0

type MockRunnableMockRecorder struct {
	// contains filtered or unexported fields
}

MockRunnableMockRecorder is the mock recorder for MockRunnable.

func (*MockRunnableMockRecorder) Abort added in v1.24.0

func (mr *MockRunnableMockRecorder) Abort() *gomock.Call

Abort indicates an expected call of Abort.

func (*MockRunnableMockRecorder) Run added in v1.24.0

func (mr *MockRunnableMockRecorder) Run(arg0 any) *gomock.Call

Run indicates an expected call of Run.

type MockScheduler added in v1.14.0

type MockScheduler[T Task] struct {
	// contains filtered or unexported fields
}

MockScheduler is a mock of Scheduler interface.

func NewMockScheduler added in v1.14.0

func NewMockScheduler[T Task](ctrl *gomock.Controller) *MockScheduler[T]

NewMockScheduler creates a new mock instance.

func (*MockScheduler[T]) EXPECT added in v1.14.0

func (m *MockScheduler[T]) EXPECT() *MockSchedulerMockRecorder[T]

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockScheduler[T]) Start added in v1.14.0

func (m *MockScheduler[T]) Start()

Start mocks base method.

func (*MockScheduler[T]) Stop added in v1.14.0

func (m *MockScheduler[T]) Stop()

Stop mocks base method.

func (*MockScheduler[T]) Submit added in v1.14.0

func (m *MockScheduler[T]) Submit(task T)

Submit mocks base method.

func (*MockScheduler[T]) TrySubmit added in v1.17.0

func (m *MockScheduler[T]) TrySubmit(task T) bool

TrySubmit mocks base method.

type MockSchedulerMockRecorder added in v1.14.0

type MockSchedulerMockRecorder[T Task] struct {
	// contains filtered or unexported fields
}

MockSchedulerMockRecorder is the mock recorder for MockScheduler.

func (*MockSchedulerMockRecorder[T]) Start added in v1.14.0

func (mr *MockSchedulerMockRecorder[T]) Start() *gomock.Call

Start indicates an expected call of Start.

func (*MockSchedulerMockRecorder[T]) Stop added in v1.14.0

func (mr *MockSchedulerMockRecorder[T]) Stop() *gomock.Call

Stop indicates an expected call of Stop.

func (*MockSchedulerMockRecorder[T]) Submit added in v1.14.0

func (mr *MockSchedulerMockRecorder[T]) Submit(task any) *gomock.Call

Submit indicates an expected call of Submit.

func (*MockSchedulerMockRecorder[T]) TrySubmit added in v1.17.0

func (mr *MockSchedulerMockRecorder[T]) TrySubmit(task any) *gomock.Call

TrySubmit indicates an expected call of TrySubmit.

type MockTask added in v1.14.0

type MockTask struct {
	// contains filtered or unexported fields
}

MockTask is a mock of Task interface.

func NewMockTask added in v1.14.0

func NewMockTask(ctrl *gomock.Controller) *MockTask

NewMockTask creates a new mock instance.

func (*MockTask) Abort added in v1.21.0

func (m *MockTask) Abort()

Abort mocks base method.

func (*MockTask) Ack added in v1.14.0

func (m *MockTask) Ack()

Ack mocks base method.

func (*MockTask) Cancel added in v1.17.3

func (m *MockTask) Cancel()

Cancel mocks base method.

func (*MockTask) EXPECT added in v1.14.0

func (m *MockTask) EXPECT() *MockTaskMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockTask) Execute added in v1.14.0

func (m *MockTask) Execute() error

Execute mocks base method.

func (*MockTask) HandleErr added in v1.14.0

func (m *MockTask) HandleErr(err error) error

HandleErr mocks base method.

func (*MockTask) IsRetryableError added in v1.14.0

func (m *MockTask) IsRetryableError(err error) bool

IsRetryableError mocks base method.

func (*MockTask) Nack added in v1.14.0

func (m *MockTask) Nack(err error)

Nack mocks base method.

func (*MockTask) Reschedule added in v1.14.0

func (m *MockTask) Reschedule()

Reschedule mocks base method.

func (*MockTask) RetryPolicy added in v1.14.0

func (m *MockTask) RetryPolicy() backoff.RetryPolicy

RetryPolicy mocks base method.

func (*MockTask) State added in v1.14.0

func (m *MockTask) State() State

State mocks base method.

type MockTaskMockRecorder added in v1.14.0

type MockTaskMockRecorder struct {
	// contains filtered or unexported fields
}

MockTaskMockRecorder is the mock recorder for MockTask.

func (*MockTaskMockRecorder) Abort added in v1.21.0

func (mr *MockTaskMockRecorder) Abort() *gomock.Call

Abort indicates an expected call of Abort.

func (*MockTaskMockRecorder) Ack added in v1.14.0

func (mr *MockTaskMockRecorder) Ack() *gomock.Call

Ack indicates an expected call of Ack.

func (*MockTaskMockRecorder) Cancel added in v1.17.3

func (mr *MockTaskMockRecorder) Cancel() *gomock.Call

Cancel indicates an expected call of Cancel.

func (*MockTaskMockRecorder) Execute added in v1.14.0

func (mr *MockTaskMockRecorder) Execute() *gomock.Call

Execute indicates an expected call of Execute.

func (*MockTaskMockRecorder) HandleErr added in v1.14.0

func (mr *MockTaskMockRecorder) HandleErr(err any) *gomock.Call

HandleErr indicates an expected call of HandleErr.

func (*MockTaskMockRecorder) IsRetryableError added in v1.14.0

func (mr *MockTaskMockRecorder) IsRetryableError(err any) *gomock.Call

IsRetryableError indicates an expected call of IsRetryableError.

func (*MockTaskMockRecorder) Nack added in v1.14.0

func (mr *MockTaskMockRecorder) Nack(err any) *gomock.Call

Nack indicates an expected call of Nack.

func (*MockTaskMockRecorder) Reschedule added in v1.14.0

func (mr *MockTaskMockRecorder) Reschedule() *gomock.Call

Reschedule indicates an expected call of Reschedule.

func (*MockTaskMockRecorder) RetryPolicy added in v1.14.0

func (mr *MockTaskMockRecorder) RetryPolicy() *gomock.Call

RetryPolicy indicates an expected call of RetryPolicy.

func (*MockTaskMockRecorder) State added in v1.14.0

func (mr *MockTaskMockRecorder) State() *gomock.Call

State indicates an expected call of State.

type Priority added in v1.17.0

type Priority int

func (Priority) String added in v1.17.0

func (p Priority) String() string

type QuotaRequestFn added in v1.23.0

type QuotaRequestFn[T Task] func(T) quotas.Request

type RateLimitedScheduler added in v1.23.0

type RateLimitedScheduler[T Task] struct {
	// contains filtered or unexported fields
}

func NewRateLimitedScheduler added in v1.23.0

func NewRateLimitedScheduler[T Task](
	scheduler Scheduler[T],
	rateLimiter quotas.RequestRateLimiter,
	timeSource clock.TimeSource,
	quotaRequestFn QuotaRequestFn[T],
	metricTagsFn MetricTagsFn[T],
	options RateLimitedSchedulerOptions,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *RateLimitedScheduler[T]

func (*RateLimitedScheduler[T]) Start added in v1.23.0

func (s *RateLimitedScheduler[T]) Start()

func (*RateLimitedScheduler[T]) Stop added in v1.23.0

func (s *RateLimitedScheduler[T]) Stop()

func (*RateLimitedScheduler[T]) Submit added in v1.23.0

func (s *RateLimitedScheduler[T]) Submit(task T)

func (*RateLimitedScheduler[T]) TrySubmit added in v1.23.0

func (s *RateLimitedScheduler[T]) TrySubmit(task T) bool

type RateLimitedSchedulerOptions added in v1.23.0

type RateLimitedSchedulerOptions struct {
	EnableShadowMode bool
}

type RateLimitedTaskRunnable added in v1.24.0

type RateLimitedTaskRunnable struct {
	Runnable
	Limiter quotas.RateLimiter
	// contains filtered or unexported fields
}

RateLimitedTaskRunnable wraps a Runnable with a rate limiter.

func NewRateLimitedTaskRunnableFromTask added in v1.24.0

func NewRateLimitedTaskRunnableFromTask(
	task Task,
	limiter quotas.RateLimiter,
	metricsHandler metrics.Handler,
) RateLimitedTaskRunnable

NewRateLimitedTaskRunnableFromTask creates a [NewRateLimitedTaskRunnable] from a Task and a [rate.Limiter].

func (RateLimitedTaskRunnable) Run added in v1.24.0

Run the embedded Runnable, applying the rate limiter.

type Runnable added in v1.24.0

type Runnable interface {
	// Run and handle errors, abort on context error.
	Run(context.Context)
	// Abort marks the task as aborted, usually means task scheduler shutdown.
	Abort()
}

type RunnableScheduler added in v1.24.0

type RunnableScheduler interface {
	// InitiateShutdown signals the scheduler to stop without waiting for shutdown to complete.
	InitiateShutdown()
	// WaitShutdown waits for the scheduler to complete shutdown. Must be called after InitiateShutdown().
	WaitShutdown()
	// Submit a Runnable for scheduling, if the scheduler is already stopped, the runnable will be aborted.
	// Returns a boolean indicating whether the task was accepted.
	TrySubmit(Runnable) bool
}

RunnableScheduler is scheduler for Runnable tasks.

type RunnableTask added in v1.24.0

type RunnableTask struct {
	Task
}

RunnableTask turns a Task into a Runnable. Does **not** retry tasks.

func (RunnableTask) Run added in v1.24.0

func (a RunnableTask) Run(ctx context.Context)

Run the embedded task, handling errors and aborting on context errors.

type Scheduler added in v1.14.0

type Scheduler[T Task] interface {
	Submit(task T)
	TrySubmit(task T) bool
	Start()
	Stop()
}

Scheduler is the generic interface for scheduling & processing tasks

type SequentialScheduler added in v1.21.0

type SequentialScheduler[T Task] struct {
	// contains filtered or unexported fields
}

func NewSequentialScheduler added in v1.21.0

func NewSequentialScheduler[T Task](
	options *SequentialSchedulerOptions,
	taskQueueHashFn collection.HashFunc,
	taskQueueFactory SequentialTaskQueueFactory[T],
	logger log.Logger,
) *SequentialScheduler[T]

func (*SequentialScheduler[T]) Start added in v1.21.0

func (s *SequentialScheduler[T]) Start()

func (*SequentialScheduler[T]) Stop added in v1.21.0

func (s *SequentialScheduler[T]) Stop()

func (*SequentialScheduler[T]) Submit added in v1.21.0

func (s *SequentialScheduler[T]) Submit(task T)

func (*SequentialScheduler[T]) TrySubmit added in v1.21.0

func (s *SequentialScheduler[T]) TrySubmit(task T) bool

type SequentialSchedulerOptions added in v1.21.0

type SequentialSchedulerOptions struct {
	QueueSize   int
	WorkerCount dynamicconfig.TypedSubscribable[int]
}

type SequentialTaskQueue added in v1.21.0

type SequentialTaskQueue[T Task] interface {
	// ID return the ID of the queue, as well as the tasks inside (same)
	ID() interface{}
	// Add push a task to the task set
	Add(T)
	// Remove pop a task from the task set
	Remove() T
	// IsEmpty indicate if the task set is empty
	IsEmpty() bool
	// Len return the size of the queue
	Len() int
}

type SequentialTaskQueueFactory added in v1.21.0

type SequentialTaskQueueFactory[T Task] func(task T) SequentialTaskQueue[T]

type State added in v1.14.0

type State int

State represents the current state of a task

const (
	// TaskStatePending is the state for a task when it's waiting to be processed or currently being processed
	TaskStatePending State = iota + 1
	// TaskStateAborted is the state for a task when its executor shuts down
	TaskStateAborted
	// TaskStateCancelled is the state for a task when its execution has request to be cancelled
	TaskStateCancelled
	// TaskStateAcked is the state for a task if it has been successfully completed
	TaskStateAcked
	// TaskStateNacked is the state for a task if it can not be processed
	TaskStateNacked
)

type Task

type Task interface {
	// Execute process this task
	Execute() error
	// HandleErr handle the error returned by Execute
	HandleErr(err error) error
	// IsRetryableError check whether to retry after HandleErr(Execute())
	IsRetryableError(err error) bool
	// RetryPolicy returns the retry policy for task processing
	RetryPolicy() backoff.RetryPolicy
	// Abort marks the task as aborted, usually means task executor shutdown
	Abort()
	// Cancel marks the task as cancelled, usually by the task submitter
	Cancel()
	// Ack marks the task as successful completed
	Ack()
	// Nack marks the task as unsuccessful completed
	Nack(err error)
	// Reschedule marks the task for retry
	Reschedule()
	// State returns the current task state
	State() State
}

Task is the interface for tasks which should be executed sequentially

type TaskChannelKeyFn added in v1.18.0

type TaskChannelKeyFn[T Task, K comparable] func(T) K

TaskChannelKeyFn is the function for mapping a task to its task channel (key)

type WeightedChannel added in v1.14.0

type WeightedChannel[T Task] struct {
	// contains filtered or unexported fields
}

func NewWeightedChannel added in v1.14.0

func NewWeightedChannel[T Task](
	weight int,
	size int,
	now time.Time,
) *WeightedChannel[T]

func (*WeightedChannel[T]) Cap added in v1.14.0

func (c *WeightedChannel[T]) Cap() int

func (*WeightedChannel[T]) Chan added in v1.14.0

func (c *WeightedChannel[T]) Chan() chan T

func (*WeightedChannel[T]) DecrementRefCount added in v1.26.2

func (c *WeightedChannel[T]) DecrementRefCount()

func (*WeightedChannel[T]) IncrementRefCount added in v1.26.2

func (c *WeightedChannel[T]) IncrementRefCount()

func (*WeightedChannel[T]) LastActiveTime added in v1.26.2

func (c *WeightedChannel[T]) LastActiveTime() time.Time

func (*WeightedChannel[T]) Len added in v1.14.0

func (c *WeightedChannel[T]) Len() int

func (*WeightedChannel[T]) RefCount added in v1.26.2

func (c *WeightedChannel[T]) RefCount() int32

func (*WeightedChannel[T]) SetWeight added in v1.18.0

func (c *WeightedChannel[T]) SetWeight(newWeight int)

func (*WeightedChannel[T]) UpdateLastActiveTime added in v1.26.2

func (c *WeightedChannel[T]) UpdateLastActiveTime(now time.Time)

func (*WeightedChannel[T]) Weight added in v1.14.0

func (c *WeightedChannel[T]) Weight() int

type WeightedChannels added in v1.14.0

type WeightedChannels[T Task] []*WeightedChannel[T]

func (WeightedChannels[T]) Len added in v1.14.0

func (c WeightedChannels[T]) Len() int

func (WeightedChannels[T]) Less added in v1.14.0

func (c WeightedChannels[T]) Less(i, j int) bool

func (WeightedChannels[T]) Swap added in v1.14.0

func (c WeightedChannels[T]) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL