Documentation ¶
Overview ¶
Package server is the webserver which sends simulation requests to the simulator.
Index ¶
- Variables
- func GetEnvInt(key string, defaultValue int) int
- func LogConfig(log *zap.SugaredLogger)
- func LoggingMiddleware(log *zap.SugaredLogger, next http.Handler) http.Handler
- type Node
- type NodePool
- type NodeURIPayload
- type PrioQueue
- type RedisState
- type Server
- type ServerOpts
- type SimRequest
- type SimResponse
- type Webserver
Constants ¶
This section is empty.
Variables ¶
var ( JobChannelBuffer = GetEnvInt("JOB_CHAN_BUFFER", 2) // buffer for JobC in backends (for transporting jobs from server -> backend node) RequestMaxTries = GetEnvInt("RETRIES_MAX", 3) // 3 tries means it will be retried 2 additional times, and on third error would fail PayloadMaxBytes = GetEnvInt("PAYLOAD_MAX_KB", 8192) * 1024 // Max payload size in bytes. If a payload sent to the webserver is larger, it returns "400 Bad Request". MaxQueueItemsFastTrack = GetEnvInt("ITEMS_FASTTRACK_MAX", 0) // Max number of items in fast-track queue. 0 means no limit. MaxQueueItemsHighPrio = GetEnvInt("ITEMS_HIGHPRIO_MAX", 0) // Max number of items in high-prio queue. 0 means no limit. MaxQueueItemsLowPrio = GetEnvInt("ITEMS_LOWPRIO_MAX", 0) // Max number of items in low-prio queue. 0 means no limit. // How often fast-track queue items should be popped before popping a high-priority item FastTrackPerHighPrio = GetEnvInt("ITEMS_FASTTRACK_PER_HIGHPRIO", 2) FastTrackDrainFirst = os.Getenv("FASTTRACK_DRAIN_FIRST") == "1" // whether to fully drain the fast-track queue first RequestTimeout = time.Duration(GetEnvInt("REQUEST_TIMEOUT", 5)) * time.Second // Time between creation and receive in the node worker, after which a SimRequest will not be processed anymore ServerJobSendTimeout = time.Duration(GetEnvInt("JOB_SEND_TIMEOUT", 2)) * time.Second // How long the server tries to send a job into the nodepool for processing ProxyRequestTimeout = time.Duration(GetEnvInt("REQUEST_PROXY_TIMEOUT", 3)) * time.Second // HTTP request timeout for proxy requests to the backend node )
var ( ErrRequestTimeout = errors.New("request timeout hit before processing") ErrNodeTimeout = errors.New("node timeout") ErrNoNodesAvailable = errors.New("no nodes available") )
var ( RedisPrefix = "prio-load-balancer:" RedisKeyNodes = RedisPrefix + "prio-load-balancer:nodes" )
Functions ¶
func LogConfig ¶
func LogConfig(log *zap.SugaredLogger)
func LoggingMiddleware ¶
LoggingMiddleware logs the incoming HTTP request & its duration.
Types ¶
type Node ¶
func NewNode ¶
func NewNode(log *zap.SugaredLogger, uri string, jobC chan *SimRequest, numWorkers int32) (*Node, error)
func (*Node) HealthCheck ¶
func (*Node) ProxyRequest ¶ added in v0.5.0
func (*Node) StartWorkers ¶
func (n *Node) StartWorkers()
StartWorkers spawns the proxy workers in goroutines. Workers that are already running will be cancelled.
func (*Node) StopWorkers ¶
func (n *Node) StopWorkers()
func (*Node) StopWorkersAndWait ¶
func (n *Node) StopWorkersAndWait()
type NodePool ¶
type NodePool struct { JobC chan *SimRequest // contains filtered or unexported fields }
func NewNodePool ¶
func NewNodePool(log *zap.SugaredLogger, redisState *RedisState, numWorkersPerNode int32) *NodePool
func (*NodePool) AddNode ¶
AddNode adds a node to the pool and starts the workers. If a new node is added, the list of nodes is saved to redis.
func (*NodePool) LoadNodesFromRedis ¶
type NodeURIPayload ¶
type NodeURIPayload struct {
URI string `json:"uri"`
}
type PrioQueue ¶
type PrioQueue struct {
// contains filtered or unexported fields
}
PrioQueue has 3 queues: fastTrack, highPrio and lowPrio - items will be popped 1:1 from fastTrack and highPrio, until both are empty - then items from lowPrio queue are used
maybe we should configure that every n-th item is used from low-prio?
func NewPrioQueue ¶
func (*PrioQueue) Close ¶
func (q *PrioQueue) Close()
Close disallows adding any new items with Push(), and lets readers using Pop() return nil if queue is empty
func (*PrioQueue) CloseAndWait ¶
func (q *PrioQueue) CloseAndWait()
CloseAndWait closes the queue and waits until the queue is empty
func (*PrioQueue) NumRequests ¶
func (*PrioQueue) Pop ¶
func (q *PrioQueue) Pop() (nextReq *SimRequest)
Pop returns the next Bid. If no task in queue, blocks until there is one again. First drains the high-prio queue, then the low-prio one. Will return nil only after calling Close() when the queue is empty
func (*PrioQueue) Push ¶
func (q *PrioQueue) Push(r *SimRequest) bool
Push adds a new item to the end of the queue. Returns true if added, false if queue is closed or at max capacity
type RedisState ¶
type RedisState struct {
RedisClient *redis.Client
}
func NewRedisState ¶
func NewRedisState(redisURI string) (*RedisState, error)
func (*RedisState) GetNodes ¶
func (s *RedisState) GetNodes() (nodeUris []string, err error)
func (*RedisState) SaveNodes ¶
func (s *RedisState) SaveNodes(nodeUris []string) error
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the overall load balancer server
func NewServer ¶
func NewServer(opts ServerOpts) (*Server, error)
NewServer creates a new Server instance, loads the nodes from Redis and starts the node workers
func (*Server) AddNode ¶
AddNode adds a new execution node to the pool and starts the workers. If a new node is added, the list of nodes is saved to redis.
func (*Server) NumNodeWorkersAlive ¶
NumNodeWorkersAlive returns the number of currently active node workers
type ServerOpts ¶
type ServerOpts struct { Log *zap.SugaredLogger HTTPAddrPtr string // listen address for the webserver RedisURI string // (optional) URI for the redis instance. If empty then don't use Redis. WorkersPerNode int32 // Number of concurrent workers per execution node }
type SimRequest ¶
type SimRequest struct { // can be none of, or one of high-prio / fast-track IsHighPrio bool IsFastTrack bool Payload []byte ResponseC chan SimResponse Cancelled bool CreatedAt time.Time Tries int }
func NewSimRequest ¶
func NewSimRequest(payload []byte, isHighPrio, IsFastTrack bool) *SimRequest
func (*SimRequest) SendResponse ¶
func (r *SimRequest) SendResponse(resp SimResponse) (wasSent bool)
SendResponse sends the response to ResponseC. If noone is listening on the channel, it is dropped.
type SimResponse ¶
type Webserver ¶
type Webserver struct {
// contains filtered or unexported fields
}
func NewWebserver ¶
func (*Webserver) HandleNodesRequest ¶
func (s *Webserver) HandleNodesRequest(w http.ResponseWriter, req *http.Request)
func (*Webserver) HandleQueueRequest ¶
func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request)
func (*Webserver) HandleRootRequest ¶
func (s *Webserver) HandleRootRequest(w http.ResponseWriter, req *http.Request)