agent

package
v0.3.697 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2019 License: Apache-2.0 Imports: 56 Imported by: 13

Documentation

Overview

Package agent defines the Agent interface and related concepts. An agent is an entity that knows how to execute an Fn function.

The Agent Interface

The Agent interface is the heart of this package. Agent exposes an api to create calls from various parameters and then execute those calls. An Agent has a few roles:

  • manage the memory pool for a given server
  • manage the container lifecycle for calls
  • execute calls against containers
  • invoke Start and End for each call appropriately

For more information about how an agent executes a call see the documentation on the Agent interface.

Variants

There are two flavors of runner, the local Docker agent and a load-balancing agent. To create an agent that uses Docker containers to execute calls, use New().

To create an agent that can load-balance across a pool of sub-agents, use NewLBAgent().

Index

Constants

View Source
const (
	// EnvContainerLabelTag is a classifier label tag that is used to distinguish fn managed containers
	EnvContainerLabelTag = "FN_CONTAINER_LABEL_TAG"
	// EnvImageCleanMaxSize enables image cleaner and sets the high water mark for image cache in bytes
	EnvImageCleanMaxSize = "FN_IMAGE_CLEAN_MAX_SIZE"
	// EnvImageCleanExemptTags list of image names separated by whitespace that are exempt from removal in image cleaner
	EnvImageCleanExemptTags = "FN_IMAGE_CLEAN_EXEMPT_TAGS"
	// EnvImageEnableVolume allows image to contain VOLUME definitions
	EnvImageEnableVolume = "FN_IMAGE_ENABLE_VOLUME"
	// EnvDockerNetworks is a comma separated list of networks to attach to each container started
	EnvDockerNetworks = "FN_DOCKER_NETWORKS"
	// EnvDockerLoadFile is a file location for a file that contains a tarball of a docker image to load on startup
	EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE"
	// EnvFreezeIdle is the delay between a container being last used and being frozen
	EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS"
	// EnvHotPoll is the interval to ping for a slot manager thread to check if a container should be
	// launched for a given function
	EnvHotPoll = "FN_HOT_POLL_MSECS"
	// EnvHotLauncherTimeout is the timeout for a hot container queue to persist if idle
	EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
	// EnvHotStartTimeout is the timeout for a hot container to be created including docker-pull
	EnvHotPullTimeout = "FN_HOT_PULL_TIMEOUT_MSECS"
	// EnvHotStartTimeout is the timeout for a hot container to become available for use for requests after EnvHotStartTimeout
	EnvHotStartTimeout = "FN_HOT_START_TIMEOUT_MSECS"
	// EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation
	EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
	// EnvHdrMaxResponseSize is the maximum number of bytes that a function may return in an invocation header
	EnvMaxHdrResponseSize = "FN_MAX_HDR_RESPONSE_SIZE"
	// EnvMaxLogSize is the maximum size that a function's log may reach
	EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES"
	// EnvMaxTotalCPU is the maximum CPU that will be reserved across all containers
	EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS"
	// EnvMaxTotalMemory is the maximum memory that will be reserved across all containers
	EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES"
	// EnvMaxFsSize is the maximum filesystem size that a function may use
	EnvMaxFsSize = "FN_MAX_FS_SIZE_MB"
	// EnvMaxPIDs is the maximum number of PIDs that a function is allowed to create
	EnvMaxPIDs = "FN_MAX_PIDS"
	// EnvPreForkPoolSize is the number of containers pooled to steal network from, this may reduce latency
	EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE"
	// EnvPreForkImage is the image to use for the pre-fork pool
	EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE"
	// EnvPreForkCmd is the command to run for images in the pre-fork pool, it should run for a long time
	EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD"
	// EnvPreForkUseOnce limits the number of times a pre-fork pool container may be used to one, they are otherwise recycled
	EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE"
	// EnvPreForkNetworks is the equivalent of EnvDockerNetworks but for pre-fork pool containers
	EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS"
	// EnvEnableNBResourceTracker makes every request to the resource tracker non-blocking, meaning the resources are either
	// available or it will return an error immediately
	EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER"
	// EnvMaxTmpFsInodes is the maximum number of inodes for /tmp in a container
	EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES"
	// EnvDisableReadOnlyRootFs makes the root fs for a container have rw permissions, by default it is read only
	EnvDisableReadOnlyRootFs = "FN_DISABLE_READONLY_ROOTFS"
	// EnvDisableDebugUserLogs disables user function logs being logged at level debug. wise to enable for production.
	EnvDisableDebugUserLogs = "FN_DISABLE_DEBUG_USER_LOGS"

	// EnvIOFSEnableTmpfs enables creating a per-container tmpfs mount for the IOFS
	EnvIOFSEnableTmpfs = "FN_IOFS_TMPFS"
	// EnvIOFSPath is the path within fn server container of a directory to configure for unix socket files for each container
	EnvIOFSPath = "FN_IOFS_PATH"
	// EnvIOFSDockerPath determines the relative location on the docker host where iofs mounts should be prefixed with
	EnvIOFSDockerPath = "FN_IOFS_DOCKER_PATH"
	// EnvIOFSOpts are the options to set when mounting the iofs directory for unix socket files
	EnvIOFSOpts = "FN_IOFS_OPTS"

	// EnvDetachedHeadroom is the extra room we want to give to a detached function to run.
	EnvDetachedHeadroom = "FN_EXECUTION_HEADROOM"

	// MaxMsDisabled is used to determine whether mr freeze is lying in wait. TODO remove this manuever
	MaxMsDisabled = time.Duration(math.MaxInt64)

	// DefaultHotPoll is the default value for EnvHotPoll
	DefaultHotPoll = 200 * time.Millisecond
)
View Source
const (
	// Here we give 5 seconds of timeout inside the container. We hardcode these numbers here to
	// ensure we control idle timeout & timeout as well as how long should cache be valid.
	// A cache duration of idleTimeout + 500 msecs allows us to reuse the cache, for about 1.5 secs,
	// and during this time, since we allow no queries to go through, the hot container times out.
	//
	// For now, status tests a single case: a new hot container is spawned when cache is expired
	// and when a query is allowed to run.
	// TODO: we might want to mix this up and perhaps allow that hot container to handle
	// more than one query to test both 'new hot container' and 'old hot container' cases.
	StatusCallTimeout       = int32(5)
	StatusCallIdleTimeout   = int32(1)
	StatusCallCacheDuration = time.Duration(500)*time.Millisecond + time.Duration(StatusCallIdleTimeout)*time.Second

	// Total context timeout (scheduler+execution.) We need to allocate plenty of time here.
	// 60 seconds should be enough to provoke disk I/O errors, docker timeouts. etc.
	StatusCtxTimeout = time.Duration(60 * time.Second)
)
View Source
const (
	Mem1MB = 1024 * 1024
	Mem1GB = 1024 * 1024 * 1024

	// Assume 2GB RAM on non-linux systems
	DefaultNonLinuxMemory = 2048 * Mem1MB
)
View Source
const (
	// max buffer size for grpc data messages, 10K
	MaxDataChunk = 10 * 1024
)
View Source
const RegistryToken = "FN_REGISTRY_TOKEN"

RegistryToken is a reserved call extensions key to pass registry token

#nosec

Variables

View Source
var (
	ErrorExpectedTry  = errors.New("Protocol failure: expected ClientMsg_Try")
	ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data")
)
View Source
var (
	ErrorRunnerClosed    = errors.New("Runner is closed")
	ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response")
)
View Source
var (

	// AppIDMetricKey is a tag for metrics
	AppIDMetricKey = common.MakeKey("app_id")
	// FnIDMetricKey is a tag for metrics
	FnIDMetricKey = common.MakeKey("fn_id")
	// ImageNameMetricKey is a tag for metrics
	ImageNameMetricKey = common.MakeKey("image_name")
)
View Source
var CapacityFull = errors.New("max capacity reached")

Functions

func DefaultStaticRunnerPool

func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool

func GetCallLatencies added in v0.3.646

func GetCallLatencies(c *call) (time.Duration, time.Duration)

func NewCallHandle

func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle

func NewDockerDriver

func NewDockerDriver(cfg *Config) (drivers.Driver, error)

NewDockerDriver creates a default docker driver from agent config

func NewSlotQueue

func NewSlotQueue(key string) *slotQueue

func NewSlotQueueMgr

func NewSlotQueueMgr() *slotQueueMgr

func NewStaticRunnerPool

func NewStaticRunnerPool(runnerAddresses []string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) pool.RunnerPool

func NewgRPCRunner added in v0.3.637

func NewgRPCRunner(addr string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) (pool.Runner, error)

func RegisterAgentViews

func RegisterAgentViews(tagKeys []string, latencyDist []float64)

RegisterAgentViews creates and registers all agent views

func RegisterContainerViews

func RegisterContainerViews(tagKeys []string, latencyDist []float64)

RegisterContainerViews creates and register containers views with provided tag keys

func RegisterDockerViews

func RegisterDockerViews(tagKeys []string, latencyDist, ioNetDist, ioDiskDist, memoryDist, cpuDist []float64)

RegisterDockerViews creates a and registers Docker views with provided tag keys

func RegisterLBAgentViews

func RegisterLBAgentViews(tagKeys []string, latencyDist []float64)

func RegisterRunnerViews added in v0.3.639

func RegisterRunnerViews(tagKeys []string, latencyDist []float64)

RegisterRunnerViews creates and registers all runner views

func TranslateGRPCStatusToRunnerStatus

func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus

TranslateGRPCStatusToRunnerStatus runner.RunnerStatus to runnerpool.RunnerStatus

Types

type Agent

type Agent interface {
	// GetCall will return a Call that is executable by the Agent, which
	// can be built via various CallOpt's provided to the method.
	GetCall(...CallOpt) (Call, error)

	// Submit will attempt to execute a call locally, a Call may store information
	// about itself in its Start and End methods, which will be called in Submit
	// immediately before and after the Call is executed, respectively. An error
	// will be returned if there is an issue executing the call or the error
	// may be from the call's execution itself (if, say, the container dies,
	// or the call times out).
	Submit(Call) error

	// Close will wait for any outstanding calls to complete and then exit.
	// Closing the agent will invoke Close on the underlying DataAccess.
	// Close is not safe to be called from multiple threads.
	io.Closer

	AddCallListener(fnext.CallListener)
}

Agent exposes an api to create calls from various parameters and then submit those calls, it also exposes a 'safe' shutdown mechanism via its Close method. Agent has a few roles:

  • manage the memory pool for a given server
  • manage the container lifecycle for calls
  • execute calls against containers
  • invoke Start and End for each call appropriately

Overview: Upon submission of a call, Agent will start the call's timeout timer immediately. If the call is hot, Agent will attempt to find an active hot container for that route, and if necessary launch another container. calls will be able to read/write directly from/to a socket in the container. If it's necessary to launch a container, first an attempt will be made to try to reserve the ram required while waiting for any hot 'slot' to become available [if applicable]. If there is an error launching the container, an error will be returned provided the call has not yet timed out or found another hot 'slot' to execute in [if applicable]. call.Start will be called immediately before sending any input to a container. call.End will be called regardless of the timeout timer's status if the call was executed, and that error returned may be returned from Submit.

func DefaultPureRunner

func DefaultPureRunner(cancel context.CancelFunc, addr string, tlsCfg *tls.Config) (Agent, error)

func New

func New(options ...Option) Agent

New creates an Agent that executes functions locally as Docker containers.

func NewLBAgent

func NewLBAgent(rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error)

NewLBAgent creates an Agent that knows how to load-balance function calls across a group of runner nodes.

func NewPureRunner

func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error)

type Call

type Call interface {
	// Model will return the underlying models.Call configuration for this call.
	Model() *models.Call

	// Start will be called before this call is executed, it may be used to
	// guarantee mutual exclusion, check docker permissions, update timestamps,
	// etc.
	// TODO Start and End can likely be unexported as they are only used in the agent,
	// and on a type which is constructed in a specific agent. meh.
	Start(ctx context.Context) error

	// End will be called immediately after attempting a call execution,
	// regardless of whether the execution failed or not. An error will be passed
	// to End, which if nil indicates a successful execution. Any error returned
	// from End will be returned as the error from Submit.
	End(ctx context.Context, err error) error
}

Call is an agent specific instance of a call object, that is runnable.

type CallOpt

type CallOpt func(c *call) error

CallOpt allows configuring a call before execution TODO(reed): consider the interface here, all options must be defined in agent and flexible enough for usage by extenders of fn, this straddling is painful. consider models.Call?

func FromHTTPFnRequest

func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt

FromHTTPFnRequest Sets up a call from an http trigger request

func FromModel

func FromModel(mCall *models.Call) CallOpt

FromModel creates a call object from an existing stored call model object, reading the body from the stored call payload

func FromModelAndInput

func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt

FromModelAndInput creates a call object from an existing stored call model object , reading the body from a provided stream

func InvokeDetached

func InvokeDetached() CallOpt

InvokeDetached mark a call to be a detached call

func WithContext

func WithContext(ctx context.Context) CallOpt

WithContext overrides the context on the call

func WithDockerAuth added in v0.3.662

func WithDockerAuth(auth docker.Auther) CallOpt

WithDockerAuth configures a call to retrieve credentials for an image pull

func WithExtensions

func WithExtensions(extensions map[string]string) CallOpt

WithExtensions adds internal attributes to the call that can be interpreted by extensions in the agent Pure runner can use this to pass an extension to the call

func WithLogger added in v0.3.618

func WithLogger(w io.ReadWriteCloser) CallOpt

WithLogger sets stderr to the provided one

func WithTrigger

func WithTrigger(t *models.Trigger) CallOpt

WithTrigger adds trigger specific bits to a call. TODO consider removal, this is from a shuffle

func WithWriter

func WithWriter(w io.Writer) CallOpt

WithWriter sets the writer that the call uses to send its output message to TODO this should be required

type CallOverrider

type CallOverrider func(*http.Request, *models.Call, map[string]string) (map[string]string, error)

CallOverrider should die. Interceptor in GetCall

type Config

type Config struct {
	MinDockerVersion        string        `json:"min_docker_version"`
	ContainerLabelTag       string        `json:"container_label_tag"`
	DockerNetworks          string        `json:"docker_networks"`
	DockerLoadFile          string        `json:"docker_load_file"`
	FreezeIdle              time.Duration `json:"freeze_idle_msecs"`
	HotPoll                 time.Duration `json:"hot_poll_msecs"`
	HotLauncherTimeout      time.Duration `json:"hot_launcher_timeout_msecs"`
	HotPullTimeout          time.Duration `json:"hot_pull_timeout_msecs"`
	HotStartTimeout         time.Duration `json:"hot_start_timeout_msecs"`
	DetachedHeadRoom        time.Duration `json:"detached_head_room_msecs"`
	MaxResponseSize         uint64        `json:"max_response_size_bytes"`
	MaxHdrResponseSize      uint64        `json:"max_hdr_response_size_bytes"`
	MaxLogSize              uint64        `json:"max_log_size_bytes"`
	MaxTotalCPU             uint64        `json:"max_total_cpu_mcpus"`
	MaxTotalMemory          uint64        `json:"max_total_memory_bytes"`
	MaxFsSize               uint64        `json:"max_fs_size_mb"`
	MaxPIDs                 uint64        `json:"max_pids"`
	PreForkPoolSize         uint64        `json:"pre_fork_pool_size"`
	PreForkImage            string        `json:"pre_fork_image"`
	PreForkCmd              string        `json:"pre_fork_pool_cmd"`
	PreForkUseOnce          uint64        `json:"pre_fork_use_once"`
	PreForkNetworks         string        `json:"pre_fork_networks"`
	EnableNBResourceTracker bool          `json:"enable_nb_resource_tracker"`
	MaxTmpFsInodes          uint64        `json:"max_tmpfs_inodes"`
	DisableReadOnlyRootFs   bool          `json:"disable_readonly_rootfs"`
	DisableDebugUserLogs    bool          `json:"disable_debug_user_logs"`
	IOFSEnableTmpfs         bool          `json:"iofs_enable_tmpfs"`
	IOFSAgentPath           string        `json:"iofs_path"`
	IOFSMountRoot           string        `json:"iofs_mount_root"`
	IOFSOpts                string        `json:"iofs_opts"`
	ImageCleanMaxSize       uint64        `json:"image_clean_max_size"`
	ImageCleanExemptTags    string        `json:"image_clean_exempt_tags"`
	ImageEnableVolume       bool          `json:"image_enable_volume"`
}

Config specifies various settings for an agent

func NewConfig

func NewConfig() (*Config, error)

NewConfig returns a config set from env vars, plus defaults

type ContainerState

type ContainerState interface {
	UpdateState(ctx context.Context, newState ContainerStateType, call *call)
	GetState() string
}

func NewContainerState

func NewContainerState() ContainerState

type ContainerStateType

type ContainerStateType int
const (
	ContainerStateNone   ContainerStateType = iota // uninitialized
	ContainerStateWait                             // resource (cpu + mem) waiting
	ContainerStateStart                            // launching
	ContainerStateIdle                             // running: idle but not paused
	ContainerStatePaused                           // running: idle but paused
	ContainerStateBusy                             // running: busy
	ContainerStateDone                             // exited/failed/done
	ContainerStateMax
)

type DataAccess

type DataAccess interface {
	ReadDataAccess
}

XXX(reed): replace all uses of ReadDataAccess with DataAccess or vice versa, whatever is easier

type DetachedResponseWriter

type DetachedResponseWriter struct {
	Headers http.Header
	// contains filtered or unexported fields
}

func NewDetachedResponseWriter

func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter

func (*DetachedResponseWriter) Header

func (w *DetachedResponseWriter) Header() http.Header

func (*DetachedResponseWriter) Status

func (w *DetachedResponseWriter) Status() int

func (*DetachedResponseWriter) Write

func (w *DetachedResponseWriter) Write(data []byte) (int, error)

func (*DetachedResponseWriter) WriteHeader

func (w *DetachedResponseWriter) WriteHeader(statusCode int)

type EvictToken

type EvictToken struct {
	C        chan struct{}
	DoneChan chan struct{}
	// contains filtered or unexported fields
}

func (*EvictToken) SetEvictable

func (token *EvictToken) SetEvictable(isEvictable bool)

type Evictor

type Evictor interface {
	// CreateEvictToken creates an eviction token to be used in evictor tracking. Returns
	// an eviction token.
	CreateEvictToken(slotId string, mem, cpu uint64) *EvictToken

	// DeleteEvictToken deletes an eviction token from evictor system
	DeleteEvictToken(token *EvictToken)

	// PerformEviction performs evictions to satisfy cpu & mem arguments
	// and returns a slice of channels for evictions performed. The callers
	// can wait on these channel to ensure evictions are completed.
	PerformEviction(slotId string, mem, cpu uint64) []chan struct{}
}

func NewEvictor

func NewEvictor() Evictor

type LBAgentOption

type LBAgentOption func(*lbAgent) error

func WithLBAgentConfig

func WithLBAgentConfig(cfg *Config) LBAgentOption

WithLBAgentConfig sets the agent config to the provided Config

func WithLBCallOptions added in v0.3.668

func WithLBCallOptions(opts ...CallOpt) LBAgentOption

WithLBCallOptions adds additional call options to each call created from GetCall, these options will be executed after any other options supplied to GetCall

func WithLBCallOverrider

func WithLBCallOverrider(fn CallOverrider) LBAgentOption

WithLBCallOverrider is for LB agents to register a CallOverrider to modify a Call and extensions

type LogStreamer added in v0.3.676

type LogStreamer interface {
	StreamLogs(runner.RunnerProtocol_StreamLogsServer) error
}

Log Streamer to manage log gRPC interface

type Option

type Option func(*agent) error

Option configures an agent at startup

func WithCallOptions added in v0.3.662

func WithCallOptions(opts ...CallOpt) Option

WithCallOptions adds additional call options to each call created from GetCall, these options will be executed after any other options supplied to GetCall

func WithCallOverrider

func WithCallOverrider(fn CallOverrider) Option

WithCallOverrider registers register a CallOverrider to modify a Call and extensions on call construction

func WithConfig

func WithConfig(cfg *Config) Option

WithConfig sets the agent config to the provided config

func WithDockerDriver

func WithDockerDriver(drv drivers.Driver) Option

WithDockerDriver Provides a customer driver to agent

type PureRunnerOption

type PureRunnerOption func(*pureRunner) error

func PureRunnerWithAgent

func PureRunnerWithAgent(a Agent) PureRunnerOption

func PureRunnerWithConfigFunc added in v0.3.685

func PureRunnerWithConfigFunc(configFunc func(context.Context, *runner.ConfigMsg) (*runner.ConfigStatus, error)) PureRunnerOption

func PureRunnerWithCustomHealthCheckerFunc added in v0.3.687

func PureRunnerWithCustomHealthCheckerFunc(customHealthCheckerFunc func(context.Context) (map[string]string, error)) PureRunnerOption

func PureRunnerWithDetached

func PureRunnerWithDetached() PureRunnerOption

func PureRunnerWithGRPCServerOptions added in v0.3.637

func PureRunnerWithGRPCServerOptions(options ...grpc.ServerOption) PureRunnerOption

func PureRunnerWithKdumpsOnDisk added in v0.3.624

func PureRunnerWithKdumpsOnDisk(numKdumps uint64) PureRunnerOption

PureRunnerWithKdumpsOnDisk returns a PureRunnerOption that indicates that kdumps have been found on disk. The argument numKdump is a counter that indicates how many dumps were on disk at the time the runner was created.

func PureRunnerWithLogStreamer added in v0.3.676

func PureRunnerWithLogStreamer(logStreamer LogStreamer) PureRunnerOption

func PureRunnerWithSSL

func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption

func PureRunnerWithStatusImage

func PureRunnerWithStatusImage(imgName string) PureRunnerOption

PureRunnerWithStatusImage returns a PureRunnerOption that annotates a PureRunner with a statusImageName attribute. This attribute names an image name to use for the status checks. Optionally, the status image can be pre-loaded into docker using FN_DOCKER_LOAD_FILE to avoid docker pull during status checks.

func PureRunnerWithStatusNetworkEnabler added in v0.3.659

func PureRunnerWithStatusNetworkEnabler(barrierPath string) PureRunnerOption

type ReadDataAccess

type ReadDataAccess interface {
	GetAppID(ctx context.Context, appName string) (string, error)
	// GetAppByID abstracts querying the datastore for an app.
	GetAppByID(ctx context.Context, appID string) (*models.App, error)
	GetTriggerBySource(ctx context.Context, appID string, triggerType, source string) (*models.Trigger, error)
	GetFnByID(ctx context.Context, fnID string) (*models.Fn, error)
}

ReadDataAccess represents read operations required to operate a load balancer node

func NewCachedDataAccess

func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess

NewCachedDataAccess is a wrapper that caches entries temporarily

func NewMetricReadDataAccess added in v0.3.695

func NewMetricReadDataAccess(rda ReadDataAccess) ReadDataAccess

NewMetricReadDataAccess adds metrics to a ReadDataAccess

type RequestState

type RequestState interface {
	UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}

func NewRequestState

func NewRequestState() RequestState

type RequestStateType

type RequestStateType int
const (
	RequestStateNone RequestStateType = iota // uninitialized
	RequestStateWait                         // request is waiting
	RequestStateExec                         // request is executing
	RequestStateDone                         // request is done
	RequestStateMax
)

type ResourceToken

type ResourceToken interface {
	// Close must be called by any thread that receives a token.
	io.Closer
	Error() error
	NeededCapacity() (uint64, models.MilliCPUs)
}

type ResourceTracker

type ResourceTracker interface {
	// GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled,
	// the channel will never receive anything. If it is not possible to fulfill this resource, the channel
	// will never receive anything (use IsResourcePossible). If a resource token is available for the provided
	// resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed.
	// if isNB is set, resource check is done and error token is returned without blocking.
	// Memory is expected to be provided in MB units.
	GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken

	// IsResourcePossible returns whether it's possible to fulfill the requested resources on this
	// machine. It must be called before GetResourceToken or GetResourceToken may hang.
	// Memory is expected to be provided in MB units.
	IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool

	// Retrieve current stats/usage
	GetUtilization() ResourceUtilization
}

A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: disk, network IO for future

func NewResourceTracker

func NewResourceTracker(cfg *Config) ResourceTracker

type ResourceUtilization

type ResourceUtilization struct {
	// CPU in use
	CpuUsed models.MilliCPUs
	// CPU available
	CpuAvail models.MilliCPUs
	// Memory in use in bytes
	MemUsed uint64
	// Memory available in bytes
	MemAvail uint64
}

type Slot

type Slot interface {
	Close() error
	Error() error
	// contains filtered or unexported methods
}

Directories

Path Synopsis
Package drivers is intended as a general purpose container abstraction library.
Package drivers is intended as a general purpose container abstraction library.
docker
Package docker provides a Docker driver for Fn.
Package docker provides a Docker driver for Fn.
mock
Package mock provides a fake Driver implementation that is only used for testing.
Package mock provides a fake Driver implementation that is only used for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL