Documentation ¶
Overview ¶
Package agent defines the Agent interface and related concepts. An agent is an entity that knows how to execute an Fn function.
The Agent Interface ¶
The Agent interface is the heart of this package. Agent exposes an api to create calls from various parameters and then execute those calls. An Agent has a few roles:
- manage the memory pool for a given server
- manage the container lifecycle for calls
- execute calls against containers
- invoke Start and End for each call appropriately
For more information about how an agent executes a call see the documentation on the Agent interface.
Variants ¶
There are two flavors of runner, the local Docker agent and a load-balancing agent. To create an agent that uses Docker containers to execute calls, use New().
To create an agent that can load-balance across a pool of sub-agents, use NewLBAgent().
Index ¶
- Constants
- Variables
- func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool
- func GetCallLatencies(c *call) (time.Duration, time.Duration)
- func NewDockerDriver(cfg *Config) (drivers.Driver, error)
- func NewSlotQueue(key string) *slotQueue
- func NewSlotQueueMgr() *slotQueueMgr
- func NewStaticRunnerPool(runnerAddresses []string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) pool.RunnerPool
- func NewgRPCRunner(addr string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) (pool.Runner, error)
- func NewgRPCRunnerWithTimeout(addr string, tlsConf *tls.Config, timeout time.Duration, ...) (pool.Runner, error)
- func RegisterAgentViews(tagKeys []string, latencyDist []float64)
- func RegisterContainerViews(tagKeys []string, latencyDist []float64)
- func RegisterDockerViews(tagKeys []string, ...)
- func RegisterLBAgentViews(tagKeys []string, latencyDist []float64)
- func RegisterRunnerViews(tagKeys []string, latencyDist []float64)
- func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus
- type Agent
- func DefaultPureRunner(cancel context.CancelFunc, addr string, tlsCfg *tls.Config) (Agent, error)
- func New(options ...Option) Agent
- func NewLBAgent(rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error)
- func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error)
- type Call
- type CallOpt
- func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt
- func FromModel(mCall *models.Call) CallOpt
- func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt
- func InvokeDetached() CallOpt
- func WithContext(ctx context.Context) CallOpt
- func WithDockerAuth(auth docker.Auther) CallOpt
- func WithExtensions(extensions map[string]string) CallOpt
- func WithLogger(w io.ReadWriteCloser) CallOpt
- func WithTrigger(t *models.Trigger) CallOpt
- func WithWriter(w io.Writer) CallOpt
- type CallOverrider
- type Config
- type ContainerState
- type ContainerStateType
- type DataAccess
- type DetachedResponseWriter
- type EvictToken
- type Evictor
- type LBAgentOption
- type LogStreamer
- type MockAgent
- type Option
- type PureRunnerOption
- func PureRunnerWithAgent(a Agent) PureRunnerOption
- func PureRunnerWithConfigFunc(...) PureRunnerOption
- func PureRunnerWithCustomHealthCheckerFunc(customHealthCheckerFunc func(context.Context) (map[string]string, error)) PureRunnerOption
- func PureRunnerWithDetached() PureRunnerOption
- func PureRunnerWithGRPCServerOptions(options ...grpc.ServerOption) PureRunnerOption
- func PureRunnerWithKdumpsOnDisk(numKdumps uint64) PureRunnerOption
- func PureRunnerWithLogStreamer(logStreamer LogStreamer) PureRunnerOption
- func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption
- func PureRunnerWithStatusImage(imgName string) PureRunnerOption
- func PureRunnerWithStatusNetworkEnabler(barrierPath string) PureRunnerOption
- type ReadDataAccess
- type RequestState
- type RequestStateType
- type ResourceToken
- type ResourceTracker
- type ResourceUtilization
- type Slot
Constants ¶
const ( // EnvContainerLabelTag is a classifier label tag that is used to distinguish fn managed containers EnvContainerLabelTag = "FN_CONTAINER_LABEL_TAG" // EnvImageCleanMaxSize enables image cleaner and sets the high water mark for image cache in bytes EnvImageCleanMaxSize = "FN_IMAGE_CLEAN_MAX_SIZE" // EnvImageCleanExemptTags list of image names separated by whitespace that are exempt from removal in image cleaner EnvImageCleanExemptTags = "FN_IMAGE_CLEAN_EXEMPT_TAGS" // EnvImageEnableVolume allows image to contain VOLUME definitions EnvImageEnableVolume = "FN_IMAGE_ENABLE_VOLUME" // EnvDockerNetworks is a comma separated list of networks to attach to each container started EnvDockerNetworks = "FN_DOCKER_NETWORKS" // EnvDockerLoadFile is a file location for a file that contains a tarball of a docker image to load on startup EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE" // EnvDisableUnprivilegedContainers disables docker security features like user name, cap drop etc. EnvDisableUnprivilegedContainers = "FN_DISABLE_UNPRIVILEGED_CONTAINERS" // EnvFreezeIdle is the delay between a container being last used and being frozen EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" // EnvHotPoll is the interval to ping for a slot manager thread to check if a container should be // launched for a given function EnvHotPoll = "FN_HOT_POLL_MSECS" // EnvHotLauncherTimeout is the timeout for a hot container queue to persist if idle EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" // EnvHotPullTimeout is the timeout for a hot container to be created including docker-pull EnvHotPullTimeout = "FN_HOT_PULL_TIMEOUT_MSECS" // EnvHotStartTimeout is the timeout for a hot container to become available for use for requests after EnvHotStartTimeout EnvHotStartTimeout = "FN_HOT_START_TIMEOUT_MSECS" // EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" // EnvMaxHdrResponseSize is the maximum number of bytes that a function may return in an invocation header EnvMaxHdrResponseSize = "FN_MAX_HDR_RESPONSE_SIZE" // EnvMaxLogSize is the maximum size that a function's log may reach EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" // EnvMaxTotalCPU is the maximum CPU that will be reserved across all containers EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" // EnvMaxTotalMemory is the maximum memory that will be reserved across all containers EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" // EnvMaxFsSize is the maximum filesystem size that a function may use EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" // EnvMaxPIDs is the maximum number of PIDs that a function is allowed to create EnvMaxPIDs = "FN_MAX_PIDS" // EnvMaxOpenFiles is the maximum number open files handles the process in a // function is allowed to have EnvMaxOpenFiles = "FN_MAX_OPEN_FILES" // EnvMaxLockedMemory the maximum number of bytes of memory that may be // locked into RAM EnvMaxLockedMemory = "FN_MAX_LOCKED_MEMORY" // EnvMaxPendingSignals limit on the number of signals that may be queued EnvMaxPendingSignals = "FN_MAX_PENDING_SIGNALS" // EnvMaxMessageQueue limit on the number of bytes that can be allocated for // POSIX message queues EnvMaxMessageQueue = "FN_MAX_MESSAGE_QUEUE" // EnvPreForkPoolSize is the number of containers pooled to steal network from, this may reduce latency EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" // EnvPreForkImage is the image to use for the pre-fork pool EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" // EnvPreForkCmd is the command to run for images in the pre-fork pool, it should run for a long time EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" // EnvPreForkUseOnce limits the number of times a pre-fork pool container may be used to one, they are otherwise recycled EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" // EnvPreForkNetworks is the equivalent of EnvDockerNetworks but for pre-fork pool containers EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" // EnvEnableNBResourceTracker makes every request to the resource tracker non-blocking, meaning the resources are either // available or it will return an error immediately EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" // EnvMaxTmpFsInodes is the maximum number of inodes for /tmp in a container EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES" // EnvDisableReadOnlyRootFs makes the root fs for a container have rw permissions, by default it is read only EnvDisableReadOnlyRootFs = "FN_DISABLE_READONLY_ROOTFS" // EnvDisableDebugUserLogs disables user function logs being logged at level debug. wise to enable for production. EnvDisableDebugUserLogs = "FN_DISABLE_DEBUG_USER_LOGS" // EnvIOFSEnableTmpfs enables creating a per-container tmpfs mount for the IOFS EnvIOFSEnableTmpfs = "FN_IOFS_TMPFS" // EnvIOFSPath is the path within fn server container of a directory to configure for unix socket files for each container EnvIOFSPath = "FN_IOFS_PATH" // EnvIOFSDockerPath determines the relative location on the docker host where iofs mounts should be prefixed with EnvIOFSDockerPath = "FN_IOFS_DOCKER_PATH" // EnvIOFSOpts are the options to set when mounting the iofs directory for unix socket files EnvIOFSOpts = "FN_IOFS_OPTS" // EnvDetachedHeadroom is the extra room we want to give to a detached function to run. EnvDetachedHeadroom = "FN_EXECUTION_HEADROOM" // MaxMsDisabled is used to determine whether mr freeze is lying in wait. TODO remove this manuever MaxMsDisabled = time.Duration(math.MaxInt64) // DefaultHotPoll is the default value for EnvHotPoll DefaultHotPoll = 200 * time.Millisecond )
const ( // Here we give 5 seconds of timeout inside the container. We hardcode these numbers here to // ensure we control idle timeout & timeout as well as how long should cache be valid. // A cache duration of idleTimeout + 500 msecs allows us to reuse the cache, for about 1.5 secs, // and during this time, since we allow no queries to go through, the hot container times out. // // For now, status tests a single case: a new hot container is spawned when cache is expired // and when a query is allowed to run. // TODO: we might want to mix this up and perhaps allow that hot container to handle // more than one query to test both 'new hot container' and 'old hot container' cases. StatusCallTimeout = int32(5) StatusCallIdleTimeout = int32(1) StatusCallCacheDuration = time.Duration(500)*time.Millisecond + time.Duration(StatusCallIdleTimeout)*time.Second // Total context timeout (scheduler+execution.) We need to allocate plenty of time here. // 60 seconds should be enough to provoke disk I/O errors, docker timeouts. etc. StatusCtxTimeout = 60 * time.Second )
const ( Mem1MB = 1024 * 1024 Mem1GB = 1024 * 1024 * 1024 // Assume 2GB RAM on non-linux systems DefaultNonLinuxMemory = 2048 * Mem1MB )
const ( // max buffer size for grpc data messages, 10K MaxDataChunk = 10 * 1024 DefaultConnectTimeout = 100 * time.Millisecond )
const RegistryToken = "FN_REGISTRY_TOKEN"
RegistryToken is a reserved call extensions key to pass registry token
#nosec
Variables ¶
var ( ErrorExpectedTry = errors.New("Protocol failure: expected ClientMsg_Try") ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data") )
var ( ErrorRunnerClosed = errors.New("Runner is closed") ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response") )
var ( // AppIDMetricKey is a tag for metrics AppIDMetricKey = common.MakeKey("app_id") // FnIDMetricKey is a tag for metrics FnIDMetricKey = common.MakeKey("fn_id") // ImageNameMetricKey is a tag for metrics ImageNameMetricKey = common.MakeKey("image_name") )
var CapacityFull = errors.New("max capacity reached")
Functions ¶
func DefaultStaticRunnerPool ¶
func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool
func NewDockerDriver ¶
NewDockerDriver creates a default docker driver from agent config
func NewSlotQueue ¶
func NewSlotQueue(key string) *slotQueue
func NewSlotQueueMgr ¶
func NewSlotQueueMgr() *slotQueueMgr
func NewStaticRunnerPool ¶
func NewStaticRunnerPool(runnerAddresses []string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) pool.RunnerPool
func NewgRPCRunner ¶
func RegisterAgentViews ¶
RegisterAgentViews creates and registers all agent views
func RegisterContainerViews ¶
RegisterContainerViews creates and register containers views with provided tag keys
func RegisterDockerViews ¶
func RegisterDockerViews(tagKeys []string, latencyDist, ioNetDist, ioDiskDist, memoryDist, cpuDist []float64)
RegisterDockerViews creates a and registers Docker views with provided tag keys
func RegisterLBAgentViews ¶
func RegisterRunnerViews ¶
RegisterRunnerViews creates and registers all runner views
func TranslateGRPCStatusToRunnerStatus ¶
func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus
TranslateGRPCStatusToRunnerStatus runner.RunnerStatus to runnerpool.RunnerStatus
Types ¶
type Agent ¶
type Agent interface { // GetCall will return a Call that is executable by the Agent, which // can be built via various CallOpt's provided to the method. GetCall(...CallOpt) (Call, error) // Submit will attempt to execute a call locally, a Call may store information // about itself in its Start and End methods, which will be called in Submit // immediately before and after the Call is executed, respectively. An error // will be returned if there is an issue executing the call or the error // may be from the call's execution itself (if, say, the container dies, // or the call times out). Submit(Call) error // Closer will wait for any outstanding calls to complete and then exit. // Closing the agent will invoke Close on the underlying DataAccess. // Close is not safe to be called from multiple threads. io.Closer AddCallListener(fnext.CallListener) }
Agent exposes an api to create calls from various parameters and then submit those calls, it also exposes a 'safe' shutdown mechanism via its Close method. Agent has a few roles:
- manage the memory pool for a given server
- manage the container lifecycle for calls
- execute calls against containers
- invoke Start and End for each call appropriately
Overview: Upon submission of a call, Agent will start the call's timeout timer immediately. If the call is hot, Agent will attempt to find an active hot container for that route, and if necessary launch another container. calls will be able to read/write directly from/to a socket in the container. If it's necessary to launch a container, first an attempt will be made to try to reserve the ram required while waiting for any hot 'slot' to become available [if applicable]. If there is an error launching the container, an error will be returned provided the call has not yet timed out or found another hot 'slot' to execute in [if applicable]. call.Start will be called immediately before sending any input to a container. call.End will be called regardless of the timeout timer's status if the call was executed, and that error returned may be returned from Submit.
func DefaultPureRunner ¶
func NewLBAgent ¶
func NewLBAgent(rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error)
NewLBAgent creates an Agent that knows how to load-balance function calls across a group of runner nodes.
func NewPureRunner ¶
func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error)
type Call ¶
type Call interface { // Model will return the underlying models.Call configuration for this call. Model() *models.Call // Start will be called before this call is executed, it may be used to // guarantee mutual exclusion, check docker permissions, update timestamps, // etc. // TODO Start and End can likely be unexported as they are only used in the agent, // and on a type which is constructed in a specific agent. meh. Start(ctx context.Context) error // End will be called immediately after attempting a call execution, // regardless of whether the execution failed or not. An error will be passed // to End, which if nil indicates a successful execution. Any error returned // from End will be returned as the error from Submit. End(ctx context.Context, err error) error }
Call is an agent specific instance of a call object, that is runnable.
type CallOpt ¶
type CallOpt func(c *call) error
CallOpt allows configuring a call before execution TODO(reed): consider the interface here, all options must be defined in agent and flexible enough for usage by extenders of fn, this straddling is painful. consider models.Call?
func FromHTTPFnRequest ¶
FromHTTPFnRequest Sets up a call from an http trigger request
func FromModel ¶
FromModel creates a call object from an existing stored call model object, reading the body from the stored call payload
func FromModelAndInput ¶
func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt
FromModelAndInput creates a call object from an existing stored call model object , reading the body from a provided stream
func InvokeDetached ¶
func InvokeDetached() CallOpt
InvokeDetached mark a call to be a detached call
func WithContext ¶
WithContext overrides the context on the call
func WithDockerAuth ¶
WithDockerAuth configures a call to retrieve credentials for an image pull
func WithExtensions ¶
WithExtensions adds internal attributes to the call that can be interpreted by extensions in the agent Pure runner can use this to pass an extension to the call
func WithLogger ¶
func WithLogger(w io.ReadWriteCloser) CallOpt
WithLogger sets stderr to the provided one
func WithTrigger ¶
WithTrigger adds trigger specific bits to a call. TODO consider removal, this is from a shuffle
func WithWriter ¶
WithWriter sets the writer that the call uses to send its output message to TODO this should be required
type CallOverrider ¶
CallOverrider should die. Interceptor in GetCall
type Config ¶
type Config struct { MinDockerVersion string `json:"min_docker_version"` ContainerLabelTag string `json:"container_label_tag"` DockerNetworks string `json:"docker_networks"` DockerLoadFile string `json:"docker_load_file"` DisableUnprivilegedContainers bool `json:"disable_unprivileged_containers"` FreezeIdle time.Duration `json:"freeze_idle_msecs"` HotPoll time.Duration `json:"hot_poll_msecs"` HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` HotPullTimeout time.Duration `json:"hot_pull_timeout_msecs"` HotStartTimeout time.Duration `json:"hot_start_timeout_msecs"` DetachedHeadRoom time.Duration `json:"detached_head_room_msecs"` MaxResponseSize uint64 `json:"max_response_size_bytes"` MaxHdrResponseSize uint64 `json:"max_hdr_response_size_bytes"` MaxLogSize uint64 `json:"max_log_size_bytes"` MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` MaxTotalMemory uint64 `json:"max_total_memory_bytes"` MaxFsSize uint64 `json:"max_fs_size_mb"` MaxPIDs uint64 `json:"max_pids"` MaxOpenFiles *uint64 `json:"max_open_files"` MaxLockedMemory *uint64 `json:"max_locked_memory"` MaxPendingSignals *uint64 `json:"max_pending_signals"` MaxMessageQueue *uint64 `json:"max_message_queue"` PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkImage string `json:"pre_fork_image"` PreForkCmd string `json:"pre_fork_pool_cmd"` PreForkUseOnce uint64 `json:"pre_fork_use_once"` PreForkNetworks string `json:"pre_fork_networks"` EnableNBResourceTracker bool `json:"enable_nb_resource_tracker"` MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"` DisableReadOnlyRootFs bool `json:"disable_readonly_rootfs"` DisableDebugUserLogs bool `json:"disable_debug_user_logs"` IOFSEnableTmpfs bool `json:"iofs_enable_tmpfs"` EnableFDKDebugInfo bool `json:"enable_fdk_debug_info"` IOFSAgentPath string `json:"iofs_path"` IOFSMountRoot string `json:"iofs_mount_root"` IOFSOpts string `json:"iofs_opts"` ImageCleanMaxSize uint64 `json:"image_clean_max_size"` ImageCleanExemptTags string `json:"image_clean_exempt_tags"` ImageEnableVolume bool `json:"image_enable_volume"` }
Config specifies various settings for an agent
type ContainerState ¶
type ContainerState interface { UpdateState(ctx context.Context, newState ContainerStateType, call *call) GetState() string }
func NewContainerState ¶
func NewContainerState() ContainerState
type ContainerStateType ¶
type ContainerStateType int
const ( ContainerStateNone ContainerStateType = iota // uninitialized ContainerStateWait // resource (cpu + mem) waiting ContainerStateStart // launching ContainerStateIdle // running: idle but not paused ContainerStatePaused // running: idle but paused ContainerStateBusy // running: busy ContainerStateDone // exited/failed/done ContainerStateMax )
type DataAccess ¶
type DataAccess interface { ReadDataAccess }
XXX(reed): replace all uses of ReadDataAccess with DataAccess or vice versa, whatever is easier
type DetachedResponseWriter ¶
type DetachedResponseWriter struct { Headers http.Header // contains filtered or unexported fields }
func NewDetachedResponseWriter ¶
func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter
func (*DetachedResponseWriter) Header ¶
func (w *DetachedResponseWriter) Header() http.Header
func (*DetachedResponseWriter) Status ¶
func (w *DetachedResponseWriter) Status() int
func (*DetachedResponseWriter) Write ¶
func (w *DetachedResponseWriter) Write(data []byte) (int, error)
func (*DetachedResponseWriter) WriteHeader ¶
func (w *DetachedResponseWriter) WriteHeader(statusCode int)
type EvictToken ¶
type EvictToken struct { C chan struct{} DoneChan chan struct{} // contains filtered or unexported fields }
func (*EvictToken) SetEvictable ¶
func (token *EvictToken) SetEvictable(isEvictable bool)
type Evictor ¶
type Evictor interface { // CreateEvictToken creates an eviction token to be used in evictor tracking. Returns // an eviction token. CreateEvictToken(slotId string, mem, cpu uint64) *EvictToken // DeleteEvictToken deletes an eviction token from evictor system DeleteEvictToken(token *EvictToken) // PerformEviction performs evictions to satisfy cpu & mem arguments // and returns a slice of channels for evictions performed. The callers // can wait on these channel to ensure evictions are completed. PerformEviction(slotId string, mem, cpu uint64) []chan struct{} }
func NewEvictor ¶
func NewEvictor() Evictor
type LBAgentOption ¶
type LBAgentOption func(*lbAgent) error
func WithLBAgentConfig ¶
func WithLBAgentConfig(cfg *Config) LBAgentOption
WithLBAgentConfig sets the agent config to the provided Config
func WithLBCallOptions ¶
func WithLBCallOptions(opts ...CallOpt) LBAgentOption
WithLBCallOptions adds additional call options to each call created from GetCall, these options will be executed after any other options supplied to GetCall
func WithLBCallOverrider ¶
func WithLBCallOverrider(fn CallOverrider) LBAgentOption
WithLBCallOverrider is for LB agents to register a CallOverrider to modify a Call and extensions
type LogStreamer ¶
type LogStreamer interface {
StreamLogs(runner.RunnerProtocol_StreamLogsServer) error
}
Log Streamer to manage log gRPC interface
type MockAgent ¶
func (*MockAgent) AddCallListener ¶
func (m *MockAgent) AddCallListener(f fnext.CallListener)
type Option ¶
type Option func(*agent) error
Option configures an agent at startup
func WithCallOptions ¶
WithCallOptions adds additional call options to each call created from GetCall, these options will be executed after any other options supplied to GetCall
func WithCallOverrider ¶
func WithCallOverrider(fn CallOverrider) Option
WithCallOverrider registers register a CallOverrider to modify a Call and extensions on call construction
func WithConfig ¶
WithConfig sets the agent config to the provided config
func WithDockerDriver ¶
WithDockerDriver Provides a customer driver to agent
type PureRunnerOption ¶
type PureRunnerOption func(*pureRunner) error
func PureRunnerWithAgent ¶
func PureRunnerWithAgent(a Agent) PureRunnerOption
func PureRunnerWithConfigFunc ¶
func PureRunnerWithConfigFunc(configFunc func(context.Context, *runner.ConfigMsg) (*runner.ConfigStatus, error)) PureRunnerOption
func PureRunnerWithDetached ¶
func PureRunnerWithDetached() PureRunnerOption
func PureRunnerWithGRPCServerOptions ¶
func PureRunnerWithGRPCServerOptions(options ...grpc.ServerOption) PureRunnerOption
func PureRunnerWithKdumpsOnDisk ¶
func PureRunnerWithKdumpsOnDisk(numKdumps uint64) PureRunnerOption
PureRunnerWithKdumpsOnDisk returns a PureRunnerOption that indicates that kdumps have been found on disk. The argument numKdump is a counter that indicates how many dumps were on disk at the time the runner was created.
func PureRunnerWithLogStreamer ¶
func PureRunnerWithLogStreamer(logStreamer LogStreamer) PureRunnerOption
func PureRunnerWithSSL ¶
func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption
func PureRunnerWithStatusImage ¶
func PureRunnerWithStatusImage(imgName string) PureRunnerOption
PureRunnerWithStatusImage returns a PureRunnerOption that annotates a PureRunner with a statusImageName attribute. This attribute names an image name to use for the status checks. Optionally, the status image can be pre-loaded into docker using FN_DOCKER_LOAD_FILE to avoid docker pull during status checks.
func PureRunnerWithStatusNetworkEnabler ¶
func PureRunnerWithStatusNetworkEnabler(barrierPath string) PureRunnerOption
type ReadDataAccess ¶
type ReadDataAccess interface { GetAppID(ctx context.Context, appName string) (string, error) // GetAppByID abstracts querying the datastore for an app. GetAppByID(ctx context.Context, appID string) (*models.App, error) GetTriggerBySource(ctx context.Context, appID string, triggerType, source string) (*models.Trigger, error) GetFnByID(ctx context.Context, fnID string) (*models.Fn, error) }
ReadDataAccess represents read operations required to operate a load balancer node
func NewCachedDataAccess ¶
func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess
NewCachedDataAccess is a wrapper that caches entries temporarily
func NewMetricReadDataAccess ¶
func NewMetricReadDataAccess(rda ReadDataAccess) ReadDataAccess
NewMetricReadDataAccess adds metrics to a ReadDataAccess
type RequestState ¶
type RequestState interface {
UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}
func NewRequestState ¶
func NewRequestState() RequestState
type RequestStateType ¶
type RequestStateType int
const ( RequestStateNone RequestStateType = iota // uninitialized RequestStateWait // request is waiting RequestStateExec // request is executing RequestStateDone // request is done RequestStateMax )
type ResourceToken ¶
type ResourceTracker ¶
type ResourceTracker interface { // GetResourceToken returns a resource token. // Memory is expected to be provided in MB units. GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs) ResourceToken // GetResourceTokenNB is the non-blocking equivalent of GetResourceToken. The return value is the // resource token itself. If the request cannot be satisfied, a token with CapacityFull error set is // returned. // Memory is expected to be provided in MB units. GetResourceTokenNB(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs) ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this machine. // Memory is expected to be provided in MB units. IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool // Retrieve current stats/usage GetUtilization() ResourceUtilization }
A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: disk, network IO for future
func NewResourceTracker ¶
func NewResourceTracker(cfg *Config) ResourceTracker
type ResourceUtilization ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package drivers is intended as a general purpose container abstraction library.
|
Package drivers is intended as a general purpose container abstraction library. |
docker
Package docker provides a Docker driver for Fn.
|
Package docker provides a Docker driver for Fn. |
mock
Package mock provides a fake Driver implementation that is only used for testing.
|
Package mock provides a fake Driver implementation that is only used for testing. |