orch

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2021 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package orch provides a library for orchestrating sessions on kubernetes. Users should construct a Controller and queue sessions with its Schedule method. The Controller will configure many internal types and structures to communicate with kubernetes, monitor the health of test components and limit the number of running sessions.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FindPools

func FindPools(nl NodeLister) (map[string]Pool, error)

FindPools uses a NodeLister to find all nodes, determine their pools and produce Pool instances with their capacities. It returns a map where the string is the name of the pool and the value is the Pool instance. An error is returned if the List operation on the NodeLister errors.

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller serves as the coordinator for orchestrating sessions. It manages active and idle sessions, as well as, interactions with Kubernetes through a set of a internal types.

func NewController

func NewController(clientset kubernetes.Interface, store store.Store, options *ControllerOptions) (*Controller, error)

NewController creates a controller using a Kubernetes clientset, store and an optional ControllerOptions instance.

The clientset allows the controller to interact with Kubernetes. If nil, an error will be returned instead of a controller.

The store is used to report orchestration events, so progress can be reported to a user.

The options value allows the controller to be customized. Specifying nil will configure the controller to sane defaults. These defaults are described in the ControllerOptions documentation.

func (*Controller) Schedule

func (c *Controller) Schedule(s *types.Session) error

Schedule adds a session to the list of sessions waiting to run. An error is returned if the session is nil, or the controller was not started.

func (*Controller) Start

func (c *Controller) Start() error

Start prepares a controller for session scheduling. It creates or configures a set of threads and types to queue sessions, assign sessions to threads and communicate with kubernetes.

An error is returned if there are problems preparing a queue or setting up a watcher to monitor Kubernetes events.

func (*Controller) Stop

func (c *Controller) Stop(ctx context.Context) error

Stop attempts to terminate all orchestration threads spawned by a call to Start. It waits for a graceful shutdown until the context is cancelled.

If the context is cancelled before a graceful shutdown, an error is returned. This improper shutdown may result in unpredictable states. It should be avoided if possible.

If Start was not called prior to Stop, there will be no adverse effects and nil will be returned.

func (*Controller) Stopped

func (c *Controller) Stopped() bool

Stopped returns true if the controller is not running. This indicates that either Start has not been invoked or Stop has been invoked.

type ControllerOptions

type ControllerOptions struct {
	// ExecutorCount specifies the maximum number of sessions that should be
	// processed at a time. It defaults to 1, disabling concurrent sessions.
	ExecutorCount int

	// WatcherOptions overrides the defaults of the watcher. The watcher
	// listens for Kubernetes events and reports the health of components
	// to the session's executor.
	WatcherOptions *WatcherOptions

	// TestTimeout is the maximum duration to wait for component containers
	// to provision and terminate with a successful exit code. If this
	// timeout is reached before an exit, the session will error.
	//
	// The zero value provides unlimited time to the test, so it should be
	// avoided in production.
	TestTimeout time.Duration
}

ControllerOptions overrides the defaults of the controller, allowing it to be configured as needed.

type Executor

type Executor interface {
	// Execute runs a test session. It accepts a context that can prevent
	// problematic sessions from running indefinitely.
	//
	// An error is returned if there is a problem regarding the test itself.
	// This does not include internal errors that are not specific to the
	// test.
	Execute(context.Context, *types.Session) error
}

Executor can run a test session by provisioning its components, monitoring its health and cleaning up after its termination.

type Health

type Health int32

Health indicates the availability or readiness of an object or a set of objects.

const (
	// Unknown indicates that the state of the object could not be mapped to a standard health
	// value. The system should wait until another health value is set.
	Unknown Health = iota

	// Ready indicates that an object is healthy and available.
	Ready

	// NotReady indicates that something is not yet correct with an object's state, but this
	// may be recoverable.
	NotReady

	// Succeeded indicates that an object has terminated successfully.
	Succeeded

	// Failed indicates that an object has terminated due to a failure.
	Failed
)

func (Health) String

func (h Health) String() string

String returns the string representation of a health constant.

type NodeLister

type NodeLister interface {
	// List returns a list of Kubernetes nodes or an error.
	List(opts metav1.ListOptions) (*v1.NodeList, error)
}

NodeLister is any type that can list nodes in a kubernetes namespace. Most likely, this will be the kubernetes type that implements the v1.NodeInterface.

type PodWatchEvent

type PodWatchEvent struct {
	// SessionName is the name of the session to which the pod belongs.
	SessionName string

	// ComponentName is the name of the component this pod represents.
	ComponentName string

	// Pod is the kubernetes object itself.
	Pod *corev1.Pod

	// PodIP is the pod's IP if available. Otherwise, it is an empty string.
	PodIP string

	// Health is the current health of the pod.
	Health Health

	// Error may provide the error details that led to the failing health.
	Error error
}

PodWatchEvent is sent to a subscriber on a Watcher whenever there is a change.

type Pool

type Pool struct {
	// Name is an indentifier that uniquely distinguishes a pool instance.
	Name string

	// Available is the number of machines that are idle and able to be reserved.
	Available int

	// Capacity is the total number of machines in the pool.
	Capacity int
}

Pool describes a cluster of identical machines.

type PoolAdder

type PoolAdder interface {
	// AddPool adds a pool.
	AddPool(pool Pool)

	// RemovePool removes the pool.
	RemovePool(pool Pool)
}

PoolAdder is a type that can add and remove pools.

type PoolAvailabilityError

type PoolAvailabilityError struct{}

PoolAvailabilityError indicates Reserve was called with a session that required a number of machines that were not currently available. These machines may become available at another time.

func (PoolAvailabilityError) Error

func (pae PoolAvailabilityError) Error() string

Error returns a string representation of the available error message.

type PoolCapacityError

type PoolCapacityError struct {
	// Name is the string identifier for the pool.
	Name string

	// Requested is the number of machines that the session requires.
	Requested int

	// Capacity is the maximum number of machines that the pool can accomodate.
	Capacity int
}

PoolCapacityError indicates that a session requires a number of machines which is greater than the number of machines in the pool. The session can never be scheduled.

func (PoolCapacityError) Error

func (pce PoolCapacityError) Error() string

Error returns a string representation of the capacity error message.

type PoolUnknownError

type PoolUnknownError struct {
	// Name is the string identifier for a un-added pool.
	Name string
}

PoolUnknownError indicates that a session component required a pool that was not added on the availability instance.

func (PoolUnknownError) Error

func (pue PoolUnknownError) Error() string

Error returns a string representation of the unknown error message.

type ReservationManager

type ReservationManager struct {
	// contains filtered or unexported fields
}

ReservationManager contains a set of pools and manages the availability of their machines. It is designed to help limit the number of running sessions. It is not thread-safe.

Instances should be created using the NewReservationManager constructor, not a literal.

func NewReservationManager

func NewReservationManager() *ReservationManager

NewReservationManager creates a new instance.

func (*ReservationManager) AddPool

func (rm *ReservationManager) AddPool(pool Pool)

AddPool adds a pool to the list of pools.

func (*ReservationManager) RemovePool

func (rm *ReservationManager) RemovePool(pool Pool) error

RemovePool removes a pool from the list of pools.

If the pool was never added, it returns a PoolUnknownError.

func (*ReservationManager) Reserve

func (rm *ReservationManager) Reserve(session *types.Session) error

Reserve decreases the number of machines a session requires from the number of available machines. If there are not enough machines available, it returns a PoolAvailabilityError.

If the number of machines required exceeds the capacity, Reserve returns a PoolCapacityError. It returns a PoolUnknownError if the session requires an unknown pool.

func (*ReservationManager) Unreserve

func (rm *ReservationManager) Unreserve(session *types.Session) error

Unreserve increases the number of available machines by the number of machines a session required. Essentially, it reverses the actions of the Reserve function.

This method does not ensure that Reserve has been called on the session. If the caller does not invoke Reserve first, the number of available machines may exceed the true capacity.

It returns a PoolUnknownError if the session requires an unknown pool.

type ReservationTracker

type ReservationTracker interface {
	// Reserve decreases the number of machines a session requires from the number of available
	// machines. If there are not enough machines available, it returns a PoolAvailabilityError.
	//
	// If the number of machines required exceeds the capacity, Reserve returns a
	// PoolCapacityError. It returns a PoolUnknownError if the session requires an unknown pool.
	Reserve(session *types.Session) error

	// Unreserve increases the number of available machines by the number of machines a session
	// required. Essentially, it reverses the actions of the Reserve function.
	//
	// This method does not ensure that Reserve has been called on the session. If the caller
	// does not invoke Reserve first, the number of available machines may exceed the true
	// capacity.
	//
	// It returns a PoolUnknownError if the session requires an unknown pool.
	Unreserve(session *types.Session) error
}

ReservationTracker limits the number of running sessions by considering the number of machines that are available.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher listens for changes to pods in a cluster and forwards these events through channels.

The orch package creates pods for components in a session, labeling each with their session and component names. This allows the Watcher to send events related to one session's pods through one channel and another session through another channel. The channel that receives events for a session is known as the subscriber. It is created and closed through the Subscribe and Unsubscribe methods.

Create watcher instances with the NewWatcher constructor, not a literal.

func NewWatcher

func NewWatcher(pw podWatcher, options *WatcherOptions) *Watcher

NewWatcher creates and prepares a new watcher instance.

func (*Watcher) Start

func (w *Watcher) Start() error

Start creates a new thread that listens for kubernetes events, forwarding them to subscribers. It returns an error if there is a problem with kubernetes.

func (*Watcher) Stop

func (w *Watcher) Stop()

Stop prevents additional events from being forwarded to subscribers.

func (*Watcher) Subscribe

func (w *Watcher) Subscribe(sessionName string) (<-chan *PodWatchEvent, error)

Subscribe accepts the name of a session and returns a channel or error. The channel will receive a list of all events for pods labeled with this session. If there is already a subscriber for the session, an error is returned.

func (*Watcher) Unsubscribe

func (w *Watcher) Unsubscribe(sessionName string) error

Unsubscribe accepts the name of a session and prevents the subscriber channel from receiving events additional events. If the session has no subscribers, it returns an error.

type WatcherOptions

type WatcherOptions struct {
	// EventBufferSize specifies the size of the buffered channel for each
	// session. It allows the watcher to write additional kubernetes events
	// without blocking for reads. It defaults to 32 events.
	EventBufferSize int
}

WatcherOptions overrides the defaults of the watcher, allowing it to be configured as needed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL