Documentation ¶
Overview ¶
Package dal provides a data abstraction layer for the Controller
Index ¶
- Constants
- Variables
- func WithReservation(ctx context.Context, reservation Reservation, fn func() error) error
- type AsyncCall
- type AsyncOrigin
- type AsyncOriginFSM
- type AsyncOriginPubSub
- type CallEvent
- type Controller
- type ControllerState
- type DAL
- func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
- func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)
- func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duration, ...) (leases.Lease, context.Context, error)
- func (d *DAL) Begin(ctx context.Context) (*Tx, error)
- func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], ...) (didScheduleAnotherCall bool, err error)
- func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error
- func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error)
- func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, ...) (key model.DeploymentKey, err error)
- func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr string) error
- func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error)
- func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error
- func (d *DAL) ExpireLeases(ctx context.Context) error
- func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error)
- func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error)
- func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error)
- func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error)
- func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error)
- func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error)
- func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)
- func (d *DAL) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]Reconciliation, error)
- func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, error)
- func (d *DAL) GetIdleRunners(ctx context.Context, limit int, labels model.Labels) ([]Runner, error)
- func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error)
- func (d *DAL) GetLeaseInfo(ctx context.Context, key leases.Key, metadata any) (expiry time.Time, err error)
- func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)
- func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error)
- func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string][]Route, error)
- func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error)
- func (d *DAL) GetRunnerState(ctx context.Context, runnerKey model.RunnerKey) (RunnerState, error)
- func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]Runner, error)
- func (d *DAL) GetStatus(ctx context.Context) (Status, error)
- func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error)
- func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error
- func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error
- func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error)
- func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error)
- func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error)
- func (d *DAL) PollDeployments(ctx context.Context)
- func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error)
- func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error
- func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter) ([]Event, error)
- func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
- func (d *DAL) ReserveRunnerForDeployment(ctx context.Context, deployment model.DeploymentKey, ...) (Reservation, error)
- func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err error)
- func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) error
- func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, ...) (err error)
- func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error)
- func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error)
- func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error
- type Deployment
- type DeploymentArtefact
- type DeploymentCreatedEvent
- type DeploymentNotification
- type DeploymentUpdatedEvent
- type Encryptors
- type Event
- type EventFilter
- func FilterCall(sourceModule optional.Option[string], destModule string, ...) EventFilter
- func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter
- func FilterDescending() EventFilter
- func FilterIDRange(higherThan, lowerThan int64) EventFilter
- func FilterLogLevel(level log.Level) EventFilter
- func FilterRequests(requestKeys ...model.RequestKey) EventFilter
- func FilterTimeRange(olderThan, newerThan time.Time) EventFilter
- func FilterTypes(types ...sql.EventType) EventFilter
- type EventType
- type FSMInstance
- type FSMStatus
- type IngressRoute
- type IngressRouteEntry
- type IngressRoutingEntry
- type Lease
- type LogEvent
- type Notification
- type NotificationPayload
- type Process
- type ProcessRunner
- type Reconciliation
- type RequestOrigin
- type Reservation
- type Route
- type Runner
- type RunnerState
- type Status
- type Tx
Constants ¶
const ( RunnerStateIdle = RunnerState(sql.RunnerStateIdle) RunnerStateReserved = RunnerState(sql.RunnerStateReserved) RunnerStateAssigned = RunnerState(sql.RunnerStateAssigned) RunnerStateDead = RunnerState(sql.RunnerStateDead) )
Runner states.
const ( ControllerStateLive = ControllerState(sql.ControllerStateLive) ControllerStateDead = ControllerState(sql.ControllerStateDead) )
Controller states.
const ( RequestOriginIngress = RequestOrigin(sql.OriginIngress) RequestOriginCron = RequestOrigin(sql.OriginCron) RequestOriginPubsub = RequestOrigin(sql.OriginPubsub) )
const ( EventTypeLog = sql.EventTypeLog EventTypeCall = sql.EventTypeCall EventTypeDeploymentCreated = sql.EventTypeDeploymentCreated EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated )
Supported event types.
const ( FSMStatusRunning = sql.FsmStatusRunning FSMStatusCompleted = sql.FsmStatusCompleted FSMStatusFailed = sql.FsmStatusFailed )
Variables ¶
var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")
Functions ¶
func WithReservation ¶
func WithReservation(ctx context.Context, reservation Reservation, fn func() error) error
Types ¶
type AsyncCall ¶ added in v0.194.0
type AsyncCall struct { *Lease // May be nil ID int64 Origin AsyncOrigin Verb schema.RefKey CatchVerb optional.Option[schema.RefKey] Request json.RawMessage ScheduledAt time.Time QueueDepth int64 Error optional.Option[string] RemainingAttempts int32 Backoff time.Duration MaxBackoff time.Duration Catching bool }
type AsyncOrigin ¶ added in v0.226.0
type AsyncOrigin interface { // Origin returns the origin type. Origin() string String() string // contains filtered or unexported methods }
AsyncOrigin is a sum type representing the originator of an async call.
This is used to determine how to handle the result of the async call.
func ParseAsyncOrigin ¶ added in v0.226.0
func ParseAsyncOrigin(origin string) (AsyncOrigin, error)
ParseAsyncOrigin parses an async origin key.
type AsyncOriginFSM ¶ added in v0.226.0
type AsyncOriginFSM struct { FSM schema.RefKey `parser:"'fsm' ':' @@"` Key string `parser:"':' @(~EOF)+"` }
AsyncOriginFSM represents the context for the originator of an FSM async call.
It is in the form fsm:<module>.<name>:<key>
func (AsyncOriginFSM) Origin ¶ added in v0.226.0
func (a AsyncOriginFSM) Origin() string
func (AsyncOriginFSM) String ¶ added in v0.226.0
func (a AsyncOriginFSM) String() string
type AsyncOriginPubSub ¶ added in v0.241.0
AsyncOriginPubSub represents the context for the originator of an PubSub async call.
It is in the form fsm:<module>.<subscription_name>
func (AsyncOriginPubSub) Origin ¶ added in v0.241.0
func (a AsyncOriginPubSub) Origin() string
func (AsyncOriginPubSub) String ¶ added in v0.241.0
func (a AsyncOriginPubSub) String() string
type CallEvent ¶
type CallEvent struct { ID int64 DeploymentKey model.DeploymentKey RequestKey optional.Option[model.RequestKey] Time time.Time SourceVerb optional.Option[schema.Ref] DestVerb schema.Ref Duration time.Duration Request json.RawMessage Response json.RawMessage Error optional.Option[string] Stack optional.Option[string] }
type Controller ¶
type Controller struct { Key model.ControllerKey Endpoint string State ControllerState }
type ControllerState ¶
type ControllerState string
type DAL ¶
type DAL struct { // DeploymentChanges is a Topic that receives changes to the deployments table. DeploymentChanges *pubsub.Topic[DeploymentNotification] // contains filtered or unexported fields }
func (*DAL) AcquireAsyncCall ¶ added in v0.191.0
AcquireAsyncCall acquires a pending async call to execute.
Returns ErrNotFound if there are no async calls to acquire.
func (*DAL) AcquireFSMInstance ¶ added in v0.226.0
func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)
AcquireFSMInstance returns an FSM instance, also acquiring a lease on it.
The lease must be released by the caller.
func (*DAL) AcquireLease ¶ added in v0.186.0
func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duration, metadata optional.Option[any]) (leases.Lease, context.Context, error)
AcquireLease acquires a lease for the given key.
Will return leases.ErrConflict (not dalerrs.ErrConflict) if the lease is already held by another controller.
The returned context will be cancelled when the lease fails to renew.
func (*DAL) CompleteAsyncCall ¶ added in v0.194.0
func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], finalise func(tx *Tx, isFinalResult bool) error) (didScheduleAnotherCall bool, err error)
CompleteAsyncCall completes an async call.
"result" is either a []byte representing the successful response, or a string representing a failure message.
func (*DAL) CompleteEventForSubscription ¶ added in v0.241.0
func (*DAL) CreateArtefact ¶
CreateArtefact inserts a new artefact into the database and returns its ID.
func (*DAL) CreateDeployment ¶
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)
CreateDeployment (possibly) creates a new deployment and associates previously created artefacts with it.
If an existing deployment with identical artefacts exists, it is returned.
func (*DAL) CreateRequest ¶ added in v0.167.0
func (*DAL) DeleteOldEvents ¶ added in v0.323.0
func (*DAL) DeregisterRunner ¶
DeregisterRunner deregisters the given runner.
func (*DAL) ExpireLeases ¶ added in v0.186.0
ExpireLeases expires (deletes) all leases that have expired.
func (*DAL) ExpireRunnerClaims ¶
func (*DAL) FailFSMInstance ¶ added in v0.226.0
func (*DAL) FinishFSMTransition ¶ added in v0.226.0
func (*DAL) GetActiveControllers ¶ added in v0.177.2
func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error)
func (*DAL) GetActiveDeploymentSchemas ¶ added in v0.79.3
func (*DAL) GetActiveDeployments ¶
func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error)
GetActiveDeployments returns all active deployments.
func (*DAL) GetActiveRunners ¶
func (*DAL) GetActiveSchema ¶ added in v0.276.3
GetActiveSchema returns the schema for all active deployments.
func (*DAL) GetDeployment ¶
func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)
func (*DAL) GetDeploymentsNeedingReconciliation ¶
func (d *DAL) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]Reconciliation, error)
GetDeploymentsNeedingReconciliation returns deployments that have a mismatch between the number of assigned and required replicas.
func (*DAL) GetDeploymentsWithMinReplicas ¶ added in v0.163.10
func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, error)
func (*DAL) GetIdleRunners ¶
GetIdleRunners returns up to limit idle runners matching the given labels.
"labels" is a single level map of key-value pairs. Values may be scalar or lists of scalars. If a value is a list, it will match the labels if all the values in the list match.
e.g. {"languages": ["kotlin"], "arch": "arm64"}' will match a runner with the labels '{"languages": ["go", "kotlin"], "os": "linux", "arch": "amd64", "pid": 1234}'
If no runners are available, it will return an empty slice.
func (*DAL) GetIngressRoutes ¶
func (*DAL) GetLeaseInfo ¶ added in v0.263.0
func (d *DAL) GetLeaseInfo(ctx context.Context, key leases.Key, metadata any) (expiry time.Time, err error)
GetLeaseInfo returns the metadata and expiry time for the lease with the given key.
metadata should be a pointer to the type that metadata should be unmarshaled into.
func (*DAL) GetMissingArtefacts ¶
func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)
GetMissingArtefacts returns the digests of the artefacts that are missing from the database.
func (*DAL) GetProcessList ¶
GetProcessList returns a list of all "processes".
func (*DAL) GetRoutingTable ¶
GetRoutingTable returns the endpoints for all runners for the given modules, or all routes if modules is empty.
Returns route map keyed by module.
func (*DAL) GetRunnerState ¶
func (*DAL) GetRunnersForDeployment ¶
func (*DAL) GetSubscriptionsNeedingUpdate ¶ added in v0.241.0
func (*DAL) InsertCallEvent ¶
func (*DAL) KillStaleControllers ¶
KillStaleControllers deletes controllers that have not had heartbeats for the given duration.
func (*DAL) KillStaleRunners ¶
KillStaleRunners deletes runners that have not had heartbeats for the given duration.
func (*DAL) LoadAsyncCall ¶ added in v0.194.0
func (*DAL) PollDeployments ¶ added in v0.296.3
func (*DAL) ProgressSubscriptions ¶ added in v0.244.0
func (*DAL) PublishEventForTopic ¶ added in v0.239.0
func (*DAL) QueryEvents ¶
func (*DAL) ReplaceDeployment ¶
func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
ReplaceDeployment replaces an old deployment of a module with a new deployment.
returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active.
func (*DAL) ReserveRunnerForDeployment ¶
func (d *DAL) ReserveRunnerForDeployment(ctx context.Context, deployment model.DeploymentKey, reservationTimeout time.Duration, labels model.Labels) (Reservation, error)
ReserveRunnerForDeployment reserves a runner for the given deployment.
It returns a Reservation that must be committed or rolled back.
func (*DAL) ResetSubscription ¶ added in v0.312.0
ResetSubscription resets the subscription cursor to the topic's head.
func (*DAL) SetDeploymentReplicas ¶
func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) error
SetDeploymentReplicas activates the given deployment.
func (*DAL) StartFSMTransition ¶ added in v0.226.0
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, destinationState schema.RefKey, request json.RawMessage, retryParams schema.RetryParams) (err error)
StartFSMTransition sends an event to an executing instance of an FSM.
If the instance doesn't exist a new one will be created.
[name] is the name of the state machine to execute, [executionKey] is the unique identifier for this execution of the FSM.
Returns ErrConflict if the state machine is already executing a transition.
Note: this does not actually call the FSM, it just enqueues an async call for future execution.
Note: no validation of the FSM is performed.
func (*DAL) SucceedFSMInstance ¶ added in v0.226.0
func (*DAL) UpsertController ¶
func (*DAL) UpsertModule ¶
type Deployment ¶
type Deployment struct { Key model.DeploymentKey Language string Module string MinReplicas int Replicas optional.Option[int] // Depending on the query this may or may not be populated. Schema *schema.Module CreatedAt time.Time Labels model.Labels }
func (Deployment) String ¶
func (d Deployment) String() string
type DeploymentArtefact ¶
func DeploymentArtefactFromProto ¶
func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error)
func (*DeploymentArtefact) ToProto ¶
func (d *DeploymentArtefact) ToProto() *ftlv1.DeploymentArtefact
type DeploymentCreatedEvent ¶
type DeploymentCreatedEvent struct { ID int64 DeploymentKey model.DeploymentKey Time time.Time Language string ModuleName string MinReplicas int ReplacedDeployment optional.Option[model.DeploymentKey] }
func (*DeploymentCreatedEvent) GetID ¶
func (e *DeploymentCreatedEvent) GetID() int64
type DeploymentNotification ¶
type DeploymentNotification = Notification[Deployment, model.DeploymentKey, *model.DeploymentKey]
DeploymentNotification is a notification from the database when a deployment changes.
type DeploymentUpdatedEvent ¶
type DeploymentUpdatedEvent struct { ID int64 DeploymentKey model.DeploymentKey Time time.Time MinReplicas int PrevMinReplicas int }
func (*DeploymentUpdatedEvent) GetID ¶
func (e *DeploymentUpdatedEvent) GetID() int64
type Encryptors ¶ added in v0.319.0
type Encryptors struct { Logs encryption.Encryptable Async encryption.Encryptable }
func NoOpEncryptors ¶ added in v0.319.0
func NoOpEncryptors() *Encryptors
NoOpEncryptors do not encrypt potentially sensitive data.
type Event ¶
type Event interface { GetID() int64 // contains filtered or unexported methods }
Event types.
type EventFilter ¶
type EventFilter func(query *eventFilter)
func FilterCall ¶
func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) EventFilter
FilterCall filters call events between the given modules.
May be called multiple times.
func FilterDeployments ¶
func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter
func FilterDescending ¶
func FilterDescending() EventFilter
FilterDescending returns events in descending order.
func FilterIDRange ¶
func FilterIDRange(higherThan, lowerThan int64) EventFilter
FilterIDRange filters events between the given IDs, inclusive.
func FilterLogLevel ¶
func FilterLogLevel(level log.Level) EventFilter
func FilterRequests ¶
func FilterRequests(requestKeys ...model.RequestKey) EventFilter
func FilterTimeRange ¶
func FilterTimeRange(olderThan, newerThan time.Time) EventFilter
FilterTimeRange filters events between the given times, inclusive.
Either maybe be zero to indicate no upper or lower bound.
func FilterTypes ¶
func FilterTypes(types ...sql.EventType) EventFilter
type FSMInstance ¶ added in v0.226.0
type IngressRoute ¶
type IngressRouteEntry ¶
type IngressRoutingEntry ¶
type Lease ¶ added in v0.186.0
type Lease struct {
// contains filtered or unexported fields
}
Lease represents a lease that is held by a controller.
type LogEvent ¶
type Notification ¶
type Notification[T NotificationPayload, Key any, KeyP interface { *Key encoding.TextUnmarshaler }] struct { Deleted optional.Option[Key] // If present the object was deleted. Message optional.Option[T] }
A Notification from the database.
func (Notification[T, Key, KeyP]) String ¶
func (n Notification[T, Key, KeyP]) String() string
type NotificationPayload ¶
type NotificationPayload interface {
// contains filtered or unexported methods
}
NotificationPayload is a row from the database.
type Process ¶
type Process struct { Deployment model.DeploymentKey MinReplicas int Labels model.Labels Runner optional.Option[ProcessRunner] }
type ProcessRunner ¶
type Reconciliation ¶
type RequestOrigin ¶
type RequestOrigin string
type Reservation ¶
type Reservation interface { Runner() Runner Commit(ctx context.Context) error Rollback(ctx context.Context) error }
A Reservation of a Runner.
type Route ¶
type RunnerState ¶
type RunnerState string
func RunnerStateFromProto ¶
func RunnerStateFromProto(state ftlv1.RunnerState) RunnerState
func (RunnerState) ToProto ¶
func (s RunnerState) ToProto() ftlv1.RunnerState
type Status ¶
type Status struct { Controllers []Controller Runners []Runner Deployments []Deployment IngressRoutes []IngressRouteEntry Routes []Route }
type Tx ¶ added in v0.217.9
type Tx struct {
*DAL
}
Tx is DAL within a transaction.
func (*Tx) CommitOrRollback ¶ added in v0.217.9
CommitOrRollback can be used in a defer statement to commit or rollback a transaction depending on whether the enclosing function returned an error.
func myFunc() (err error) { tx, err := dal.Begin(ctx) if err != nil { return err } defer tx.CommitOrRollback(ctx, &err) ... }