queues

package
v1.26.0-120.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2024 License: MIT Imports: 45 Imported by: 0

Documentation

Overview

Package queues is a generated GoMock package.

Package queues is a generated GoMock package.

Package queues is a generated GoMock package.

Package queues is a generated GoMock package.

Index

Constants

View Source
const (
	DefaultReaderId = common.DefaultQueueReaderID
)

Variables

View Source
var (
	ErrSendTaskToDLQ      = errors.New("failed to send task to DLQ")
	ErrCreateDLQ          = errors.New("failed to create DLQ")
	ErrGetClusterMetadata = errors.New("failed to get cluster metadata")
)
View Source
var (
	ErrTerminalTaskFailure = errors.New("original task failed and this task is now to send the original to the DLQ")
)
View Source
var (
	NoopReaderCompletionFn = func(_ int64) {}
)

Functions

func EstimateTaskMetricTag added in v1.23.0

func EstimateTaskMetricTag(
	e Executable,
	namespaceRegistry namespace.Registry,
	currentClusterName string,
) []metrics.Tag

func FromPersistenceAndPredicate added in v1.17.3

func FromPersistenceAndPredicate(
	attributes *persistencespb.AndPredicateAttributes,
) tasks.Predicate

func FromPersistenceDestinationPredicate added in v1.24.0

func FromPersistenceDestinationPredicate(
	attributes *persistencespb.DestinationPredicateAttributes,
) tasks.Predicate

func FromPersistenceEmptyPredicate added in v1.17.3

func FromPersistenceEmptyPredicate(
	_ *persistencespb.EmptyPredicateAttributes,
) tasks.Predicate

func FromPersistenceNamespaceIDPredicate added in v1.17.3

func FromPersistenceNamespaceIDPredicate(
	attributes *persistencespb.NamespaceIdPredicateAttributes,
) tasks.Predicate

func FromPersistenceNotPredicate added in v1.17.3

func FromPersistenceNotPredicate(
	attributes *persistencespb.NotPredicateAttributes,
) tasks.Predicate

func FromPersistenceOrPredicate added in v1.17.3

func FromPersistenceOrPredicate(
	attributes *persistencespb.OrPredicateAttributes,
) tasks.Predicate

func FromPersistenceOutboundTaskGroupPredicate added in v1.25.0

func FromPersistenceOutboundTaskGroupPredicate(
	attributes *persistencespb.OutboundTaskGroupPredicateAttributes,
) tasks.Predicate

func FromPersistenceOutboundTaskPredicate added in v1.25.0

func FromPersistenceOutboundTaskPredicate(
	attributes *persistencespb.OutboundTaskPredicateAttributes,
) tasks.Predicate

func FromPersistencePredicate added in v1.17.3

func FromPersistencePredicate(
	predicate *persistencespb.Predicate,
) tasks.Predicate

func FromPersistenceQueueState added in v1.17.3

func FromPersistenceQueueState(
	state *persistencespb.QueueState,
) *queueState

func FromPersistenceTaskKey added in v1.17.3

func FromPersistenceTaskKey(
	key *persistencespb.TaskKey,
) tasks.Key

func FromPersistenceTaskTypePredicate added in v1.17.3

func FromPersistenceTaskTypePredicate(
	attributes *persistencespb.TaskTypePredicateAttributes,
) tasks.Predicate

func FromPersistenceUniversalPredicate added in v1.17.3

func FromPersistenceUniversalPredicate(
	_ *persistencespb.UniversalPredicateAttributes,
) tasks.Predicate

func GetActiveTimerTaskTypeTagValue added in v1.17.0

func GetActiveTimerTaskTypeTagValue(
	executable Executable,
) string

func GetActiveTransferTaskTypeTagValue added in v1.17.0

func GetActiveTransferTaskTypeTagValue(
	task tasks.Task,
) string

func GetArchivalTaskTypeTagValue added in v1.19.0

func GetArchivalTaskTypeTagValue(
	task tasks.Task,
) string

func GetOutboundTaskTypeTagValue added in v1.25.0

func GetOutboundTaskTypeTagValue(task tasks.Task, isActive bool) string

func GetStandbyTimerTaskTypeTagValue added in v1.17.0

func GetStandbyTimerTaskTypeTagValue(
	task tasks.Task,
) string

func GetStandbyTransferTaskTypeTagValue added in v1.17.0

func GetStandbyTransferTaskTypeTagValue(
	task tasks.Task,
) string

func GetTimerStateMachineTaskTypeTagValue added in v1.25.0

func GetTimerStateMachineTaskTypeTagValue(taskType string, isActive bool) string

func GetVisibilityTaskTypeTagValue added in v1.17.0

func GetVisibilityTaskTypeTagValue(
	task tasks.Task,
) string

func IsTaskAcked added in v1.18.0

func IsTaskAcked(
	task tasks.Task,
	persistenceQueueState *persistencespb.QueueState,
) bool

func IsTimeExpired added in v1.18.0

func IsTimeExpired(
	referenceTime time.Time,
	testingTime time.Time,
) bool

IsTimeExpired checks if the testing time is equal or before the reference time. The precision of the comparison is millisecond.

func NewExecutableFactory added in v1.23.0

func NewExecutableFactory(
	executor Executor,
	scheduler Scheduler,
	rescheduler Rescheduler,
	priorityAssigner PriorityAssigner,
	timeSource clock.TimeSource,
	namespaceRegistry namespace.Registry,
	clusterMetadata cluster.Metadata,
	logger log.Logger,
	metricsHandler metrics.Handler,
	dlqWriter *DLQWriter,
	dlqEnabled dynamicconfig.BoolPropertyFn,
	attemptsBeforeSendingToDlq dynamicconfig.IntPropertyFn,
	dlqInternalErrors dynamicconfig.BoolPropertyFn,
	dlqErrorPattern dynamicconfig.StringPropertyFn,
) *executableFactoryImpl

func NewImmediateQueue added in v1.17.3

func NewImmediateQueue(
	shard hshard.Context,
	category tasks.Category,
	scheduler Scheduler,
	rescheduler Rescheduler,
	options *Options,
	hostRateLimiter quotas.RequestRateLimiter,
	grouper Grouper,
	logger log.Logger,
	metricsHandler metrics.Handler,
	factory ExecutableFactory,
) *immediateQueue

func NewRandomKey added in v1.17.0

func NewRandomKey() tasks.Key

func NewRandomKeyInRange added in v1.17.0

func NewRandomKeyInRange(
	r Range,
) tasks.Key

func NewReaderPriorityRateLimiter added in v1.18.0

func NewReaderPriorityRateLimiter(
	rateFn quotas.RateFn,
	maxReaders int64,
) quotas.RequestRateLimiter

func NewRescheduler added in v1.17.0

func NewRescheduler(
	scheduler Scheduler,
	timeSource clock.TimeSource,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *reschedulerImpl

func NewScheduledQueue added in v1.17.3

func NewScheduledQueue(
	shard hshard.Context,
	category tasks.Category,
	scheduler Scheduler,
	rescheduler Rescheduler,
	executableFactory ExecutableFactory,
	options *Options,
	hostRateLimiter quotas.RequestRateLimiter,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *scheduledQueue

func ToPersistenceAndPredicate added in v1.17.3

func ToPersistenceAndPredicate(
	andPredicate *predicates.AndImpl[tasks.Task],
) *persistencespb.Predicate

func ToPersistenceDestinationPredicate added in v1.24.0

func ToPersistenceDestinationPredicate(
	taskDestinationPredicate *tasks.DestinationPredicate,
) *persistencespb.Predicate

func ToPersistenceEmptyPredicate added in v1.17.3

func ToPersistenceEmptyPredicate(
	_ *predicates.EmptyImpl[tasks.Task],
) *persistencespb.Predicate

func ToPersistenceNamespaceIDPredicate added in v1.17.3

func ToPersistenceNamespaceIDPredicate(
	namespaceIDPredicate *tasks.NamespacePredicate,
) *persistencespb.Predicate

func ToPersistenceNotPredicate added in v1.17.3

func ToPersistenceNotPredicate(
	notPredicate *predicates.NotImpl[tasks.Task],
) *persistencespb.Predicate

func ToPersistenceOrPredicate added in v1.17.3

func ToPersistenceOrPredicate(
	orPredicate *predicates.OrImpl[tasks.Task],
) *persistencespb.Predicate

func ToPersistenceOutboundTaskGroupPredicate added in v1.25.0

func ToPersistenceOutboundTaskGroupPredicate(
	pred *tasks.OutboundTaskGroupPredicate,
) *persistencespb.Predicate

func ToPersistenceOutboundTaskPredicate added in v1.25.0

func ToPersistenceOutboundTaskPredicate(
	pred *tasks.OutboundTaskPredicate,
) *persistencespb.Predicate

func ToPersistencePredicate added in v1.17.3

func ToPersistencePredicate(
	predicate tasks.Predicate,
) *persistencespb.Predicate

func ToPersistenceQueueState added in v1.17.3

func ToPersistenceQueueState(
	queueState *queueState,
) *persistencespb.QueueState

func ToPersistenceRange added in v1.17.3

func ToPersistenceRange(
	r Range,
) *persistencespb.QueueSliceRange

func ToPersistenceScope added in v1.17.3

func ToPersistenceScope(
	scope Scope,
) *persistencespb.QueueSliceScope

func ToPersistenceTaskKey added in v1.17.3

func ToPersistenceTaskKey(
	key tasks.Key,
) *persistencespb.TaskKey

func ToPersistenceTaskTypePredicate added in v1.17.3

func ToPersistenceTaskTypePredicate(
	taskTypePredicate *tasks.TypePredicate,
) *persistencespb.Predicate

func ToPersistenceUniversalPredicate added in v1.17.3

func ToPersistenceUniversalPredicate(
	_ *predicates.UniversalImpl[tasks.Task],
) *persistencespb.Predicate

Types

type Action added in v1.18.0

type Action interface {
	Name() string
	Run(*ReaderGroup)
}

Action is a set of operations that can be run on a ReaderGroup. It is created and run by Mitigator upon receiving an Alert.

type Alert added in v1.18.0

type Alert struct {
	AlertType                            AlertType
	AlertAttributesQueuePendingTaskCount *AlertAttributesQueuePendingTaskCount
	AlertAttributesReaderStuck           *AlertAttributesReaderStuck
	AlertAttributesSliceCount            *AlertAttributesSlicesCount
}

Alert is created by a Monitor when some statistics of the Queue is abnormal

type AlertAttributesQueuePendingTaskCount added in v1.18.0

type AlertAttributesQueuePendingTaskCount struct {
	CurrentPendingTaskCount   int
	CiriticalPendingTaskCount int
}

type AlertAttributesReaderStuck added in v1.18.0

type AlertAttributesReaderStuck struct {
	ReaderID         int64
	CurrentWatermark tasks.Key
}

type AlertAttributesSlicesCount added in v1.18.0

type AlertAttributesSlicesCount struct {
	CurrentSliceCount  int
	CriticalSliceCount int
}

type AlertType added in v1.18.0

type AlertType int
const (
	AlertTypeUnspecified AlertType = iota
	AlertTypeQueuePendingTaskCount
	AlertTypeReaderStuck
	AlertTypeSliceCount
)

type ChannelWeightFn added in v1.18.0

type ChannelWeightFn = tasks.ChannelWeightFn[TaskChannelKey]

type CircuitBreakerExecutable added in v1.24.0

type CircuitBreakerExecutable struct {
	Executable
	// contains filtered or unexported fields
}

CircuitBreakerExecutable wraps Executable with a circuit breaker. If the executable returns DestinationDownError, it will signal the circuit breaker of failure, and return the inner error.

func NewCircuitBreakerExecutable added in v1.24.0

func NewCircuitBreakerExecutable(
	e Executable,
	cb circuitbreaker.TwoStepCircuitBreaker,
	metricsHandler metrics.Handler,
) *CircuitBreakerExecutable

func (*CircuitBreakerExecutable) Execute added in v1.24.0

func (e *CircuitBreakerExecutable) Execute() error

This is roughly the same implementation of the `gobreaker.CircuitBreaker.Execute` function, but checks if the error is `DestinationDownError` to report success, and unwrap it.

type CommonSchedulerWrapper added in v1.24.0

type CommonSchedulerWrapper struct {
	tasks.Scheduler[Executable]
	TaskKeyFn func(e Executable) TaskChannelKey
}

CommonSchedulerWrapper is an adapter that converts a common [task.Scheduler] to a Scheduler with an injectable TaskChannelKeyFn.

func (*CommonSchedulerWrapper) TaskChannelKeyFn added in v1.24.0

func (s *CommonSchedulerWrapper) TaskChannelKeyFn() TaskChannelKeyFn

type DLQWriter added in v1.23.0

type DLQWriter struct {
	// contains filtered or unexported fields
}

DLQWriter can be used to write tasks to the DLQ.

func NewDLQWriter added in v1.23.0

NewDLQWriter returns a DLQ which will write to the given QueueWriter.

func (*DLQWriter) WriteTaskToDLQ added in v1.23.0

func (q *DLQWriter) WriteTaskToDLQ(ctx context.Context, sourceCluster, targetCluster string, task tasks.Task) error

WriteTaskToDLQ writes a task to the DLQ, creating the underlying queue if it doesn't already exist.

type DestinationDownError added in v1.24.0

type DestinationDownError struct {
	Message string
	// contains filtered or unexported fields
}

DestinationDownError indicates the destination is down and wraps another error. It is a useful specific error that can be used, for example, in a circuit breaker to distinguish when a destination service is down and an internal error.

func NewDestinationDownError added in v1.24.0

func NewDestinationDownError(msg string, err error) *DestinationDownError

func (*DestinationDownError) Error added in v1.24.0

func (e *DestinationDownError) Error() string

func (*DestinationDownError) Unwrap added in v1.24.0

func (e *DestinationDownError) Unwrap() error

type Executable added in v1.17.0

type Executable interface {
	ctasks.Task
	tasks.Task

	Attempt() int
	GetTask() tasks.Task
	GetPriority() ctasks.Priority
	GetScheduledTime() time.Time
	SetScheduledTime(time.Time)
}

func NewExecutable added in v1.17.0

func NewExecutable(
	readerID int64,
	task tasks.Task,
	executor Executor,
	scheduler Scheduler,
	rescheduler Rescheduler,
	priorityAssigner PriorityAssigner,
	timeSource clock.TimeSource,
	namespaceRegistry namespace.Registry,
	clusterMetadata cluster.Metadata,
	logger log.Logger,
	metricsHandler metrics.Handler,
	opts ...ExecutableOption,
) Executable

type ExecutableFactory added in v1.23.0

type ExecutableFactory interface {
	NewExecutable(task tasks.Task, readerID int64) Executable
}

ExecutableFactory and the related interfaces here are needed to make it possible to extend the functionality of task processing without changing its core logic. This is similar to ExecutorWrapper.

type ExecutableFactoryFn added in v1.23.0

type ExecutableFactoryFn func(readerID int64, t tasks.Task) Executable

ExecutableFactoryFn is a convenience type to avoid having to create a struct that implements ExecutableFactory.

func (ExecutableFactoryFn) NewExecutable added in v1.23.0

func (f ExecutableFactoryFn) NewExecutable(task tasks.Task, readerID int64) Executable

type ExecutableOption added in v1.23.0

type ExecutableOption func(*ExecutableParams)

type ExecutableParams added in v1.23.0

type ExecutableParams struct {
	DLQEnabled                 dynamicconfig.BoolPropertyFn
	DLQWriter                  *DLQWriter
	MaxUnexpectedErrorAttempts dynamicconfig.IntPropertyFn
	DLQInternalErrors          dynamicconfig.BoolPropertyFn
	DLQErrorPattern            dynamicconfig.StringPropertyFn
}

type ExecuteResponse added in v1.23.0

type ExecuteResponse struct {
	// Following two fields are metadata of the execution
	// and should be populated by the executor even
	// when the actual task execution fails
	ExecutionMetricTags []metrics.Tag
	ExecutedAsActive    bool

	ExecutionErr error
}

type Executor added in v1.17.0

type Executor interface {
	Execute(context.Context, Executable) ExecuteResponse
}

func NewActiveStandbyExecutor added in v1.22.0

func NewActiveStandbyExecutor(
	currentClusterName string,
	registry namespace.Registry,
	activeExecutor Executor,
	standbyExecutor Executor,
	logger log.Logger,
) Executor

type ExecutorWrapper added in v1.22.0

type ExecutorWrapper interface {
	Wrap(delegate Executor) Executor
}

type Grouper added in v1.24.0

type Grouper interface {
	// Key returns the group key for a given task.
	Key(task tasks.Task) (key any)
	// Predicate constructs a prdicate from a slice of keys.
	Predicate(keys []any) tasks.Predicate
}

Grouper groups tasks and constructs predicates for those groups.

type GrouperNamespaceID added in v1.24.0

type GrouperNamespaceID struct {
}

func (GrouperNamespaceID) Key added in v1.24.0

func (GrouperNamespaceID) Key(task tasks.Task) (key any)

func (GrouperNamespaceID) Predicate added in v1.24.0

func (GrouperNamespaceID) Predicate(keys []any) tasks.Predicate

type GrouperStateMachineNamespaceIDAndDestination added in v1.24.0

type GrouperStateMachineNamespaceIDAndDestination struct {
}

func (GrouperStateMachineNamespaceIDAndDestination) Key added in v1.24.0

func (GrouperStateMachineNamespaceIDAndDestination) KeyTyped added in v1.24.0

func (GrouperStateMachineNamespaceIDAndDestination) Predicate added in v1.24.0

type Iterator added in v1.17.0

type Iterator interface {
	collection.Iterator[tasks.Task]

	Range() Range
	CanSplit(tasks.Key) bool
	Split(key tasks.Key) (left Iterator, right Iterator)
	CanMerge(Iterator) bool
	Merge(Iterator) Iterator
	Remaining() Iterator
}

type IteratorImpl added in v1.17.0

type IteratorImpl struct {
	// contains filtered or unexported fields
}

func NewIterator added in v1.17.0

func NewIterator(
	paginationFnProvider PaginationFnProvider,
	r Range,
) *IteratorImpl

func (*IteratorImpl) CanMerge added in v1.17.0

func (i *IteratorImpl) CanMerge(iter Iterator) bool

func (*IteratorImpl) CanSplit added in v1.17.0

func (i *IteratorImpl) CanSplit(key tasks.Key) bool

func (*IteratorImpl) HasNext added in v1.17.0

func (i *IteratorImpl) HasNext() bool

func (*IteratorImpl) Merge added in v1.17.0

func (i *IteratorImpl) Merge(iter Iterator) Iterator

func (*IteratorImpl) Next added in v1.17.0

func (i *IteratorImpl) Next() (tasks.Task, error)

func (*IteratorImpl) Range added in v1.17.0

func (i *IteratorImpl) Range() Range

func (*IteratorImpl) Remaining added in v1.17.0

func (i *IteratorImpl) Remaining() Iterator

func (*IteratorImpl) Split added in v1.17.0

func (i *IteratorImpl) Split(key tasks.Key) (left Iterator, right Iterator)

type MaybeTerminalTaskError added in v1.24.0

type MaybeTerminalTaskError interface {
	IsTerminalTaskError() bool
}

MaybeTerminalTaskError are errors which (if IsTerminalTaskError returns true) cannot be retried and should not be rescheduled. Tasks should be enqueued to the DLQ immediately if an error is marked as terminal.

type Mitigator added in v1.18.0

type Mitigator interface {
	Mitigate(Alert)
}

Mitigator generates and runs an Action for resolving the given Alert

type MockExecutable added in v1.17.0

type MockExecutable struct {
	// contains filtered or unexported fields
}

MockExecutable is a mock of Executable interface.

func NewMockExecutable added in v1.17.0

func NewMockExecutable(ctrl *gomock.Controller) *MockExecutable

NewMockExecutable creates a new mock instance.

func (*MockExecutable) Abort added in v1.21.0

func (m *MockExecutable) Abort()

Abort mocks base method.

func (*MockExecutable) Ack added in v1.17.0

func (m *MockExecutable) Ack()

Ack mocks base method.

func (*MockExecutable) Attempt added in v1.17.0

func (m *MockExecutable) Attempt() int

Attempt mocks base method.

func (*MockExecutable) Cancel added in v1.17.3

func (m *MockExecutable) Cancel()

Cancel mocks base method.

func (*MockExecutable) EXPECT added in v1.17.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockExecutable) Execute added in v1.17.0

func (m *MockExecutable) Execute() error

Execute mocks base method.

func (*MockExecutable) GetCategory added in v1.17.0

func (m *MockExecutable) GetCategory() tasks0.Category

GetCategory mocks base method.

func (*MockExecutable) GetKey added in v1.17.0

func (m *MockExecutable) GetKey() tasks0.Key

GetKey mocks base method.

func (*MockExecutable) GetNamespaceID added in v1.17.0

func (m *MockExecutable) GetNamespaceID() string

GetNamespaceID mocks base method.

func (*MockExecutable) GetPriority added in v1.17.0

func (m *MockExecutable) GetPriority() tasks.Priority

GetPriority mocks base method.

func (*MockExecutable) GetRunID added in v1.17.0

func (m *MockExecutable) GetRunID() string

GetRunID mocks base method.

func (*MockExecutable) GetScheduledTime added in v1.18.0

func (m *MockExecutable) GetScheduledTime() time.Time

GetScheduledTime mocks base method.

func (*MockExecutable) GetTask added in v1.17.0

func (m *MockExecutable) GetTask() tasks0.Task

GetTask mocks base method.

func (*MockExecutable) GetTaskID added in v1.17.0

func (m *MockExecutable) GetTaskID() int64

GetTaskID mocks base method.

func (*MockExecutable) GetType added in v1.17.0

func (m *MockExecutable) GetType() v1.TaskType

GetType mocks base method.

func (*MockExecutable) GetVisibilityTime added in v1.17.0

func (m *MockExecutable) GetVisibilityTime() time.Time

GetVisibilityTime mocks base method.

func (*MockExecutable) GetWorkflowID added in v1.17.0

func (m *MockExecutable) GetWorkflowID() string

GetWorkflowID mocks base method.

func (*MockExecutable) HandleErr added in v1.17.0

func (m *MockExecutable) HandleErr(err error) error

HandleErr mocks base method.

func (*MockExecutable) IsRetryableError added in v1.17.0

func (m *MockExecutable) IsRetryableError(err error) bool

IsRetryableError mocks base method.

func (*MockExecutable) Nack added in v1.17.0

func (m *MockExecutable) Nack(err error)

Nack mocks base method.

func (*MockExecutable) Reschedule added in v1.17.0

func (m *MockExecutable) Reschedule()

Reschedule mocks base method.

func (*MockExecutable) RetryPolicy added in v1.17.0

func (m *MockExecutable) RetryPolicy() backoff.RetryPolicy

RetryPolicy mocks base method.

func (*MockExecutable) SetScheduledTime added in v1.18.0

func (m *MockExecutable) SetScheduledTime(arg0 time.Time)

SetScheduledTime mocks base method.

func (*MockExecutable) SetTaskID added in v1.17.0

func (m *MockExecutable) SetTaskID(id int64)

SetTaskID mocks base method.

func (*MockExecutable) SetVisibilityTime added in v1.17.0

func (m *MockExecutable) SetVisibilityTime(timestamp time.Time)

SetVisibilityTime mocks base method.

func (*MockExecutable) State added in v1.17.0

func (m *MockExecutable) State() tasks.State

State mocks base method.

type MockExecutableMockRecorder added in v1.17.0

type MockExecutableMockRecorder struct {
	// contains filtered or unexported fields
}

MockExecutableMockRecorder is the mock recorder for MockExecutable.

func (*MockExecutableMockRecorder) Abort added in v1.21.0

func (mr *MockExecutableMockRecorder) Abort() *gomock.Call

Abort indicates an expected call of Abort.

func (*MockExecutableMockRecorder) Ack added in v1.17.0

Ack indicates an expected call of Ack.

func (*MockExecutableMockRecorder) Attempt added in v1.17.0

func (mr *MockExecutableMockRecorder) Attempt() *gomock.Call

Attempt indicates an expected call of Attempt.

func (*MockExecutableMockRecorder) Cancel added in v1.17.3

func (mr *MockExecutableMockRecorder) Cancel() *gomock.Call

Cancel indicates an expected call of Cancel.

func (*MockExecutableMockRecorder) Execute added in v1.17.0

func (mr *MockExecutableMockRecorder) Execute() *gomock.Call

Execute indicates an expected call of Execute.

func (*MockExecutableMockRecorder) GetCategory added in v1.17.0

func (mr *MockExecutableMockRecorder) GetCategory() *gomock.Call

GetCategory indicates an expected call of GetCategory.

func (*MockExecutableMockRecorder) GetKey added in v1.17.0

func (mr *MockExecutableMockRecorder) GetKey() *gomock.Call

GetKey indicates an expected call of GetKey.

func (*MockExecutableMockRecorder) GetNamespaceID added in v1.17.0

func (mr *MockExecutableMockRecorder) GetNamespaceID() *gomock.Call

GetNamespaceID indicates an expected call of GetNamespaceID.

func (*MockExecutableMockRecorder) GetPriority added in v1.17.0

func (mr *MockExecutableMockRecorder) GetPriority() *gomock.Call

GetPriority indicates an expected call of GetPriority.

func (*MockExecutableMockRecorder) GetRunID added in v1.17.0

func (mr *MockExecutableMockRecorder) GetRunID() *gomock.Call

GetRunID indicates an expected call of GetRunID.

func (*MockExecutableMockRecorder) GetScheduledTime added in v1.18.0

func (mr *MockExecutableMockRecorder) GetScheduledTime() *gomock.Call

GetScheduledTime indicates an expected call of GetScheduledTime.

func (*MockExecutableMockRecorder) GetTask added in v1.17.0

func (mr *MockExecutableMockRecorder) GetTask() *gomock.Call

GetTask indicates an expected call of GetTask.

func (*MockExecutableMockRecorder) GetTaskID added in v1.17.0

func (mr *MockExecutableMockRecorder) GetTaskID() *gomock.Call

GetTaskID indicates an expected call of GetTaskID.

func (*MockExecutableMockRecorder) GetType added in v1.17.0

func (mr *MockExecutableMockRecorder) GetType() *gomock.Call

GetType indicates an expected call of GetType.

func (*MockExecutableMockRecorder) GetVisibilityTime added in v1.17.0

func (mr *MockExecutableMockRecorder) GetVisibilityTime() *gomock.Call

GetVisibilityTime indicates an expected call of GetVisibilityTime.

func (*MockExecutableMockRecorder) GetWorkflowID added in v1.17.0

func (mr *MockExecutableMockRecorder) GetWorkflowID() *gomock.Call

GetWorkflowID indicates an expected call of GetWorkflowID.

func (*MockExecutableMockRecorder) HandleErr added in v1.17.0

func (mr *MockExecutableMockRecorder) HandleErr(err interface{}) *gomock.Call

HandleErr indicates an expected call of HandleErr.

func (*MockExecutableMockRecorder) IsRetryableError added in v1.17.0

func (mr *MockExecutableMockRecorder) IsRetryableError(err interface{}) *gomock.Call

IsRetryableError indicates an expected call of IsRetryableError.

func (*MockExecutableMockRecorder) Nack added in v1.17.0

func (mr *MockExecutableMockRecorder) Nack(err interface{}) *gomock.Call

Nack indicates an expected call of Nack.

func (*MockExecutableMockRecorder) Reschedule added in v1.17.0

func (mr *MockExecutableMockRecorder) Reschedule() *gomock.Call

Reschedule indicates an expected call of Reschedule.

func (*MockExecutableMockRecorder) RetryPolicy added in v1.17.0

func (mr *MockExecutableMockRecorder) RetryPolicy() *gomock.Call

RetryPolicy indicates an expected call of RetryPolicy.

func (*MockExecutableMockRecorder) SetScheduledTime added in v1.18.0

func (mr *MockExecutableMockRecorder) SetScheduledTime(arg0 interface{}) *gomock.Call

SetScheduledTime indicates an expected call of SetScheduledTime.

func (*MockExecutableMockRecorder) SetTaskID added in v1.17.0

func (mr *MockExecutableMockRecorder) SetTaskID(id interface{}) *gomock.Call

SetTaskID indicates an expected call of SetTaskID.

func (*MockExecutableMockRecorder) SetVisibilityTime added in v1.17.0

func (mr *MockExecutableMockRecorder) SetVisibilityTime(timestamp interface{}) *gomock.Call

SetVisibilityTime indicates an expected call of SetVisibilityTime.

func (*MockExecutableMockRecorder) State added in v1.17.0

func (mr *MockExecutableMockRecorder) State() *gomock.Call

State indicates an expected call of State.

type MockExecutor added in v1.17.0

type MockExecutor struct {
	// contains filtered or unexported fields
}

MockExecutor is a mock of Executor interface.

func NewMockExecutor added in v1.17.0

func NewMockExecutor(ctrl *gomock.Controller) *MockExecutor

NewMockExecutor creates a new mock instance.

func (*MockExecutor) EXPECT added in v1.17.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockExecutor) Execute added in v1.17.0

func (m *MockExecutor) Execute(arg0 context.Context, arg1 Executable) ExecuteResponse

Execute mocks base method.

type MockExecutorMockRecorder added in v1.17.0

type MockExecutorMockRecorder struct {
	// contains filtered or unexported fields
}

MockExecutorMockRecorder is the mock recorder for MockExecutor.

func (*MockExecutorMockRecorder) Execute added in v1.17.0

func (mr *MockExecutorMockRecorder) Execute(arg0, arg1 interface{}) *gomock.Call

Execute indicates an expected call of Execute.

type MockExecutorWrapper added in v1.22.0

type MockExecutorWrapper struct {
	// contains filtered or unexported fields
}

MockExecutorWrapper is a mock of ExecutorWrapper interface.

func NewMockExecutorWrapper added in v1.22.0

func NewMockExecutorWrapper(ctrl *gomock.Controller) *MockExecutorWrapper

NewMockExecutorWrapper creates a new mock instance.

func (*MockExecutorWrapper) EXPECT added in v1.22.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockExecutorWrapper) Wrap added in v1.22.0

func (m *MockExecutorWrapper) Wrap(delegate Executor) Executor

Wrap mocks base method.

type MockExecutorWrapperMockRecorder added in v1.22.0

type MockExecutorWrapperMockRecorder struct {
	// contains filtered or unexported fields
}

MockExecutorWrapperMockRecorder is the mock recorder for MockExecutorWrapper.

func (*MockExecutorWrapperMockRecorder) Wrap added in v1.22.0

func (mr *MockExecutorWrapperMockRecorder) Wrap(delegate interface{}) *gomock.Call

Wrap indicates an expected call of Wrap.

type MockMaybeTerminalTaskError added in v1.24.0

type MockMaybeTerminalTaskError struct {
	// contains filtered or unexported fields
}

MockMaybeTerminalTaskError is a mock of MaybeTerminalTaskError interface.

func NewMockMaybeTerminalTaskError added in v1.24.0

func NewMockMaybeTerminalTaskError(ctrl *gomock.Controller) *MockMaybeTerminalTaskError

NewMockMaybeTerminalTaskError creates a new mock instance.

func (*MockMaybeTerminalTaskError) EXPECT added in v1.24.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockMaybeTerminalTaskError) IsTerminalTaskError added in v1.24.0

func (m *MockMaybeTerminalTaskError) IsTerminalTaskError() bool

IsTerminalTaskError mocks base method.

type MockMaybeTerminalTaskErrorMockRecorder added in v1.24.0

type MockMaybeTerminalTaskErrorMockRecorder struct {
	// contains filtered or unexported fields
}

MockMaybeTerminalTaskErrorMockRecorder is the mock recorder for MockMaybeTerminalTaskError.

func (*MockMaybeTerminalTaskErrorMockRecorder) IsTerminalTaskError added in v1.24.0

func (mr *MockMaybeTerminalTaskErrorMockRecorder) IsTerminalTaskError() *gomock.Call

IsTerminalTaskError indicates an expected call of IsTerminalTaskError.

type MockQueue added in v1.17.3

type MockQueue struct {
	// contains filtered or unexported fields
}

MockQueue is a mock of Queue interface.

func NewMockQueue added in v1.17.3

func NewMockQueue(ctrl *gomock.Controller) *MockQueue

NewMockQueue creates a new mock instance.

func (*MockQueue) Category added in v1.17.3

func (m *MockQueue) Category() tasks.Category

Category mocks base method.

func (*MockQueue) EXPECT added in v1.17.3

func (m *MockQueue) EXPECT() *MockQueueMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockQueue) FailoverNamespace added in v1.17.3

func (m *MockQueue) FailoverNamespace(namespaceID string)

FailoverNamespace mocks base method.

func (*MockQueue) NotifyNewTasks added in v1.17.3

func (m *MockQueue) NotifyNewTasks(tasks []tasks.Task)

NotifyNewTasks mocks base method.

func (*MockQueue) Start added in v1.17.3

func (m *MockQueue) Start()

Start mocks base method.

func (*MockQueue) Stop added in v1.17.3

func (m *MockQueue) Stop()

Stop mocks base method.

type MockQueueMockRecorder added in v1.17.3

type MockQueueMockRecorder struct {
	// contains filtered or unexported fields
}

MockQueueMockRecorder is the mock recorder for MockQueue.

func (*MockQueueMockRecorder) Category added in v1.17.3

func (mr *MockQueueMockRecorder) Category() *gomock.Call

Category indicates an expected call of Category.

func (*MockQueueMockRecorder) FailoverNamespace added in v1.17.3

func (mr *MockQueueMockRecorder) FailoverNamespace(namespaceID interface{}) *gomock.Call

FailoverNamespace indicates an expected call of FailoverNamespace.

func (*MockQueueMockRecorder) NotifyNewTasks added in v1.17.3

func (mr *MockQueueMockRecorder) NotifyNewTasks(tasks interface{}) *gomock.Call

NotifyNewTasks indicates an expected call of NotifyNewTasks.

func (*MockQueueMockRecorder) Start added in v1.17.3

func (mr *MockQueueMockRecorder) Start() *gomock.Call

Start indicates an expected call of Start.

func (*MockQueueMockRecorder) Stop added in v1.17.3

func (mr *MockQueueMockRecorder) Stop() *gomock.Call

Stop indicates an expected call of Stop.

type MockRescheduler added in v1.17.0

type MockRescheduler struct {
	// contains filtered or unexported fields
}

MockRescheduler is a mock of Rescheduler interface.

func NewMockRescheduler added in v1.17.0

func NewMockRescheduler(ctrl *gomock.Controller) *MockRescheduler

NewMockRescheduler creates a new mock instance.

func (*MockRescheduler) Add added in v1.17.0

func (m *MockRescheduler) Add(task Executable, rescheduleTime time.Time)

Add mocks base method.

func (*MockRescheduler) EXPECT added in v1.17.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRescheduler) Len added in v1.17.0

func (m *MockRescheduler) Len() int

Len mocks base method.

func (*MockRescheduler) Reschedule added in v1.17.0

func (m *MockRescheduler) Reschedule(namespaceID string)

Reschedule mocks base method.

func (*MockRescheduler) Start added in v1.17.3

func (m *MockRescheduler) Start()

Start mocks base method.

func (*MockRescheduler) Stop added in v1.17.3

func (m *MockRescheduler) Stop()

Stop mocks base method.

type MockReschedulerMockRecorder added in v1.17.0

type MockReschedulerMockRecorder struct {
	// contains filtered or unexported fields
}

MockReschedulerMockRecorder is the mock recorder for MockRescheduler.

func (*MockReschedulerMockRecorder) Add added in v1.17.0

func (mr *MockReschedulerMockRecorder) Add(task, rescheduleTime interface{}) *gomock.Call

Add indicates an expected call of Add.

func (*MockReschedulerMockRecorder) Len added in v1.17.0

Len indicates an expected call of Len.

func (*MockReschedulerMockRecorder) Reschedule added in v1.17.0

func (mr *MockReschedulerMockRecorder) Reschedule(namespaceID interface{}) *gomock.Call

Reschedule indicates an expected call of Reschedule.

func (*MockReschedulerMockRecorder) Start added in v1.17.3

Start indicates an expected call of Start.

func (*MockReschedulerMockRecorder) Stop added in v1.17.3

Stop indicates an expected call of Stop.

type MockScheduler added in v1.17.0

type MockScheduler struct {
	// contains filtered or unexported fields
}

MockScheduler is a mock of Scheduler interface.

func NewMockScheduler added in v1.17.0

func NewMockScheduler(ctrl *gomock.Controller) *MockScheduler

NewMockScheduler creates a new mock instance.

func (*MockScheduler) EXPECT added in v1.17.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockScheduler) Start added in v1.17.0

func (m *MockScheduler) Start()

Start mocks base method.

func (*MockScheduler) Stop added in v1.17.0

func (m *MockScheduler) Stop()

Stop mocks base method.

func (*MockScheduler) Submit added in v1.17.0

func (m *MockScheduler) Submit(arg0 Executable)

Submit mocks base method.

func (*MockScheduler) TaskChannelKeyFn added in v1.18.0

func (m *MockScheduler) TaskChannelKeyFn() TaskChannelKeyFn

TaskChannelKeyFn mocks base method.

func (*MockScheduler) TrySubmit added in v1.17.0

func (m *MockScheduler) TrySubmit(arg0 Executable) bool

TrySubmit mocks base method.

type MockSchedulerMockRecorder added in v1.17.0

type MockSchedulerMockRecorder struct {
	// contains filtered or unexported fields
}

MockSchedulerMockRecorder is the mock recorder for MockScheduler.

func (*MockSchedulerMockRecorder) Start added in v1.17.0

func (mr *MockSchedulerMockRecorder) Start() *gomock.Call

Start indicates an expected call of Start.

func (*MockSchedulerMockRecorder) Stop added in v1.17.0

Stop indicates an expected call of Stop.

func (*MockSchedulerMockRecorder) Submit added in v1.17.0

func (mr *MockSchedulerMockRecorder) Submit(arg0 interface{}) *gomock.Call

Submit indicates an expected call of Submit.

func (*MockSchedulerMockRecorder) TaskChannelKeyFn added in v1.18.0

func (mr *MockSchedulerMockRecorder) TaskChannelKeyFn() *gomock.Call

TaskChannelKeyFn indicates an expected call of TaskChannelKeyFn.

func (*MockSchedulerMockRecorder) TrySubmit added in v1.17.0

func (mr *MockSchedulerMockRecorder) TrySubmit(arg0 interface{}) *gomock.Call

TrySubmit indicates an expected call of TrySubmit.

type Monitor added in v1.18.0

type Monitor interface {
	GetTotalPendingTaskCount() int
	GetSlicePendingTaskCount(slice Slice) int
	SetSlicePendingTaskCount(slice Slice, count int)

	GetReaderWatermark(readerID int64) (tasks.Key, bool)
	SetReaderWatermark(readerID int64, watermark tasks.Key)

	GetTotalSliceCount() int
	GetSliceCount(readerID int64) int
	SetSliceCount(readerID int64, count int)

	RemoveSlice(slice Slice)
	RemoveReader(readerID int64)

	ResolveAlert(AlertType)
	SilenceAlert(AlertType)
	AlertCh() <-chan *Alert
	Close()
}

Monitor tracks Queue statistics and sends an Alert to the AlertCh if any statistics becomes abnormal

type MonitorOptions added in v1.18.0

type MonitorOptions struct {
	PendingTasksCriticalCount   dynamicconfig.IntPropertyFn
	ReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn
	SliceCountCriticalThreshold dynamicconfig.IntPropertyFn
}

type NamespaceIDAndDestination added in v1.24.0

type NamespaceIDAndDestination struct {
	NamespaceID string
	Destination string
}

NamespaceIDAndDestination is the key for grouping tasks by namespace ID and destination.

type Options added in v1.17.3

type Options struct {
	ReaderOptions
	MonitorOptions

	MaxPollRPS                          dynamicconfig.IntPropertyFn
	MaxPollInterval                     dynamicconfig.DurationPropertyFn
	MaxPollIntervalJitterCoefficient    dynamicconfig.FloatPropertyFn
	CheckpointInterval                  dynamicconfig.DurationPropertyFn
	CheckpointIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
	MaxReaderCount                      dynamicconfig.IntPropertyFn
}

type PaginationFnProvider added in v1.17.3

type PaginationFnProvider func(Range) collection.PaginationFn[tasks.Task]

type PriorityAssigner added in v1.17.0

type PriorityAssigner interface {
	Assign(Executable) tasks.Priority
}

PriorityAssigner assigns priority to task executables

func NewNoopPriorityAssigner added in v1.17.0

func NewNoopPriorityAssigner() PriorityAssigner

func NewPriorityAssigner added in v1.17.0

func NewPriorityAssigner() PriorityAssigner

func NewStaticPriorityAssigner added in v1.23.0

func NewStaticPriorityAssigner(priority tasks.Priority) PriorityAssigner

type Queue added in v1.17.3

type Queue interface {
	Category() tasks.Category
	NotifyNewTasks(tasks []tasks.Task)
	FailoverNamespace(namespaceID string)
	Start()
	Stop()
}

type QueueWriter added in v1.23.0

type QueueWriter interface {
	CreateQueue(
		ctx context.Context,
		request *persistence.CreateQueueRequest,
	) (*persistence.CreateQueueResponse, error)
	EnqueueTask(
		ctx context.Context,
		request *persistence.EnqueueTaskRequest,
	) (*persistence.EnqueueTaskResponse, error)
}

QueueWriter is a subset of persistence.HistoryTaskQueueManager.

type Range added in v1.17.0

type Range struct {
	InclusiveMin tasks.Key
	ExclusiveMax tasks.Key
}

func FromPersistenceRange added in v1.17.3

func FromPersistenceRange(
	r *persistencespb.QueueSliceRange,
) Range

func NewRandomOrderedRangesInRange added in v1.17.3

func NewRandomOrderedRangesInRange(
	r Range,
	numRanges int,
) []Range

func NewRandomRange added in v1.17.0

func NewRandomRange() Range

func NewRange added in v1.17.0

func NewRange(
	inclusiveMin tasks.Key,
	exclusiveMax tasks.Key,
) Range

func (*Range) CanMerge added in v1.17.0

func (r *Range) CanMerge(
	input Range,
) bool

func (*Range) CanSplit added in v1.17.0

func (r *Range) CanSplit(
	key tasks.Key,
) bool

func (*Range) ContainsKey added in v1.17.0

func (r *Range) ContainsKey(
	key tasks.Key,
) bool

func (*Range) ContainsRange added in v1.17.0

func (r *Range) ContainsRange(
	input Range,
) bool

func (*Range) Equals added in v1.17.3

func (r *Range) Equals(
	input Range,
) bool

func (*Range) IsEmpty added in v1.17.0

func (r *Range) IsEmpty() bool

func (*Range) Merge added in v1.17.0

func (r *Range) Merge(
	input Range,
) Range

func (*Range) Split added in v1.17.0

func (r *Range) Split(
	key tasks.Key,
) (left Range, right Range)

type RateLimitedSchedulerOptions added in v1.23.0

type RateLimitedSchedulerOptions struct {
	EnableShadowMode dynamicconfig.BoolPropertyFn
	StartupDelay     dynamicconfig.DurationPropertyFn
}

type Reader added in v1.17.3

type Reader interface {
	Scopes() []Scope

	WalkSlices(SliceIterator)
	SplitSlices(SliceSplitter)
	MergeSlices(...Slice)
	AppendSlices(...Slice)
	ClearSlices(SlicePredicate)
	CompactSlices(SlicePredicate)
	ShrinkSlices() int

	Notify()
	Pause(time.Duration)
	Start()
	Stop()
}

type ReaderCompletionFn added in v1.21.0

type ReaderCompletionFn func(readerID int64)

type ReaderGroup added in v1.17.3

type ReaderGroup struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewReaderGroup added in v1.17.3

func NewReaderGroup(
	initializer ReaderInitializer,
) *ReaderGroup

func (*ReaderGroup) ForEach added in v1.21.0

func (g *ReaderGroup) ForEach(f func(int64, Reader))

func (*ReaderGroup) GetOrCreateReader added in v1.21.0

func (g *ReaderGroup) GetOrCreateReader(readerID int64) Reader

func (*ReaderGroup) NewReader added in v1.17.3

func (g *ReaderGroup) NewReader(readerID int64, slices ...Slice) Reader

func (*ReaderGroup) ReaderByID added in v1.17.3

func (g *ReaderGroup) ReaderByID(readerID int64) (Reader, bool)

func (*ReaderGroup) Readers added in v1.17.3

func (g *ReaderGroup) Readers() map[int64]Reader

func (*ReaderGroup) RemoveReader added in v1.18.0

func (g *ReaderGroup) RemoveReader(readerID int64)

func (*ReaderGroup) Start added in v1.17.3

func (g *ReaderGroup) Start()

func (*ReaderGroup) Stop added in v1.17.3

func (g *ReaderGroup) Stop()

type ReaderImpl added in v1.17.3

type ReaderImpl struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewReader added in v1.17.3

func NewReader(
	readerID int64,
	slices []Slice,
	options *ReaderOptions,
	scheduler Scheduler,
	rescheduler Rescheduler,
	timeSource clock.TimeSource,
	ratelimiter quotas.RequestRateLimiter,
	monitor Monitor,
	completionFn ReaderCompletionFn,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *ReaderImpl

func (*ReaderImpl) AppendSlices added in v1.18.0

func (r *ReaderImpl) AppendSlices(incomingSlices ...Slice)

func (*ReaderImpl) ClearSlices added in v1.17.3

func (r *ReaderImpl) ClearSlices(predicate SlicePredicate)

func (*ReaderImpl) CompactSlices added in v1.18.0

func (r *ReaderImpl) CompactSlices(predicate SlicePredicate)

func (*ReaderImpl) MergeSlices added in v1.17.3

func (r *ReaderImpl) MergeSlices(incomingSlices ...Slice)

func (*ReaderImpl) Notify added in v1.21.0

func (r *ReaderImpl) Notify()

func (*ReaderImpl) Pause added in v1.17.3

func (r *ReaderImpl) Pause(duration time.Duration)

func (*ReaderImpl) Scopes added in v1.17.3

func (r *ReaderImpl) Scopes() []Scope

func (*ReaderImpl) ShrinkSlices added in v1.17.3

func (r *ReaderImpl) ShrinkSlices() int

Shrink all queue slices, returning the number of tasks removed (completed)

func (*ReaderImpl) SplitSlices added in v1.17.3

func (r *ReaderImpl) SplitSlices(splitter SliceSplitter)

func (*ReaderImpl) Start added in v1.17.3

func (r *ReaderImpl) Start()

func (*ReaderImpl) Stop added in v1.17.3

func (r *ReaderImpl) Stop()

func (*ReaderImpl) WalkSlices added in v1.17.3

func (r *ReaderImpl) WalkSlices(iterator SliceIterator)

type ReaderInitializer added in v1.17.3

type ReaderInitializer func(readerID int64, slices []Slice) Reader

type ReaderOptions added in v1.17.3

type ReaderOptions struct {
	BatchSize            dynamicconfig.IntPropertyFn
	MaxPendingTasksCount dynamicconfig.IntPropertyFn
	PollBackoffInterval  dynamicconfig.DurationPropertyFn
}

type Rescheduler added in v1.17.0

type Rescheduler interface {
	// Add task executable to the rescheduler.
	Add(task Executable, rescheduleTime time.Time)

	// Reschedule triggers an immediate reschedule for provided namespace
	// ignoring executable's reschedule time.
	// Used by namespace failover logic
	Reschedule(namespaceID string)

	// Len returns the total number of task executables waiting to be rescheduled.
	Len() int
	Start()
	Stop()
}

Rescheduler buffers task executables that are failed to process and resubmit them to the task scheduler when the Reschedule method is called.

type Scheduler added in v1.17.0

type Scheduler interface {
	Start()
	Stop()
	Submit(Executable)
	TrySubmit(Executable) bool

	TaskChannelKeyFn() TaskChannelKeyFn
}

Scheduler is the component for scheduling and processing task executables, it's based on the common/tasks.Scheduler interface and provide the additional information of how tasks are grouped during scheduling.

func NewRateLimitedScheduler added in v1.23.0

func NewRateLimitedScheduler(
	baseScheduler Scheduler,
	options RateLimitedSchedulerOptions,
	currentClusterName string,
	namespaceRegistry namespace.Registry,
	rateLimiter SchedulerRateLimiter,
	timeSource clock.TimeSource,
	logger log.Logger,
	metricsHandler metrics.Handler,
) Scheduler

func NewScheduler added in v1.17.0

func NewScheduler(
	currentClusterName string,
	options SchedulerOptions,
	namespaceRegistry namespace.Registry,
	logger log.Logger,
) Scheduler

type SchedulerOptions added in v1.17.0

type SchedulerOptions struct {
	WorkerCount             dynamicconfig.TypedSubscribable[int]
	ActiveNamespaceWeights  dynamicconfig.MapPropertyFnWithNamespaceFilter
	StandbyNamespaceWeights dynamicconfig.MapPropertyFnWithNamespaceFilter
}

type SchedulerRateLimiter added in v1.19.0

type SchedulerRateLimiter quotas.RequestRateLimiter

func NewPrioritySchedulerRateLimiter added in v1.23.0

func NewPrioritySchedulerRateLimiter(
	namespaceRateFn quotas.NamespaceRateFn,
	hostRateFn quotas.RateFn,
	persistenceNamespaceRateFn quotas.NamespaceRateFn,
	persistenceHostRateFn quotas.RateFn,
) (SchedulerRateLimiter, error)

type Scope added in v1.17.0

type Scope struct {
	Range     Range
	Predicate tasks.Predicate
}

func FromPersistenceScope added in v1.17.3

func FromPersistenceScope(
	scope *persistencespb.QueueSliceScope,
) Scope

func NewRandomScopes added in v1.17.3

func NewRandomScopes(
	numScopes int,
) []Scope

func NewScope added in v1.17.0

func NewScope(
	r Range,
	predicate tasks.Predicate,
) Scope

func (*Scope) CanMergeByPredicate added in v1.17.0

func (s *Scope) CanMergeByPredicate(
	incomingScope Scope,
) bool

func (*Scope) CanMergeByRange added in v1.17.0

func (s *Scope) CanMergeByRange(
	incomingScope Scope,
) bool

func (*Scope) CanSplitByRange added in v1.17.0

func (s *Scope) CanSplitByRange(
	key tasks.Key,
) bool

func (*Scope) Contains added in v1.17.0

func (s *Scope) Contains(task tasks.Task) bool

func (*Scope) Equals added in v1.17.3

func (s *Scope) Equals(scope Scope) bool

func (*Scope) IsEmpty added in v1.17.3

func (s *Scope) IsEmpty() bool

func (*Scope) MergeByPredicate added in v1.17.0

func (s *Scope) MergeByPredicate(
	incomingScope Scope,
) Scope

func (*Scope) MergeByRange added in v1.17.0

func (s *Scope) MergeByRange(
	incomingScope Scope,
) Scope

func (*Scope) SplitByPredicate added in v1.17.0

func (s *Scope) SplitByPredicate(
	predicate tasks.Predicate,
) (pass Scope, fail Scope)

func (*Scope) SplitByRange added in v1.17.0

func (s *Scope) SplitByRange(
	key tasks.Key,
) (left Scope, right Scope)

type Slice added in v1.17.3

type Slice interface {
	Scope() Scope
	CanSplitByRange(tasks.Key) bool
	SplitByRange(tasks.Key) (left Slice, right Slice)
	SplitByPredicate(tasks.Predicate) (pass Slice, fail Slice)
	CanMergeWithSlice(Slice) bool
	MergeWithSlice(Slice) []Slice
	CompactWithSlice(Slice) Slice
	ShrinkScope() int
	SelectTasks(readerID int64, batchSize int) ([]Executable, error)
	MoreTasks() bool
	TaskStats() TaskStats
	Clear()
}

Slice manages the loading and status tracking of all tasks within its Scope. It also provides methods for splitting or merging with another slice either by range or by predicate.

type SliceImpl added in v1.17.3

type SliceImpl struct {
	// contains filtered or unexported fields
}

func NewSlice added in v1.17.3

func NewSlice(
	paginationFnProvider PaginationFnProvider,
	executableFactory ExecutableFactory,
	monitor Monitor,
	scope Scope,
	grouper Grouper,
) *SliceImpl

func (*SliceImpl) CanMergeWithSlice added in v1.17.3

func (s *SliceImpl) CanMergeWithSlice(slice Slice) bool

func (*SliceImpl) CanSplitByRange added in v1.17.3

func (s *SliceImpl) CanSplitByRange(key tasks.Key) bool

func (*SliceImpl) Clear added in v1.17.3

func (s *SliceImpl) Clear()

func (*SliceImpl) CompactWithSlice added in v1.18.0

func (s *SliceImpl) CompactWithSlice(slice Slice) Slice

func (*SliceImpl) MergeWithSlice added in v1.17.3

func (s *SliceImpl) MergeWithSlice(slice Slice) []Slice

func (*SliceImpl) MoreTasks added in v1.17.3

func (s *SliceImpl) MoreTasks() bool

func (*SliceImpl) Scope added in v1.17.3

func (s *SliceImpl) Scope() Scope

func (*SliceImpl) SelectTasks added in v1.17.3

func (s *SliceImpl) SelectTasks(readerID int64, batchSize int) ([]Executable, error)

func (*SliceImpl) ShrinkScope added in v1.18.0

func (s *SliceImpl) ShrinkScope() int

func (*SliceImpl) SplitByPredicate added in v1.17.3

func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (pass Slice, fail Slice)

func (*SliceImpl) SplitByRange added in v1.17.3

func (s *SliceImpl) SplitByRange(key tasks.Key) (left Slice, right Slice)

func (*SliceImpl) TaskStats added in v1.18.0

func (s *SliceImpl) TaskStats() TaskStats

type SliceIterator added in v1.17.3

type SliceIterator func(s Slice)

type SlicePredicate added in v1.17.3

type SlicePredicate func(s Slice) bool

type SliceSplitter added in v1.17.3

type SliceSplitter func(s Slice) (remaining []Slice, split bool)

type SpeculativeWorkflowTaskTimeoutQueue added in v1.21.0

type SpeculativeWorkflowTaskTimeoutQueue struct {
	// contains filtered or unexported fields
}

func NewSpeculativeWorkflowTaskTimeoutQueue added in v1.21.0

func NewSpeculativeWorkflowTaskTimeoutQueue(
	scheduler ctasks.Scheduler[ctasks.Task],
	priorityAssigner PriorityAssigner,
	executor Executor,
	namespaceRegistry namespace.Registry,
	clusterMetadata cluster.Metadata,
	timeSource clock.TimeSource,
	metricsHandler metrics.Handler,
	logger log.SnTaggedLogger,

) *SpeculativeWorkflowTaskTimeoutQueue

func (SpeculativeWorkflowTaskTimeoutQueue) Category added in v1.21.0

func (SpeculativeWorkflowTaskTimeoutQueue) FailoverNamespace added in v1.21.0

func (q SpeculativeWorkflowTaskTimeoutQueue) FailoverNamespace(_ string)

func (SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks added in v1.21.0

func (q SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks(ts []tasks.Task)

func (SpeculativeWorkflowTaskTimeoutQueue) Start added in v1.21.0

func (SpeculativeWorkflowTaskTimeoutQueue) Stop added in v1.21.0

type TaskChannelKey added in v1.18.0

type TaskChannelKey struct {
	NamespaceID string
	Priority    tasks.Priority
}

type TaskChannelKeyFn added in v1.18.0

type TaskChannelKeyFn = tasks.TaskChannelKeyFn[Executable, TaskChannelKey]

type TaskStats added in v1.18.0

type TaskStats struct {
	PendingPerKey map[any]int
}

type UnprocessableTaskError added in v1.24.0

type UnprocessableTaskError struct {
	Message string
}

UnprocessableTaskError is an indicator that an executor does not know how to handle a task. Considered terminal.

func NewUnprocessableTaskError added in v1.24.0

func NewUnprocessableTaskError(message string) UnprocessableTaskError

NewUnprocessableTaskError returns a new UnprocessableTaskError from given message.

func (UnprocessableTaskError) Error added in v1.24.0

func (e UnprocessableTaskError) Error() string

func (UnprocessableTaskError) IsTerminalTaskError added in v1.24.0

func (UnprocessableTaskError) IsTerminalTaskError() bool

IsTerminalTaskError marks this error as terminal to be handled appropriately.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL