scheduler

package
v0.21.231 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0, MIT Imports: 13 Imported by: 0

Documentation

Overview

Package scheduler provides a registry to be used as a postprocessor for the routes that use a LIFO filter.

Index

Constants

View Source
const (
	// LIFOKey used during routing to pass lifo values from the filters to the proxy.
	LIFOKey = "lifo"
	// FIFOKey used during routing to pass fifo values from the filters to the proxy.
	FIFOKey = "fifo"
)

Variables

View Source
var (
	ErrQueueFull      = errors.New("queue full")
	ErrQueueTimeout   = errors.New("queue timeout")
	ErrClientCanceled = errors.New("client canceled")
)

Functions

This section is empty.

Types

type Config

type Config struct {

	// MaxConcurrency defines how many jobs are allowed to run concurrently.
	// Defaults to 1.
	MaxConcurrency int

	// MaxStackSize defines how many jobs may be waiting in the stack.
	// Defaults to infinite.
	MaxQueueSize int

	// Timeout defines how long a job can be waiting in the stack.
	// Defaults to infinite.
	Timeout time.Duration

	// CloseTimeout sets a maximum duration for how long the queue can wait
	// for the active and queued jobs to finish. Defaults to infinite.
	CloseTimeout time.Duration
}

Config can be used to provide configuration of the registry.

type FIFOFilter added in v0.13.247

type FIFOFilter interface {

	// SetQueue will be used by the registry to pass in the right queue to
	// the filter.
	SetQueue(*FifoQueue)

	// GetQueue is currently used only by tests.
	GetQueue() *FifoQueue

	// Config will be called by the registry once during processing the
	// routing to get the right queue settings from the filter.
	Config() Config
}

FIFOFilter is the interface that needs to be implemented by the filters that use a FIFO queue maintained by the registry.

type FifoQueue added in v0.13.247

type FifoQueue struct {
	// contains filtered or unexported fields
}

FifoQueue objects implement a FIFO queue for handling requests, with a maximum allowed concurrency and queue size. Currently, they can be used from the fifo filters in the filters/scheduler package only.

func (*FifoQueue) Config added in v0.13.247

func (fq *FifoQueue) Config() Config

Config returns the configuration that the queue was created with.

func (*FifoQueue) Reconfigure added in v0.13.252

func (fq *FifoQueue) Reconfigure(c Config)

Reconfigure updates the connfiguration of the FifoQueue. It will reset the current state.

func (*FifoQueue) Status added in v0.13.247

func (fq *FifoQueue) Status() QueueStatus

Status returns the current status of a queue.

func (*FifoQueue) Wait added in v0.13.247

func (fq *FifoQueue) Wait(ctx context.Context) (func(), error)

Wait blocks until a request can be processed or needs to be rejected. It returns done() and an error. When it can be processed, calling done indicates that it has finished. It is mandatory to call done() the request was processed. When the request needs to be rejected, an error will be returned and done will be nil.

type GroupedLIFOFilter added in v0.10.282

type GroupedLIFOFilter interface {
	LIFOFilter

	// Group returns the name of the group.
	Group() string

	// HasConfig indicates that the current filter provides the queue
	// queue settings for the group.
	HasConfig() bool
}

GroupedLIFOFilter is an extension of the LIFOFilter interface for filters that use a shared queue.

type LIFOFilter

type LIFOFilter interface {

	// SetQueue will be used by the registry to pass in the right queue to
	// the filter.
	SetQueue(*Queue)

	// GetQueue is currently used only by tests.
	GetQueue() *Queue

	// Config will be called by the registry once during processing the
	// routing to get the right queue settings from the filter.
	Config() Config
}

LIFOFilter is the interface that needs to be implemented by the filters that use a LIFO queue maintained by the registry.

type Options added in v0.10.282

type Options struct {

	// MetricsUpdateTimeout defines the frequency of how often the
	// FIFO and LIFO metrics are updated when they are enabled.
	// Defaults to 1s.
	MetricsUpdateTimeout time.Duration

	// EnableRouteLIFOMetrics enables collecting metrics about the LIFO queues.
	EnableRouteLIFOMetrics bool

	// EnableRouteFIFOMetrics enables collecting metrics about the FIFO queues.
	EnableRouteFIFOMetrics bool

	// Metrics must be provided to the registry in order to collect the FIFO and LIFO metrics.
	Metrics metrics.Metrics
}

Options provides options for the registry.

type Queue added in v0.10.282

type Queue struct {
	// contains filtered or unexported fields
}

Queue objects implement a LIFO queue for handling requests, with a maximum allowed concurrency and queue size. Currently, they can be used from the lifo and lifoGroup filters in the filters/scheduler package only.

func (*Queue) Close added in v0.16.2

func (q *Queue) Close()

func (*Queue) Config added in v0.10.282

func (q *Queue) Config() Config

Config returns the configuration that the queue was created with.

func (*Queue) Status added in v0.10.282

func (q *Queue) Status() QueueStatus

Status returns the current status of a queue.

func (*Queue) Wait added in v0.10.282

func (q *Queue) Wait() (done func(), err error)

Wait blocks until a request can be processed or needs to be rejected. When it can be processed, calling done indicates that it has finished. It is mandatory to call done() the request was processed. When the request needs to be rejected, an error will be returned.

type QueueStatus added in v0.10.282

type QueueStatus struct {

	// ActiveRequests represents the number of the requests currently being handled.
	ActiveRequests int

	// QueuedRequests represents the number of requests waiting to be handled.
	QueuedRequests int

	// Closed indicates that the queue was closed.
	Closed bool
}

QueueStatus reports the current status of a queue. It can be used for metrics.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maintains a set of LIFO queues. It is used to preserve LIFO queue instances across multiple generations of the routing. It implements the routing.PostProcessor interface, it is enough to just pass in to routing.Routing when initializing it.

When the EnableRouteLIFOMetrics is set, then the registry starts a background goroutine for regularly take snapshots of the active lifo queues and update the corresponding metrics. This goroutine is started when the first lifo filter is detected and returns when the registry is closed. Individual metrics objects (keys) are used for each lifo filter, and one for each lifo group defined by the lifoGroup filter.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a registry with the default options.

func RegistryWith added in v0.10.282

func RegistryWith(o Options) *Registry

RegistryWith (Options) creates a registry with the provided options.

func (*Registry) Close

func (r *Registry) Close()

Close closes the registry, including graceful tearing down the stored queues.

func (*Registry) Do

func (r *Registry) Do(routes []*routing.Route) []*routing.Route

Do implements routing.PostProcessor and sets the queue for the scheduler filters.

It preserves the existing queue when available.

func (*Registry) PreProcessor added in v0.13.179

func (r *Registry) PreProcessor() routing.PreProcessor

Returns routing.PreProcessor that ensures single lifo filter instance per route

Registry can not implement routing.PreProcessor directly due to unfortunate method name clash with routing.PostProcessor

func (*Registry) UpdateMetrics added in v0.13.181

func (r *Registry) UpdateMetrics()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL