Documentation
¶
Overview ¶
Package scheduler is a generated GoMock package.
Index ¶
- Constants
- Variables
- func RegisterGeneratorExecutors(registry *hsm.Registry, options GeneratorTaskExecutorOptions) error
- func RegisterStateMachines(r *hsm.Registry) error
- type BufferTask
- type Config
- type EventBuffer
- type EventExecute
- type EventWait
- type ExecuteTask
- type Executor
- type Generator
- type GeneratorMachineState
- type GeneratorTaskExecutorOptions
- type MockSpecProcessor
- type MockSpecProcessorMockRecorder
- type ProcessedTimeRange
- type Scheduler
- type SchedulerMachineState
- type SpecProcessor
- type SpecProcessorImpl
- type Tweakables
Constants ¶
const (
// Unique identifier for the Executor sub state machine.
ExecutorMachineType = "scheduler.Executor"
)
const (
TaskTypeBuffer = "scheduler.generator.Buffer"
)
const (
TaskTypeExecute = "scheduler.executor.Execute"
)
Variables ¶
var ( CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting( "component.scheduler.tweakables", DefaultTweakables, "A set of tweakable parameters for the state machine scheduler.") RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "component.scheduler.retryPolicy.initialInterval", time.Second, `The initial backoff interval when retrying a failed workflow start.`, ) RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "component.scheduler.retryPolicy.maxInterval", time.Minute, `The maximum backoff interval when retrying a failed workflow start.`, ) ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting( "component.scheduler.serviceCallTimeout", 5*time.Second, `The upper bound on how long a service call can take before being timed out.`, ) DefaultTweakables = Tweakables{ DefaultCatchupWindow: 365 * 24 * time.Hour, MinCatchupWindow: 10 * time.Second, MaxBufferSize: 1000, CanceledTerminatedCountAsFailures: false, RecentActionCount: 10, } )
var ( // Each sub state machine is a singleton of the top-level Scheduler, accessed with // a fixed key. ExecutorMachineKey = hsm.Key{Type: ExecutorMachineType, ID: ""} )
var ( // Each sub state machine is a singleton of the top-level Scheduler, accessed with // a fixed key GeneratorMachineKey = hsm.Key{Type: GeneratorMachineType, ID: ""} )
var TransitionExecute = hsm.NewTransition( []enumsspb.SchedulerExecutorState{ enumsspb.SCHEDULER_EXECUTOR_STATE_UNSPECIFIED, enumsspb.SCHEDULER_EXECUTOR_STATE_WAITING, enumsspb.SCHEDULER_EXECUTOR_STATE_EXECUTING, }, enumsspb.SCHEDULER_EXECUTOR_STATE_EXECUTING, func(e Executor, event EventExecute) (hsm.TransitionOutput, error) { e.NextInvocationTime = nil e.BufferedStarts = append(e.BufferedStarts, event.BufferedStarts...) return e.output() }, )
Transition to executing state to continue executing pending buffered actions, writing additional pending actions into the Executor's persistent state.
var TransitionWait = hsm.NewTransition( []enumsspb.SchedulerExecutorState{ enumsspb.SCHEDULER_EXECUTOR_STATE_UNSPECIFIED, enumsspb.SCHEDULER_EXECUTOR_STATE_EXECUTING, }, enumsspb.SCHEDULER_EXECUTOR_STATE_WAITING, func(e Executor, event EventWait) (hsm.TransitionOutput, error) { e.NextInvocationTime = nil return e.output() }, )
Transition to waiting state. No new tasks will be created until the Executor is transitioned back to Executing state.
Functions ¶
func RegisterGeneratorExecutors ¶
func RegisterGeneratorExecutors(registry *hsm.Registry, options GeneratorTaskExecutorOptions) error
func RegisterStateMachines ¶
RegisterStateMachine registers state machine definitions with the HSM registry. Should be called during dependency injection.
Types ¶
type BufferTask ¶
type BufferTask struct {
// contains filtered or unexported fields
}
func (BufferTask) Deadline ¶
func (b BufferTask) Deadline() time.Time
func (BufferTask) Destination ¶
func (BufferTask) Destination() string
func (BufferTask) Type ¶
func (BufferTask) Type() string
func (BufferTask) Validate ¶
func (BufferTask) Validate(_ *persistencespb.StateMachineRef, _ *hsm.Node) error
type Config ¶
type Config struct { Tweakables dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables] ServiceCallTimeout dynamicconfig.DurationPropertyFn RetryPolicy func() backoff.RetryPolicy }
State Machine Scheduler dynamic config, shared among all sub state machines.
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventBuffer ¶
Fired when the Generator should buffer actions. After buffering, another buffer task is usually created with a later deadline. The Generator will alternate between sleeping and buffering without an explicit state transition.
type EventExecute ¶
type EventExecute struct { Node *hsm.Node Deadline time.Time BufferedStarts []*schedulespb.BufferedStart }
EventExecute is fired when the executor should wake up and begin processing pending buffered starts, including any that are set on BufferedStarts. BufferedStarts are immediately appended to the Executor's queue as the transition is applied.
Execution can be delayed (such as in the event of backing off) by setting Deadline to something other than env.Now().
type EventWait ¶
EventWait is fired when the executor should return to a waiting state until more actions are buffered.
type ExecuteTask ¶
type ExecuteTask struct {
// contains filtered or unexported fields
}
func (ExecuteTask) Deadline ¶
func (e ExecuteTask) Deadline() time.Time
func (ExecuteTask) Destination ¶
func (ExecuteTask) Destination() string
func (ExecuteTask) Type ¶
func (ExecuteTask) Type() string
func (ExecuteTask) Validate ¶
func (ExecuteTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error
type Executor ¶
type Executor struct {
*schedulespb.ExecutorInternal
}
The Executor sub state machine is responsible for executing buffered actions.
func NewExecutor ¶
func NewExecutor() *Executor
NewExecutor returns an intialized Executor sub state machine, which should be parented under a Scheduler root node.
func (Executor) RegenerateTasks ¶
func (Executor) SetState ¶
func (e Executor) SetState(state enumsspb.SchedulerExecutorState)
func (Executor) State ¶
func (e Executor) State() enumsspb.SchedulerExecutorState
type Generator ¶
type Generator struct {
*schedulespb.GeneratorInternal
}
The Generator sub state machine is responsible for buffering actions according to the schedule's specification. Manually requested actions (from an immediate request or backfill) are separately handled in the Backfiller sub state machine.
func NewGenerator ¶
func NewGenerator() *Generator
NewGenerator returns an intialized Generator sub state machine, which should be parented under a Scheduler root node.
func (Generator) RegenerateTasks ¶
func (Generator) SetState ¶
func (g Generator) SetState(_ GeneratorMachineState)
func (Generator) State ¶
func (g Generator) State() GeneratorMachineState
type GeneratorMachineState ¶
type GeneratorMachineState int
const ( // Unique identifier for the Generator sub state machine. GeneratorMachineType = "scheduler.Generator" // The Generator has only a single running state. GeneratorMachineStateRunning GeneratorMachineState = 0 )
type MockSpecProcessor ¶
type MockSpecProcessor struct {
// contains filtered or unexported fields
}
MockSpecProcessor is a mock of SpecProcessor interface.
func NewMockSpecProcessor ¶
func NewMockSpecProcessor(ctrl *gomock.Controller) *MockSpecProcessor
NewMockSpecProcessor creates a new mock instance.
func (*MockSpecProcessor) EXPECT ¶
func (m *MockSpecProcessor) EXPECT() *MockSpecProcessorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockSpecProcessor) ProcessTimeRange ¶
func (m *MockSpecProcessor) ProcessTimeRange(scheduler Scheduler, start, end time.Time, manual bool, limit *int) (*ProcessedTimeRange, error)
ProcessTimeRange mocks base method.
type MockSpecProcessorMockRecorder ¶
type MockSpecProcessorMockRecorder struct {
// contains filtered or unexported fields
}
MockSpecProcessorMockRecorder is the mock recorder for MockSpecProcessor.
func (*MockSpecProcessorMockRecorder) ProcessTimeRange ¶
func (mr *MockSpecProcessorMockRecorder) ProcessTimeRange(scheduler, start, end, manual, limit any) *gomock.Call
ProcessTimeRange indicates an expected call of ProcessTimeRange.
type ProcessedTimeRange ¶
type ProcessedTimeRange struct { NextWakeupTime time.Time LastActionTime time.Time BufferedStarts []*schedulespb.BufferedStart }
type Scheduler ¶
type Scheduler struct { *schedulespb.SchedulerInternal // contains filtered or unexported fields }
Scheduler is a top-level state machine compromised of 3 sub state machines: - Generator: buffers actions according to the schedule specification - Executor: executes buffered actions - Backfiller: buffers actions according to requested backfills
A running Scheduler will always have exactly one of each of the above sub state machines mounted as nodes within the HSM tree. The top-level machine itself remains in a singular running state for its lifetime (all work is done within the sub state machines). The Scheduler state machine is only responsible for creating the singleton sub state machines.
func NewScheduler ¶
func NewScheduler( namespace, namespaceID, scheduleID string, sched *schedulepb.Schedule, patch *schedulepb.SchedulePatch, ) *Scheduler
NewScheduler returns an initialized Scheduler state machine (without any sub state machines).
func (Scheduler) RegenerateTasks ¶
func (Scheduler) SetState ¶
func (s Scheduler) SetState(_ SchedulerMachineState)
func (Scheduler) State ¶
func (s Scheduler) State() SchedulerMachineState
type SchedulerMachineState ¶
type SchedulerMachineState int
const ( // Unique identifier for top-level scheduler state machine. SchedulerMachineType = "scheduler.Scheduler" // The top-level scheduler only has a single, constant state. SchedulerMachineStateRunning SchedulerMachineState = 0 )
type SpecProcessor ¶
type SpecProcessor interface { // ProcessTimeRange generates buffered actions according to the schedule spec for // the given time range. // // The parameter manual is propagated to the returned BufferedStarts. When the limit // is set to a non-nil pointer, it will be decremented for each buffered start, and // the function will return early should limit reach 0. ProcessTimeRange( scheduler Scheduler, start, end time.Time, manual bool, limit *int, ) (*ProcessedTimeRange, error) }
SpecProcessor is used by the Generator and Backfiller to generate buffered actions according to the schedule spec.
type SpecProcessorImpl ¶
type SpecProcessorImpl struct { fx.In Config *Config MetricsHandler metrics.Handler Logger log.Logger SpecBuilder *scheduler1.SpecBuilder }
func (SpecProcessorImpl) ProcessTimeRange ¶
func (s SpecProcessorImpl) ProcessTimeRange( scheduler Scheduler, start, end time.Time, manual bool, limit *int, ) (*ProcessedTimeRange, error)
type Tweakables ¶
type Tweakables struct { DefaultCatchupWindow time.Duration // Default for catchup window MinCatchupWindow time.Duration // Minimum for catchup window MaxBufferSize int // MaxBufferSize limits the number of buffered actions pending execution in total CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure RecentActionCount int // How many recent actions are recorded in SchedulerInfo. }