scheduler

package
v1.25.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskTypeSchedulerWait    = "scheduler.SchedulerWait"
	TaskTypeSchedulerExecute = "scheduler.SchedulerExecute"
)
View Source
const (
	RecentActionCount = 10
)
View Source
const StateMachineType = "scheduler.Scheduler"

Unique type identifier for this state machine.

Variables

View Source
var CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting[Tweakables](
	"component.scheduler.tweakables",
	DefaultTweakables,
	"A set of tweakable parameters for the schedulers")
View Source
var DefaultTweakables = Tweakables{
	DefaultCatchupWindow:              365 * 24 * time.Hour,
	MinCatchupWindow:                  10 * time.Second,
	MaxBufferSize:                     1000,
	BackfillsPerIteration:             10,
	CanceledTerminatedCountAsFailures: false,
}
View Source
var ErrSchedulerConflict = errors.New("concurrent scheduler task execution detected, unable to update scheduler state")
View Source
var ExecutionTimeout = dynamicconfig.NewNamespaceDurationSetting(
	"component.scheduler.executionTimeout",
	time.Second*10,
	`ExecutionTimeout is the timeout for executing a single scheduler task.`,
)
View Source
var TransitionSchedulerActivate = hsm.NewTransition(
	[]enumsspb.SchedulerState{enumsspb.SCHEDULER_STATE_WAITING},
	enumsspb.SCHEDULER_STATE_EXECUTING,
	func(scheduler *Scheduler, event EventSchedulerActivate) (hsm.TransitionOutput, error) {
		tasks, err := scheduler.RegenerateTasks(nil)
		return hsm.TransitionOutput{Tasks: tasks}, err
	})
View Source
var TransitionSchedulerWait = hsm.NewTransition(
	[]enumsspb.SchedulerState{enumsspb.SCHEDULER_STATE_EXECUTING},
	enumsspb.SCHEDULER_STATE_WAITING,
	func(scheduler *Scheduler, event EventSchedulerWait) (hsm.TransitionOutput, error) {
		tasks, err := scheduler.RegenerateTasks(nil)
		return hsm.TransitionOutput{Tasks: tasks}, err
	})
View Source
var UseExperimentalHsmScheduler = dynamicconfig.NewNamespaceBoolSetting(
	"scheduler.use-experimental-hsm-scheduler",
	false,
	"When true, use the experimental scheduler implemented using the HSM framework instead of workflows")

Functions

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	executorOptions TaskExecutorOptions,
) error

func RegisterStateMachine

func RegisterStateMachine(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type EventSchedulerActivate

type EventSchedulerActivate struct{}

EventSchedulerActivate is triggered when the scheduler state machine should wake up and perform work.

type EventSchedulerWait

type EventSchedulerWait struct{}

EventSchedulerWait is triggered when the scheduler state machine is done working and goes back to waiting.

type ProcessWorkflowCompletionEvent

type ProcessWorkflowCompletionEvent struct{}

func (ProcessWorkflowCompletionEvent) DeserializeInput

func (p ProcessWorkflowCompletionEvent) DeserializeInput(data []byte) (any, error)

func (ProcessWorkflowCompletionEvent) Name

func (ProcessWorkflowCompletionEvent) SerializeOutput

func (p ProcessWorkflowCompletionEvent) SerializeOutput(_ any) ([]byte, error)

type ScheduleExecuteTaskSerializer

type ScheduleExecuteTaskSerializer struct{}

func (ScheduleExecuteTaskSerializer) Deserialize

func (ScheduleExecuteTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (ScheduleExecuteTaskSerializer) Serialize

type ScheduleWaitTaskSerializer

type ScheduleWaitTaskSerializer struct{}

func (ScheduleWaitTaskSerializer) Deserialize

func (ScheduleWaitTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (ScheduleWaitTaskSerializer) Serialize

func (ScheduleWaitTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type Scheduler

type Scheduler struct {
	*schedspb.HsmSchedulerState
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(args *schedspb.StartScheduleArgs, tweakables *Tweakables) *Scheduler

NewScheduler creates a new scheduler in the WAITING state from given params.

func (*Scheduler) RegenerateTasks

func (s *Scheduler) RegenerateTasks(*hsm.Node) ([]hsm.Task, error)

func (*Scheduler) SetState

func (s *Scheduler) SetState(state enumsspb.SchedulerState)

func (*Scheduler) State

func (s *Scheduler) State() enumsspb.SchedulerState

type SchedulerActivateTask

type SchedulerActivateTask struct {
	Destination string
}

func (SchedulerActivateTask) Concurrent

func (SchedulerActivateTask) Concurrent() bool

func (SchedulerActivateTask) Kind

func (SchedulerActivateTask) Type

type SchedulerWaitTask

type SchedulerWaitTask struct {
	Deadline time.Time
}

func (SchedulerWaitTask) Concurrent

func (SchedulerWaitTask) Concurrent() bool

func (SchedulerWaitTask) Kind

func (t SchedulerWaitTask) Kind() hsm.TaskKind

func (SchedulerWaitTask) Type

func (SchedulerWaitTask) Type() string

type TaskExecutorOptions

type TaskExecutorOptions struct {
	fx.In
	MetricsHandler    metrics.Handler
	Logger            log.Logger
	SpecBuilder       *scheduler.SpecBuilder
	FrontendClient    workflowservice.WorkflowServiceClient
	HistoryClient     resource.HistoryClient
	NamespaceRegistry namespace.Registry
	Config            *Config
}

type Tweakables

type Tweakables struct {
	DefaultCatchupWindow              time.Duration // Default for catchup window
	MinCatchupWindow                  time.Duration // Minimum for catchup window
	MaxBufferSize                     int           // MaxBufferSize limits the number of buffered starts and backfills
	BackfillsPerIteration             int           // How many backfilled actions to take per iteration (implies rate limit since min sleep is 1s)
	CanceledTerminatedCountAsFailures bool          // Whether cancelled+terminated count for pause-on-failure

}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL