scheduler

package
v1.27.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2025 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package scheduler is a generated GoMock package.

Index

Constants

View Source
const (
	// Unique identifier for the Executor sub state machine.
	ExecutorMachineType = "scheduler.Executor"
)
View Source
const (
	TaskTypeBuffer = "scheduler.generator.Buffer"
)
View Source
const (
	TaskTypeExecute = "scheduler.executor.Execute"
)

Variables

View Source
var (
	CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting(
		"component.scheduler.tweakables",
		DefaultTweakables,
		"A set of tweakable parameters for the state machine scheduler.")

	RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
		"component.scheduler.retryPolicy.initialInterval",
		time.Second,
		`The initial backoff interval when retrying a failed workflow start.`,
	)

	RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
		"component.scheduler.retryPolicy.maxInterval",
		time.Minute,
		`The maximum backoff interval when retrying a failed workflow start.`,
	)

	ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting(
		"component.scheduler.serviceCallTimeout",
		5*time.Second,
		`The upper bound on how long a service call can take before being timed out.`,
	)

	DefaultTweakables = Tweakables{
		DefaultCatchupWindow:              365 * 24 * time.Hour,
		MinCatchupWindow:                  10 * time.Second,
		MaxBufferSize:                     1000,
		CanceledTerminatedCountAsFailures: false,
		RecentActionCount:                 10,
	}
)
View Source
var (

	// Each sub state machine is a singleton of the top-level Scheduler, accessed with
	// a fixed key.
	ExecutorMachineKey = hsm.Key{Type: ExecutorMachineType, ID: ""}
)
View Source
var (

	// Each sub state machine is a singleton of the top-level Scheduler, accessed with
	// a fixed key
	GeneratorMachineKey = hsm.Key{Type: GeneratorMachineType, ID: ""}
)

Transition to executing state to continue executing pending buffered actions, writing additional pending actions into the Executor's persistent state.

Transition to waiting state. No new tasks will be created until the Executor is transitioned back to Executing state.

Functions

func RegisterGeneratorExecutors added in v1.27.0

func RegisterGeneratorExecutors(registry *hsm.Registry, options GeneratorTaskExecutorOptions) error

func RegisterStateMachines added in v1.27.0

func RegisterStateMachines(r *hsm.Registry) error

RegisterStateMachine registers state machine definitions with the HSM registry. Should be called during dependency injection.

Types

type BufferTask added in v1.27.0

type BufferTask struct {
	// contains filtered or unexported fields
}

func (BufferTask) Deadline added in v1.27.0

func (b BufferTask) Deadline() time.Time

func (BufferTask) Destination added in v1.27.0

func (BufferTask) Destination() string

func (BufferTask) Type added in v1.27.0

func (BufferTask) Type() string

func (BufferTask) Validate added in v1.27.0

type Config

type Config struct {
	Tweakables         dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables]
	ServiceCallTimeout dynamicconfig.DurationPropertyFn
	RetryPolicy        func() backoff.RetryPolicy
}

State Machine Scheduler dynamic config, shared among all sub state machines.

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection) *Config

type EventBuffer added in v1.27.0

type EventBuffer struct {
	Node *hsm.Node

	Deadline time.Time
}

Fired when the Generator should buffer actions. After buffering, another buffer task is usually created with a later deadline. The Generator will alternate between sleeping and buffering without an explicit state transition.

type EventExecute added in v1.27.0

type EventExecute struct {
	Node *hsm.Node

	Deadline       time.Time
	BufferedStarts []*schedulespb.BufferedStart
}

EventExecute is fired when the executor should wake up and begin processing pending buffered starts, including any that are set on BufferedStarts. BufferedStarts are immediately appended to the Executor's queue as the transition is applied.

Execution can be delayed (such as in the event of backing off) by setting Deadline to something other than env.Now().

type EventWait added in v1.27.0

type EventWait struct {
	Node *hsm.Node
}

EventWait is fired when the executor should return to a waiting state until more actions are buffered.

type ExecuteTask added in v1.27.0

type ExecuteTask struct {
	// contains filtered or unexported fields
}

func (ExecuteTask) Deadline added in v1.27.0

func (e ExecuteTask) Deadline() time.Time

func (ExecuteTask) Destination added in v1.27.0

func (ExecuteTask) Destination() string

func (ExecuteTask) Type added in v1.27.0

func (ExecuteTask) Type() string

func (ExecuteTask) Validate added in v1.27.0

type Executor added in v1.27.0

type Executor struct {
	*schedulespb.ExecutorInternal
}

The Executor sub state machine is responsible for executing buffered actions.

func NewExecutor added in v1.27.0

func NewExecutor() *Executor

NewExecutor returns an intialized Executor sub state machine, which should be parented under a Scheduler root node.

func (Executor) RegenerateTasks added in v1.27.0

func (e Executor) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Executor) SetState added in v1.27.0

func (e Executor) SetState(state enumsspb.SchedulerExecutorState)

func (Executor) State added in v1.27.0

type Generator added in v1.27.0

type Generator struct {
	*schedulespb.GeneratorInternal
}

The Generator sub state machine is responsible for buffering actions according to the schedule's specification. Manually requested actions (from an immediate request or backfill) are separately handled in the Backfiller sub state machine.

func NewGenerator added in v1.27.0

func NewGenerator() *Generator

NewGenerator returns an intialized Generator sub state machine, which should be parented under a Scheduler root node.

func (Generator) RegenerateTasks added in v1.27.0

func (g Generator) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Generator) SetState added in v1.27.0

func (g Generator) SetState(_ GeneratorMachineState)

func (Generator) State added in v1.27.0

func (g Generator) State() GeneratorMachineState

type GeneratorMachineState added in v1.27.0

type GeneratorMachineState int
const (
	// Unique identifier for the Generator sub state machine.
	GeneratorMachineType = "scheduler.Generator"

	// The Generator has only a single running state.
	GeneratorMachineStateRunning GeneratorMachineState = 0
)

type GeneratorTaskExecutorOptions added in v1.27.0

type GeneratorTaskExecutorOptions struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	BaseLogger     log.Logger
	SpecProcessor  SpecProcessor
}

type MockSpecProcessor added in v1.27.0

type MockSpecProcessor struct {
	// contains filtered or unexported fields
}

MockSpecProcessor is a mock of SpecProcessor interface.

func NewMockSpecProcessor added in v1.27.0

func NewMockSpecProcessor(ctrl *gomock.Controller) *MockSpecProcessor

NewMockSpecProcessor creates a new mock instance.

func (*MockSpecProcessor) EXPECT added in v1.27.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSpecProcessor) ProcessTimeRange added in v1.27.0

func (m *MockSpecProcessor) ProcessTimeRange(scheduler Scheduler, start, end time.Time, manual bool, limit *int) (*ProcessedTimeRange, error)

ProcessTimeRange mocks base method.

type MockSpecProcessorMockRecorder added in v1.27.0

type MockSpecProcessorMockRecorder struct {
	// contains filtered or unexported fields
}

MockSpecProcessorMockRecorder is the mock recorder for MockSpecProcessor.

func (*MockSpecProcessorMockRecorder) ProcessTimeRange added in v1.27.0

func (mr *MockSpecProcessorMockRecorder) ProcessTimeRange(scheduler, start, end, manual, limit any) *gomock.Call

ProcessTimeRange indicates an expected call of ProcessTimeRange.

type ProcessedTimeRange added in v1.27.0

type ProcessedTimeRange struct {
	NextWakeupTime time.Time
	LastActionTime time.Time
	BufferedStarts []*schedulespb.BufferedStart
}

type Scheduler

type Scheduler struct {
	*schedulespb.SchedulerInternal
	// contains filtered or unexported fields
}

Scheduler is a top-level state machine compromised of 3 sub state machines: - Generator: buffers actions according to the schedule specification - Executor: executes buffered actions - Backfiller: buffers actions according to requested backfills

A running Scheduler will always have exactly one of each of the above sub state machines mounted as nodes within the HSM tree. The top-level machine itself remains in a singular running state for its lifetime (all work is done within the sub state machines). The Scheduler state machine is only responsible for creating the singleton sub state machines.

func NewScheduler

func NewScheduler(
	namespace, namespaceID, scheduleID string,
	sched *schedulepb.Schedule,
	patch *schedulepb.SchedulePatch,
) *Scheduler

NewScheduler returns an initialized Scheduler state machine (without any sub state machines).

func (Scheduler) RegenerateTasks

func (s Scheduler) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Scheduler) SetState

func (s Scheduler) SetState(_ SchedulerMachineState)

func (Scheduler) State

func (s Scheduler) State() SchedulerMachineState

type SchedulerMachineState added in v1.27.0

type SchedulerMachineState int
const (
	// Unique identifier for top-level scheduler state machine.
	SchedulerMachineType = "scheduler.Scheduler"

	// The top-level scheduler only has a single, constant state.
	SchedulerMachineStateRunning SchedulerMachineState = 0
)

type SpecProcessor added in v1.27.0

type SpecProcessor interface {
	// ProcessTimeRange generates buffered actions according to the schedule spec for
	// the given time range.
	//
	// The parameter manual is propagated to the returned BufferedStarts. When the limit
	// is set to a non-nil pointer, it will be decremented for each buffered start, and
	// the function will return early should limit reach 0.
	ProcessTimeRange(
		scheduler Scheduler,
		start, end time.Time,
		manual bool,
		limit *int,
	) (*ProcessedTimeRange, error)
}

SpecProcessor is used by the Generator and Backfiller to generate buffered actions according to the schedule spec.

type SpecProcessorImpl added in v1.27.0

type SpecProcessorImpl struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	Logger         log.Logger
	SpecBuilder    *scheduler1.SpecBuilder
}

func (SpecProcessorImpl) ProcessTimeRange added in v1.27.0

func (s SpecProcessorImpl) ProcessTimeRange(
	scheduler Scheduler,
	start, end time.Time,
	manual bool,
	limit *int,
) (*ProcessedTimeRange, error)

type Tweakables

type Tweakables struct {
	DefaultCatchupWindow              time.Duration // Default for catchup window
	MinCatchupWindow                  time.Duration // Minimum for catchup window
	MaxBufferSize                     int           // MaxBufferSize limits the number of buffered actions pending execution in total
	CanceledTerminatedCountAsFailures bool          // Whether cancelled+terminated count for pause-on-failure
	RecentActionCount                 int           // How many recent actions are recorded in SchedulerInfo.

}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL