Documentation ¶
Overview ¶
Package agent defines the Agent interface and related concepts. An agent is an entity that knows how to execute an Fn function.
The Agent Interface ¶
The Agent interface is the heart of this package. Agent exposes an api to create calls from various parameters and then execute those calls. An Agent has a few roles:
- manage the memory pool for a given server
- manage the container lifecycle for calls
- execute calls against containers
- invoke Start and End for each call appropriately
- check the mq for any async calls, and submit them
For more information about how an agent executes a call see the documentation on the Agent interface.
Variants ¶
There are two flavors of runner, the local Docker agent and a load-balancing agent. To create an agent that uses Docker containers to execute calls, use New().
To create an agent that can load-balance across a pool of sub-agents, use NewLBAgent().
Index ¶
- Constants
- Variables
- func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool
- func GetCallLatencies(c *call) (time.Duration, time.Duration)
- func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle
- func NewDockerDriver(cfg *Config) (drivers.Driver, error)
- func NewSlotQueue(key string) *slotQueue
- func NewSlotQueueMgr() *slotQueueMgr
- func NewStaticRunnerPool(runnerAddresses []string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) pool.RunnerPool
- func NewgRPCRunner(addr string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) (pool.Runner, error)
- func RegisterAgentViews(tagKeys []string, latencyDist []float64)
- func RegisterContainerViews(tagKeys []string, latencyDist []float64)
- func RegisterDockerViews(tagKeys []string, ...)
- func RegisterLBAgentViews(tagKeys []string, latencyDist []float64)
- func RegisterRunnerViews(tagKeys []string, latencyDist []float64)
- func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus
- type Agent
- func DefaultPureRunner(cancel context.CancelFunc, addr string, da CallHandler, tlsCfg *tls.Config) (Agent, error)
- func New(da CallHandler, options ...Option) Agent
- func NewLBAgent(da CallHandler, rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error)
- func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error)
- type Call
- type CallHandler
- type CallOpt
- func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt
- func FromModel(mCall *models.Call) CallOpt
- func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt
- func InvokeDetached() CallOpt
- func WithContext(ctx context.Context) CallOpt
- func WithExtensions(extensions map[string]string) CallOpt
- func WithLogger(w io.ReadWriteCloser) CallOpt
- func WithTrigger(t *models.Trigger) CallOpt
- func WithWriter(w io.Writer) CallOpt
- type CallOverrider
- type Config
- type ContainerState
- type ContainerStateType
- type DataAccess
- type DequeueDataAccess
- type DetachedResponseWriter
- type EnqueueDataAccess
- type EvictToken
- type Evictor
- type LBAgentOption
- type Option
- type PureRunnerOption
- func PureRunnerWithAgent(a Agent) PureRunnerOption
- func PureRunnerWithDetached() PureRunnerOption
- func PureRunnerWithGRPCServerOptions(options ...grpc.ServerOption) PureRunnerOption
- func PureRunnerWithKdumpsOnDisk(numKdumps uint64) PureRunnerOption
- func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption
- func PureRunnerWithStatusImage(imgName string) PureRunnerOption
- type ReadDataAccess
- type RequestState
- type RequestStateType
- type ResourceToken
- type ResourceTracker
- type ResourceUtilization
- type Slot
Constants ¶
const ( // EnvContainerLabelTag is a classifier label tag that is used to distinguish fn managed containers EnvContainerLabelTag = "FN_CONTAINER_LABEL_TAG" // EnvImageCleanMaxSize enables image cleaner and sets the high water mark for image cache in bytes EnvImageCleanMaxSize = "FN_IMAGE_CLEAN_MAX_SIZE" // EnvImageCleanExemptTags list of image names separated by whitespace that are exempt from removal in image cleaner EnvImageCleanExemptTags = "FN_IMAGE_CLEAN_EXEMPT_TAGS" // EnvDockerNetworks is a comma separated list of networks to attach to each container started EnvDockerNetworks = "FN_DOCKER_NETWORKS" // EnvDockerLoadFile is a file location for a file that contains a tarball of a docker image to load on startup EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE" // EnvFreezeIdle is the delay between a container being last used and being frozen EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" // EnvHotPoll is the interval to ping for a slot manager thread to check if a container should be // launched for a given function EnvHotPoll = "FN_HOT_POLL_MSECS" // EnvHotLauncherTimeout is the timeout for a hot container queue to persist if idle EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" // EnvHotStartTimeout is the timeout for a hot container to be created including docker-pull EnvHotPullTimeout = "FN_HOT_PULL_TIMEOUT_MSECS" // EnvHotStartTimeout is the timeout for a hot container to become available for use for requests after EnvHotStartTimeout EnvHotStartTimeout = "FN_HOT_START_TIMEOUT_MSECS" // EnvAsyncChewPoll is the interval to poll the queue that contains async function invocations EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" // EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" // EnvMaxLogSize is the maximum size that a function's log may reach EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" // EnvMaxTotalCPU is the maximum CPU that will be reserved across all containers EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" // EnvMaxTotalMemory is the maximum memory that will be reserved across all containers EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" // EnvMaxFsSize is the maximum filesystem size that a function may use EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" // EnvPreForkPoolSize is the number of containers pooled to steal network from, this may reduce latency EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" // EnvPreForkImage is the image to use for the pre-fork pool EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" // EnvPreForkCmd is the command to run for images in the pre-fork pool, it should run for a long time EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" // EnvPreForkUseOnce limits the number of times a pre-fork pool container may be used to one, they are otherwise recycled EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" // EnvPreForkNetworks is the equivalent of EnvDockerNetworks but for pre-fork pool containers EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" // EnvEnableNBResourceTracker makes every request to the resource tracker non-blocking, meaning the resources are either // available or it will return an error immediately EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" // EnvMaxTmpFsInodes is the maximum number of inodes for /tmp in a container EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES" // EnvDisableReadOnlyRootFs makes the root fs for a container have rw permissions, by default it is read only EnvDisableReadOnlyRootFs = "FN_DISABLE_READONLY_ROOTFS" // EnvDisableDebugUserLogs disables user function logs being logged at level debug. wise to enable for production. EnvDisableDebugUserLogs = "FN_DISABLE_DEBUG_USER_LOGS" // EnvIOFSEnableTmpfs enables creating a per-container tmpfs mount for the IOFS EnvIOFSEnableTmpfs = "FN_IOFS_TMPFS" // EnvIOFSPath is the path within fn server container of a directory to configure for unix socket files for each container EnvIOFSPath = "FN_IOFS_PATH" // EnvIOFSDockerPath determines the relative location on the docker host where iofs mounts should be prefixed with EnvIOFSDockerPath = "FN_IOFS_DOCKER_PATH" // EnvIOFSOpts are the options to set when mounting the iofs directory for unix socket files EnvIOFSOpts = "FN_IOFS_OPTS" // EnvDetachedHeadroom is the extra room we want to give to a detached function to run. EnvDetachedHeadroom = "FN_EXECUTION_HEADROOM" // MaxMsDisabled is used to determine whether mr freeze is lying in wait. TODO remove this manuever MaxMsDisabled = time.Duration(math.MaxInt64) // DefaultHotPoll is the default value for EnvHotPoll DefaultHotPoll = 200 * time.Millisecond )
const ( // Here we give 5 seconds of timeout inside the container. We hardcode these numbers here to // ensure we control idle timeout & timeout as well as how long should cache be valid. // A cache duration of idleTimeout + 500 msecs allows us to reuse the cache, for about 1.5 secs, // and during this time, since we allow no queries to go through, the hot container times out. // // For now, status tests a single case: a new hot container is spawned when cache is expired // and when a query is allowed to run. // TODO: we might want to mix this up and perhaps allow that hot container to handle // more than one query to test both 'new hot container' and 'old hot container' cases. StatusCallTimeout = int32(5) StatusCallIdleTimeout = int32(1) StatusCallCacheDuration = time.Duration(500)*time.Millisecond + time.Duration(StatusCallIdleTimeout)*time.Second // Total context timeout (scheduler+execution.) We need to allocate plenty of time here. // 60 seconds should be enough to provoke disk I/O errors, docker timeouts. etc. StatusCtxTimeout = time.Duration(60 * time.Second) )
const ( Mem1MB = 1024 * 1024 Mem1GB = 1024 * 1024 * 1024 // Assume 2GB RAM on non-linux systems DefaultNonLinuxMemory = 2048 * Mem1MB )
const (
// max buffer size for grpc data messages, 10K
MaxDataChunk = 10 * 1024
)
const RegistryToken = "FN_REGISTRY_TOKEN"
RegistryToken is a reserved call extensions key to pass registry token
#nosec
Variables ¶
var ( ErrorExpectedTry = errors.New("Protocol failure: expected ClientMsg_Try") ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data") )
var ( ErrorRunnerClosed = errors.New("Runner is closed") ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response") )
var CapacityFull = errors.New("max capacity reached")
Functions ¶
func DefaultStaticRunnerPool ¶
func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool
func GetCallLatencies ¶ added in v0.3.646
func NewCallHandle ¶
func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle
func NewDockerDriver ¶
NewDockerDriver creates a default docker driver from agent config
func NewSlotQueue ¶
func NewSlotQueue(key string) *slotQueue
func NewSlotQueueMgr ¶
func NewSlotQueueMgr() *slotQueueMgr
func NewStaticRunnerPool ¶
func NewStaticRunnerPool(runnerAddresses []string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) pool.RunnerPool
func NewgRPCRunner ¶ added in v0.3.637
func RegisterAgentViews ¶
RegisterAgentViews creates and registers all agent views
func RegisterContainerViews ¶
RegisterContainerViews creates and register containers views with provided tag keys
func RegisterDockerViews ¶
func RegisterDockerViews(tagKeys []string, latencyDist, ioNetDist, ioDiskDist, memoryDist, cpuDist []float64)
RegisterDockerViews creates a and registers Docker views with provided tag keys
func RegisterLBAgentViews ¶
func RegisterRunnerViews ¶ added in v0.3.639
RegisterRunnerViews creates and registers all runner views
func TranslateGRPCStatusToRunnerStatus ¶
func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus
Translate runner.RunnerStatus to runnerpool.RunnerStatus
Types ¶
type Agent ¶
type Agent interface { // GetCall will return a Call that is executable by the Agent, which // can be built via various CallOpt's provided to the method. GetCall(...CallOpt) (Call, error) // Submit will attempt to execute a call locally, a Call may store information // about itself in its Start and End methods, which will be called in Submit // immediately before and after the Call is executed, respectively. An error // will be returned if there is an issue executing the call or the error // may be from the call's execution itself (if, say, the container dies, // or the call times out). Submit(Call) error // Close will wait for any outstanding calls to complete and then exit. // Closing the agent will invoke Close on the underlying DataAccess. // Close is not safe to be called from multiple threads. io.Closer AddCallListener(fnext.CallListener) }
Agent exposes an api to create calls from various parameters and then submit those calls, it also exposes a 'safe' shutdown mechanism via its Close method. Agent has a few roles:
- manage the memory pool for a given server
- manage the container lifecycle for calls
- execute calls against containers
- invoke Start and End for each call appropriately
- check the mq for any async calls, and submit them
Overview: Upon submission of a call, Agent will start the call's timeout timer immediately. If the call is hot, Agent will attempt to find an active hot container for that route, and if necessary launch another container. calls will be able to read/write directly from/to a socket in the container. If it's necessary to launch a container, first an attempt will be made to try to reserve the ram required while waiting for any hot 'slot' to become available [if applicable]. If there is an error launching the container, an error will be returned provided the call has not yet timed out or found another hot 'slot' to execute in [if applicable]. call.Start will be called immediately before sending any input to a container. call.End will be called regardless of the timeout timer's status if the call was executed, and that error returned may be returned from Submit.
func DefaultPureRunner ¶
func DefaultPureRunner(cancel context.CancelFunc, addr string, da CallHandler, tlsCfg *tls.Config) (Agent, error)
func New ¶
func New(da CallHandler, options ...Option) Agent
New creates an Agent that executes functions locally as Docker containers.
func NewLBAgent ¶
func NewLBAgent(da CallHandler, rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error)
NewLBAgent creates an Agent that knows how to load-balance function calls across a group of runner nodes.
func NewPureRunner ¶
func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error)
type Call ¶
type Call interface { // Model will return the underlying models.Call configuration for this call. // TODO we could respond to async correctly from agent but layering, this // is only because the front end has different responses based on call type. // try to discourage use elsewhere until this gets pushed down more... Model() *models.Call // Start will be called before this call is executed, it may be used to // guarantee mutual exclusion, check docker permissions, update timestamps, // etc. // TODO Start and End can likely be unexported as they are only used in the agent, // and on a type which is constructed in a specific agent. meh. Start(ctx context.Context) error // End will be called immediately after attempting a call execution, // regardless of whether the execution failed or not. An error will be passed // to End, which if nil indicates a successful execution. Any error returned // from End will be returned as the error from Submit. End(ctx context.Context, err error) error }
type CallHandler ¶
type CallHandler interface { // Start will attempt to start the provided Call within an appropriate // context. Start(ctx context.Context, mCall *models.Call) error // Finish will notify the system that the Call has been processed, and // fulfill the reservation in the queue if the call came from a queue. Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error }
CallHandler consumes the start and finish events for a call This is effectively a callback that is allowed to read the logs - TODO Deprecate this - this could be a CallListener except it also consumes logs
func NewDirectCallDataAccess ¶
func NewDirectCallDataAccess(ls models.LogStore, mq models.MessageQueue) CallHandler
type CallOpt ¶
type CallOpt func(c *call) error
TODO build w/o closures... lazy
func FromHTTPFnRequest ¶
Sets up a call from an http trigger request
func FromModel ¶
FromModel creates a call object from an existing stored call model object, reading the body from the stored call payload
func FromModelAndInput ¶
func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt
FromModelAndInput creates a call object from an existing stored call model object , reading the body from a provided stream
func InvokeDetached ¶
func InvokeDetached() CallOpt
InvokeDetached mark a call to be a detached call
func WithContext ¶
WithContext overrides the context on the call
func WithExtensions ¶
WithExtensions adds internal attributes to the call that can be interpreted by extensions in the agent Pure runner can use this to pass an extension to the call
func WithLogger ¶ added in v0.3.618
func WithLogger(w io.ReadWriteCloser) CallOpt
WithLogger sets stderr to the provided one
func WithTrigger ¶
WithTrigger adds trigger specific bits to a call. TODO consider removal, this is from a shuffle
func WithWriter ¶
WithWriter sets the writer that the call uses to send its output message to TODO this should be required
type CallOverrider ¶
Interceptor in GetCall
type Config ¶
type Config struct { MinDockerVersion string `json:"min_docker_version"` ContainerLabelTag string `json:"container_label_tag"` DockerNetworks string `json:"docker_networks"` DockerLoadFile string `json:"docker_load_file"` FreezeIdle time.Duration `json:"freeze_idle_msecs"` HotPoll time.Duration `json:"hot_poll_msecs"` HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` HotPullTimeout time.Duration `json:"hot_pull_timeout_msecs"` HotStartTimeout time.Duration `json:"hot_start_timeout_msecs"` AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` DetachedHeadRoom time.Duration `json:"detached_head_room_msecs"` MaxResponseSize uint64 `json:"max_response_size_bytes"` MaxLogSize uint64 `json:"max_log_size_bytes"` MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` MaxTotalMemory uint64 `json:"max_total_memory_bytes"` MaxFsSize uint64 `json:"max_fs_size_mb"` PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkImage string `json:"pre_fork_image"` PreForkCmd string `json:"pre_fork_pool_cmd"` PreForkUseOnce uint64 `json:"pre_fork_use_once"` PreForkNetworks string `json:"pre_fork_networks"` EnableNBResourceTracker bool `json:"enable_nb_resource_tracker"` MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"` DisableReadOnlyRootFs bool `json:"disable_readonly_rootfs"` DisableDebugUserLogs bool `json:"disable_debug_user_logs"` IOFSEnableTmpfs bool `json:"iofs_enable_tmpfs"` IOFSAgentPath string `json:"iofs_path"` IOFSMountRoot string `json:"iofs_mount_root"` IOFSOpts string `json:"iofs_opts"` MaxDockerRetries uint64 `json:"max_docker_retries"` ImageCleanMaxSize uint64 `json:"image_clean_max_size"` ImageCleanExemptTags string `json:"image_clean_exempt_tags"` }
Config specifies various settings for an agent
type ContainerState ¶
type ContainerState interface { UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue) GetState() string }
func NewContainerState ¶
func NewContainerState() ContainerState
type ContainerStateType ¶
type ContainerStateType int
const ( ContainerStateNone ContainerStateType = iota // uninitialized ContainerStateWait // resource (cpu + mem) waiting ContainerStateStart // launching ContainerStateIdle // running: idle but not paused ContainerStatePaused // running: idle but paused ContainerStateBusy // running: busy ContainerStateDone // exited/failed/done ContainerStateMax )
type DataAccess ¶
type DataAccess interface { ReadDataAccess DequeueDataAccess CallHandler }
DataAccess is currently
type DequeueDataAccess ¶
type DequeueDataAccess interface { // Dequeue will query the queue for the next available Call that can be run // by this Agent, and reserve it (ultimately forwards to mq.Reserve). Dequeue(ctx context.Context) (*models.Call, error) }
DequeueDataAccess abstracts an underlying dequeue for async runners
func NewDirectDequeueAccess ¶
func NewDirectDequeueAccess(mq models.MessageQueue) DequeueDataAccess
type DetachedResponseWriter ¶
type DetachedResponseWriter struct { Headers http.Header // contains filtered or unexported fields }
func NewDetachedResponseWriter ¶
func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter
func (*DetachedResponseWriter) Header ¶
func (w *DetachedResponseWriter) Header() http.Header
func (*DetachedResponseWriter) Status ¶
func (w *DetachedResponseWriter) Status() int
func (*DetachedResponseWriter) Write ¶
func (w *DetachedResponseWriter) Write(data []byte) (int, error)
func (*DetachedResponseWriter) WriteHeader ¶
func (w *DetachedResponseWriter) WriteHeader(statusCode int)
type EnqueueDataAccess ¶
type EnqueueDataAccess interface { // Enqueue will add a Call to the queue (ultimately forwards to mq.Push). Enqueue(ctx context.Context, mCall *models.Call) error }
EnqueueDataAccess abstracts an underying enqueue for async queueing
func NewDirectEnqueueAccess ¶
func NewDirectEnqueueAccess(mq models.MessageQueue) EnqueueDataAccess
func NewUnsupportedAsyncEnqueueAccess ¶
func NewUnsupportedAsyncEnqueueAccess() EnqueueDataAccess
NewUnsupportedEnqueueAccess is a backstop that errors when you try to enqueue an async operation on a server that doesn't support async
type EvictToken ¶
type EvictToken struct { C chan struct{} DoneChan chan struct{} // contains filtered or unexported fields }
func (*EvictToken) SetEvictable ¶
func (token *EvictToken) SetEvictable(isEvictable bool)
type Evictor ¶
type Evictor interface { // CreateEvictToken creates an eviction token to be used in evictor tracking. Returns // an eviction token. CreateEvictToken(slotId string, mem, cpu uint64) *EvictToken // DeleteEvictToken deletes an eviction token from evictor system DeleteEvictToken(token *EvictToken) // PerformEviction performs evictions to satisfy cpu & mem arguments // and returns a slice of channels for evictions performed. The callers // can wait on these channel to ensure evictions are completed. PerformEviction(slotId string, mem, cpu uint64) []chan struct{} }
func NewEvictor ¶
func NewEvictor() Evictor
type LBAgentOption ¶
type LBAgentOption func(*lbAgent) error
func WithLBAgentConfig ¶
func WithLBAgentConfig(cfg *Config) LBAgentOption
func WithLBCallOverrider ¶
func WithLBCallOverrider(fn CallOverrider) LBAgentOption
LB agents can use this to register a CallOverrider to modify a Call and extensions
type Option ¶
type Option func(*agent) error
Option configures an agent at startup
func WithAsync ¶
func WithAsync(dqda DequeueDataAccess) Option
WithAsync Enables Async operations on the agent
func WithCallOverrider ¶
func WithCallOverrider(fn CallOverrider) Option
WithCallOverrider registers register a CallOverrider to modify a Call and extensions on call construction
func WithConfig ¶
WithConfig sets the agent config to the provided config
func WithDockerDriver ¶
WithDockerDriver Provides a customer driver to agent
type PureRunnerOption ¶
type PureRunnerOption func(*pureRunner) error
func PureRunnerWithAgent ¶
func PureRunnerWithAgent(a Agent) PureRunnerOption
func PureRunnerWithDetached ¶
func PureRunnerWithDetached() PureRunnerOption
func PureRunnerWithGRPCServerOptions ¶ added in v0.3.637
func PureRunnerWithGRPCServerOptions(options ...grpc.ServerOption) PureRunnerOption
func PureRunnerWithKdumpsOnDisk ¶ added in v0.3.624
func PureRunnerWithKdumpsOnDisk(numKdumps uint64) PureRunnerOption
PureRunnerWithKdumpsOnDisk returns a PureRunnerOption that indicates that kdumps have been found on disk. The argument numKdump is a counter that indicates how many dumps were on disk at the time the runner was created.
func PureRunnerWithSSL ¶
func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption
func PureRunnerWithStatusImage ¶
func PureRunnerWithStatusImage(imgName string) PureRunnerOption
PureRunnerWithStatusImage returns a PureRunnerOption that annotates a PureRunner with a statusImageName attribute. This attribute names an image name to use for the status checks. Optionally, the status image can be pre-loaded into docker using FN_DOCKER_LOAD_FILE to avoid docker pull during status checks.
type ReadDataAccess ¶
type ReadDataAccess interface { GetAppID(ctx context.Context, appName string) (string, error) // GetAppByID abstracts querying the datastore for an app. GetAppByID(ctx context.Context, appID string) (*models.App, error) GetTriggerBySource(ctx context.Context, appId string, triggerType, source string) (*models.Trigger, error) GetFnByID(ctx context.Context, fnId string) (*models.Fn, error) }
ReadDataAccess represents read operations required to operate a load balancer node
func NewCachedDataAccess ¶
func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess
type RequestState ¶
type RequestState interface {
UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}
func NewRequestState ¶
func NewRequestState() RequestState
type RequestStateType ¶
type RequestStateType int
const ( RequestStateNone RequestStateType = iota // uninitialized RequestStateWait // request is waiting RequestStateExec // request is executing RequestStateDone // request is done RequestStateMax )
type ResourceToken ¶
type ResourceTracker ¶
type ResourceTracker interface { // WaitAsyncResource returns a channel that will send once when there seem to be sufficient // resource levels to run an async task, it is up to the implementer to create policy here. WaitAsyncResource(ctx context.Context) chan struct{} // GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled, // the channel will never receive anything. If it is not possible to fulfill this resource, the channel // will never receive anything (use IsResourcePossible). If a resource token is available for the provided // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. // if isNB is set, resource check is done and error token is returned without blocking. // Memory is expected to be provided in MB units. GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool // Retrieve current stats/usage GetUtilization() ResourceUtilization }
A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: disk, network IO for future
func NewResourceTracker ¶
func NewResourceTracker(cfg *Config) ResourceTracker
type ResourceUtilization ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package drivers is intended as a general purpose container abstraction library.
|
Package drivers is intended as a general purpose container abstraction library. |
docker
Package docker provides a Docker driver for Fn.
|
Package docker provides a Docker driver for Fn. |
mock
Package mock provides a fake Driver implementation that is only used for testing.
|
Package mock provides a fake Driver implementation that is only used for testing. |