Documentation ¶
Index ¶
- Constants
- Variables
- func RegisterExecutor(registry *hsm.Registry, executorOptions TaskExecutorOptions) error
- func RegisterStateMachine(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type Config
- type EventSchedulerActivate
- type EventSchedulerWait
- type ProcessWorkflowCompletionEvent
- type ScheduleExecuteTaskSerializer
- type ScheduleWaitTaskSerializer
- type Scheduler
- type SchedulerActivateTask
- type SchedulerWaitTask
- type TaskExecutorOptions
- type Tweakables
Constants ¶
View Source
const ( TaskTypeSchedulerWait = "scheduler.SchedulerWait" TaskTypeSchedulerExecute = "scheduler.SchedulerExecute" )
View Source
const (
RecentActionCount = 10
)
View Source
const StateMachineType = "scheduler.Scheduler"
Unique type identifier for this state machine.
Variables ¶
View Source
var CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting[Tweakables]( "component.scheduler.tweakables", DefaultTweakables, "A set of tweakable parameters for the schedulers")
View Source
var DefaultTweakables = Tweakables{ DefaultCatchupWindow: 365 * 24 * time.Hour, MinCatchupWindow: 10 * time.Second, MaxBufferSize: 1000, BackfillsPerIteration: 10, CanceledTerminatedCountAsFailures: false, }
View Source
var ErrSchedulerConflict = errors.New("concurrent scheduler task execution detected, unable to update scheduler state")
View Source
var ExecutionTimeout = dynamicconfig.NewNamespaceDurationSetting( "component.scheduler.executionTimeout", time.Second*10, `ExecutionTimeout is the timeout for executing a single scheduler task.`, )
View Source
var Module = fx.Module( "component.scheduler", fx.Provide(ConfigProvider), fx.Provide(scheduler.NewSpecBuilder), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterStateMachine), fx.Invoke(RegisterExecutor), )
View Source
var TransitionSchedulerActivate = hsm.NewTransition( []enumsspb.SchedulerState{enumsspb.SCHEDULER_STATE_WAITING}, enumsspb.SCHEDULER_STATE_EXECUTING, func(scheduler *Scheduler, event EventSchedulerActivate) (hsm.TransitionOutput, error) { tasks, err := scheduler.RegenerateTasks(nil) return hsm.TransitionOutput{Tasks: tasks}, err })
View Source
var TransitionSchedulerWait = hsm.NewTransition( []enumsspb.SchedulerState{enumsspb.SCHEDULER_STATE_EXECUTING}, enumsspb.SCHEDULER_STATE_WAITING, func(scheduler *Scheduler, event EventSchedulerWait) (hsm.TransitionOutput, error) { tasks, err := scheduler.RegenerateTasks(nil) return hsm.TransitionOutput{Tasks: tasks}, err })
View Source
var UseExperimentalHsmScheduler = dynamicconfig.NewNamespaceBoolSetting( "scheduler.use-experimental-hsm-scheduler", false, "When true, use the experimental scheduler implemented using the HSM framework instead of workflows")
Functions ¶
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, executorOptions TaskExecutorOptions, ) error
func RegisterStateMachine ¶
func RegisterTaskSerializers ¶
Types ¶
type Config ¶
type Config struct { Tweakables dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables] ExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter }
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventSchedulerActivate ¶
type EventSchedulerActivate struct{}
EventSchedulerActivate is triggered when the scheduler state machine should wake up and perform work.
type EventSchedulerWait ¶
type EventSchedulerWait struct{}
EventSchedulerWait is triggered when the scheduler state machine is done working and goes back to waiting.
type ProcessWorkflowCompletionEvent ¶
type ProcessWorkflowCompletionEvent struct{}
func (ProcessWorkflowCompletionEvent) DeserializeInput ¶
func (p ProcessWorkflowCompletionEvent) DeserializeInput(data []byte) (any, error)
func (ProcessWorkflowCompletionEvent) Name ¶
func (p ProcessWorkflowCompletionEvent) Name() string
func (ProcessWorkflowCompletionEvent) SerializeOutput ¶
func (p ProcessWorkflowCompletionEvent) SerializeOutput(_ any) ([]byte, error)
type ScheduleExecuteTaskSerializer ¶
type ScheduleExecuteTaskSerializer struct{}
func (ScheduleExecuteTaskSerializer) Deserialize ¶
func (ScheduleExecuteTaskSerializer) Deserialize(data []byte, kind hsm.TaskAttributes) (hsm.Task, error)
type ScheduleWaitTaskSerializer ¶
type ScheduleWaitTaskSerializer struct{}
func (ScheduleWaitTaskSerializer) Deserialize ¶
func (ScheduleWaitTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type Scheduler ¶
type Scheduler struct { *schedspb.HsmSchedulerState // contains filtered or unexported fields }
func NewScheduler ¶
func NewScheduler(args *schedspb.StartScheduleArgs, tweakables *Tweakables) *Scheduler
NewScheduler creates a new scheduler in the WAITING state from given params.
func (*Scheduler) RegenerateTasks ¶
func (*Scheduler) SetState ¶
func (s *Scheduler) SetState(state enumsspb.SchedulerState)
func (*Scheduler) State ¶
func (s *Scheduler) State() enumsspb.SchedulerState
type SchedulerActivateTask ¶
type SchedulerActivateTask struct{}
func (SchedulerActivateTask) Deadline ¶ added in v1.26.2
func (t SchedulerActivateTask) Deadline() time.Time
func (SchedulerActivateTask) Destination ¶
func (t SchedulerActivateTask) Destination() string
func (SchedulerActivateTask) Type ¶
func (SchedulerActivateTask) Type() string
func (SchedulerActivateTask) Validate ¶ added in v1.26.2
func (SchedulerActivateTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type SchedulerWaitTask ¶
type SchedulerWaitTask struct {
// contains filtered or unexported fields
}
func (SchedulerWaitTask) Deadline ¶
func (t SchedulerWaitTask) Deadline() time.Time
func (SchedulerWaitTask) Destination ¶ added in v1.26.2
func (SchedulerWaitTask) Destination() string
func (SchedulerWaitTask) Type ¶
func (SchedulerWaitTask) Type() string
func (SchedulerWaitTask) Validate ¶ added in v1.26.2
func (SchedulerWaitTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type TaskExecutorOptions ¶
type Tweakables ¶
type Tweakables struct { DefaultCatchupWindow time.Duration // Default for catchup window MinCatchupWindow time.Duration // Minimum for catchup window MaxBufferSize int // MaxBufferSize limits the number of buffered starts and backfills BackfillsPerIteration int // How many backfilled actions to take per iteration (implies rate limit since min sleep is 1s) CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure }
Click to show internal directories.
Click to hide internal directories.