Documentation ¶
Overview ¶
Package queue is a generated GoMock package.
Index ¶
- type Action
- type ActionResult
- type ActionType
- type DomainFilter
- type GetStateActionAttributes
- type GetStateActionResult
- type LocalTimerGate
- type LocalTimerGateImpl
- type MockProcessingQueue
- func (m *MockProcessingQueue) AddTasks(arg0 map[task.Key]task.Task, arg1 task.Key)
- func (m *MockProcessingQueue) EXPECT() *MockProcessingQueueMockRecorder
- func (m *MockProcessingQueue) Merge(arg0 ProcessingQueue) []ProcessingQueue
- func (m *MockProcessingQueue) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
- func (m *MockProcessingQueue) State() ProcessingQueueState
- func (m *MockProcessingQueue) UpdateAckLevel() (task.Key, int)
- type MockProcessingQueueCollection
- func (m *MockProcessingQueueCollection) ActiveQueue() ProcessingQueue
- func (m *MockProcessingQueueCollection) AddTasks(arg0 map[task.Key]task.Task, arg1 task.Key)
- func (m *MockProcessingQueueCollection) EXPECT() *MockProcessingQueueCollectionMockRecorder
- func (m *MockProcessingQueueCollection) Level() int
- func (m *MockProcessingQueueCollection) Merge(arg0 []ProcessingQueue)
- func (m *MockProcessingQueueCollection) Queues() []ProcessingQueue
- func (m *MockProcessingQueueCollection) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
- func (m *MockProcessingQueueCollection) UpdateAckLevels() (task.Key, int)
- type MockProcessingQueueCollectionMockRecorder
- func (mr *MockProcessingQueueCollectionMockRecorder) ActiveQueue() *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Level() *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Merge(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Queues() *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) Split(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueCollectionMockRecorder) UpdateAckLevels() *gomock.Call
- type MockProcessingQueueMockRecorder
- func (mr *MockProcessingQueueMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) Merge(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) Split(arg0 interface{}) *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) State() *gomock.Call
- func (mr *MockProcessingQueueMockRecorder) UpdateAckLevel() *gomock.Call
- type MockProcessingQueueSplitPolicy
- type MockProcessingQueueSplitPolicyMockRecorder
- type MockProcessingQueueState
- func (m *MockProcessingQueueState) AckLevel() task.Key
- func (m *MockProcessingQueueState) DomainFilter() DomainFilter
- func (m *MockProcessingQueueState) EXPECT() *MockProcessingQueueStateMockRecorder
- func (m *MockProcessingQueueState) Level() int
- func (m *MockProcessingQueueState) MaxLevel() task.Key
- func (m *MockProcessingQueueState) ReadLevel() task.Key
- type MockProcessingQueueStateMockRecorder
- func (mr *MockProcessingQueueStateMockRecorder) AckLevel() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) DomainFilter() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) Level() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) MaxLevel() *gomock.Call
- func (mr *MockProcessingQueueStateMockRecorder) ReadLevel() *gomock.Call
- type MockProcessor
- func (m *MockProcessor) EXPECT() *MockProcessorMockRecorder
- func (m *MockProcessor) FailoverDomain(domainIDs map[string]struct{})
- func (m *MockProcessor) HandleAction(clusterName string, action *Action) (*ActionResult, error)
- func (m *MockProcessor) LockTaskProcessing()
- func (m *MockProcessor) NotifyNewTask(clusterName string, executionInfo *persistence.WorkflowExecutionInfo, ...)
- func (m *MockProcessor) Start()
- func (m *MockProcessor) Stop()
- func (m *MockProcessor) UnlockTaskProcessing()
- type MockProcessorMockRecorder
- func (mr *MockProcessorMockRecorder) FailoverDomain(domainIDs interface{}) *gomock.Call
- func (mr *MockProcessorMockRecorder) HandleAction(clusterName, action interface{}) *gomock.Call
- func (mr *MockProcessorMockRecorder) LockTaskProcessing() *gomock.Call
- func (mr *MockProcessorMockRecorder) NotifyNewTask(clusterName, executionInfo, tasks interface{}) *gomock.Call
- func (mr *MockProcessorMockRecorder) Start() *gomock.Call
- func (mr *MockProcessorMockRecorder) Stop() *gomock.Call
- func (mr *MockProcessorMockRecorder) UnlockTaskProcessing() *gomock.Call
- type ProcessingQueue
- type ProcessingQueueCollection
- type ProcessingQueueSplitPolicy
- func NewAggregatedSplitPolicy(policies ...ProcessingQueueSplitPolicy) ProcessingQueueSplitPolicy
- func NewPendingTaskSplitPolicy(pendingTaskThreshold map[int]int, ...) ProcessingQueueSplitPolicy
- func NewRandomSplitPolicy(splitProbability float64, ...) ProcessingQueueSplitPolicy
- func NewSelectedDomainSplitPolicy(domainIDs map[string]struct{}, newQueueLevel int, logger log.Logger, ...) ProcessingQueueSplitPolicy
- func NewStuckTaskSplitPolicy(attemptThreshold map[int]int, ...) ProcessingQueueSplitPolicy
- type ProcessingQueueState
- type Processor
- type RemoteTimerGate
- type RemoteTimerGateImpl
- func (timerGate *RemoteTimerGateImpl) Close()
- func (timerGate *RemoteTimerGateImpl) FireAfter(now time.Time) bool
- func (timerGate *RemoteTimerGateImpl) FireChan() <-chan struct{}
- func (timerGate *RemoteTimerGateImpl) SetCurrentTime(currentTime time.Time) bool
- func (timerGate *RemoteTimerGateImpl) Update(nextTime time.Time) bool
- type ResetActionAttributes
- type ResetActionResult
- type TaskAllocator
- type TimerGate
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶ added in v0.15.0
type Action struct { ActionType ActionType ResetActionAttributes *ResetActionAttributes GetStateActionAttributes *GetStateActionAttributes }
Action specifies the Action should be performed
func NewGetStateAction ¶ added in v0.15.0
func NewGetStateAction() *Action
NewGetStateAction reads all processing queue states in the processor
func NewResetAction ¶ added in v0.15.0
func NewResetAction() *Action
NewResetAction creates a new action for reseting processing queue states
type ActionResult ¶ added in v0.15.0
type ActionResult struct { ActionType ActionType ResetActionResult *ResetActionResult GetStateActionResult *GetStateActionResult }
ActionResult is the result for performing an Action
type ActionType ¶ added in v0.15.0
type ActionType int
ActionType specifies the type of the Action
const ( // ActionTypeReset is the ActionType for reseting processing queue states ActionTypeReset ActionType = iota + 1 // ActionTypeGetState is the ActionType for reading processing queue states ActionTypeGetState )
type DomainFilter ¶
type DomainFilter struct { DomainIDs map[string]struct{} // by default, a DomainFilter matches domains listed in the Domains field // if reverseMatch is true then the DomainFilter matches domains that are // not in the Domains field. ReverseMatch bool }
DomainFilter filters domain
func NewDomainFilter ¶
func NewDomainFilter( domainIDs map[string]struct{}, reverseMatch bool, ) DomainFilter
NewDomainFilter creates a new domain filter
func (DomainFilter) Exclude ¶
func (f DomainFilter) Exclude(domainIDs map[string]struct{}) DomainFilter
Exclude removes domainIDs from the domainID set specified by the filter
func (DomainFilter) Filter ¶
func (f DomainFilter) Filter(domainID string) bool
Filter returns true if domainID is in the domainID set specified by the filter
func (DomainFilter) Include ¶
func (f DomainFilter) Include(domainIDs map[string]struct{}) DomainFilter
Include adds more domainIDs to the domainID set specified by the filter
func (DomainFilter) Merge ¶
func (f DomainFilter) Merge(f2 DomainFilter) DomainFilter
Merge merges the domainID sets specified by two domain filters
type GetStateActionAttributes ¶ added in v0.15.0
type GetStateActionAttributes struct{}
GetStateActionAttributes contains the parameter for performing GetState Action
type GetStateActionResult ¶ added in v0.15.0
type GetStateActionResult struct {
States []ProcessingQueueState
}
GetStateActionResult is the result for performing GetState Action
type LocalTimerGate ¶ added in v0.14.0
type LocalTimerGate interface { TimerGate }
LocalTimerGate interface
func NewLocalTimerGate ¶ added in v0.14.0
func NewLocalTimerGate(timeSource clock.TimeSource) LocalTimerGate
NewLocalTimerGate create a new timer gate instance
type LocalTimerGateImpl ¶ added in v0.14.0
type LocalTimerGateImpl struct {
// contains filtered or unexported fields
}
LocalTimerGateImpl is an timer implementation, which basically is an wrapper of golang's timer and additional feature
func (*LocalTimerGateImpl) Close ¶ added in v0.14.0
func (timerGate *LocalTimerGateImpl) Close()
Close shutdown the timer
func (*LocalTimerGateImpl) FireAfter ¶ added in v0.14.0
func (timerGate *LocalTimerGateImpl) FireAfter(now time.Time) bool
FireAfter check will the timer get fired after a certain time
func (*LocalTimerGateImpl) FireChan ¶ added in v0.14.0
func (timerGate *LocalTimerGateImpl) FireChan() <-chan struct{}
FireChan return the channel which will be fired when time is up
type MockProcessingQueue ¶
type MockProcessingQueue struct {
// contains filtered or unexported fields
}
MockProcessingQueue is a mock of ProcessingQueue interface
func NewMockProcessingQueue ¶
func NewMockProcessingQueue(ctrl *gomock.Controller) *MockProcessingQueue
NewMockProcessingQueue creates a new mock instance
func (*MockProcessingQueue) EXPECT ¶
func (m *MockProcessingQueue) EXPECT() *MockProcessingQueueMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProcessingQueue) Merge ¶
func (m *MockProcessingQueue) Merge(arg0 ProcessingQueue) []ProcessingQueue
Merge mocks base method
func (*MockProcessingQueue) Split ¶
func (m *MockProcessingQueue) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
Split mocks base method
func (*MockProcessingQueue) State ¶
func (m *MockProcessingQueue) State() ProcessingQueueState
State mocks base method
func (*MockProcessingQueue) UpdateAckLevel ¶
func (m *MockProcessingQueue) UpdateAckLevel() (task.Key, int)
UpdateAckLevel mocks base method
type MockProcessingQueueCollection ¶
type MockProcessingQueueCollection struct {
// contains filtered or unexported fields
}
MockProcessingQueueCollection is a mock of ProcessingQueueCollection interface
func NewMockProcessingQueueCollection ¶
func NewMockProcessingQueueCollection(ctrl *gomock.Controller) *MockProcessingQueueCollection
NewMockProcessingQueueCollection creates a new mock instance
func (*MockProcessingQueueCollection) ActiveQueue ¶
func (m *MockProcessingQueueCollection) ActiveQueue() ProcessingQueue
ActiveQueue mocks base method
func (*MockProcessingQueueCollection) EXPECT ¶
func (m *MockProcessingQueueCollection) EXPECT() *MockProcessingQueueCollectionMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProcessingQueueCollection) Level ¶
func (m *MockProcessingQueueCollection) Level() int
Level mocks base method
func (*MockProcessingQueueCollection) Merge ¶
func (m *MockProcessingQueueCollection) Merge(arg0 []ProcessingQueue)
Merge mocks base method
func (*MockProcessingQueueCollection) Queues ¶
func (m *MockProcessingQueueCollection) Queues() []ProcessingQueue
Queues mocks base method
func (*MockProcessingQueueCollection) Split ¶
func (m *MockProcessingQueueCollection) Split(arg0 ProcessingQueueSplitPolicy) []ProcessingQueue
Split mocks base method
func (*MockProcessingQueueCollection) UpdateAckLevels ¶ added in v0.14.0
func (m *MockProcessingQueueCollection) UpdateAckLevels() (task.Key, int)
UpdateAckLevels mocks base method
type MockProcessingQueueCollectionMockRecorder ¶
type MockProcessingQueueCollectionMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueCollectionMockRecorder is the mock recorder for MockProcessingQueueCollection
func (*MockProcessingQueueCollectionMockRecorder) ActiveQueue ¶
func (mr *MockProcessingQueueCollectionMockRecorder) ActiveQueue() *gomock.Call
ActiveQueue indicates an expected call of ActiveQueue
func (*MockProcessingQueueCollectionMockRecorder) AddTasks ¶
func (mr *MockProcessingQueueCollectionMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
AddTasks indicates an expected call of AddTasks
func (*MockProcessingQueueCollectionMockRecorder) Level ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Level() *gomock.Call
Level indicates an expected call of Level
func (*MockProcessingQueueCollectionMockRecorder) Merge ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Merge(arg0 interface{}) *gomock.Call
Merge indicates an expected call of Merge
func (*MockProcessingQueueCollectionMockRecorder) Queues ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Queues() *gomock.Call
Queues indicates an expected call of Queues
func (*MockProcessingQueueCollectionMockRecorder) Split ¶
func (mr *MockProcessingQueueCollectionMockRecorder) Split(arg0 interface{}) *gomock.Call
Split indicates an expected call of Split
func (*MockProcessingQueueCollectionMockRecorder) UpdateAckLevels ¶ added in v0.14.0
func (mr *MockProcessingQueueCollectionMockRecorder) UpdateAckLevels() *gomock.Call
UpdateAckLevels indicates an expected call of UpdateAckLevels
type MockProcessingQueueMockRecorder ¶
type MockProcessingQueueMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueMockRecorder is the mock recorder for MockProcessingQueue
func (*MockProcessingQueueMockRecorder) AddTasks ¶
func (mr *MockProcessingQueueMockRecorder) AddTasks(arg0, arg1 interface{}) *gomock.Call
AddTasks indicates an expected call of AddTasks
func (*MockProcessingQueueMockRecorder) Merge ¶
func (mr *MockProcessingQueueMockRecorder) Merge(arg0 interface{}) *gomock.Call
Merge indicates an expected call of Merge
func (*MockProcessingQueueMockRecorder) Split ¶
func (mr *MockProcessingQueueMockRecorder) Split(arg0 interface{}) *gomock.Call
Split indicates an expected call of Split
func (*MockProcessingQueueMockRecorder) State ¶
func (mr *MockProcessingQueueMockRecorder) State() *gomock.Call
State indicates an expected call of State
func (*MockProcessingQueueMockRecorder) UpdateAckLevel ¶
func (mr *MockProcessingQueueMockRecorder) UpdateAckLevel() *gomock.Call
UpdateAckLevel indicates an expected call of UpdateAckLevel
type MockProcessingQueueSplitPolicy ¶
type MockProcessingQueueSplitPolicy struct {
// contains filtered or unexported fields
}
MockProcessingQueueSplitPolicy is a mock of ProcessingQueueSplitPolicy interface
func NewMockProcessingQueueSplitPolicy ¶
func NewMockProcessingQueueSplitPolicy(ctrl *gomock.Controller) *MockProcessingQueueSplitPolicy
NewMockProcessingQueueSplitPolicy creates a new mock instance
func (*MockProcessingQueueSplitPolicy) EXPECT ¶
func (m *MockProcessingQueueSplitPolicy) EXPECT() *MockProcessingQueueSplitPolicyMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProcessingQueueSplitPolicy) Evaluate ¶
func (m *MockProcessingQueueSplitPolicy) Evaluate(arg0 ProcessingQueue) []ProcessingQueueState
Evaluate mocks base method
type MockProcessingQueueSplitPolicyMockRecorder ¶
type MockProcessingQueueSplitPolicyMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueSplitPolicyMockRecorder is the mock recorder for MockProcessingQueueSplitPolicy
func (*MockProcessingQueueSplitPolicyMockRecorder) Evaluate ¶
func (mr *MockProcessingQueueSplitPolicyMockRecorder) Evaluate(arg0 interface{}) *gomock.Call
Evaluate indicates an expected call of Evaluate
type MockProcessingQueueState ¶
type MockProcessingQueueState struct {
// contains filtered or unexported fields
}
MockProcessingQueueState is a mock of ProcessingQueueState interface
func NewMockProcessingQueueState ¶
func NewMockProcessingQueueState(ctrl *gomock.Controller) *MockProcessingQueueState
NewMockProcessingQueueState creates a new mock instance
func (*MockProcessingQueueState) AckLevel ¶
func (m *MockProcessingQueueState) AckLevel() task.Key
AckLevel mocks base method
func (*MockProcessingQueueState) DomainFilter ¶
func (m *MockProcessingQueueState) DomainFilter() DomainFilter
DomainFilter mocks base method
func (*MockProcessingQueueState) EXPECT ¶
func (m *MockProcessingQueueState) EXPECT() *MockProcessingQueueStateMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProcessingQueueState) Level ¶
func (m *MockProcessingQueueState) Level() int
Level mocks base method
func (*MockProcessingQueueState) MaxLevel ¶
func (m *MockProcessingQueueState) MaxLevel() task.Key
MaxLevel mocks base method
func (*MockProcessingQueueState) ReadLevel ¶
func (m *MockProcessingQueueState) ReadLevel() task.Key
ReadLevel mocks base method
type MockProcessingQueueStateMockRecorder ¶
type MockProcessingQueueStateMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessingQueueStateMockRecorder is the mock recorder for MockProcessingQueueState
func (*MockProcessingQueueStateMockRecorder) AckLevel ¶
func (mr *MockProcessingQueueStateMockRecorder) AckLevel() *gomock.Call
AckLevel indicates an expected call of AckLevel
func (*MockProcessingQueueStateMockRecorder) DomainFilter ¶
func (mr *MockProcessingQueueStateMockRecorder) DomainFilter() *gomock.Call
DomainFilter indicates an expected call of DomainFilter
func (*MockProcessingQueueStateMockRecorder) Level ¶
func (mr *MockProcessingQueueStateMockRecorder) Level() *gomock.Call
Level indicates an expected call of Level
func (*MockProcessingQueueStateMockRecorder) MaxLevel ¶
func (mr *MockProcessingQueueStateMockRecorder) MaxLevel() *gomock.Call
MaxLevel indicates an expected call of MaxLevel
func (*MockProcessingQueueStateMockRecorder) ReadLevel ¶
func (mr *MockProcessingQueueStateMockRecorder) ReadLevel() *gomock.Call
ReadLevel indicates an expected call of ReadLevel
type MockProcessor ¶ added in v0.14.0
type MockProcessor struct {
// contains filtered or unexported fields
}
MockProcessor is a mock of Processor interface
func NewMockProcessor ¶ added in v0.14.0
func NewMockProcessor(ctrl *gomock.Controller) *MockProcessor
NewMockProcessor creates a new mock instance
func (*MockProcessor) EXPECT ¶ added in v0.14.0
func (m *MockProcessor) EXPECT() *MockProcessorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProcessor) FailoverDomain ¶ added in v0.14.0
func (m *MockProcessor) FailoverDomain(domainIDs map[string]struct{})
FailoverDomain mocks base method
func (*MockProcessor) HandleAction ¶ added in v0.15.0
func (m *MockProcessor) HandleAction(clusterName string, action *Action) (*ActionResult, error)
HandleAction mocks base method
func (*MockProcessor) LockTaskProcessing ¶ added in v0.14.0
func (m *MockProcessor) LockTaskProcessing()
LockTaskProcessing mocks base method
func (*MockProcessor) NotifyNewTask ¶ added in v0.14.0
func (m *MockProcessor) NotifyNewTask(clusterName string, executionInfo *persistence.WorkflowExecutionInfo, tasks []persistence.Task)
NotifyNewTask mocks base method
func (*MockProcessor) Start ¶ added in v0.14.0
func (m *MockProcessor) Start()
Start mocks base method
func (*MockProcessor) UnlockTaskProcessing ¶ added in v0.14.0
func (m *MockProcessor) UnlockTaskProcessing()
UnlockTaskProcessing mocks base method
type MockProcessorMockRecorder ¶ added in v0.14.0
type MockProcessorMockRecorder struct {
// contains filtered or unexported fields
}
MockProcessorMockRecorder is the mock recorder for MockProcessor
func (*MockProcessorMockRecorder) FailoverDomain ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) FailoverDomain(domainIDs interface{}) *gomock.Call
FailoverDomain indicates an expected call of FailoverDomain
func (*MockProcessorMockRecorder) HandleAction ¶ added in v0.15.0
func (mr *MockProcessorMockRecorder) HandleAction(clusterName, action interface{}) *gomock.Call
HandleAction indicates an expected call of HandleAction
func (*MockProcessorMockRecorder) LockTaskProcessing ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) LockTaskProcessing() *gomock.Call
LockTaskProcessing indicates an expected call of LockTaskProcessing
func (*MockProcessorMockRecorder) NotifyNewTask ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) NotifyNewTask(clusterName, executionInfo, tasks interface{}) *gomock.Call
NotifyNewTask indicates an expected call of NotifyNewTask
func (*MockProcessorMockRecorder) Start ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start
func (*MockProcessorMockRecorder) Stop ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop
func (*MockProcessorMockRecorder) UnlockTaskProcessing ¶ added in v0.14.0
func (mr *MockProcessorMockRecorder) UnlockTaskProcessing() *gomock.Call
UnlockTaskProcessing indicates an expected call of UnlockTaskProcessing
type ProcessingQueue ¶
type ProcessingQueue interface { State() ProcessingQueueState Split(ProcessingQueueSplitPolicy) []ProcessingQueue Merge(ProcessingQueue) []ProcessingQueue AddTasks(map[task.Key]task.Task, task.Key) UpdateAckLevel() (task.Key, int) // return new ack level and number of pending tasks }
ProcessingQueue is responsible for keeping track of the state of tasks within the scope defined by its state; it can also be split into multiple ProcessingQueues with non-overlapping scope or be merged with another ProcessingQueue
func NewProcessingQueue ¶
func NewProcessingQueue( state ProcessingQueueState, logger log.Logger, metricsClient metrics.Client, ) ProcessingQueue
NewProcessingQueue creates a new processing queue based on its state
type ProcessingQueueCollection ¶
type ProcessingQueueCollection interface { Level() int Queues() []ProcessingQueue ActiveQueue() ProcessingQueue AddTasks(map[task.Key]task.Task, task.Key) UpdateAckLevels() (task.Key, int) // return min of all new ack levels and number of total pending tasks Split(ProcessingQueueSplitPolicy) []ProcessingQueue Merge([]ProcessingQueue) }
ProcessingQueueCollection manages a list of non-overlapping ProcessingQueues and keep track of the current active ProcessingQueue
func NewProcessingQueueCollection ¶
func NewProcessingQueueCollection( level int, queues []ProcessingQueue, ) ProcessingQueueCollection
NewProcessingQueueCollection creates a new collection for non-overlapping queues
type ProcessingQueueSplitPolicy ¶
type ProcessingQueueSplitPolicy interface {
Evaluate(ProcessingQueue) []ProcessingQueueState
}
ProcessingQueueSplitPolicy determines if one ProcessingQueue should be split into multiple ProcessingQueues
func NewAggregatedSplitPolicy ¶
func NewAggregatedSplitPolicy( policies ...ProcessingQueueSplitPolicy, ) ProcessingQueueSplitPolicy
NewAggregatedSplitPolicy creates a new processing queue split policy that which combines other policies. Policies are evaluated in the order they passed in, and if one policy returns an non-empty result, that result will be returned as is and policies after that one will not be evaluated
func NewPendingTaskSplitPolicy ¶
func NewPendingTaskSplitPolicy( pendingTaskThreshold map[int]int, enabledByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter, lookAheadFunc lookAheadFunc, maxNewQueueLevel int, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewPendingTaskSplitPolicy creates a new processing queue split policy based on the number of pending tasks
func NewRandomSplitPolicy ¶ added in v0.14.0
func NewRandomSplitPolicy( splitProbability float64, enabledByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter, maxNewQueueLevel int, lookAheadFunc lookAheadFunc, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewRandomSplitPolicy creates a split policy that will randomly split one or more domains into a new processing queue
func NewSelectedDomainSplitPolicy ¶
func NewSelectedDomainSplitPolicy( domainIDs map[string]struct{}, newQueueLevel int, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewSelectedDomainSplitPolicy creates a new processing queue split policy that splits out specific domainIDs
func NewStuckTaskSplitPolicy ¶
func NewStuckTaskSplitPolicy( attemptThreshold map[int]int, enabledByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter, maxNewQueueLevel int, logger log.Logger, metricsScope metrics.Scope, ) ProcessingQueueSplitPolicy
NewStuckTaskSplitPolicy creates a new processing queue split policy based on the number of task attempts tasks
type ProcessingQueueState ¶
type ProcessingQueueState interface { Level() int AckLevel() task.Key ReadLevel() task.Key MaxLevel() task.Key DomainFilter() DomainFilter }
ProcessingQueueState indicates the scope of a task processing queue and its current progress
func NewProcessingQueueState ¶
func NewProcessingQueueState( level int, ackLevel task.Key, maxLevel task.Key, domainFilter DomainFilter, ) ProcessingQueueState
NewProcessingQueueState creates a new state instance for processing queue readLevel will be set to the same value as ackLevel
type Processor ¶ added in v0.14.0
type Processor interface { common.Daemon FailoverDomain(domainIDs map[string]struct{}) NotifyNewTask(clusterName string, executionInfo *persistence.WorkflowExecutionInfo, tasks []persistence.Task) HandleAction(clusterName string, action *Action) (*ActionResult, error) // TODO: enforce context timeout for Actions LockTaskProcessing() UnlockTaskProcessing() }
Processor is the interface for task queue processor
func NewTimerQueueProcessor ¶ added in v0.14.0
func NewTimerQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant, ) Processor
NewTimerQueueProcessor creates a new timer QueueProcessor
func NewTransferQueueProcessor ¶ added in v0.14.0
func NewTransferQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, ) Processor
NewTransferQueueProcessor creates a new transfer QueueProcessor
type RemoteTimerGate ¶ added in v0.14.0
type RemoteTimerGate interface { TimerGate // SetCurrentTime set the current time, and additionally fire the fire chan // if new "current" time is after the next wake up time, return true if // "current" is actually updated SetCurrentTime(nextTime time.Time) bool }
RemoteTimerGate interface
func NewRemoteTimerGate ¶ added in v0.14.0
func NewRemoteTimerGate() RemoteTimerGate
NewRemoteTimerGate create a new timer gate instance
type RemoteTimerGateImpl ¶ added in v0.14.0
type RemoteTimerGateImpl struct { // lock for timer and next wake up time sync.Mutex // contains filtered or unexported fields }
RemoteTimerGateImpl is an timer implementation, which basically is an wrapper of golang's timer and additional feature
func (*RemoteTimerGateImpl) Close ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) Close()
Close shutdown the timer
func (*RemoteTimerGateImpl) FireAfter ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) FireAfter(now time.Time) bool
FireAfter check will the timer get fired after a certain time
func (*RemoteTimerGateImpl) FireChan ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) FireChan() <-chan struct{}
FireChan return the channel which will be fired when time is up
func (*RemoteTimerGateImpl) SetCurrentTime ¶ added in v0.14.0
func (timerGate *RemoteTimerGateImpl) SetCurrentTime(currentTime time.Time) bool
SetCurrentTime set the current time, and additionally fire the fire chan if new "current" time is after the next wake up time, return true if "current" is actually updated
type ResetActionAttributes ¶ added in v0.15.0
type ResetActionAttributes struct{}
ResetActionAttributes contains the parameter for performing Reset Action
type ResetActionResult ¶ added in v0.15.0
type ResetActionResult struct{}
ResetActionResult is the result for performing Reset Action
type TaskAllocator ¶ added in v0.14.0
type TaskAllocator interface { VerifyActiveTask(taskDomainID string, task interface{}) (bool, error) VerifyFailoverActiveTask(targetDomainIDs map[string]struct{}, taskDomainID string, task interface{}) (bool, error) VerifyStandbyTask(standbyCluster string, taskDomainID string, task interface{}) (bool, error) Lock() Unlock() }
TaskAllocator verifies if a task should be processed or not
func NewTaskAllocator ¶ added in v0.14.0
func NewTaskAllocator(shard shard.Context) TaskAllocator
NewTaskAllocator create a new task allocator
type TimerGate ¶ added in v0.14.0
type TimerGate interface { // FireChan return the channel which will be fired when time is up FireChan() <-chan struct{} // FireAfter check will the timer get fired after a certain time FireAfter(now time.Time) bool // Update update the timer gate, return true if update is a success // success means timer is idle or timer is set with a sooner time to fire Update(nextTime time.Time) bool // Close shutdown the timer Close() }
TimerGate interface
Source Files ¶
- action.go
- domain_filter.go
- interface.go
- interface_mock.go
- processing_queue.go
- processing_queue_collection.go
- processor_base.go
- queue_processor_util.go
- split_policy.go
- task_allocator.go
- timer_gate.go
- timer_queue_processor.go
- timer_queue_processor_base.go
- transfer_queue_processor.go
- transfer_queue_processor_base.go
- transfer_queue_validator.go