Documentation
¶
Index ¶
- Constants
- func NewSlotQueue(key string) *slotQueue
- func NewSlotQueueMgr() *slotQueueMgr
- func StatsComplete(ctx context.Context)
- func StatsDequeue(ctx context.Context)
- func StatsDequeueAndFail(ctx context.Context)
- func StatsDequeueAndStart(ctx context.Context)
- func StatsEnqueue(ctx context.Context)
- func StatsFailed(ctx context.Context)
- func StatsIncrementErrors(ctx context.Context)
- func StatsIncrementTimedout(ctx context.Context)
- func StatsIncrementTooBusy(ctx context.Context)
- type Agent
- type CachedDataAccess
- type Call
- type CallOpt
- type ContainerState
- type ContainerStateType
- type DataAccess
- type Param
- type Params
- type RequestState
- type RequestStateType
- type ResourceToken
- type ResourceTracker
- type Slot
Constants ¶
const ( Mem1MB = 1024 * 1024 Mem1GB = 1024 * 1024 * 1024 )
Variables ¶
This section is empty.
Functions ¶
func NewSlotQueue ¶
func NewSlotQueue(key string) *slotQueue
func NewSlotQueueMgr ¶
func NewSlotQueueMgr() *slotQueueMgr
func StatsComplete ¶
func StatsDequeue ¶
Call when a function has been queued but cannot be started because of an error
func StatsDequeueAndFail ¶
func StatsDequeueAndStart ¶
func StatsEnqueue ¶
func StatsFailed ¶
func StatsIncrementErrors ¶
func StatsIncrementTimedout ¶
func StatsIncrementTooBusy ¶
Types ¶
type Agent ¶
type Agent interface { // GetCall will return a Call that is executable by the Agent, which // can be built via various CallOpt's provided to the method. GetCall(...CallOpt) (Call, error) // Submit will attempt to execute a call locally, a Call may store information // about itself in its Start and End methods, which will be called in Submit // immediately before and after the Call is executed, respectively. An error // will be returned if there is an issue executing the call or the error // may be from the call's execution itself (if, say, the container dies, // or the call times out). Submit(Call) error // Close will wait for any outstanding calls to complete and then exit. // Close is not safe to be called from multiple threads. io.Closer // Return the http.Handler used to handle Prometheus metric requests PromHandler() http.Handler AddCallListener(fnext.CallListener) // Enqueue is to use the agent's sweet sweet client bindings to remotely // queue async tasks and should be removed from Agent interface ASAP. Enqueue(context.Context, *models.Call) error }
func New ¶
func New(da DataAccess) Agent
type CachedDataAccess ¶
type CachedDataAccess struct { DataAccess // contains filtered or unexported fields }
CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.
type Call ¶
type Call interface { // Model will return the underlying models.Call configuration for this call. // TODO we could respond to async correctly from agent but layering, this // is only because the front end has different responses based on call type. // try to discourage use elsewhere until this gets pushed down more... Model() *models.Call // Start will be called before this call is executed, it may be used to // guarantee mutual exclusion, check docker permissions, update timestamps, // etc. // TODO Start and End can likely be unexported as they are only used in the agent, // and on a type which is constructed in a specific agent. meh. Start(ctx context.Context) error // End will be called immediately after attempting a call execution, // regardless of whether the execution failed or not. An error will be passed // to End, which if nil indicates a successful execution. Any error returned // from End will be returned as the error from Submit. End(ctx context.Context, err error) error }
type CallOpt ¶
type CallOpt func(a *agent, c *call) error
TODO build w/o closures... lazy
func FromModel ¶
TODO this currently relies on FromRequest having happened before to create the model here, to be a fully qualified model. We probably should double check but having a way to bypass will likely be what's used anyway unless forced.
func WithContext ¶
type ContainerState ¶
type ContainerState interface {
UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue)
}
func NewContainerState ¶
func NewContainerState() ContainerState
type ContainerStateType ¶
type ContainerStateType int
const ( ContainerStateNone ContainerStateType = iota // uninitialized ContainerStateWait // resource (cpu + mem) waiting ContainerStateStart // launching ContainerStateIdle // running idle ContainerStateBusy // running busy ContainerStateDone // exited/failed/done ContainerStateMax )
type DataAccess ¶
type DataAccess interface { // GetApp abstracts querying the datastore for an app. GetApp(ctx context.Context, appName string) (*models.App, error) // GetRoute abstracts querying the datastore for a route within an app. GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error) // Enqueue will add a Call to the queue (ultimately forwards to mq.Push). Enqueue(ctx context.Context, mCall *models.Call) error // Dequeue will query the queue for the next available Call that can be run // by this Agent, and reserve it (ultimately forwards to mq.Reserve). Dequeue(ctx context.Context) (*models.Call, error) // Start will attempt to start the provided Call within an appropriate // context. Start(ctx context.Context, mCall *models.Call) error // Finish will notify the system that the Call has been processed, and // fulfill the reservation in the queue if the call came from a queue. Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error }
DataAccess abstracts the datastore and message queue operations done by the agent, so that API nodes and runner nodes can work with the same interface but actually operate on the data in different ways (by direct access or by mediation through an API node).
func NewCachedDataAccess ¶
func NewCachedDataAccess(da DataAccess) DataAccess
func NewDirectDataAccess ¶
func NewDirectDataAccess(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) DataAccess
type RequestState ¶
type RequestState interface {
UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}
func NewRequestState ¶
func NewRequestState() RequestState
type RequestStateType ¶
type RequestStateType int
const ( RequestStateNone RequestStateType = iota // uninitialized RequestStateWait // request is waiting RequestStateExec // request is executing RequestStateDone // request is done RequestStateMax )
type ResourceToken ¶
type ResourceTracker ¶
type ResourceTracker interface { // WaitAsyncResource returns a channel that will send once when there seem to be sufficient // resource levels to run an async task, it is up to the implementer to create policy here. WaitAsyncResource(ctx context.Context) chan struct{} // GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled, // the channel will never receive anything. If it is not possible to fulfill this resource, the channel // will never receive anything (use IsResourcePossible). If a resource token is available for the provided // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. // Memory is expected to be provided in MB units. GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync bool) <-chan ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool // returns number of waiters waiting for a resource token blocked on condition variable GetResourceTokenWaiterCount() uint64 }
A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: add cpu, disk, network IO for future
func NewResourceTracker ¶
func NewResourceTracker() ResourceTracker