Documentation ¶
Overview ¶
Package dal provides a data abstraction layer for the Controller
Index ¶
- Constants
- Variables
- type AsyncCall
- type DAL
- func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx context.Context, err error)
- func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)
- func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], ...) (didScheduleAnotherCall bool, err error)
- func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error)
- func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, ...) (key model.DeploymentKey, err error)
- func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr string) error
- func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error
- func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) FinishFSMTransition(ctx context.Context, instance *FSMInstance) (*FSMInstance, error)
- func (d *DAL) GetActiveControllers(ctx context.Context) ([]dalmodel.Controller, error)
- func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error)
- func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment, error)
- func (d *DAL) GetActiveRunners(ctx context.Context) ([]dalmodel.Runner, error)
- func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error)
- func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)
- func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]dalmodel.Deployment, error)
- func (d *DAL) GetFSMStates(ctx context.Context, fsm schema.RefKey, instanceKey string) (currentState, destinationState optional.Option[schema.RefKey], err error)
- func (d *DAL) GetIngressRoutes(ctx context.Context) (map[string][]dalmodel.IngressRoute, error)
- func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)
- func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error)
- func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (dalmodel.Runner, error)
- func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]dalmodel.Runner, error)
- func (d *DAL) GetStatus(ctx context.Context) (dalmodel.Status, error)
- func (d *DAL) GetZombieAsyncCalls(ctx context.Context, limit int) ([]*AsyncCall, error)
- func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error)
- func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error)
- func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error)
- func (d *DAL) PollDeployments(ctx context.Context)
- func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string) (optional.Option[NextFSMEvent], error)
- func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
- func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error)
- func (d *DAL) SetNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string, ...) error
- func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, ...) (err error)
- func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error
- func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error)
- func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error)
- func (d *DAL) UpsertRunner(ctx context.Context, runner dalmodel.Runner) error
- type DeploymentNotification
- type FSMInstance
- type FSMStatus
- type IngressRoutingEntry
- type NextFSMEvent
- type Process
- type ProcessRunner
- type Reservation
Constants ¶
const ( FSMStatusRunning = sql2.FsmStatusRunning FSMStatusCompleted = sql2.FsmStatusCompleted FSMStatusFailed = sql2.FsmStatusFailed )
Variables ¶
var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")
Functions ¶
This section is empty.
Types ¶
type AsyncCall ¶ added in v0.194.0
type AsyncCall struct { *leasedal.Lease // May be nil ID int64 Origin async.AsyncOrigin Verb schema.RefKey CatchVerb optional.Option[schema.RefKey] Request []byte ScheduledAt time.Time QueueDepth int64 ParentRequestKey optional.Option[string] TraceContext []byte Error optional.Option[string] RemainingAttempts int32 Backoff time.Duration MaxBackoff time.Duration Catching bool }
type DAL ¶
type DAL struct { *libdal.Handle[DAL] // DeploymentChanges is a Topic that receives changes to the deployments table. DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification] // contains filtered or unexported fields }
func New ¶
func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service) *DAL
func (*DAL) AcquireAsyncCall ¶ added in v0.191.0
func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx context.Context, err error)
AcquireAsyncCall acquires a pending async call to execute.
Returns ErrNotFound if there are no async calls to acquire.
func (*DAL) AcquireFSMInstance ¶ added in v0.226.0
func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)
AcquireFSMInstance returns an FSM instance, also acquiring a lease on it.
The lease must be released by the caller.
func (*DAL) CompleteAsyncCall ¶ added in v0.194.0
func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], finalise func(tx *DAL, isFinalResult bool) error) (didScheduleAnotherCall bool, err error)
CompleteAsyncCall completes an async call. The call will use the existing transaction if d is a transaction. Otherwise it will create and commit a new transaction.
"result" is either a []byte representing the successful response, or a string representing a failure message.
func (*DAL) CreateArtefact ¶
CreateArtefact inserts a new artefact into the database and returns its ID.
func (*DAL) CreateDeployment ¶
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)
CreateDeployment (possibly) creates a new deployment and associates previously created artefacts with it.
If an existing deployment with identical artefacts exists, it is returned.
func (*DAL) CreateRequest ¶ added in v0.167.0
func (*DAL) DeregisterRunner ¶
DeregisterRunner deregisters the given runner.
func (*DAL) FailFSMInstance ¶ added in v0.226.0
func (*DAL) FinishFSMTransition ¶ added in v0.226.0
func (d *DAL) FinishFSMTransition(ctx context.Context, instance *FSMInstance) (*FSMInstance, error)
FinishFSMTransition marks an FSM transition as completed
func (*DAL) GetActiveControllers ¶ added in v0.177.2
func (*DAL) GetActiveDeploymentSchemas ¶ added in v0.79.3
func (*DAL) GetActiveDeployments ¶
GetActiveDeployments returns all active deployments.
func (*DAL) GetActiveRunners ¶
func (*DAL) GetActiveSchema ¶ added in v0.276.3
GetActiveSchema returns the schema for all active deployments.
func (*DAL) GetDeployment ¶
func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)
func (*DAL) GetDeploymentsWithMinReplicas ¶ added in v0.163.10
func (*DAL) GetFSMStates ¶ added in v0.337.0
func (*DAL) GetIngressRoutes ¶
func (*DAL) GetMissingArtefacts ¶
func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)
GetMissingArtefacts returns the digests of the artefacts that are missing from the database.
func (*DAL) GetProcessList ¶
GetProcessList returns a list of all "processes".
func (*DAL) GetRunnersForDeployment ¶
func (*DAL) GetZombieAsyncCalls ¶ added in v0.360.0
func (*DAL) KillStaleControllers ¶
KillStaleControllers deletes controllers that have not had heartbeats for the given duration.
func (*DAL) KillStaleRunners ¶
KillStaleRunners deletes runners that have not had heartbeats for the given duration.
func (*DAL) LoadAsyncCall ¶ added in v0.194.0
func (*DAL) PollDeployments ¶ added in v0.296.3
func (*DAL) PopNextFSMEvent ¶ added in v0.337.0
func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string) (optional.Option[NextFSMEvent], error)
PopNextFSMEvent returns the next event for an FSM instance, if any, and deletes it.
func (*DAL) ReplaceDeployment ¶
func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
ReplaceDeployment replaces an old deployment of a module with a new deployment.
returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active.
func (*DAL) SetDeploymentReplicas ¶
func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error)
SetDeploymentReplicas activates the given deployment.
func (*DAL) SetNextFSMEvent ¶ added in v0.337.0
func (*DAL) StartFSMTransition ¶ added in v0.226.0
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, destinationState schema.RefKey, request []byte, encrypted bool, retryParams schema.RetryParams) (err error)
StartFSMTransition sends an event to an executing instance of an FSM.
If the instance doesn't exist a new one will be created.
[name] is the name of the state machine to execute, [executionKey] is the unique identifier for this execution of the FSM.
Returns ErrConflict if the state machine is already executing a transition.
Note: this does not actually call the FSM, it just enqueues an async call for future execution.
Note: no validation of the FSM is performed.
func (*DAL) SucceedFSMInstance ¶ added in v0.226.0
func (*DAL) UpsertController ¶
func (*DAL) UpsertModule ¶
type DeploymentNotification ¶
type DeploymentNotification = dalmodel.Notification[dalmodel.Deployment, model.DeploymentKey, *model.DeploymentKey]
DeploymentNotification is a notification from the database when a deployment changes.
type FSMInstance ¶ added in v0.226.0
type IngressRoutingEntry ¶
type NextFSMEvent ¶ added in v0.337.0
type Process ¶
type Process struct { Deployment model.DeploymentKey MinReplicas int Labels model.Labels Runner optional.Option[ProcessRunner] }