Documentation ¶
Overview ¶
Package dal provides a data abstraction layer for the Controller
Index ¶
- Constants
- Variables
- func WithReservation(ctx context.Context, reservation Reservation, fn func() error) error
- type AsyncCall
- type AsyncOrigin
- type AsyncOriginFSM
- type AsyncOriginPubSub
- type AttemptedCronJob
- type CallEvent
- type Controller
- type ControllerState
- type DAL
- func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
- func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)
- func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duration) (leases.Lease, error)
- func (d *DAL) Begin(ctx context.Context) (*Tx, error)
- func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], ...) (err error)
- func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error
- func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error)
- func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, ...) (key model.DeploymentKey, err error)
- func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr string) error
- func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error
- func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error)
- func (d *DAL) ExpireLeases(ctx context.Context) error
- func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error)
- func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error)
- func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error)
- func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error)
- func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error)
- func (d *DAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error)
- func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)
- func (d *DAL) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]Reconciliation, error)
- func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, error)
- func (d *DAL) GetIdleRunners(ctx context.Context, limit int, labels model.Labels) ([]Runner, error)
- func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error)
- func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)
- func (d *DAL) GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error)
- func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error)
- func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string][]Route, error)
- func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error)
- func (d *DAL) GetRunnerState(ctx context.Context, runnerKey model.RunnerKey) (RunnerState, error)
- func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]Runner, error)
- func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error)
- func (d *DAL) GetStatus(ctx context.Context) (Status, error)
- func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error)
- func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error
- func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error
- func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error)
- func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error)
- func (d *DAL) ListModuleConfiguration(ctx context.Context) ([]sql.ModuleConfiguration, error)
- func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error)
- func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error)
- func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error
- func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter) ([]Event, error)
- func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
- func (d *DAL) ReserveRunnerForDeployment(ctx context.Context, deployment model.DeploymentKey, ...) (Reservation, error)
- func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) error
- func (d *DAL) SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error
- func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []AttemptedCronJob, err error)
- func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, ...) (err error)
- func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error
- func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error)
- func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error)
- func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error
- type Deployment
- type DeploymentArtefact
- type DeploymentCreatedEvent
- type DeploymentNotification
- type DeploymentUpdatedEvent
- type Event
- type EventFilter
- func FilterCall(sourceModule optional.Option[string], destModule string, ...) EventFilter
- func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter
- func FilterDescending() EventFilter
- func FilterIDRange(higherThan, lowerThan int64) EventFilter
- func FilterLogLevel(level log.Level) EventFilter
- func FilterRequests(requestKeys ...model.RequestKey) EventFilter
- func FilterTimeRange(olderThan, newerThan time.Time) EventFilter
- func FilterTypes(types ...sql.EventType) EventFilter
- type EventType
- type FSMInstance
- type FSMStatus
- type IngressRoute
- type IngressRouteEntry
- type IngressRoutingEntry
- type Lease
- type LogEvent
- type Notification
- type NotificationPayload
- type Process
- type ProcessRunner
- type Reconciliation
- type RequestOrigin
- type Reservation
- type Route
- type Runner
- type RunnerState
- type Status
- type Tx
Constants ¶
const ( RunnerStateIdle = RunnerState(sql.RunnerStateIdle) RunnerStateReserved = RunnerState(sql.RunnerStateReserved) RunnerStateAssigned = RunnerState(sql.RunnerStateAssigned) RunnerStateDead = RunnerState(sql.RunnerStateDead) )
Runner states.
const ( ControllerStateLive = ControllerState(sql.ControllerStateLive) ControllerStateDead = ControllerState(sql.ControllerStateDead) )
Controller states.
const ( RequestOriginIngress = RequestOrigin(sql.OriginIngress) RequestOriginCron = RequestOrigin(sql.OriginCron) RequestOriginPubsub = RequestOrigin(sql.OriginPubsub) )
const ( EventTypeLog = sql.EventTypeLog EventTypeCall = sql.EventTypeCall EventTypeDeploymentCreated = sql.EventTypeDeploymentCreated EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated )
Supported event types.
const ( FSMStatusRunning = sql.FsmStatusRunning FSMStatusCompleted = sql.FsmStatusCompleted FSMStatusFailed = sql.FsmStatusFailed )
Variables ¶
var ( // ErrConflict is returned by select methods in the DAL when a resource already exists. // // Its use will be documented in the corresponding methods. ErrConflict = errors.New("conflict") // ErrNotFound is returned by select methods in the DAL when no results are found. ErrNotFound = errors.New("not found") // ErrConstraint is returned by select methods in the DAL when a constraint is violated. ErrConstraint = errors.New("constraint violation") )
Functions ¶
func WithReservation ¶
func WithReservation(ctx context.Context, reservation Reservation, fn func() error) error
Types ¶
type AsyncOrigin ¶ added in v0.226.0
type AsyncOrigin interface { // Origin returns the origin type. Origin() string String() string // contains filtered or unexported methods }
AsyncOrigin is a sum type representing the originator of an async call.
This is used to determine how to handle the result of the async call.
func ParseAsyncOrigin ¶ added in v0.226.0
func ParseAsyncOrigin(origin string) (AsyncOrigin, error)
ParseAsyncOrigin parses an async origin key.
type AsyncOriginFSM ¶ added in v0.226.0
type AsyncOriginFSM struct { FSM schema.RefKey `parser:"'fsm' ':' @@"` Key string `parser:"':' @(~EOF)+"` }
AsyncOriginFSM represents the context for the originator of an FSM async call.
It is in the form fsm:<module>.<name>:<key>
func (AsyncOriginFSM) Origin ¶ added in v0.226.0
func (a AsyncOriginFSM) Origin() string
func (AsyncOriginFSM) String ¶ added in v0.226.0
func (a AsyncOriginFSM) String() string
type AsyncOriginPubSub ¶ added in v0.241.0
AsyncOriginPubSub represents the context for the originator of an PubSub async call.
It is in the form fsm:<module>.<subscription_name>
func (AsyncOriginPubSub) Origin ¶ added in v0.241.0
func (a AsyncOriginPubSub) Origin() string
func (AsyncOriginPubSub) String ¶ added in v0.241.0
func (a AsyncOriginPubSub) String() string
type AttemptedCronJob ¶ added in v0.167.0
type CallEvent ¶
type CallEvent struct { ID int64 DeploymentKey model.DeploymentKey RequestKey optional.Option[model.RequestKey] Time time.Time SourceVerb optional.Option[schema.Ref] DestVerb schema.Ref Duration time.Duration Request []byte Response []byte Error optional.Option[string] Stack optional.Option[string] }
type Controller ¶
type Controller struct { Key model.ControllerKey Endpoint string State ControllerState }
type ControllerState ¶
type ControllerState string
type DAL ¶
type DAL struct { // DeploymentChanges is a Topic that receives changes to the deployments table. DeploymentChanges *pubsub.Topic[DeploymentNotification] // contains filtered or unexported fields }
func (*DAL) AcquireAsyncCall ¶ added in v0.191.0
AcquireAsyncCall acquires a pending async call to execute.
Returns ErrNotFound if there are no async calls to acquire.
func (*DAL) AcquireFSMInstance ¶ added in v0.226.0
func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)
AcquireFSMInstance returns an FSM instance, also acquiring a lease on it.
The lease must be released by the caller.
func (*DAL) AcquireLease ¶ added in v0.186.0
func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duration) (leases.Lease, error)
AcquireLease acquires a lease for the given key.
Will return ErrConflict if the lease is already held by another controller.
func (*DAL) CompleteAsyncCall ¶ added in v0.194.0
func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], finalise func(tx *Tx) error) (err error)
CompleteAsyncCall completes an async call.
"result" is either a []byte representing the successful response, or a string representing a failure message.
func (*DAL) CompleteEventForSubscription ¶ added in v0.241.0
func (*DAL) CreateArtefact ¶
CreateArtefact inserts a new artefact into the database and returns its ID.
func (*DAL) CreateDeployment ¶
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)
CreateDeployment (possibly) creates a new deployment and associates previously created artefacts with it.
If an existing deployment with identical artefacts exists, it is returned.
func (*DAL) CreateRequest ¶ added in v0.167.0
func (*DAL) DeregisterRunner ¶
DeregisterRunner deregisters the given runner.
func (*DAL) EndCronJob ¶ added in v0.167.0
func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error)
EndCronJob sets the status from executing to idle and updates the next execution time Can be called on the successful completion of a job, or if the job failed to execute (error or timeout)
func (*DAL) ExpireLeases ¶ added in v0.186.0
ExpireLeases expires (deletes) all leases that have expired.
func (*DAL) ExpireRunnerClaims ¶
func (*DAL) FailFSMInstance ¶ added in v0.226.0
func (*DAL) FinishFSMTransition ¶ added in v0.226.0
func (*DAL) GetActiveControllers ¶ added in v0.177.2
func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error)
func (*DAL) GetActiveDeploymentSchemas ¶ added in v0.79.3
func (*DAL) GetActiveDeployments ¶
func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error)
GetActiveDeployments returns all active deployments.
func (*DAL) GetActiveRunners ¶
func (*DAL) GetCronJobs ¶ added in v0.167.0
GetCronJobs returns all cron jobs for deployments with min replicas > 0
func (*DAL) GetDeployment ¶
func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)
func (*DAL) GetDeploymentsNeedingReconciliation ¶
func (d *DAL) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]Reconciliation, error)
GetDeploymentsNeedingReconciliation returns deployments that have a mismatch between the number of assigned and required replicas.
func (*DAL) GetDeploymentsWithMinReplicas ¶ added in v0.163.10
func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, error)
func (*DAL) GetIdleRunners ¶
GetIdleRunners returns up to limit idle runners matching the given labels.
"labels" is a single level map of key-value pairs. Values may be scalar or lists of scalars. If a value is a list, it will match the labels if all the values in the list match.
e.g. {"languages": ["kotlin"], "arch": "arm64"}' will match a runner with the labels '{"languages": ["go", "kotlin"], "os": "linux", "arch": "amd64", "pid": 1234}'
If no runners are available, it will return an empty slice.
func (*DAL) GetIngressRoutes ¶
func (*DAL) GetMissingArtefacts ¶
func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)
GetMissingArtefacts returns the digests of the artefacts that are missing from the database.
func (*DAL) GetModuleConfiguration ¶ added in v0.231.1
func (*DAL) GetProcessList ¶
GetProcessList returns a list of all "processes".
func (*DAL) GetRoutingTable ¶
GetRoutingTable returns the endpoints for all runners for the given modules, or all routes if modules is empty.
Returns route map keyed by module.
func (*DAL) GetRunnerState ¶
func (*DAL) GetRunnersForDeployment ¶
func (*DAL) GetStaleCronJobs ¶ added in v0.167.0
func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error)
GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration
func (*DAL) GetSubscriptionsNeedingUpdate ¶ added in v0.241.0
func (*DAL) InsertCallEvent ¶
func (*DAL) KillStaleControllers ¶
KillStaleControllers deletes controllers that have not had heartbeats for the given duration.
func (*DAL) KillStaleRunners ¶
KillStaleRunners deletes runners that have not had heartbeats for the given duration.
func (*DAL) ListModuleConfiguration ¶ added in v0.231.1
func (*DAL) LoadAsyncCall ¶ added in v0.194.0
func (*DAL) ProgressSubscriptions ¶ added in v0.244.0
func (*DAL) PublishEventForTopic ¶ added in v0.239.0
func (*DAL) QueryEvents ¶
func (*DAL) ReplaceDeployment ¶
func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
ReplaceDeployment replaces an old deployment of a module with a new deployment.
func (*DAL) ReserveRunnerForDeployment ¶
func (d *DAL) ReserveRunnerForDeployment(ctx context.Context, deployment model.DeploymentKey, reservationTimeout time.Duration, labels model.Labels) (Reservation, error)
ReserveRunnerForDeployment reserves a runner for the given deployment.
It returns a Reservation that must be committed or rolled back.
func (*DAL) SetDeploymentReplicas ¶
func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) error
SetDeploymentReplicas activates the given deployment.
func (*DAL) SetModuleConfiguration ¶ added in v0.231.1
func (*DAL) StartCronJobs ¶ added in v0.167.0
func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []AttemptedCronJob, err error)
StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row
func (*DAL) StartFSMTransition ¶ added in v0.226.0
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, destinationState schema.RefKey, request json.RawMessage, retryParams schema.RetryParams) (err error)
StartFSMTransition sends an event to an executing instance of an FSM.
If the instance doesn't exist a new one will be created.
[name] is the name of the state machine to execute, [executionKey] is the unique identifier for this execution of the FSM.
Returns ErrConflict if the state machine is already executing a transition.
Note: this does not actually call the FSM, it just enqueues an async call for future execution.
Note: no validation of the FSM is performed.
func (*DAL) SucceedFSMInstance ¶ added in v0.226.0
func (*DAL) UnsetModuleConfiguration ¶ added in v0.231.1
func (*DAL) UpsertController ¶
func (*DAL) UpsertModule ¶
type Deployment ¶
type Deployment struct { Key model.DeploymentKey Language string Module string MinReplicas int Replicas optional.Option[int] // Depending on the query this may or may not be populated. Schema *schema.Module CreatedAt time.Time Labels model.Labels }
func (Deployment) String ¶
func (d Deployment) String() string
type DeploymentArtefact ¶
func DeploymentArtefactFromProto ¶
func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error)
func (*DeploymentArtefact) ToProto ¶
func (d *DeploymentArtefact) ToProto() *ftlv1.DeploymentArtefact
type DeploymentCreatedEvent ¶
type DeploymentCreatedEvent struct { ID int64 DeploymentKey model.DeploymentKey Time time.Time Language string ModuleName string MinReplicas int ReplacedDeployment optional.Option[model.DeploymentKey] }
func (*DeploymentCreatedEvent) GetID ¶
func (e *DeploymentCreatedEvent) GetID() int64
type DeploymentNotification ¶
type DeploymentNotification = Notification[Deployment, model.DeploymentKey, *model.DeploymentKey]
DeploymentNotification is a notification from the database when a deployment changes.
type DeploymentUpdatedEvent ¶
type DeploymentUpdatedEvent struct { ID int64 DeploymentKey model.DeploymentKey Time time.Time MinReplicas int PrevMinReplicas int }
func (*DeploymentUpdatedEvent) GetID ¶
func (e *DeploymentUpdatedEvent) GetID() int64
type Event ¶
type Event interface { GetID() int64 // contains filtered or unexported methods }
Event types.
type EventFilter ¶
type EventFilter func(query *eventFilter)
func FilterCall ¶
func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) EventFilter
FilterCall filters call events between the given modules.
May be called multiple times.
func FilterDeployments ¶
func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter
func FilterDescending ¶
func FilterDescending() EventFilter
FilterDescending returns events in descending order.
func FilterIDRange ¶
func FilterIDRange(higherThan, lowerThan int64) EventFilter
FilterIDRange filters events between the given IDs, inclusive.
func FilterLogLevel ¶
func FilterLogLevel(level log.Level) EventFilter
func FilterRequests ¶
func FilterRequests(requestKeys ...model.RequestKey) EventFilter
func FilterTimeRange ¶
func FilterTimeRange(olderThan, newerThan time.Time) EventFilter
FilterTimeRange filters events between the given times, inclusive.
Either maybe be zero to indicate no upper or lower bound.
func FilterTypes ¶
func FilterTypes(types ...sql.EventType) EventFilter
type FSMInstance ¶ added in v0.226.0
type IngressRoute ¶
type IngressRouteEntry ¶
type IngressRoutingEntry ¶
type Lease ¶ added in v0.186.0
type Lease struct {
// contains filtered or unexported fields
}
Lease represents a lease that is held by a controller.
type LogEvent ¶
type Notification ¶
type Notification[T NotificationPayload, Key any, KeyP interface { *Key encoding.TextUnmarshaler }] struct { Deleted optional.Option[Key] // If present the object was deleted. Message optional.Option[T] }
A Notification from the database.
func (Notification[T, Key, KeyP]) String ¶
func (n Notification[T, Key, KeyP]) String() string
type NotificationPayload ¶
type NotificationPayload interface {
// contains filtered or unexported methods
}
NotificationPayload is a row from the database.
type Process ¶
type Process struct { Deployment model.DeploymentKey MinReplicas int Labels model.Labels Runner optional.Option[ProcessRunner] }
type ProcessRunner ¶
type Reconciliation ¶
type RequestOrigin ¶
type RequestOrigin string
type Reservation ¶
type Reservation interface { Runner() Runner Commit(ctx context.Context) error Rollback(ctx context.Context) error }
A Reservation of a Runner.
type Route ¶
type RunnerState ¶
type RunnerState string
func RunnerStateFromProto ¶
func RunnerStateFromProto(state ftlv1.RunnerState) RunnerState
func (RunnerState) ToProto ¶
func (s RunnerState) ToProto() ftlv1.RunnerState
type Status ¶
type Status struct { Controllers []Controller Runners []Runner Deployments []Deployment IngressRoutes []IngressRouteEntry Routes []Route }
type Tx ¶ added in v0.217.9
type Tx struct {
*DAL
}
Tx is DAL within a transaction.
func (*Tx) CommitOrRollback ¶ added in v0.217.9
CommitOrRollback can be used in a defer statement to commit or rollback a transaction depending on whether the enclosing function returned an error.
func myFunc() (err error) { tx, err := dal.Begin(ctx) if err != nil { return err } defer tx.CommitOrRollback(ctx, &err) ... }