Documentation ¶
Overview ¶
Package agent defines the Agent interface and related concepts. An agent is an entity that knows how to execute an Fn function.
The Agent Interface ¶
The Agent interface is the heart of this package. Agent exposes an api to create calls from various parameters and then execute those calls. An Agent has a few roles:
- manage the memory pool for a given server
- manage the container lifecycle for calls (hot+cold)
- execute calls against containers
- invoke Start and End for each call appropriately
- check the mq for any async calls, and submit them
For more information about how an agent executes a call see the documentation on the Agent interface.
Variants ¶
There are two flavors of runner, the local Docker agent and a load-balancing agent. To create an agent that uses Docker containers to execute calls, use New().
To create an agent that can load-balance across a pool of sub-agents, use NewLBAgent().
Index ¶
- Constants
- Variables
- func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool
- func GetGroupID(call *models.Call) string
- func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle
- func NewHotContainer(call *call, cfg *AgentConfig) (*container, func())
- func NewSlotQueue(key string) *slotQueue
- func NewSlotQueueMgr() *slotQueueMgr
- func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN string, ...) pool.RunnerPool
- func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool.Runner, error)
- type Agent
- func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ...) (Agent, error)
- func New(da DataAccess, options ...AgentOption) Agent
- func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error)
- func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ...) (Agent, error)
- func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (Agent, error)
- type AgentConfig
- type AgentOption
- type CachedDataAccess
- type Call
- type CallOpt
- type CapacityGate
- type ContainerState
- type ContainerStateType
- type DataAccess
- type Param
- type Params
- type RequestState
- type RequestStateType
- type ResourceToken
- type ResourceTracker
- type Slot
Constants ¶
const ( EnvDockerNetworks = "FN_DOCKER_NETWORKS" EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" EnvEjectIdle = "FN_EJECT_IDLE_MSECS" EnvHotPoll = "FN_HOT_POLL_MSECS" EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" MaxDisabledMsecs = time.Duration(math.MaxInt64) )
const ( Mem1MB = 1024 * 1024 Mem1GB = 1024 * 1024 * 1024 )
Variables ¶
var ( ErrorExpectedTry = errors.New("Protocol failure: expected ClientMsg_Try") ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data") )
var ( ErrorPoolClosed = errors.New("Runner pool closed") ErrorPoolRunnerExists = errors.New("Runner already exists") )
var CapacityFull = errors.New("max capacity reached")
var (
ErrorRunnerClosed = errors.New("Runner is closed")
)
Functions ¶
func DefaultStaticRunnerPool ¶
func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool
func GetGroupID ¶
func NewCallHandle ¶
func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle
func NewHotContainer ¶
func NewHotContainer(call *call, cfg *AgentConfig) (*container, func())
func NewSlotQueue ¶
func NewSlotQueue(key string) *slotQueue
func NewSlotQueueMgr ¶
func NewSlotQueueMgr() *slotQueueMgr
func NewStaticRunnerPool ¶
func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN string, runnerFactory pool.MTLSRunnerFactory) pool.RunnerPool
Types ¶
type Agent ¶
type Agent interface { // GetCall will return a Call that is executable by the Agent, which // can be built via various CallOpt's provided to the method. GetCall(...CallOpt) (Call, error) // Submit will attempt to execute a call locally, a Call may store information // about itself in its Start and End methods, which will be called in Submit // immediately before and after the Call is executed, respectively. An error // will be returned if there is an issue executing the call or the error // may be from the call's execution itself (if, say, the container dies, // or the call times out). Submit(Call) error // Close will wait for any outstanding calls to complete and then exit. // Close is not safe to be called from multiple threads. io.Closer AddCallListener(fnext.CallListener) // Enqueue is to use the agent's sweet sweet client bindings to remotely // queue async tasks and should be removed from Agent interface ASAP. Enqueue(context.Context, *models.Call) error // GetAppID is to get the match of an app name to its ID GetAppID(ctx context.Context, appName string) (string, error) // GetAppByID is to get the app by ID GetAppByID(ctx context.Context, appID string) (*models.App, error) // GetRoute is to get the route by appId and path GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) }
Agent exposes an api to create calls from various parameters and then submit those calls, it also exposes a 'safe' shutdown mechanism via its Close method. Agent has a few roles:
- manage the memory pool for a given server
- manage the container lifecycle for calls (hot+cold)
- execute calls against containers
- invoke Start and End for each call appropriately
- check the mq for any async calls, and submit them
Overview: Upon submission of a call, Agent will start the call's timeout timer immediately. If the call is hot, Agent will attempt to find an active hot container for that route, and if necessary launch another container. Cold calls will launch one container each. Cold calls will get container input and output directly, whereas hot calls will be able to read/write directly from/to a pipe in a container via Dispatch. If it's necessary to launch a container, first an attempt will be made to try to reserve the ram required while waiting for any hot 'slot' to become available [if applicable]. If there is an error launching the container, an error will be returned provided the call has not yet timed out or found another hot 'slot' to execute in [if applicable]. call.Start will be called immediately before starting a container, if cold (i.e. after pulling), or immediately before sending any input, if hot. call.End will be called regardless of the timeout timer's status if the call was executed, and that error returned may be returned from Submit.
func DefaultPureRunner ¶
func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string) (Agent, error)
func New ¶
func New(da DataAccess, options ...AgentOption) Agent
New creates an Agent that executes functions locally as Docker containers.
func NewLBAgent ¶
func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error)
NewLBAgent creates an Agent that knows how to load-balance function calls across a group of runner nodes.
func NewPureRunner ¶
func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (Agent, error)
func UnsecuredPureRunner ¶
func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (Agent, error)
type AgentConfig ¶
type AgentConfig struct { MinDockerVersion string `json:"min_docker_version"` DockerNetworks string `json:"docker_networks"` FreezeIdle time.Duration `json:"freeze_idle_msecs"` EjectIdle time.Duration `json:"eject_idle_msecs"` HotPoll time.Duration `json:"hot_poll_msecs"` HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` CallEndTimeout time.Duration `json:"call_end_timeout"` MaxCallEndStacking uint64 `json:"max_call_end_stacking"` MaxResponseSize uint64 `json:"max_response_size_bytes"` MaxLogSize uint64 `json:"max_log_size_bytes"` MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` MaxTotalMemory uint64 `json:"max_total_memory_bytes"` MaxFsSize uint64 `json:"max_fs_size_mb"` PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkImage string `json:"pre_fork_image"` PreForkCmd string `json:"pre_fork_pool_cmd"` PreForkUseOnce uint64 `json:"pre_fork_use_once"` PreForkNetworks string `json:"pre_fork_networks"` EnableNBResourceTracker bool `json:"enable_nb_resource_tracker"` }
func NewAgentConfig ¶
func NewAgentConfig() (*AgentConfig, error)
type AgentOption ¶
type AgentOption func(*agent) error
func WithConfig ¶
func WithConfig(cfg *AgentConfig) AgentOption
type CachedDataAccess ¶
type CachedDataAccess struct { DataAccess // contains filtered or unexported fields }
CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.
func (*CachedDataAccess) GetAppByID ¶
type Call ¶
type Call interface { // Model will return the underlying models.Call configuration for this call. // TODO we could respond to async correctly from agent but layering, this // is only because the front end has different responses based on call type. // try to discourage use elsewhere until this gets pushed down more... Model() *models.Call // Start will be called before this call is executed, it may be used to // guarantee mutual exclusion, check docker permissions, update timestamps, // etc. // TODO Start and End can likely be unexported as they are only used in the agent, // and on a type which is constructed in a specific agent. meh. Start(ctx context.Context) error // End will be called immediately after attempting a call execution, // regardless of whether the execution failed or not. An error will be passed // to End, which if nil indicates a successful execution. Any error returned // from End will be returned as the error from Submit. End(ctx context.Context, err error) error }
type CallOpt ¶
type CallOpt func(c *call) error
TODO build w/o closures... lazy
func FromModel ¶
TODO this currently relies on FromRequest having happened before to create the model here, to be a fully qualified model. We probably should double check but having a way to bypass will likely be what's used anyway unless forced.
func FromModelAndInput ¶
func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt
func FromRequest ¶
func WithContext ¶
type CapacityGate ¶
type CapacityGate interface { // CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is // guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units // of capacity have been committed. CheckAndReserveCapacity(units uint64) error // ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under // zero; implementations are free to panic in that case. ReleaseCapacity(units uint64) }
type ContainerState ¶
type ContainerState interface {
UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue)
}
func NewContainerState ¶
func NewContainerState() ContainerState
type ContainerStateType ¶
type ContainerStateType int
const ( ContainerStateNone ContainerStateType = iota // uninitialized ContainerStateWait // resource (cpu + mem) waiting ContainerStateStart // launching ContainerStateIdle // running idle ContainerStateBusy // running busy ContainerStateDone // exited/failed/done ContainerStateMax )
type DataAccess ¶
type DataAccess interface { GetAppID(ctx context.Context, appName string) (string, error) // GetAppByID abstracts querying the datastore for an app. GetAppByID(ctx context.Context, appID string) (*models.App, error) // GetRoute abstracts querying the datastore for a route within an app. GetRoute(ctx context.Context, appID string, routePath string) (*models.Route, error) // Enqueue will add a Call to the queue (ultimately forwards to mq.Push). Enqueue(ctx context.Context, mCall *models.Call) error // Dequeue will query the queue for the next available Call that can be run // by this Agent, and reserve it (ultimately forwards to mq.Reserve). Dequeue(ctx context.Context) (*models.Call, error) // Start will attempt to start the provided Call within an appropriate // context. Start(ctx context.Context, mCall *models.Call) error // Finish will notify the system that the Call has been processed, and // fulfill the reservation in the queue if the call came from a queue. Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error }
DataAccess abstracts the datastore and message queue operations done by the agent, so that API nodes and runner nodes can work with the same interface but actually operate on the data in different ways (by direct access or by mediation through an API node).
func NewCachedDataAccess ¶
func NewCachedDataAccess(da DataAccess) DataAccess
func NewDirectDataAccess ¶
func NewDirectDataAccess(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) DataAccess
type RequestState ¶
type RequestState interface {
UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}
func NewRequestState ¶
func NewRequestState() RequestState
type RequestStateType ¶
type RequestStateType int
const ( RequestStateNone RequestStateType = iota // uninitialized RequestStateWait // request is waiting RequestStateExec // request is executing RequestStateDone // request is done RequestStateMax )
type ResourceToken ¶
type ResourceTracker ¶
type ResourceTracker interface { // WaitAsyncResource returns a channel that will send once when there seem to be sufficient // resource levels to run an async task, it is up to the implementer to create policy here. WaitAsyncResource(ctx context.Context) chan struct{} // GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled, // the channel will never receive anything. If it is not possible to fulfill this resource, the channel // will never receive anything (use IsResourcePossible). If a resource token is available for the provided // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. // if isNB is set, resource check is done and error token is returned without blocking. // if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow // a sync only reserve area) Memory is expected to be provided in MB units. GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool // returns number of waiters waiting for a resource token blocked on condition variable GetResourceTokenWaiterCount() uint64 }
A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: add cpu, disk, network IO for future
func NewResourceTracker ¶
func NewResourceTracker(cfg *AgentConfig) ResourceTracker
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package drivers is intended as a general purpose container abstraction library.
|
Package drivers is intended as a general purpose container abstraction library. |
docker
Package docker provides a Docker driver for Fn.
|
Package docker provides a Docker driver for Fn. |
mock
Package mock provides a fake Driver implementation that is only used for testing.
|
Package mock provides a fake Driver implementation that is only used for testing. |
Package runner is a generated protocol buffer package.
|
Package runner is a generated protocol buffer package. |
Package protocol defines the protocol between the Fn Agent and the code running inside of a container.
|
Package protocol defines the protocol between the Fn Agent and the code running inside of a container. |