agent

package
v0.3.666 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2019 License: Apache-2.0 Imports: 53 Imported by: 13

Documentation

Overview

Package agent defines the Agent interface and related concepts. An agent is an entity that knows how to execute an Fn function.

The Agent Interface

The Agent interface is the heart of this package. Agent exposes an api to create calls from various parameters and then execute those calls. An Agent has a few roles:

  • manage the memory pool for a given server
  • manage the container lifecycle for calls
  • execute calls against containers
  • invoke Start and End for each call appropriately
  • check the mq for any async calls, and submit them

For more information about how an agent executes a call see the documentation on the Agent interface.

Variants

There are two flavors of runner, the local Docker agent and a load-balancing agent. To create an agent that uses Docker containers to execute calls, use New().

To create an agent that can load-balance across a pool of sub-agents, use NewLBAgent().

Index

Constants

View Source
const (
	// EnvContainerLabelTag is a classifier label tag that is used to distinguish fn managed containers
	EnvContainerLabelTag = "FN_CONTAINER_LABEL_TAG"
	// EnvImageCleanMaxSize enables image cleaner and sets the high water mark for image cache in bytes
	EnvImageCleanMaxSize = "FN_IMAGE_CLEAN_MAX_SIZE"
	// EnvImageCleanExemptTags list of image names separated by whitespace that are exempt from removal in image cleaner
	EnvImageCleanExemptTags = "FN_IMAGE_CLEAN_EXEMPT_TAGS"
	// EnvImageEnableVolume allows image to contain VOLUME definitions
	EnvImageEnableVolume = "FN_IMAGE_ENABLE_VOLUME"
	// EnvDockerNetworks is a comma separated list of networks to attach to each container started
	EnvDockerNetworks = "FN_DOCKER_NETWORKS"
	// EnvDockerLoadFile is a file location for a file that contains a tarball of a docker image to load on startup
	EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE"
	// EnvFreezeIdle is the delay between a container being last used and being frozen
	EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS"
	// EnvHotPoll is the interval to ping for a slot manager thread to check if a container should be
	// launched for a given function
	EnvHotPoll = "FN_HOT_POLL_MSECS"
	// EnvHotLauncherTimeout is the timeout for a hot container queue to persist if idle
	EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
	// EnvHotStartTimeout is the timeout for a hot container to be created including docker-pull
	EnvHotPullTimeout = "FN_HOT_PULL_TIMEOUT_MSECS"
	// EnvHotStartTimeout is the timeout for a hot container to become available for use for requests after EnvHotStartTimeout
	EnvHotStartTimeout = "FN_HOT_START_TIMEOUT_MSECS"
	// EnvAsyncChewPoll is the interval to poll the queue that contains async function invocations
	EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS"
	// EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation
	EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
	// EnvMaxLogSize is the maximum size that a function's log may reach
	EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES"
	// EnvMaxTotalCPU is the maximum CPU that will be reserved across all containers
	EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS"
	// EnvMaxTotalMemory is the maximum memory that will be reserved across all containers
	EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES"
	// EnvMaxFsSize is the maximum filesystem size that a function may use
	EnvMaxFsSize = "FN_MAX_FS_SIZE_MB"
	// EnvPreForkPoolSize is the number of containers pooled to steal network from, this may reduce latency
	EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE"
	// EnvPreForkImage is the image to use for the pre-fork pool
	EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE"
	// EnvPreForkCmd is the command to run for images in the pre-fork pool, it should run for a long time
	EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD"
	// EnvPreForkUseOnce limits the number of times a pre-fork pool container may be used to one, they are otherwise recycled
	EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE"
	// EnvPreForkNetworks is the equivalent of EnvDockerNetworks but for pre-fork pool containers
	EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS"
	// EnvEnableNBResourceTracker makes every request to the resource tracker non-blocking, meaning the resources are either
	// available or it will return an error immediately
	EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER"
	// EnvMaxTmpFsInodes is the maximum number of inodes for /tmp in a container
	EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES"
	// EnvDisableReadOnlyRootFs makes the root fs for a container have rw permissions, by default it is read only
	EnvDisableReadOnlyRootFs = "FN_DISABLE_READONLY_ROOTFS"
	// EnvDisableDebugUserLogs disables user function logs being logged at level debug. wise to enable for production.
	EnvDisableDebugUserLogs = "FN_DISABLE_DEBUG_USER_LOGS"

	// EnvIOFSEnableTmpfs enables creating a per-container tmpfs mount for the IOFS
	EnvIOFSEnableTmpfs = "FN_IOFS_TMPFS"
	// EnvIOFSPath is the path within fn server container of a directory to configure for unix socket files for each container
	EnvIOFSPath = "FN_IOFS_PATH"
	// EnvIOFSDockerPath determines the relative location on the docker host where iofs mounts should be prefixed with
	EnvIOFSDockerPath = "FN_IOFS_DOCKER_PATH"
	// EnvIOFSOpts are the options to set when mounting the iofs directory for unix socket files
	EnvIOFSOpts = "FN_IOFS_OPTS"

	// EnvDetachedHeadroom is the extra room we want to give to a detached function to run.
	EnvDetachedHeadroom = "FN_EXECUTION_HEADROOM"

	// MaxMsDisabled is used to determine whether mr freeze is lying in wait. TODO remove this manuever
	MaxMsDisabled = time.Duration(math.MaxInt64)

	// DefaultHotPoll is the default value for EnvHotPoll
	DefaultHotPoll = 200 * time.Millisecond
)
View Source
const (
	// Here we give 5 seconds of timeout inside the container. We hardcode these numbers here to
	// ensure we control idle timeout & timeout as well as how long should cache be valid.
	// A cache duration of idleTimeout + 500 msecs allows us to reuse the cache, for about 1.5 secs,
	// and during this time, since we allow no queries to go through, the hot container times out.
	//
	// For now, status tests a single case: a new hot container is spawned when cache is expired
	// and when a query is allowed to run.
	// TODO: we might want to mix this up and perhaps allow that hot container to handle
	// more than one query to test both 'new hot container' and 'old hot container' cases.
	StatusCallTimeout       = int32(5)
	StatusCallIdleTimeout   = int32(1)
	StatusCallCacheDuration = time.Duration(500)*time.Millisecond + time.Duration(StatusCallIdleTimeout)*time.Second

	// Total context timeout (scheduler+execution.) We need to allocate plenty of time here.
	// 60 seconds should be enough to provoke disk I/O errors, docker timeouts. etc.
	StatusCtxTimeout = time.Duration(60 * time.Second)
)
View Source
const (
	Mem1MB = 1024 * 1024
	Mem1GB = 1024 * 1024 * 1024

	// Assume 2GB RAM on non-linux systems
	DefaultNonLinuxMemory = 2048 * Mem1MB
)
View Source
const (
	// max buffer size for grpc data messages, 10K
	MaxDataChunk = 10 * 1024
)
View Source
const RegistryToken = "FN_REGISTRY_TOKEN"

RegistryToken is a reserved call extensions key to pass registry token

#nosec

Variables

View Source
var (
	ErrorExpectedTry  = errors.New("Protocol failure: expected ClientMsg_Try")
	ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data")
)
View Source
var (
	ErrorRunnerClosed    = errors.New("Runner is closed")
	ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response")
)
View Source
var CapacityFull = errors.New("max capacity reached")

Functions

func DefaultStaticRunnerPool

func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool

func GetCallLatencies added in v0.3.646

func GetCallLatencies(c *call) (time.Duration, time.Duration)

func NewCallHandle

func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle

func NewDockerDriver

func NewDockerDriver(cfg *Config) (drivers.Driver, error)

NewDockerDriver creates a default docker driver from agent config

func NewSlotQueue

func NewSlotQueue(key string) *slotQueue

func NewSlotQueueMgr

func NewSlotQueueMgr() *slotQueueMgr

func NewStaticRunnerPool

func NewStaticRunnerPool(runnerAddresses []string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) pool.RunnerPool

func NewgRPCRunner added in v0.3.637

func NewgRPCRunner(addr string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) (pool.Runner, error)

func RegisterAgentViews

func RegisterAgentViews(tagKeys []string, latencyDist []float64)

RegisterAgentViews creates and registers all agent views

func RegisterContainerViews

func RegisterContainerViews(tagKeys []string, latencyDist []float64)

RegisterContainerViews creates and register containers views with provided tag keys

func RegisterDockerViews

func RegisterDockerViews(tagKeys []string, latencyDist, ioNetDist, ioDiskDist, memoryDist, cpuDist []float64)

RegisterDockerViews creates a and registers Docker views with provided tag keys

func RegisterLBAgentViews

func RegisterLBAgentViews(tagKeys []string, latencyDist []float64)

func RegisterRunnerViews added in v0.3.639

func RegisterRunnerViews(tagKeys []string, latencyDist []float64)

RegisterRunnerViews creates and registers all runner views

func TranslateGRPCStatusToRunnerStatus

func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus

Translate runner.RunnerStatus to runnerpool.RunnerStatus

Types

type Agent

type Agent interface {
	// GetCall will return a Call that is executable by the Agent, which
	// can be built via various CallOpt's provided to the method.
	GetCall(...CallOpt) (Call, error)

	// Submit will attempt to execute a call locally, a Call may store information
	// about itself in its Start and End methods, which will be called in Submit
	// immediately before and after the Call is executed, respectively. An error
	// will be returned if there is an issue executing the call or the error
	// may be from the call's execution itself (if, say, the container dies,
	// or the call times out).
	Submit(Call) error

	// Close will wait for any outstanding calls to complete and then exit.
	// Closing the agent will invoke Close on the underlying DataAccess.
	// Close is not safe to be called from multiple threads.
	io.Closer

	AddCallListener(fnext.CallListener)
}

Agent exposes an api to create calls from various parameters and then submit those calls, it also exposes a 'safe' shutdown mechanism via its Close method. Agent has a few roles:

  • manage the memory pool for a given server
  • manage the container lifecycle for calls
  • execute calls against containers
  • invoke Start and End for each call appropriately
  • check the mq for any async calls, and submit them

Overview: Upon submission of a call, Agent will start the call's timeout timer immediately. If the call is hot, Agent will attempt to find an active hot container for that route, and if necessary launch another container. calls will be able to read/write directly from/to a socket in the container. If it's necessary to launch a container, first an attempt will be made to try to reserve the ram required while waiting for any hot 'slot' to become available [if applicable]. If there is an error launching the container, an error will be returned provided the call has not yet timed out or found another hot 'slot' to execute in [if applicable]. call.Start will be called immediately before sending any input to a container. call.End will be called regardless of the timeout timer's status if the call was executed, and that error returned may be returned from Submit.

func DefaultPureRunner

func DefaultPureRunner(cancel context.CancelFunc, addr string, da CallHandler, tlsCfg *tls.Config) (Agent, error)

func New

func New(da CallHandler, options ...Option) Agent

New creates an Agent that executes functions locally as Docker containers.

func NewLBAgent

func NewLBAgent(da CallHandler, rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error)

NewLBAgent creates an Agent that knows how to load-balance function calls across a group of runner nodes.

func NewPureRunner

func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error)

type Call

type Call interface {
	// Model will return the underlying models.Call configuration for this call.
	// TODO we could respond to async correctly from agent but layering, this
	// is only because the front end has different responses based on call type.
	// try to discourage use elsewhere until this gets pushed down more...
	Model() *models.Call

	// Start will be called before this call is executed, it may be used to
	// guarantee mutual exclusion, check docker permissions, update timestamps,
	// etc.
	// TODO Start and End can likely be unexported as they are only used in the agent,
	// and on a type which is constructed in a specific agent. meh.
	Start(ctx context.Context) error

	// End will be called immediately after attempting a call execution,
	// regardless of whether the execution failed or not. An error will be passed
	// to End, which if nil indicates a successful execution. Any error returned
	// from End will be returned as the error from Submit.
	End(ctx context.Context, err error) error
}

Call is an agent specific instance of a call object, that is runnable.

type CallHandler

type CallHandler interface {
	// Start will attempt to start the provided Call within an appropriate
	// context.
	Start(ctx context.Context, mCall *models.Call) error

	// Finish will notify the system that the Call has been processed, and
	// fulfill the reservation in the queue if the call came from a queue.
	Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
}

CallHandler consumes the start and finish events for a call This is effectively a callback that is allowed to read the logs - TODO Deprecate this - this could be a CallListener except it also consumes logs

func NewDirectCallDataAccess

func NewDirectCallDataAccess(ls models.LogStore, mq models.MessageQueue) CallHandler

type CallOpt

type CallOpt func(c *call) error

CallOpt allows configuring a call before execution TODO(reed): consider the interface here, all options must be defined in agent and flexible enough for usage by extenders of fn, this straddling is painful. consider models.Call?

func FromHTTPFnRequest

func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt

FromHTTPFnRequest Sets up a call from an http trigger request

func FromModel

func FromModel(mCall *models.Call) CallOpt

FromModel creates a call object from an existing stored call model object, reading the body from the stored call payload

func FromModelAndInput

func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt

FromModelAndInput creates a call object from an existing stored call model object , reading the body from a provided stream

func InvokeDetached

func InvokeDetached() CallOpt

InvokeDetached mark a call to be a detached call

func WithContext

func WithContext(ctx context.Context) CallOpt

WithContext overrides the context on the call

func WithDockerAuth added in v0.3.662

func WithDockerAuth(auth docker.Auther) CallOpt

WithDockerAuth configures a call to retrieve credentials for an image pull

func WithExtensions

func WithExtensions(extensions map[string]string) CallOpt

WithExtensions adds internal attributes to the call that can be interpreted by extensions in the agent Pure runner can use this to pass an extension to the call

func WithLogger added in v0.3.618

func WithLogger(w io.ReadWriteCloser) CallOpt

WithLogger sets stderr to the provided one

func WithTrigger

func WithTrigger(t *models.Trigger) CallOpt

WithTrigger adds trigger specific bits to a call. TODO consider removal, this is from a shuffle

func WithWriter

func WithWriter(w io.Writer) CallOpt

WithWriter sets the writer that the call uses to send its output message to TODO this should be required

type CallOverrider

type CallOverrider func(*models.Call, map[string]string) (map[string]string, error)

CallOverrider should die. Interceptor in GetCall

type Config

type Config struct {
	MinDockerVersion        string        `json:"min_docker_version"`
	ContainerLabelTag       string        `json:"container_label_tag"`
	DockerNetworks          string        `json:"docker_networks"`
	DockerLoadFile          string        `json:"docker_load_file"`
	FreezeIdle              time.Duration `json:"freeze_idle_msecs"`
	HotPoll                 time.Duration `json:"hot_poll_msecs"`
	HotLauncherTimeout      time.Duration `json:"hot_launcher_timeout_msecs"`
	HotPullTimeout          time.Duration `json:"hot_pull_timeout_msecs"`
	HotStartTimeout         time.Duration `json:"hot_start_timeout_msecs"`
	AsyncChewPoll           time.Duration `json:"async_chew_poll_msecs"`
	DetachedHeadRoom        time.Duration `json:"detached_head_room_msecs"`
	MaxResponseSize         uint64        `json:"max_response_size_bytes"`
	MaxLogSize              uint64        `json:"max_log_size_bytes"`
	MaxTotalCPU             uint64        `json:"max_total_cpu_mcpus"`
	MaxTotalMemory          uint64        `json:"max_total_memory_bytes"`
	MaxFsSize               uint64        `json:"max_fs_size_mb"`
	PreForkPoolSize         uint64        `json:"pre_fork_pool_size"`
	PreForkImage            string        `json:"pre_fork_image"`
	PreForkCmd              string        `json:"pre_fork_pool_cmd"`
	PreForkUseOnce          uint64        `json:"pre_fork_use_once"`
	PreForkNetworks         string        `json:"pre_fork_networks"`
	EnableNBResourceTracker bool          `json:"enable_nb_resource_tracker"`
	MaxTmpFsInodes          uint64        `json:"max_tmpfs_inodes"`
	DisableReadOnlyRootFs   bool          `json:"disable_readonly_rootfs"`
	DisableDebugUserLogs    bool          `json:"disable_debug_user_logs"`
	IOFSEnableTmpfs         bool          `json:"iofs_enable_tmpfs"`
	IOFSAgentPath           string        `json:"iofs_path"`
	IOFSMountRoot           string        `json:"iofs_mount_root"`
	IOFSOpts                string        `json:"iofs_opts"`
	ImageCleanMaxSize       uint64        `json:"image_clean_max_size"`
	ImageCleanExemptTags    string        `json:"image_clean_exempt_tags"`
	ImageEnableVolume       bool          `json:"image_enable_volume"`
}

Config specifies various settings for an agent

func NewConfig

func NewConfig() (*Config, error)

NewConfig returns a config set from env vars, plus defaults

type ContainerState

type ContainerState interface {
	UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue)
	GetState() string
}

func NewContainerState

func NewContainerState() ContainerState

type ContainerStateType

type ContainerStateType int
const (
	ContainerStateNone   ContainerStateType = iota // uninitialized
	ContainerStateWait                             // resource (cpu + mem) waiting
	ContainerStateStart                            // launching
	ContainerStateIdle                             // running: idle but not paused
	ContainerStatePaused                           // running: idle but paused
	ContainerStateBusy                             // running: busy
	ContainerStateDone                             // exited/failed/done
	ContainerStateMax
)

type DataAccess

type DataAccess interface {
	ReadDataAccess
	DequeueDataAccess
	CallHandler
}

DataAccess is currently

type DequeueDataAccess

type DequeueDataAccess interface {
	// Dequeue will query the queue for the next available Call that can be run
	// by this Agent, and reserve it (ultimately forwards to mq.Reserve).
	Dequeue(ctx context.Context) (*models.Call, error)
}

DequeueDataAccess abstracts an underlying dequeue for async runners

func NewDirectDequeueAccess

func NewDirectDequeueAccess(mq models.MessageQueue) DequeueDataAccess

type DetachedResponseWriter

type DetachedResponseWriter struct {
	Headers http.Header
	// contains filtered or unexported fields
}

func NewDetachedResponseWriter

func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter

func (*DetachedResponseWriter) Header

func (w *DetachedResponseWriter) Header() http.Header

func (*DetachedResponseWriter) Status

func (w *DetachedResponseWriter) Status() int

func (*DetachedResponseWriter) Write

func (w *DetachedResponseWriter) Write(data []byte) (int, error)

func (*DetachedResponseWriter) WriteHeader

func (w *DetachedResponseWriter) WriteHeader(statusCode int)

type EnqueueDataAccess

type EnqueueDataAccess interface {
	// Enqueue will add a Call to the queue (ultimately forwards to mq.Push).
	Enqueue(ctx context.Context, mCall *models.Call) error
}

EnqueueDataAccess abstracts an underying enqueue for async queueing

func NewDirectEnqueueAccess

func NewDirectEnqueueAccess(mq models.MessageQueue) EnqueueDataAccess

func NewUnsupportedAsyncEnqueueAccess

func NewUnsupportedAsyncEnqueueAccess() EnqueueDataAccess

NewUnsupportedEnqueueAccess is a backstop that errors when you try to enqueue an async operation on a server that doesn't support async

type EvictToken

type EvictToken struct {
	C        chan struct{}
	DoneChan chan struct{}
	// contains filtered or unexported fields
}

func (*EvictToken) SetEvictable

func (token *EvictToken) SetEvictable(isEvictable bool)

type Evictor

type Evictor interface {
	// CreateEvictToken creates an eviction token to be used in evictor tracking. Returns
	// an eviction token.
	CreateEvictToken(slotId string, mem, cpu uint64) *EvictToken

	// DeleteEvictToken deletes an eviction token from evictor system
	DeleteEvictToken(token *EvictToken)

	// PerformEviction performs evictions to satisfy cpu & mem arguments
	// and returns a slice of channels for evictions performed. The callers
	// can wait on these channel to ensure evictions are completed.
	PerformEviction(slotId string, mem, cpu uint64) []chan struct{}
}

func NewEvictor

func NewEvictor() Evictor

type LBAgentOption

type LBAgentOption func(*lbAgent) error

func WithLBAgentConfig

func WithLBAgentConfig(cfg *Config) LBAgentOption

func WithLBCallOverrider

func WithLBCallOverrider(fn CallOverrider) LBAgentOption

LB agents can use this to register a CallOverrider to modify a Call and extensions

type Option

type Option func(*agent) error

Option configures an agent at startup

func WithAsync

func WithAsync(dqda DequeueDataAccess) Option

WithAsync Enables Async operations on the agent

func WithCallOptions added in v0.3.662

func WithCallOptions(opts ...CallOpt) Option

WithCallOptions adds additional call options to each call created from GetCall, these options will be executed after any other options supplied to GetCall

func WithCallOverrider

func WithCallOverrider(fn CallOverrider) Option

WithCallOverrider registers register a CallOverrider to modify a Call and extensions on call construction

func WithConfig

func WithConfig(cfg *Config) Option

WithConfig sets the agent config to the provided config

func WithDockerDriver

func WithDockerDriver(drv drivers.Driver) Option

WithDockerDriver Provides a customer driver to agent

type PureRunnerOption

type PureRunnerOption func(*pureRunner) error

func PureRunnerWithAgent

func PureRunnerWithAgent(a Agent) PureRunnerOption

func PureRunnerWithDetached

func PureRunnerWithDetached() PureRunnerOption

func PureRunnerWithGRPCServerOptions added in v0.3.637

func PureRunnerWithGRPCServerOptions(options ...grpc.ServerOption) PureRunnerOption

func PureRunnerWithKdumpsOnDisk added in v0.3.624

func PureRunnerWithKdumpsOnDisk(numKdumps uint64) PureRunnerOption

PureRunnerWithKdumpsOnDisk returns a PureRunnerOption that indicates that kdumps have been found on disk. The argument numKdump is a counter that indicates how many dumps were on disk at the time the runner was created.

func PureRunnerWithSSL

func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption

func PureRunnerWithStatusImage

func PureRunnerWithStatusImage(imgName string) PureRunnerOption

PureRunnerWithStatusImage returns a PureRunnerOption that annotates a PureRunner with a statusImageName attribute. This attribute names an image name to use for the status checks. Optionally, the status image can be pre-loaded into docker using FN_DOCKER_LOAD_FILE to avoid docker pull during status checks.

func PureRunnerWithStatusNetworkEnabler added in v0.3.659

func PureRunnerWithStatusNetworkEnabler(barrierPath string) PureRunnerOption

type ReadDataAccess

type ReadDataAccess interface {
	GetAppID(ctx context.Context, appName string) (string, error)
	// GetAppByID abstracts querying the datastore for an app.
	GetAppByID(ctx context.Context, appID string) (*models.App, error)
	GetTriggerBySource(ctx context.Context, appId string, triggerType, source string) (*models.Trigger, error)
	GetFnByID(ctx context.Context, fnId string) (*models.Fn, error)
}

ReadDataAccess represents read operations required to operate a load balancer node

func NewCachedDataAccess

func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess

type RequestState

type RequestState interface {
	UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}

func NewRequestState

func NewRequestState() RequestState

type RequestStateType

type RequestStateType int
const (
	RequestStateNone RequestStateType = iota // uninitialized
	RequestStateWait                         // request is waiting
	RequestStateExec                         // request is executing
	RequestStateDone                         // request is done
	RequestStateMax
)

type ResourceToken

type ResourceToken interface {
	// Close must be called by any thread that receives a token.
	io.Closer
	Error() error
	NeededCapacity() (uint64, models.MilliCPUs)
}

type ResourceTracker

type ResourceTracker interface {
	// WaitAsyncResource returns a channel that will send once when there seem to be sufficient
	// resource levels to run an async task, it is up to the implementer to create policy here.
	WaitAsyncResource(ctx context.Context) chan struct{}

	// GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled,
	// the channel will never receive anything. If it is not possible to fulfill this resource, the channel
	// will never receive anything (use IsResourcePossible). If a resource token is available for the provided
	// resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed.
	// if isNB is set, resource check is done and error token is returned without blocking.
	// Memory is expected to be provided in MB units.
	GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken

	// IsResourcePossible returns whether it's possible to fulfill the requested resources on this
	// machine. It must be called before GetResourceToken or GetResourceToken may hang.
	// Memory is expected to be provided in MB units.
	IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool

	// Retrieve current stats/usage
	GetUtilization() ResourceUtilization
}

A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: disk, network IO for future

func NewResourceTracker

func NewResourceTracker(cfg *Config) ResourceTracker

type ResourceUtilization

type ResourceUtilization struct {
	// CPU in use
	CpuUsed models.MilliCPUs
	// CPU available
	CpuAvail models.MilliCPUs
	// Memory in use in bytes
	MemUsed uint64
	// Memory available in bytes
	MemAvail uint64
}

type Slot

type Slot interface {
	Close() error
	Error() error
	// contains filtered or unexported methods
}

Directories

Path Synopsis
Package drivers is intended as a general purpose container abstraction library.
Package drivers is intended as a general purpose container abstraction library.
docker
Package docker provides a Docker driver for Fn.
Package docker provides a Docker driver for Fn.
mock
Package mock provides a fake Driver implementation that is only used for testing.
Package mock provides a fake Driver implementation that is only used for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL