Documentation ¶
Overview ¶
Package queue is a generated GoMock package.
Package queue is a generated GoMock package.
Index ¶
- type Action
- type ActionResult
- type ActionType
- type DomainFilter
- type GetStateActionAttributes
- type GetStateActionResult
- type GetTasksAttributes
- type GetTasksResult
- type LocalTimerGate
- type LocalTimerGateImpl
- type MockProcessingQueue
- func (m *MockProcessingQueue) AddTasks(arg0 map[task.Key]task.Task, arg1 task.Key)
- func (m *MockProcessingQueue) EXPECT() *MockProcessingQueueMockRecorder
- func (m *MockProcessingQueue) GetTask(arg0 task.Key) (task.Task, error)
- func (m *MockProcessingQueue) GetTasks() []task.Task
- func (m *MockProcessingQueue) Merge(arg0 ProcessingQueue) []ProcessingQueue
- func (m *MockProcessingQueue) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
- func (m *MockProcessingQueue) State() ProcessingQueueState
- func (m *MockProcessingQueue) UpdateAckLevel() (task.Key, int)
- type MockProcessingQueueCollection
- func (m *MockProcessingQueueCollection) ActiveQueue() ProcessingQueue
- func (m *MockProcessingQueueCollection) AddTasks(arg0 map[task.Key]task.Task, arg1 task.Key)
- func (m *MockProcessingQueueCollection) EXPECT() *MockProcessingQueueCollectionMockRecorder
- func (m *MockProcessingQueueCollection) GetTask(arg0 task.Key) (task.Task, error)
- func (m *MockProcessingQueueCollection) GetTasks() []task.Task
- func (m *MockProcessingQueueCollection) Level() int
- func (m *MockProcessingQueueCollection) Merge(arg0 []ProcessingQueue)
- func (m *MockProcessingQueueCollection) Queues() []ProcessingQueue
- func (m *MockProcessingQueueCollection) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
- func (m *MockProcessingQueueCollection) UpdateAckLevels() (task.Key, int)
- type MockProcessingQueueCollectionMockRecorder
- func (mr *MockProcessingQueueCollectionMockRecorder) ActiveQueue() *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) GetTask(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) GetTasks() *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Level() *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Merge(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Queues() *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Split(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) UpdateAckLevels() *gomock.Call
- type MockProcessingQueueMockRecorder
- func (mr *MockProcessingQueueMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) GetTask(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) GetTasks() *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) Merge(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) Split(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) State() *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) UpdateAckLevel() *gomock.Call
- type MockProcessingQueueSplitPolicy
- type MockProcessingQueueSplitPolicyMockRecorder
- type MockProcessingQueueState
- func (m *MockProcessingQueueState) AckLevel() task.Key
- func (m *MockProcessingQueueState) DomainFilter() DomainFilter
- func (m *MockProcessingQueueState) EXPECT() *MockProcessingQueueStateMockRecorder
- func (m *MockProcessingQueueState) Level() int
- func (m *MockProcessingQueueState) MaxLevel() task.Key
- func (m *MockProcessingQueueState) ReadLevel() task.Key
- type MockProcessingQueueStateMockRecorder
- func (mr *MockProcessingQueueStateMockRecorder) AckLevel() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) DomainFilter() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) Level() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) MaxLevel() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) ReadLevel() *gomock.Call
- type MockProcessor
- func (m *MockProcessor) EXPECT() *MockProcessorMockRecorder
- func (m *MockProcessor) FailoverDomain(domainIDs map[string]struct{})
- func (m *MockProcessor) HandleAction(ctx context.Context, clusterName string, action *Action) (*ActionResult, error)
- func (m *MockProcessor) LockTaskProcessing()
- func (m *MockProcessor) NotifyNewTask(clusterName string, info *common.NotifyTaskInfo)
- func (m *MockProcessor) Start()
- func (m *MockProcessor) Stop()
- func (m *MockProcessor) UnlockTaskProcessing()
- type MockProcessorFactory
- func (m *MockProcessorFactory) EXPECT() *MockProcessorFactoryMockRecorder
- func (m *MockProcessorFactory) NewCrossClusterQueueProcessor(shard shard.Context, historyEngine engine.Engine, ...) Processor
- func (m *MockProcessorFactory) NewTimerQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, ...) Processor
- func (m *MockProcessorFactory) NewTransferQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, ...) Processor
- type MockProcessorFactoryMockRecorder
- func (mr *MockProcessorFactoryMockRecorder) NewCrossClusterQueueProcessor(shard, historyEngine, executionCache, taskProcessor interface{}) *gomock.Call
- func (mr *MockProcessorFactoryMockRecorder) NewTimerQueueProcessor(...) *gomock.Call
- func (mr *MockProcessorFactoryMockRecorder) NewTransferQueueProcessor(...) *gomock.Call
- type MockProcessorMockRecorder
- func (mr *MockProcessorMockRecorder) FailoverDomain(domainIDs interface{}) *gomock.Call
- func (mr *MockProcessorMockRecorder) HandleAction(ctx, clusterName, action interface{}) *gomock.Call
- func (mr *MockProcessorMockRecorder) LockTaskProcessing() *gomock.Call
- func (mr *MockProcessorMockRecorder) NotifyNewTask(clusterName, info interface{}) *gomock.Call
- func (mr *MockProcessorMockRecorder) Start() *gomock.Call
- func (mr *MockProcessorMockRecorder) Stop() *gomock.Call
- func (mr *MockProcessorMockRecorder) UnlockTaskProcessing() *gomock.Call
- type ProcessingQueue
- type ProcessingQueueCollection
- type ProcessingQueueSplitPolicy
- func NewAggregatedSplitPolicy(policies ...ProcessingQueueSplitPolicy) ProcessingQueueSplitPolicy
- func NewPendingTaskSplitPolicy(pendingTaskThreshold map[int]int, ...) ProcessingQueueSplitPolicy
- func NewRandomSplitPolicy(splitProbability float64, ...) ProcessingQueueSplitPolicy
- func NewSelectedDomainSplitPolicy(domainIDs map[string]struct{}, newQueueLevel int, logger log.Logger, ...) ProcessingQueueSplitPolicy
- func NewStuckTaskSplitPolicy(attemptThreshold map[int]int, ...) ProcessingQueueSplitPolicy
- type ProcessingQueueState
- type Processor
- func NewCrossClusterQueueProcessor(shard shard.Context, historyEngine engine.Engine, ...) Processor
- func NewTimerQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, ...) Processor
- func NewTransferQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, ...) Processor
- type ProcessorFactory
- type RemoteTimerGate
- type RemoteTimerGateImpl
- func (timerGate *RemoteTimerGateImpl) Close()
- func (timerGate *RemoteTimerGateImpl) FireAfter(now time.Time) bool
- func (timerGate *RemoteTimerGateImpl) FireChan() <-chan struct{}
- func (timerGate *RemoteTimerGateImpl) SetCurrentTime(currentTime time.Time) bool
- func (timerGate *RemoteTimerGateImpl) Update(nextTime time.Time) bool
- type ResetActionAttributes
- type ResetActionResult
- type TaskAllocator
- type TimerGate
- type UpdateTasksAttributes
- type UpdateTasksResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶ added in v0.15.0
type Action struct { ActionType ActionType ResetActionAttributes *ResetActionAttributes GetStateActionAttributes *GetStateActionAttributes GetTasksAttributes *GetTasksAttributes UpdateTaskAttributes *UpdateTasksAttributes }
Action specifies the Action should be performed
func NewGetStateAction ¶ added in v0.15.0
func NewGetStateAction() *Action
NewGetStateAction reads all processing queue states in the processor
func NewGetTasksAction ¶ added in v0.23.1
func NewGetTasksAction() *Action
NewGetTasksAction creates a queue action for fetching cross cluster tasks
func NewResetAction ¶ added in v0.15.0
func NewResetAction() *Action
NewResetAction creates a new action for reseting processing queue states
func NewUpdateTasksAction ¶ added in v0.23.1
func NewUpdateTasksAction( taskResponses []*types.CrossClusterTaskResponse, ) *Action
NewUpdateTasksAction creates a queue action for responding cross cluster task processing results
type ActionResult ¶ added in v0.15.0
type ActionResult struct { ActionType ActionType ResetActionResult *ResetActionResult GetStateActionResult *GetStateActionResult GetTasksResult *GetTasksResult UpdateTaskResult *UpdateTasksResult }
ActionResult is the result for performing an Action
type ActionType ¶ added in v0.15.0
type ActionType int
ActionType specifies the type of the Action
const ( // ActionTypeReset is the ActionType for reseting processing queue states ActionTypeReset ActionType = iota + 1 // ActionTypeGetState is the ActionType for reading processing queue states ActionTypeGetState // ActionTypeGetTasks is the ActionType for get cross cluster tasks ActionTypeGetTasks // ActionTypeUpdateTask is the ActionType to update outstanding task ActionTypeUpdateTask )
type DomainFilter ¶
type DomainFilter struct { DomainIDs map[string]struct{} // by default, a DomainFilter matches domains listed in the Domains field // if reverseMatch is true then the DomainFilter matches domains that are // not in the Domains field. ReverseMatch bool }
DomainFilter filters domain
func NewDomainFilter ¶
func NewDomainFilter( domainIDs map[string]struct{}, reverseMatch bool, ) DomainFilter
NewDomainFilter creates a new domain filter
func (DomainFilter) Exclude ¶
func (f DomainFilter) Exclude(domainIDs map[string]struct{}) DomainFilter
Exclude removes domainIDs from the domainID set specified by the filter
func (DomainFilter) Filter ¶
func (f DomainFilter) Filter(domainID string) bool
Filter returns true if domainID is in the domainID set specified by the filter
func (DomainFilter) Include ¶
func (f DomainFilter) Include(domainIDs map[string]struct{}) DomainFilter
Include adds more domainIDs to the domainID set specified by the filter
func (DomainFilter) Merge ¶
func (f DomainFilter) Merge(f2 DomainFilter) DomainFilter
Merge merges the domainID sets specified by two domain filters
type GetStateActionAttributes ¶ added in v0.15.0
type GetStateActionAttributes struct{}
GetStateActionAttributes contains the parameter for performing GetState Action
type GetStateActionResult ¶ added in v0.15.0
type GetStateActionResult struct {
States []ProcessingQueueState
}
GetStateActionResult is the result for performing GetState Action
type GetTasksAttributes ¶ added in v0.23.1
type GetTasksAttributes struct{}
GetTasksAttributes contains the parameter to get tasks
type GetTasksResult ¶ added in v0.23.1
type GetTasksResult struct {
TaskRequests []*types.CrossClusterTaskRequest
}
GetTasksResult is the result for performing GetTasks Action
type LocalTimerGate ¶ added in v0.14.0
type LocalTimerGate interface { TimerGate }
LocalTimerGate interface
func NewLocalTimerGate ¶ added in v0.14.0
func NewLocalTimerGate(timeSource clock.TimeSource) LocalTimerGate
NewLocalTimerGate create a new timer gate instance
type LocalTimerGateImpl ¶ added in v0.14.0
type LocalTimerGateImpl struct {
// contains filtered or unexported fields
}
LocalTimerGateImpl is an timer implementation, which basically is an wrapper of golang's timer and additional feature
func (*LocalTimerGateImpl) Close ¶ added in v0.14.0
func (timerGate *LocalTimerGateImpl) Close()
Close shutdown the timer
func (*LocalTimerGateImpl) FireAfter ¶ added in v0.14.0
func (timerGate *LocalTimerGateImpl) FireAfter(now time.Time) bool
FireAfter check will the timer get fired after a certain time
func (*LocalTimerGateImpl) FireChan ¶ added in v0.14.0
func (timerGate *LocalTimerGateImpl) FireChan() <-chan struct{}
FireChan return the channel which will be fired when time is up
type MockProcessingQueue ¶
type MockProcessingQueue struct {
// contains filtered or unexported fields
}
MockProcessingQueue is a mock of ProcessingQueue interface.
func NewMockProcessingQueue ¶
func NewMockProcessingQueue(ctrl *gomock.Controller) *MockProcessingQueue
NewMockProcessingQueue creates a new mock instance.
func (*MockProcessingQueue) EXPECT ¶
func (m *MockProcessingQueue) EXPECT() *MockProcessingQueueMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockProcessingQueue) GetTasks ¶ added in v0.23.1
func (m *MockProcessingQueue) GetTasks() []task.Task
GetTasks mocks base method.
func (*MockProcessingQueue) Merge ¶
func (m *MockProcessingQueue) Merge(arg0 ProcessingQueue) []ProcessingQueue
Merge mocks base method.
func (*MockProcessingQueue) Split ¶
func (m *MockProcessingQueue) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
Split mocks base method.
func (*MockProcessingQueue) State ¶
func (m *MockProcessingQueue) State() ProcessingQueueState
State mocks base method.
func (*MockProcessingQueue) UpdateAckLevel ¶
func (m *MockProcessingQueue) UpdateAckLevel() (task.Key, int)
UpdateAckLevel mocks base method.
type MockProcessingQueueCollection ¶
type MockProcessingQueueCollection struct {
// contains filtered or unexported fields
}
MockProcessingQueueCollection is a mock of ProcessingQueueCollection interface.
func NewMockProcessingQueueCollection ¶
func NewMockProcessingQueueCollection(ctrl *gomock.Controller) *MockProcessingQueueCollection
NewMockProcessingQueueCollection creates a new mock instance.
func (*MockProcessingQueueCollection) ActiveQueue ¶
func (m *MockProcessingQueueCollection) ActiveQueue() ProcessingQueue
ActiveQueue mocks base method.
func (*MockProcessingQueueCollection) EXPECT ¶
func (m *MockProcessingQueueCollection) EXPECT() *MockProcessingQueueCollectionMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockProcessingQueueCollection) GetTasks ¶ added in v0.23.1
func (m *MockProcessingQueueCollection) GetTasks() []task.Task
GetTasks mocks base method.
func (*MockProcessingQueueCollection) Level ¶
func (m *MockProcessingQueueCollection) Level() int
Level mocks base method.
func (*MockProcessingQueueCollection) Merge ¶
func (m *MockProcessingQueueCollection) Merge(arg0 []ProcessingQueue)
Merge mocks base method.
func (*MockProcessingQueueCollection) Queues ¶
func (m *MockProcessingQueueCollection) Queues() []ProcessingQueue
Queues mocks base method.
func (*MockProcessingQueueCollection) Split ¶
func (m *MockProcessingQueueCollection) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
Split mocks base method.
func (*MockProcessingQueueCollection) UpdateAckLevels ¶ added in v0.14.0
func (m *MockProcessingQueueCollection) UpdateAckLevels() (task.Key, int)
UpdateAckLevels mocks base method.
type MockProcessingQueueCollectionMockRecorder ¶
type MockProcessingQueueCollectionMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueCollectionMockRecorder is the mock recorder for MockProcessingQueueCollection.
func (*MockProcessingQueueCollectionMockRecorder) ActiveQueue ¶
func (mr *MockProcessingQueueCollectionMockRecorder) ActiveQueue() *gomock.Call
ActiveQueue indicates an expected call of ActiveQueue.
func (*MockProcessingQueueCollectionMockRecorder) AddTasks ¶
func (mr *MockProcessingQueueCollectionMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
AddTasks indicates an expected call of AddTasks.
func (*MockProcessingQueueCollectionMockRecorder) GetTask ¶ added in v0.23.1
func (mr *MockProcessingQueueCollectionMockRecorder) GetTask(arg0 interface{}) *gomock.Call
GetTask indicates an expected call of GetTask.
func (*MockProcessingQueueCollectionMockRecorder) GetTasks ¶ added in v0.23.1
func (mr *MockProcessingQueueCollectionMockRecorder) GetTasks() *gomock.Call
GetTasks indicates an expected call of GetTasks.
func (*MockProcessingQueueCollectionMockRecorder) Level ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Level() *gomock.Call
Level indicates an expected call of Level.
func (*MockProcessingQueueCollectionMockRecorder) Merge ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Merge(arg0 interface{}) *gomock.Call
Merge indicates an expected call of Merge.
func (*MockProcessingQueueCollectionMockRecorder) Queues ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Queues() *gomock.Call
Queues indicates an expected call of Queues.
func (*MockProcessingQueueCollectionMockRecorder) Split ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Split(arg0 interface{}) *gomock.Call
Split indicates an expected call of Split.
func (*MockProcessingQueueCollectionMockRecorder) UpdateAckLevels ¶ added in v0.14.0
func (mr *MockProcessingQueueCollectionMockRecorder) UpdateAckLevels() *gomock.Call
UpdateAckLevels indicates an expected call of UpdateAckLevels.
type MockProcessingQueueMockRecorder ¶
type MockProcessingQueueMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueMockRecorder is the mock recorder for MockProcessingQueue.
func (*MockProcessingQueueMockRecorder) AddTasks ¶
func (mr *MockProcessingQueueMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
AddTasks indicates an expected call of AddTasks.
func (*MockProcessingQueueMockRecorder) GetTask ¶ added in v0.23.1
func (mr *MockProcessingQueueMockRecorder) GetTask(arg0 interface{}) *gomock.Call
GetTask indicates an expected call of GetTask.
func (*MockProcessingQueueMockRecorder) GetTasks ¶ added in v0.23.1
func (mr *MockProcessingQueueMockRecorder) GetTasks() *gomock.Call
GetTasks indicates an expected call of GetTasks.
func (*MockProcessingQueueMockRecorder) Merge ¶
func (mr *MockProcessingQueueMockRecorder) Merge(arg0 interface{}) *gomock.Call
Merge indicates an expected call of Merge.
func (*MockProcessingQueueMockRecorder) Split ¶
func (mr *MockProcessingQueueMockRecorder) Split(arg0 interface{}) *gomock.Call
Split indicates an expected call of Split.
func (*MockProcessingQueueMockRecorder) State ¶
func (mr *MockProcessingQueueMockRecorder) State() *gomock.Call
State indicates an expected call of State.
func (*MockProcessingQueueMockRecorder) UpdateAckLevel ¶
func (mr *MockProcessingQueueMockRecorder) UpdateAckLevel() *gomock.Call
UpdateAckLevel indicates an expected call of UpdateAckLevel.
type MockProcessingQueueSplitPolicy ¶
type MockProcessingQueueSplitPolicy struct {
// contains filtered or unexported fields
}
MockProcessingQueueSplitPolicy is a mock of ProcessingQueueSplitPolicy interface.
func NewMockProcessingQueueSplitPolicy ¶
func NewMockProcessingQueueSplitPolicy(ctrl *gomock.Controller) *MockProcessingQueueSplitPolicy
NewMockProcessingQueueSplitPolicy creates a new mock instance.
func (*MockProcessingQueueSplitPolicy) EXPECT ¶
func (m *MockProcessingQueueSplitPolicy) EXPECT() *MockProcessingQueueSplitPolicyMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockProcessingQueueSplitPolicy) Evaluate ¶
func (m *MockProcessingQueueSplitPolicy) Evaluate(arg0 ProcessingQueue) []ProcessingQueueState
Evaluate mocks base method.
type MockProcessingQueueSplitPolicyMockRecorder ¶
type MockProcessingQueueSplitPolicyMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueSplitPolicyMockRecorder is the mock recorder for MockProcessingQueueSplitPolicy.
func (*MockProcessingQueueSplitPolicyMockRecorder) Evaluate ¶
func (mr *MockProcessingQueueSplitPolicyMockRecorder) Evaluate(arg0 interface{}) *gomock.Call
Evaluate indicates an expected call of Evaluate.
type MockProcessingQueueState ¶
type MockProcessingQueueState struct {
// contains filtered or unexported fields
}
MockProcessingQueueState is a mock of ProcessingQueueState interface.
func NewMockProcessingQueueState ¶
func NewMockProcessingQueueState(ctrl *gomock.Controller) *MockProcessingQueueState
NewMockProcessingQueueState creates a new mock instance.
func (*MockProcessingQueueState) AckLevel ¶
func (m *MockProcessingQueueState) AckLevel() task.Key
AckLevel mocks base method.
func (*MockProcessingQueueState) DomainFilter ¶
func (m *MockProcessingQueueState) DomainFilter() DomainFilter
DomainFilter mocks base method.
func (*MockProcessingQueueState) EXPECT ¶
func (m *MockProcessingQueueState) EXPECT() *MockProcessingQueueStateMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockProcessingQueueState) Level ¶
func (m *MockProcessingQueueState) Level() int
Level mocks base method.
func (*MockProcessingQueueState) MaxLevel ¶
func (m *MockProcessingQueueState) MaxLevel() task.Key
MaxLevel mocks base method.
func (*MockProcessingQueueState) ReadLevel ¶
func (m *MockProcessingQueueState) ReadLevel() task.Key
ReadLevel mocks base method.
type MockProcessingQueueStateMockRecorder ¶
type MockProcessingQueueStateMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueStateMockRecorder is the mock recorder for MockProcessingQueueState.
func (*MockProcessingQueueStateMockRecorder) AckLevel ¶
func (mr *MockProcessingQueueStateMockRecorder) AckLevel() *gomock.Call
AckLevel indicates an expected call of AckLevel.
func (*MockProcessingQueueStateMockRecorder) DomainFilter ¶
func (mr *MockProcessingQueueStateMockRecorder) DomainFilter() *gomock.Call
DomainFilter indicates an expected call of DomainFilter.
func (*MockProcessingQueueStateMockRecorder) Level ¶
func (mr *MockProcessingQueueStateMockRecorder) Level() *gomock.Call
Level indicates an expected call of Level.
func (*MockProcessingQueueStateMockRecorder) MaxLevel ¶
func (mr *MockProcessingQueueStateMockRecorder) MaxLevel() *gomock.Call
MaxLevel indicates an expected call of MaxLevel.
func (*MockProcessingQueueStateMockRecorder) ReadLevel ¶
func (mr *MockProcessingQueueStateMockRecorder) ReadLevel() *gomock.Call
ReadLevel indicates an expected call of ReadLevel.
type MockProcessor ¶ added in v0.14.0
type MockProcessor struct {
// contains filtered or unexported fields
}
MockProcessor is a mock of Processor interface.
func NewMockProcessor ¶ added in v0.14.0
func NewMockProcessor(ctrl *gomock.Controller) *MockProcessor
NewMockProcessor creates a new mock instance.
func (*MockProcessor) EXPECT ¶ added in v0.14.0
func (m *MockProcessor) EXPECT() *MockProcessorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockProcessor) FailoverDomain ¶ added in v0.14.0
func (m *MockProcessor) FailoverDomain(domainIDs map[string]struct{})
FailoverDomain mocks base method.
func (*MockProcessor) HandleAction ¶ added in v0.15.0
func (m *MockProcessor) HandleAction(ctx context.Context, clusterName string, action *Action) (*ActionResult, error)
HandleAction mocks base method.
func (*MockProcessor) LockTaskProcessing ¶ added in v0.14.0
func (m *MockProcessor) LockTaskProcessing()
LockTaskProcessing mocks base method.
func (*MockProcessor) NotifyNewTask ¶ added in v0.14.0
func (m *MockProcessor) NotifyNewTask(clusterName string, info *common.NotifyTaskInfo)
NotifyNewTask mocks base method.
func (*MockProcessor) Start ¶ added in v0.14.0
func (m *MockProcessor) Start()
Start mocks base method.
func (*MockProcessor) Stop ¶ added in v0.14.0
func (m *MockProcessor) Stop()
Stop mocks base method.
func (*MockProcessor) UnlockTaskProcessing ¶ added in v0.14.0
func (m *MockProcessor) UnlockTaskProcessing()
UnlockTaskProcessing mocks base method.
type MockProcessorFactory ¶ added in v1.2.10
type MockProcessorFactory struct {
// contains filtered or unexported fields
}
MockProcessorFactory is a mock of ProcessorFactory interface.
func NewMockProcessorFactory ¶ added in v1.2.10
func NewMockProcessorFactory(ctrl *gomock.Controller) *MockProcessorFactory
NewMockProcessorFactory creates a new mock instance.
func (*MockProcessorFactory) EXPECT ¶ added in v1.2.10
func (m *MockProcessorFactory) EXPECT() *MockProcessorFactoryMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockProcessorFactory) NewCrossClusterQueueProcessor ¶ added in v1.2.10
func (m *MockProcessorFactory) NewCrossClusterQueueProcessor(shard shard.Context, historyEngine engine.Engine, executionCache *execution.Cache, taskProcessor task.Processor) Processor
NewCrossClusterQueueProcessor mocks base method.
func (*MockProcessorFactory) NewTimerQueueProcessor ¶ added in v1.2.10
func (m *MockProcessorFactory) NewTimerQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant) Processor
NewTimerQueueProcessor mocks base method.
func (*MockProcessorFactory) NewTransferQueueProcessor ¶ added in v1.2.10
func (m *MockProcessorFactory) NewTransferQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter) Processor
NewTransferQueueProcessor mocks base method.
type MockProcessorFactoryMockRecorder ¶ added in v1.2.10
type MockProcessorFactoryMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessorFactoryMockRecorder is the mock recorder for MockProcessorFactory.
func (*MockProcessorFactoryMockRecorder) NewCrossClusterQueueProcessor ¶ added in v1.2.10
func (mr *MockProcessorFactoryMockRecorder) NewCrossClusterQueueProcessor(shard, historyEngine, executionCache, taskProcessor interface{}) *gomock.Call
NewCrossClusterQueueProcessor indicates an expected call of NewCrossClusterQueueProcessor.
func (*MockProcessorFactoryMockRecorder) NewTimerQueueProcessor ¶ added in v1.2.10
func (mr *MockProcessorFactoryMockRecorder) NewTimerQueueProcessor(shard, historyEngine, taskProcessor, executionCache, archivalClient, executionCheck interface{}) *gomock.Call
NewTimerQueueProcessor indicates an expected call of NewTimerQueueProcessor.
func (*MockProcessorFactoryMockRecorder) NewTransferQueueProcessor ¶ added in v1.2.10
func (mr *MockProcessorFactoryMockRecorder) NewTransferQueueProcessor(shard, historyEngine, taskProcessor, executionCache, workflowResetter, archivalClient, executionCheck, wfIDCache, ratelimitInternalPerWorkflowID interface{}) *gomock.Call
NewTransferQueueProcessor indicates an expected call of NewTransferQueueProcessor.
type MockProcessorMockRecorder ¶ added in v0.14.0
type MockProcessorMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessorMockRecorder is the mock recorder for MockProcessor.
func (*MockProcessorMockRecorder) FailoverDomain ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) FailoverDomain(domainIDs interface{}) *gomock.Call
FailoverDomain indicates an expected call of FailoverDomain.
func (*MockProcessorMockRecorder) HandleAction ¶ added in v0.15.0
func (mr *MockProcessorMockRecorder) HandleAction(ctx, clusterName, action interface{}) *gomock.Call
HandleAction indicates an expected call of HandleAction.
func (*MockProcessorMockRecorder) LockTaskProcessing ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) LockTaskProcessing() *gomock.Call
LockTaskProcessing indicates an expected call of LockTaskProcessing.
func (*MockProcessorMockRecorder) NotifyNewTask ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) NotifyNewTask(clusterName, info interface{}) *gomock.Call
NotifyNewTask indicates an expected call of NotifyNewTask.
func (*MockProcessorMockRecorder) Start ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockProcessorMockRecorder) Stop ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
func (*MockProcessorMockRecorder) UnlockTaskProcessing ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) UnlockTaskProcessing() *gomock.Call
UnlockTaskProcessing indicates an expected call of UnlockTaskProcessing.
type ProcessingQueue ¶
type ProcessingQueue interface { State() ProcessingQueueState Split(ProcessingQueueSplitPolicy) []ProcessingQueue Merge(ProcessingQueue) []ProcessingQueue AddTasks(map[task.Key]task.Task, task.Key) GetTask(task.Key) (task.Task, error) GetTasks() []task.Task UpdateAckLevel() (task.Key, int) // return new ack level and number of pending tasks }
ProcessingQueue is responsible for keeping track of the state of tasks within the scope defined by its state; it can also be split into multiple ProcessingQueues with non-overlapping scope or be merged with another ProcessingQueue
func NewProcessingQueue ¶
func NewProcessingQueue( state ProcessingQueueState, logger log.Logger, metricsClient metrics.Client, ) ProcessingQueue
NewProcessingQueue creates a new processing queue based on its state
type ProcessingQueueCollection ¶
type ProcessingQueueCollection interface { Level() int Queues() []ProcessingQueue ActiveQueue() ProcessingQueue AddTasks(map[task.Key]task.Task, task.Key) GetTask(task.Key) (task.Task, error) GetTasks() []task.Task UpdateAckLevels() (task.Key, int) // return min of all new ack levels and number of total pending tasks Split(ProcessingQueueSplitPolicy) []ProcessingQueue Merge([]ProcessingQueue) }
ProcessingQueueCollection manages a list of non-overlapping ProcessingQueues and keep track of the current active ProcessingQueue
func NewProcessingQueueCollection ¶
func NewProcessingQueueCollection(level int, queues []ProcessingQueue) ProcessingQueueCollection
NewProcessingQueueCollection creates a new collection for non-overlapping queues
type ProcessingQueueSplitPolicy ¶
type ProcessingQueueSplitPolicy interface {
Evaluate(ProcessingQueue) []ProcessingQueueState
}
ProcessingQueueSplitPolicy determines if one ProcessingQueue should be split into multiple ProcessingQueues
func NewAggregatedSplitPolicy ¶
func NewAggregatedSplitPolicy(policies ...ProcessingQueueSplitPolicy) ProcessingQueueSplitPolicy
NewAggregatedSplitPolicy creates a new processing queue split policy that which combines other policies. Policies are evaluated in the order they passed in, and if one policy returns an non-empty result, that result will be returned as is and policies after that one will not be evaluated
func NewPendingTaskSplitPolicy ¶
func NewPendingTaskSplitPolicy( pendingTaskThreshold map[int]int, enabledByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter, lookAheadFunc lookAheadFunc, maxNewQueueLevel int, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewPendingTaskSplitPolicy creates a new processing queue split policy based on the number of pending tasks
func NewRandomSplitPolicy ¶ added in v0.14.0
func NewRandomSplitPolicy( splitProbability float64, enabledByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter, maxNewQueueLevel int, lookAheadFunc lookAheadFunc, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewRandomSplitPolicy creates a split policy that will randomly split one or more domains into a new processing queue
func NewSelectedDomainSplitPolicy ¶
func NewSelectedDomainSplitPolicy( domainIDs map[string]struct{}, newQueueLevel int, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewSelectedDomainSplitPolicy creates a new processing queue split policy that splits out specific domainIDs
func NewStuckTaskSplitPolicy ¶
func NewStuckTaskSplitPolicy( attemptThreshold map[int]int, enabledByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter, maxNewQueueLevel int, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewStuckTaskSplitPolicy creates a new processing queue split policy based on the number of task attempts tasks
type ProcessingQueueState ¶
type ProcessingQueueState interface { Level() int AckLevel() task.Key ReadLevel() task.Key MaxLevel() task.Key DomainFilter() DomainFilter }
ProcessingQueueState indicates the scope of a task processing queue and its current progress
func NewProcessingQueueState ¶
func NewProcessingQueueState( level int, ackLevel task.Key, maxLevel task.Key, domainFilter DomainFilter, ) ProcessingQueueState
NewProcessingQueueState creates a new state instance for processing queue readLevel will be set to the same value as ackLevel
type Processor ¶ added in v0.14.0
type Processor interface { common.Daemon FailoverDomain(domainIDs map[string]struct{}) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) HandleAction(ctx context.Context, clusterName string, action *Action) (*ActionResult, error) LockTaskProcessing() UnlockTaskProcessing() }
Processor is the interface for task queue processor
func NewCrossClusterQueueProcessor ¶ added in v0.23.1
func NewCrossClusterQueueProcessor( shard shard.Context, historyEngine engine.Engine, executionCache *execution.Cache, taskProcessor task.Processor, ) Processor
NewCrossClusterQueueProcessor creates a new cross cluster QueueProcessor
func NewTimerQueueProcessor ¶ added in v0.14.0
func NewTimerQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant, ) Processor
NewTimerQueueProcessor creates a new timer QueueProcessor
func NewTransferQueueProcessor ¶ added in v0.14.0
func NewTransferQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter, ) Processor
NewTransferQueueProcessor creates a new transfer QueueProcessor
type ProcessorFactory ¶ added in v1.2.10
type ProcessorFactory interface { NewTransferQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter, ) Processor NewTimerQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant, ) Processor NewCrossClusterQueueProcessor( shard shard.Context, historyEngine engine.Engine, executionCache *execution.Cache, taskProcessor task.Processor, ) Processor }
func NewProcessorFactory ¶ added in v1.2.10
func NewProcessorFactory() ProcessorFactory
type RemoteTimerGate ¶ added in v0.14.0
type RemoteTimerGate interface { TimerGate // SetCurrentTime set the current time, and additionally fire the fire chan // if new "current" time is after the next wake up time, return true if // "current" is actually updated SetCurrentTime(nextTime time.Time) bool }
RemoteTimerGate interface
func NewRemoteTimerGate ¶ added in v0.14.0
func NewRemoteTimerGate() RemoteTimerGate
NewRemoteTimerGate create a new timer gate instance
type RemoteTimerGateImpl ¶ added in v0.14.0
type RemoteTimerGateImpl struct { // lock for timer and next wake up time sync.Mutex // contains filtered or unexported fields }
RemoteTimerGateImpl is an timer implementation, which basically is an wrapper of golang's timer and additional feature
func (*RemoteTimerGateImpl) Close ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) Close()
Close shutdown the timer
func (*RemoteTimerGateImpl) FireAfter ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) FireAfter(now time.Time) bool
FireAfter check will the timer get fired after a certain time
func (*RemoteTimerGateImpl) FireChan ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) FireChan() <-chan struct{}
FireChan return the channel which will be fired when time is up
func (*RemoteTimerGateImpl) SetCurrentTime ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) SetCurrentTime(currentTime time.Time) bool
SetCurrentTime set the current time, and additionally fire the fire chan if new "current" time is after the next wake up time, return true if "current" is actually updated
type ResetActionAttributes ¶ added in v0.15.0
type ResetActionAttributes struct{}
ResetActionAttributes contains the parameter for performing Reset Action
type ResetActionResult ¶ added in v0.15.0
type ResetActionResult struct{}
ResetActionResult is the result for performing Reset Action
type TaskAllocator ¶ added in v0.14.0
type TaskAllocator interface { VerifyActiveTask(taskDomainID string, task interface{}) (bool, error) VerifyFailoverActiveTask(targetDomainIDs map[string]struct{}, taskDomainID string, task interface{}) (bool, error) VerifyStandbyTask(standbyCluster string, taskDomainID string, task interface{}) (bool, error) Lock() Unlock() }
TaskAllocator verifies if a task should be processed or not
func NewTaskAllocator ¶ added in v0.14.0
func NewTaskAllocator(shard shard.Context) TaskAllocator
NewTaskAllocator create a new task allocator
type TimerGate ¶ added in v0.14.0
type TimerGate interface { // FireChan return the channel which will be fired when time is up FireChan() <-chan struct{} // FireAfter check will the timer get fired after a certain time FireAfter(now time.Time) bool // Update update the timer gate, return true if update is a success // success means timer is idle or timer is set with a sooner time to fire Update(nextTime time.Time) bool // Close shutdown the timer Close() }
TimerGate interface
type UpdateTasksAttributes ¶ added in v0.23.1
type UpdateTasksAttributes struct {
TaskResponses []*types.CrossClusterTaskResponse
}
UpdateTasksAttributes contains the parameter to update task
type UpdateTasksResult ¶ added in v0.23.1
type UpdateTasksResult struct { }
UpdateTasksResult is the result for performing UpdateTask Action
Source Files ¶
- action.go
- constants.go
- cross_cluster_queue_processor.go
- cross_cluster_queue_processor_base.go
- domain_filter.go
- factory.go
- factory_mock.go
- interface.go
- interface_mock.go
- processing_queue.go
- processing_queue_collection.go
- processing_queue_state.go
- processor_base.go
- processor_options.go
- queue_processor_util.go
- split_policy.go
- task_allocator.go
- timer_gate.go
- timer_queue_active_processor.go
- timer_queue_failover_processor.go
- timer_queue_processor.go
- timer_queue_processor_base.go
- timer_queue_standby_processor.go
- transfer_queue_processor.go
- transfer_queue_processor_base.go
- transfer_queue_validator.go