Documentation
¶
Overview ¶
Package scheduler is a generated GoMock package.
Index ¶
- Constants
- Variables
- func RegisterGeneratorExecutors(registry *hsm.Registry, options GeneratorTaskExecutorOptions) error
- func RegisterStateMachines(r *hsm.Registry) error
- type BufferTask
- type Config
- type EventBuffer
- type EventExecute
- type EventWait
- type ExecuteTask
- type Executor
- type Generator
- type GeneratorMachineState
- type GeneratorTaskExecutorOptions
- type MockSpecProcessor
- type MockSpecProcessorMockRecorder
- type ProcessedTimeRange
- type Scheduler
- type SchedulerMachineState
- type SpecProcessor
- type SpecProcessorImpl
- type Tweakables
Constants ¶
const (
// Unique identifier for the Executor sub state machine.
ExecutorMachineType = "scheduler.Executor"
)
const (
TaskTypeBuffer = "scheduler.generator.Buffer"
)
const (
TaskTypeExecute = "scheduler.executor.Execute"
)
Variables ¶
var ( CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting( "component.scheduler.tweakables", DefaultTweakables, "A set of tweakable parameters for the state machine scheduler.") RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "component.scheduler.retryPolicy.initialInterval", time.Second, `The initial backoff interval when retrying a failed workflow start.`, ) RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "component.scheduler.retryPolicy.maxInterval", time.Minute, `The maximum backoff interval when retrying a failed workflow start.`, ) ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting( "component.scheduler.serviceCallTimeout", 5*time.Second, `The upper bound on how long a service call can take before being timed out.`, ) DefaultTweakables = Tweakables{ DefaultCatchupWindow: 365 * 24 * time.Hour, MinCatchupWindow: 10 * time.Second, MaxBufferSize: 1000, CanceledTerminatedCountAsFailures: false, RecentActionCount: 10, } )
var ( // Each sub state machine is a singleton of the top-level Scheduler, accessed with // a fixed key. ExecutorMachineKey = hsm.Key{Type: ExecutorMachineType, ID: ""} )
var ( // Each sub state machine is a singleton of the top-level Scheduler, accessed with // a fixed key GeneratorMachineKey = hsm.Key{Type: GeneratorMachineType, ID: ""} )
var TransitionExecute = hsm.NewTransition( []enumsspb.SchedulerExecutorState{ enumsspb.SCHEDULER_EXECUTOR_STATE_UNSPECIFIED, enumsspb.SCHEDULER_EXECUTOR_STATE_WAITING, enumsspb.SCHEDULER_EXECUTOR_STATE_EXECUTING, }, enumsspb.SCHEDULER_EXECUTOR_STATE_EXECUTING, func(e Executor, event EventExecute) (hsm.TransitionOutput, error) { e.NextInvocationTime = nil e.BufferedStarts = append(e.BufferedStarts, event.BufferedStarts...) return e.output() }, )
Transition to executing state to continue executing pending buffered actions, writing additional pending actions into the Executor's persistent state.
var TransitionWait = hsm.NewTransition( []enumsspb.SchedulerExecutorState{ enumsspb.SCHEDULER_EXECUTOR_STATE_UNSPECIFIED, enumsspb.SCHEDULER_EXECUTOR_STATE_EXECUTING, }, enumsspb.SCHEDULER_EXECUTOR_STATE_WAITING, func(e Executor, event EventWait) (hsm.TransitionOutput, error) { e.NextInvocationTime = nil return e.output() }, )
Transition to waiting state. No new tasks will be created until the Executor is transitioned back to Executing state.
Functions ¶
func RegisterGeneratorExecutors ¶ added in v1.27.0
func RegisterGeneratorExecutors(registry *hsm.Registry, options GeneratorTaskExecutorOptions) error
func RegisterStateMachines ¶ added in v1.27.0
RegisterStateMachine registers state machine definitions with the HSM registry. Should be called during dependency injection.
Types ¶
type BufferTask ¶ added in v1.27.0
type BufferTask struct {
// contains filtered or unexported fields
}
func (BufferTask) Deadline ¶ added in v1.27.0
func (b BufferTask) Deadline() time.Time
func (BufferTask) Destination ¶ added in v1.27.0
func (BufferTask) Destination() string
func (BufferTask) Type ¶ added in v1.27.0
func (BufferTask) Type() string
func (BufferTask) Validate ¶ added in v1.27.0
func (BufferTask) Validate(_ *persistencespb.StateMachineRef, _ *hsm.Node) error
type Config ¶
type Config struct { Tweakables dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables] ServiceCallTimeout dynamicconfig.DurationPropertyFn RetryPolicy func() backoff.RetryPolicy }
State Machine Scheduler dynamic config, shared among all sub state machines.
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventBuffer ¶ added in v1.27.0
Fired when the Generator should buffer actions. After buffering, another buffer task is usually created with a later deadline. The Generator will alternate between sleeping and buffering without an explicit state transition.
type EventExecute ¶ added in v1.27.0
type EventExecute struct { Node *hsm.Node Deadline time.Time BufferedStarts []*schedulespb.BufferedStart }
EventExecute is fired when the executor should wake up and begin processing pending buffered starts, including any that are set on BufferedStarts. BufferedStarts are immediately appended to the Executor's queue as the transition is applied.
Execution can be delayed (such as in the event of backing off) by setting Deadline to something other than env.Now().
type EventWait ¶ added in v1.27.0
EventWait is fired when the executor should return to a waiting state until more actions are buffered.
type ExecuteTask ¶ added in v1.27.0
type ExecuteTask struct {
// contains filtered or unexported fields
}
func (ExecuteTask) Deadline ¶ added in v1.27.0
func (e ExecuteTask) Deadline() time.Time
func (ExecuteTask) Destination ¶ added in v1.27.0
func (ExecuteTask) Destination() string
func (ExecuteTask) Type ¶ added in v1.27.0
func (ExecuteTask) Type() string
func (ExecuteTask) Validate ¶ added in v1.27.0
func (ExecuteTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error
type Executor ¶ added in v1.27.0
type Executor struct {
*schedulespb.ExecutorInternal
}
The Executor sub state machine is responsible for executing buffered actions.
func NewExecutor ¶ added in v1.27.0
func NewExecutor() *Executor
NewExecutor returns an intialized Executor sub state machine, which should be parented under a Scheduler root node.
func (Executor) RegenerateTasks ¶ added in v1.27.0
func (Executor) SetState ¶ added in v1.27.0
func (e Executor) SetState(state enumsspb.SchedulerExecutorState)
func (Executor) State ¶ added in v1.27.0
func (e Executor) State() enumsspb.SchedulerExecutorState
type Generator ¶ added in v1.27.0
type Generator struct {
*schedulespb.GeneratorInternal
}
The Generator sub state machine is responsible for buffering actions according to the schedule's specification. Manually requested actions (from an immediate request or backfill) are separately handled in the Backfiller sub state machine.
func NewGenerator ¶ added in v1.27.0
func NewGenerator() *Generator
NewGenerator returns an intialized Generator sub state machine, which should be parented under a Scheduler root node.
func (Generator) RegenerateTasks ¶ added in v1.27.0
func (Generator) SetState ¶ added in v1.27.0
func (g Generator) SetState(_ GeneratorMachineState)
func (Generator) State ¶ added in v1.27.0
func (g Generator) State() GeneratorMachineState
type GeneratorMachineState ¶ added in v1.27.0
type GeneratorMachineState int
const ( // Unique identifier for the Generator sub state machine. GeneratorMachineType = "scheduler.Generator" // The Generator has only a single running state. GeneratorMachineStateRunning GeneratorMachineState = 0 )
type GeneratorTaskExecutorOptions ¶ added in v1.27.0
type MockSpecProcessor ¶ added in v1.27.0
type MockSpecProcessor struct {
// contains filtered or unexported fields
}
MockSpecProcessor is a mock of SpecProcessor interface.
func NewMockSpecProcessor ¶ added in v1.27.0
func NewMockSpecProcessor(ctrl *gomock.Controller) *MockSpecProcessor
NewMockSpecProcessor creates a new mock instance.
func (*MockSpecProcessor) EXPECT ¶ added in v1.27.0
func (m *MockSpecProcessor) EXPECT() *MockSpecProcessorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockSpecProcessor) ProcessTimeRange ¶ added in v1.27.0
func (m *MockSpecProcessor) ProcessTimeRange(scheduler Scheduler, start, end time.Time, manual bool, limit *int) (*ProcessedTimeRange, error)
ProcessTimeRange mocks base method.
type MockSpecProcessorMockRecorder ¶ added in v1.27.0
type MockSpecProcessorMockRecorder struct {
// contains filtered or unexported fields
}
MockSpecProcessorMockRecorder is the mock recorder for MockSpecProcessor.
func (*MockSpecProcessorMockRecorder) ProcessTimeRange ¶ added in v1.27.0
func (mr *MockSpecProcessorMockRecorder) ProcessTimeRange(scheduler, start, end, manual, limit any) *gomock.Call
ProcessTimeRange indicates an expected call of ProcessTimeRange.
type ProcessedTimeRange ¶ added in v1.27.0
type ProcessedTimeRange struct { NextWakeupTime time.Time LastActionTime time.Time BufferedStarts []*schedulespb.BufferedStart }
type Scheduler ¶
type Scheduler struct { *schedulespb.SchedulerInternal // contains filtered or unexported fields }
Scheduler is a top-level state machine compromised of 3 sub state machines: - Generator: buffers actions according to the schedule specification - Executor: executes buffered actions - Backfiller: buffers actions according to requested backfills
A running Scheduler will always have exactly one of each of the above sub state machines mounted as nodes within the HSM tree. The top-level machine itself remains in a singular running state for its lifetime (all work is done within the sub state machines). The Scheduler state machine is only responsible for creating the singleton sub state machines.
func NewScheduler ¶
func NewScheduler( namespace, namespaceID, scheduleID string, sched *schedulepb.Schedule, patch *schedulepb.SchedulePatch, ) *Scheduler
NewScheduler returns an initialized Scheduler state machine (without any sub state machines).
func (Scheduler) RegenerateTasks ¶
func (Scheduler) SetState ¶
func (s Scheduler) SetState(_ SchedulerMachineState)
func (Scheduler) State ¶
func (s Scheduler) State() SchedulerMachineState
type SchedulerMachineState ¶ added in v1.27.0
type SchedulerMachineState int
const ( // Unique identifier for top-level scheduler state machine. SchedulerMachineType = "scheduler.Scheduler" // The top-level scheduler only has a single, constant state. SchedulerMachineStateRunning SchedulerMachineState = 0 )
type SpecProcessor ¶ added in v1.27.0
type SpecProcessor interface { // ProcessTimeRange generates buffered actions according to the schedule spec for // the given time range. // // The parameter manual is propagated to the returned BufferedStarts. When the limit // is set to a non-nil pointer, it will be decremented for each buffered start, and // the function will return early should limit reach 0. ProcessTimeRange( scheduler Scheduler, start, end time.Time, manual bool, limit *int, ) (*ProcessedTimeRange, error) }
SpecProcessor is used by the Generator and Backfiller to generate buffered actions according to the schedule spec.
type SpecProcessorImpl ¶ added in v1.27.0
type SpecProcessorImpl struct { fx.In Config *Config MetricsHandler metrics.Handler Logger log.Logger SpecBuilder *scheduler1.SpecBuilder }
func (SpecProcessorImpl) ProcessTimeRange ¶ added in v1.27.0
func (s SpecProcessorImpl) ProcessTimeRange( scheduler Scheduler, start, end time.Time, manual bool, limit *int, ) (*ProcessedTimeRange, error)
type Tweakables ¶
type Tweakables struct { DefaultCatchupWindow time.Duration // Default for catchup window MinCatchupWindow time.Duration // Minimum for catchup window MaxBufferSize int // MaxBufferSize limits the number of buffered actions pending execution in total CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure RecentActionCount int // How many recent actions are recorded in SchedulerInfo. }