scheduler

package
v1.26.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskTypeSchedulerWait    = "scheduler.SchedulerWait"
	TaskTypeSchedulerExecute = "scheduler.SchedulerExecute"
)
View Source
const (
	RecentActionCount = 10
)
View Source
const StateMachineType = "scheduler.Scheduler"

Unique type identifier for this state machine.

Variables

View Source
var CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting[Tweakables](
	"component.scheduler.tweakables",
	DefaultTweakables,
	"A set of tweakable parameters for the schedulers")
View Source
var DefaultTweakables = Tweakables{
	DefaultCatchupWindow:              365 * 24 * time.Hour,
	MinCatchupWindow:                  10 * time.Second,
	MaxBufferSize:                     1000,
	BackfillsPerIteration:             10,
	CanceledTerminatedCountAsFailures: false,
}
View Source
var ErrSchedulerConflict = errors.New("concurrent scheduler task execution detected, unable to update scheduler state")
View Source
var ExecutionTimeout = dynamicconfig.NewNamespaceDurationSetting(
	"component.scheduler.executionTimeout",
	time.Second*10,
	`ExecutionTimeout is the timeout for executing a single scheduler task.`,
)
View Source
var TransitionSchedulerActivate = hsm.NewTransition(
	[]enumsspb.SchedulerState{enumsspb.SCHEDULER_STATE_WAITING},
	enumsspb.SCHEDULER_STATE_EXECUTING,
	func(scheduler *Scheduler, event EventSchedulerActivate) (hsm.TransitionOutput, error) {
		tasks, err := scheduler.RegenerateTasks(nil)
		return hsm.TransitionOutput{Tasks: tasks}, err
	})
View Source
var TransitionSchedulerWait = hsm.NewTransition(
	[]enumsspb.SchedulerState{enumsspb.SCHEDULER_STATE_EXECUTING},
	enumsspb.SCHEDULER_STATE_WAITING,
	func(scheduler *Scheduler, event EventSchedulerWait) (hsm.TransitionOutput, error) {
		tasks, err := scheduler.RegenerateTasks(nil)
		return hsm.TransitionOutput{Tasks: tasks}, err
	})
View Source
var UseExperimentalHsmScheduler = dynamicconfig.NewNamespaceBoolSetting(
	"scheduler.use-experimental-hsm-scheduler",
	false,
	"When true, use the experimental scheduler implemented using the HSM framework instead of workflows")

Functions

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	executorOptions TaskExecutorOptions,
) error

func RegisterStateMachine

func RegisterStateMachine(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type EventSchedulerActivate

type EventSchedulerActivate struct{}

EventSchedulerActivate is triggered when the scheduler state machine should wake up and perform work.

type EventSchedulerWait

type EventSchedulerWait struct{}

EventSchedulerWait is triggered when the scheduler state machine is done working and goes back to waiting.

type ProcessWorkflowCompletionEvent

type ProcessWorkflowCompletionEvent struct{}

func (ProcessWorkflowCompletionEvent) DeserializeInput

func (p ProcessWorkflowCompletionEvent) DeserializeInput(data []byte) (any, error)

func (ProcessWorkflowCompletionEvent) Name

func (ProcessWorkflowCompletionEvent) SerializeOutput

func (p ProcessWorkflowCompletionEvent) SerializeOutput(_ any) ([]byte, error)

type ScheduleExecuteTaskSerializer

type ScheduleExecuteTaskSerializer struct{}

func (ScheduleExecuteTaskSerializer) Deserialize

func (ScheduleExecuteTaskSerializer) Deserialize(data []byte, kind hsm.TaskAttributes) (hsm.Task, error)

func (ScheduleExecuteTaskSerializer) Serialize

type ScheduleWaitTaskSerializer

type ScheduleWaitTaskSerializer struct{}

func (ScheduleWaitTaskSerializer) Deserialize

func (ScheduleWaitTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)

func (ScheduleWaitTaskSerializer) Serialize

func (ScheduleWaitTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type Scheduler

type Scheduler struct {
	*schedspb.HsmSchedulerState
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(args *schedspb.StartScheduleArgs, tweakables *Tweakables) *Scheduler

NewScheduler creates a new scheduler in the WAITING state from given params.

func (*Scheduler) RegenerateTasks

func (s *Scheduler) RegenerateTasks(*hsm.Node) ([]hsm.Task, error)

func (*Scheduler) SetState

func (s *Scheduler) SetState(state enumsspb.SchedulerState)

func (*Scheduler) State

func (s *Scheduler) State() enumsspb.SchedulerState

type SchedulerActivateTask

type SchedulerActivateTask struct{}

func (SchedulerActivateTask) Deadline added in v1.26.2

func (t SchedulerActivateTask) Deadline() time.Time

func (SchedulerActivateTask) Destination

func (t SchedulerActivateTask) Destination() string

func (SchedulerActivateTask) Type

func (SchedulerActivateTask) Validate added in v1.26.2

type SchedulerWaitTask

type SchedulerWaitTask struct {
	// contains filtered or unexported fields
}

func (SchedulerWaitTask) Deadline

func (t SchedulerWaitTask) Deadline() time.Time

func (SchedulerWaitTask) Destination added in v1.26.2

func (SchedulerWaitTask) Destination() string

func (SchedulerWaitTask) Type

func (SchedulerWaitTask) Type() string

func (SchedulerWaitTask) Validate added in v1.26.2

type TaskExecutorOptions

type TaskExecutorOptions struct {
	fx.In
	MetricsHandler    metrics.Handler
	Logger            log.Logger
	SpecBuilder       *scheduler.SpecBuilder
	FrontendClient    workflowservice.WorkflowServiceClient
	HistoryClient     resource.HistoryClient
	NamespaceRegistry namespace.Registry
	Config            *Config
}

type Tweakables

type Tweakables struct {
	DefaultCatchupWindow              time.Duration // Default for catchup window
	MinCatchupWindow                  time.Duration // Minimum for catchup window
	MaxBufferSize                     int           // MaxBufferSize limits the number of buffered starts and backfills
	BackfillsPerIteration             int           // How many backfilled actions to take per iteration (implies rate limit since min sleep is 1s)
	CanceledTerminatedCountAsFailures bool          // Whether cancelled+terminated count for pause-on-failure

}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL