scheduler

package
v1.27.0-128.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2025 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package scheduler is a generated GoMock package.

Index

Constants

View Source
const (
	// Unique identifier for the Executor sub state machine.
	ExecutorMachineType = "scheduler.Executor"
)
View Source
const (
	TaskTypeBuffer = "scheduler.generator.Buffer"
)
View Source
const (
	TaskTypeExecute = "scheduler.executor.Execute"
)

Variables

View Source
var (
	CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting(
		"component.scheduler.tweakables",
		DefaultTweakables,
		"A set of tweakable parameters for the state machine scheduler.")

	RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
		"component.scheduler.retryPolicy.initialInterval",
		time.Second,
		`The initial backoff interval when retrying a failed workflow start.`,
	)

	RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
		"component.scheduler.retryPolicy.maxInterval",
		time.Minute,
		`The maximum backoff interval when retrying a failed workflow start.`,
	)

	ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting(
		"component.scheduler.serviceCallTimeout",
		5*time.Second,
		`The upper bound on how long a service call can take before being timed out.`,
	)

	DefaultTweakables = Tweakables{
		DefaultCatchupWindow:              365 * 24 * time.Hour,
		MinCatchupWindow:                  10 * time.Second,
		MaxBufferSize:                     1000,
		CanceledTerminatedCountAsFailures: false,
		RecentActionCount:                 10,
	}
)
View Source
var (

	// Each sub state machine is a singleton of the top-level Scheduler, accessed with
	// a fixed key.
	ExecutorMachineKey = hsm.Key{Type: ExecutorMachineType, ID: ""}
)
View Source
var (

	// Each sub state machine is a singleton of the top-level Scheduler, accessed with
	// a fixed key
	GeneratorMachineKey = hsm.Key{Type: GeneratorMachineType, ID: ""}
)

Transition to executing state to continue executing pending buffered actions, writing additional pending actions into the Executor's persistent state.

Transition to waiting state. No new tasks will be created until the Executor is transitioned back to Executing state.

Functions

func RegisterGeneratorExecutors

func RegisterGeneratorExecutors(registry *hsm.Registry, options GeneratorTaskExecutorOptions) error

func RegisterStateMachines

func RegisterStateMachines(r *hsm.Registry) error

RegisterStateMachine registers state machine definitions with the HSM registry. Should be called during dependency injection.

Types

type BufferTask

type BufferTask struct {
	// contains filtered or unexported fields
}

func (BufferTask) Deadline

func (b BufferTask) Deadline() time.Time

func (BufferTask) Destination

func (BufferTask) Destination() string

func (BufferTask) Type

func (BufferTask) Type() string

func (BufferTask) Validate

type Config

type Config struct {
	Tweakables         dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables]
	ServiceCallTimeout dynamicconfig.DurationPropertyFn
	RetryPolicy        func() backoff.RetryPolicy
}

State Machine Scheduler dynamic config, shared among all sub state machines.

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection) *Config

type EventBuffer

type EventBuffer struct {
	Node *hsm.Node

	Deadline time.Time
}

Fired when the Generator should buffer actions. After buffering, another buffer task is usually created with a later deadline. The Generator will alternate between sleeping and buffering without an explicit state transition.

type EventExecute

type EventExecute struct {
	Node *hsm.Node

	Deadline       time.Time
	BufferedStarts []*schedulespb.BufferedStart
}

EventExecute is fired when the executor should wake up and begin processing pending buffered starts, including any that are set on BufferedStarts. BufferedStarts are immediately appended to the Executor's queue as the transition is applied.

Execution can be delayed (such as in the event of backing off) by setting Deadline to something other than env.Now().

type EventWait

type EventWait struct {
	Node *hsm.Node
}

EventWait is fired when the executor should return to a waiting state until more actions are buffered.

type ExecuteTask

type ExecuteTask struct {
	// contains filtered or unexported fields
}

func (ExecuteTask) Deadline

func (e ExecuteTask) Deadline() time.Time

func (ExecuteTask) Destination

func (ExecuteTask) Destination() string

func (ExecuteTask) Type

func (ExecuteTask) Type() string

func (ExecuteTask) Validate

type Executor

type Executor struct {
	*schedulespb.ExecutorInternal
}

The Executor sub state machine is responsible for executing buffered actions.

func NewExecutor

func NewExecutor() *Executor

NewExecutor returns an intialized Executor sub state machine, which should be parented under a Scheduler root node.

func (Executor) RegenerateTasks

func (e Executor) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Executor) SetState

func (e Executor) SetState(state enumsspb.SchedulerExecutorState)

func (Executor) State

type Generator

type Generator struct {
	*schedulespb.GeneratorInternal
}

The Generator sub state machine is responsible for buffering actions according to the schedule's specification. Manually requested actions (from an immediate request or backfill) are separately handled in the Backfiller sub state machine.

func NewGenerator

func NewGenerator() *Generator

NewGenerator returns an intialized Generator sub state machine, which should be parented under a Scheduler root node.

func (Generator) RegenerateTasks

func (g Generator) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Generator) SetState

func (g Generator) SetState(_ GeneratorMachineState)

func (Generator) State

func (g Generator) State() GeneratorMachineState

type GeneratorMachineState

type GeneratorMachineState int
const (
	// Unique identifier for the Generator sub state machine.
	GeneratorMachineType = "scheduler.Generator"

	// The Generator has only a single running state.
	GeneratorMachineStateRunning GeneratorMachineState = 0
)

type GeneratorTaskExecutorOptions

type GeneratorTaskExecutorOptions struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	BaseLogger     log.Logger
	SpecProcessor  SpecProcessor
}

type MockSpecProcessor

type MockSpecProcessor struct {
	// contains filtered or unexported fields
}

MockSpecProcessor is a mock of SpecProcessor interface.

func NewMockSpecProcessor

func NewMockSpecProcessor(ctrl *gomock.Controller) *MockSpecProcessor

NewMockSpecProcessor creates a new mock instance.

func (*MockSpecProcessor) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSpecProcessor) ProcessTimeRange

func (m *MockSpecProcessor) ProcessTimeRange(scheduler Scheduler, start, end time.Time, manual bool, limit *int) (*ProcessedTimeRange, error)

ProcessTimeRange mocks base method.

type MockSpecProcessorMockRecorder

type MockSpecProcessorMockRecorder struct {
	// contains filtered or unexported fields
}

MockSpecProcessorMockRecorder is the mock recorder for MockSpecProcessor.

func (*MockSpecProcessorMockRecorder) ProcessTimeRange

func (mr *MockSpecProcessorMockRecorder) ProcessTimeRange(scheduler, start, end, manual, limit any) *gomock.Call

ProcessTimeRange indicates an expected call of ProcessTimeRange.

type ProcessedTimeRange

type ProcessedTimeRange struct {
	NextWakeupTime time.Time
	LastActionTime time.Time
	BufferedStarts []*schedulespb.BufferedStart
}

type Scheduler

type Scheduler struct {
	*schedulespb.SchedulerInternal
	// contains filtered or unexported fields
}

Scheduler is a top-level state machine compromised of 3 sub state machines: - Generator: buffers actions according to the schedule specification - Executor: executes buffered actions - Backfiller: buffers actions according to requested backfills

A running Scheduler will always have exactly one of each of the above sub state machines mounted as nodes within the HSM tree. The top-level machine itself remains in a singular running state for its lifetime (all work is done within the sub state machines). The Scheduler state machine is only responsible for creating the singleton sub state machines.

func NewScheduler

func NewScheduler(
	namespace, namespaceID, scheduleID string,
	sched *schedulepb.Schedule,
	patch *schedulepb.SchedulePatch,
) *Scheduler

NewScheduler returns an initialized Scheduler state machine (without any sub state machines).

func (Scheduler) RegenerateTasks

func (s Scheduler) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Scheduler) SetState

func (s Scheduler) SetState(_ SchedulerMachineState)

func (Scheduler) State

func (s Scheduler) State() SchedulerMachineState

type SchedulerMachineState

type SchedulerMachineState int
const (
	// Unique identifier for top-level scheduler state machine.
	SchedulerMachineType = "scheduler.Scheduler"

	// The top-level scheduler only has a single, constant state.
	SchedulerMachineStateRunning SchedulerMachineState = 0
)

type SpecProcessor

type SpecProcessor interface {
	// ProcessTimeRange generates buffered actions according to the schedule spec for
	// the given time range.
	//
	// The parameter manual is propagated to the returned BufferedStarts. When the limit
	// is set to a non-nil pointer, it will be decremented for each buffered start, and
	// the function will return early should limit reach 0.
	ProcessTimeRange(
		scheduler Scheduler,
		start, end time.Time,
		manual bool,
		limit *int,
	) (*ProcessedTimeRange, error)
}

SpecProcessor is used by the Generator and Backfiller to generate buffered actions according to the schedule spec.

type SpecProcessorImpl

type SpecProcessorImpl struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	Logger         log.Logger
	SpecBuilder    *scheduler1.SpecBuilder
}

func (SpecProcessorImpl) ProcessTimeRange

func (s SpecProcessorImpl) ProcessTimeRange(
	scheduler Scheduler,
	start, end time.Time,
	manual bool,
	limit *int,
) (*ProcessedTimeRange, error)

type Tweakables

type Tweakables struct {
	DefaultCatchupWindow              time.Duration // Default for catchup window
	MinCatchupWindow                  time.Duration // Minimum for catchup window
	MaxBufferSize                     int           // MaxBufferSize limits the number of buffered actions pending execution in total
	CanceledTerminatedCountAsFailures bool          // Whether cancelled+terminated count for pause-on-failure
	RecentActionCount                 int           // How many recent actions are recorded in SchedulerInfo.

}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL