Documentation ¶
Index ¶
- Variables
- func DeleteReplica(conf *Conf, db DB, pk ReplicaPK) (err error)
- func DeleteWorker(conf *Conf, db DB, pk WorkerPK) (err error)
- func FmtPoolQueueName(conf *Conf, poolID string) string
- func FmtPoolQueueURL(conf *Conf, poolID string) string
- func FmtReplicaID(datasetID, workerID string) string
- func FmtWorkerQueueName(conf *Conf, poolID, workerID string) string
- func FmtWorkerQueueURL(conf *Conf, poolID, workerID string) string
- func HandleGateway(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
- func HandleRelease(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
- func HandleSchedule(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
- func Mux(conf *Conf, svc *Services) http.Handler
- func PutNewAlloc(conf *Conf, db DB, alloc *Alloc) (err error)
- func PutNewPool(conf *Conf, db DB, pool *Pool) (err error)
- func PutNewWorker(conf *Conf, db DB, worker *Worker) (err error)
- func PutReplica(conf *Conf, db DB, replica *Replica) (err error)
- func ReceiveEvals(conf *Conf, svc *Services, pool *Pool) (err error)
- func UpdateAllocTTL(conf *Conf, db DB, ttl int64, apk AllocPK) (err error)
- func UpdatePoolTTL(conf *Conf, db DB, ttl int64, pk PoolPK) (err error)
- func UpdateWorkerTTL(conf *Conf, db DB, ttl int64, pk WorkerPK) (err error)
- type Alloc
- type AllocPK
- type Conf
- type DB
- type Eval
- type GatewayRequest
- type GatewayResponse
- type Handler
- type Pool
- type PoolPK
- type Replica
- type ReplicaPK
- type Services
- type Worker
- type WorkerPK
Constants ¶
This section is empty.
Variables ¶
var ( //ErrAllocExists means a alloc exists while it was expected not to ErrAllocExists = errors.New("alloc already exists") //ErrAllocNotExists means a alloc was not found while expecting it to exist ErrAllocNotExists = errors.New("alloc doesn't exist") )
var ( //ErrPoolExists means a pool exists while it was expected not to ErrPoolExists = errors.New("pool already exists") //ErrPoolNotExists means a pool was not found while expecting it to exist ErrPoolNotExists = errors.New("pool doesn't exist") )
var ( //ErrWorkerExists means a worker exists while it was expected not to ErrWorkerExists = errors.New("worker already exists") //ErrWorkerNotExists means a worker was not found while expecting it to exist ErrWorkerNotExists = errors.New("worker doesn't exist") )
var Handlers = map[*regexp.Regexp]Handler{ regexp.MustCompile(`-schedule$`): HandleSchedule, regexp.MustCompile(`-release$`): HandleRelease, regexp.MustCompile(`-gateway$`): HandleGateway, }
Handlers map arn suffixes to actual event handlers
Functions ¶
func DeleteReplica ¶
DeleteReplica deletes a replica by pk
func DeleteWorker ¶
DeleteWorker deletes a worker by pk
func FmtPoolQueueName ¶
FmtPoolQueueName will format a sqs queue name consistently
func FmtPoolQueueURL ¶
FmtPoolQueueURL is able to "predict" an sqs queue url from configurations
func FmtReplicaID ¶
FmtReplicaID formats the combined pool and worker id of a replica
func FmtWorkerQueueName ¶
FmtWorkerQueueName will format a sqs queue name consistently
func FmtWorkerQueueURL ¶
FmtWorkerQueueURL is able to "predict" an sqs queue url from configurations
func HandleGateway ¶
func HandleGateway(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
HandleGateway takes invocations from the API Gateway and handles them as HTTP requests to return HTTP responses based on restful principles
func HandleRelease ¶
func HandleRelease(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
HandleRelease is a Lambda handler that periodically queries a pool's expired allocations, replicas and workers
func HandleSchedule ¶
func HandleSchedule(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
HandleSchedule is a Lambda handler that periodically reads from the scheduling queue and queries the workers table for available capacity. If the capacity can be claimed an allocation is created.
func PutNewAlloc ¶
PutNewAlloc will put an alloc with the condition the pk doesn't exist yet
func PutNewPool ¶
PutNewPool will put an pool with the condition the pk doesn't exist yet
func PutNewWorker ¶
PutNewWorker will put an worker with the condition the pk doesn't exist yet
func PutReplica ¶
PutReplica will put an replica with the condition the pk doesn't exist yet
func ReceiveEvals ¶
ReceiveEvals will long poll for scheduling messages on the scheduling queue of the pool
func UpdateAllocTTL ¶
UpdateAllocTTL under the condition that it exists
func UpdatePoolTTL ¶
UpdatePoolTTL under the condition that it exists
Types ¶
type Alloc ¶
type Alloc struct { AllocPK TTL int64 `dynamodbav:"ttl"` WorkerID string `dynamodbav:"wrk"` Eval *Eval `dynamodbav:"eval"` }
Alloc represents a planned execution
type Conf ¶
type Conf struct { Deployment string `envconfig:"DEPLOYMENT"` AWSAccountID string `envconfig:"AWS_ACCOUNT_ID"` AWSAccessKeyID string `envconfig:"AWS_ACCESS_KEY_ID"` AWSSecretAccessKey string `envconfig:"AWS_SECRET_ACCESS_KEY"` AWSRegion string `envconfig:"AWS_REGION"` StripBaseMappings int `envconfig:"STRIP_BASE_MAPPINGS"` PoolTTL int64 `envconfig:"POOL_TTL"` WorkerTTL int64 `envconfig:"WORKER_TTL"` ReplicaTTL int64 `envconfig:"REPLICA_TTL"` AllocTTL int64 `envconfig:"ALLOC_TTL"` MaxRetry int `envconfig:"MAX_RETRY"` ScheduleDLQueueURL string `envconfig:"SCHEDULE_DLQUEUE_URL"` PoolsTableName string `envconfig:"TABLE_NAME_POOLS"` ReplicasTableName string `envconfig:"TABLE_NAME_REPLICAS"` ReplicasTTLIdxName string `envconfig:"TABLE_IDX_REPLICAS_TTL"` WorkersTTLIdxName string `envconfig:"TABLE_IDX_WORKERS_TTL"` WorkersTableName string `envconfig:"TABLE_NAME_WORKERS"` WorkersCapIdxName string `envconfig:"TABLE_IDX_WORKERS_CAP"` AllocsTableName string `envconfig:"TABLE_NAME_ALLOCS"` AllocsTTLIdxName string `envconfig:"TABLE_IDX_ALLOCS_TTL"` }
Conf holds our configuration taken from the environment
type Eval ¶
type Eval struct { Dataset string `dynamodbav:"set"` //certain dataset must be available Size int `dynamodbav:"size"` //certain capacity must be available Retry int `dynamodbav:"try"` }
Eval is a scheduling evaluation
type GatewayRequest ¶
type GatewayRequest struct { HTTPMethod string Headers map[string]string Resource string PathParameters map[string]string Path string QueryStringParameters map[string]string Body string IsBase64Encoded bool StageVariables map[string]string }
GatewayRequest represents an Amazon API Gateway Proxy Event.
type GatewayResponse ¶
type GatewayResponse struct { StatusCode int `json:"statusCode"` Body string `json:"body"` Headers map[string]string `json:"headers"` }
GatewayResponse is returned to the API Gateway
type Handler ¶
type Handler func(conf *Conf, svc *Services, ev json.RawMessage) (interface{}, error)
Handler describes a Lambda handler that matches a specific suffic
type Pool ¶
Pool represents capacity provided by pools
func GetActivePool ¶
GetActivePool will get a pool by its pk but errors if it's disbanded
type PoolPK ¶
type PoolPK struct {
PoolID string `dynamodbav:"pool"`
}
PoolPK describes the pool's primary key in the base table
type Services ¶
type Services struct { SQS sqsiface.SQSAPI //message queues DB dynamodbiface.DynamoDBAPI //dynamodb nosql database Logs *zap.Logger //logging service }
Services hold our backend services