dal

package
v0.372.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Package dal provides a data abstraction layer for the Controller

Index

Constants

View Source
const (
	FSMStatusRunning   = sql2.FsmStatusRunning
	FSMStatusCompleted = sql2.FsmStatusCompleted
	FSMStatusFailed    = sql2.FsmStatusFailed
)

Variables

View Source
var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")

Functions

This section is empty.

Types

type AsyncCall added in v0.194.0

type AsyncCall struct {
	*leasedal.Lease  // May be nil
	ID               int64
	Origin           async.AsyncOrigin
	Verb             schema.RefKey
	CatchVerb        optional.Option[schema.RefKey]
	Request          []byte
	ScheduledAt      time.Time
	QueueDepth       int64
	ParentRequestKey optional.Option[string]
	TraceContext     []byte

	Error optional.Option[string]

	RemainingAttempts int32
	Backoff           time.Duration
	MaxBackoff        time.Duration
	Catching          bool
}

type DAL

type DAL struct {
	*libdal.Handle[DAL]

	// DeploymentChanges is a Topic that receives changes to the deployments table.
	DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification]
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service) *DAL

func (*DAL) AcquireAsyncCall added in v0.191.0

func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx context.Context, err error)

AcquireAsyncCall acquires a pending async call to execute.

Returns ErrNotFound if there are no async calls to acquire.

func (*DAL) AcquireFSMInstance added in v0.226.0

func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)

AcquireFSMInstance returns an FSM instance, also acquiring a lease on it.

The lease must be released by the caller.

func (*DAL) CompleteAsyncCall added in v0.194.0

func (d *DAL) CompleteAsyncCall(ctx context.Context,
	call *AsyncCall,
	result either.Either[[]byte, string],
	finalise func(tx *DAL, isFinalResult bool) error) (didScheduleAnotherCall bool, err error)

CompleteAsyncCall completes an async call. The call will use the existing transaction if d is a transaction. Otherwise it will create and commit a new transaction.

"result" is either a []byte representing the successful response, or a string representing a failure message.

func (*DAL) CreateArtefact

func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error)

CreateArtefact inserts a new artefact into the database and returns its ID.

func (*DAL) CreateDeployment

func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)

CreateDeployment (possibly) creates a new deployment and associates previously created artefacts with it.

If an existing deployment with identical artefacts exists, it is returned.

func (*DAL) CreateRequest added in v0.167.0

func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr string) error

func (*DAL) DeregisterRunner

func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error

DeregisterRunner deregisters the given runner.

func (*DAL) FailFSMInstance added in v0.226.0

func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error

func (*DAL) FinishFSMTransition added in v0.226.0

func (d *DAL) FinishFSMTransition(ctx context.Context, instance *FSMInstance) (*FSMInstance, error)

FinishFSMTransition marks an FSM transition as completed

func (*DAL) GetActiveControllers added in v0.177.2

func (d *DAL) GetActiveControllers(ctx context.Context) ([]dalmodel.Controller, error)

func (*DAL) GetActiveDeploymentSchemas added in v0.79.3

func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error)

func (*DAL) GetActiveDeployments

func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment, error)

GetActiveDeployments returns all active deployments.

func (*DAL) GetActiveRunners

func (d *DAL) GetActiveRunners(ctx context.Context) ([]dalmodel.Runner, error)

func (*DAL) GetActiveSchema added in v0.276.3

func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error)

GetActiveSchema returns the schema for all active deployments.

func (*DAL) GetDeployment

func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)

func (*DAL) GetDeploymentsWithMinReplicas added in v0.163.10

func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]dalmodel.Deployment, error)

func (*DAL) GetFSMStates added in v0.337.0

func (d *DAL) GetFSMStates(ctx context.Context, fsm schema.RefKey, instanceKey string) (currentState, destinationState optional.Option[schema.RefKey], err error)

func (*DAL) GetIngressRoutes

func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]dalmodel.IngressRoute, error)

func (*DAL) GetMissingArtefacts

func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)

GetMissingArtefacts returns the digests of the artefacts that are missing from the database.

func (*DAL) GetProcessList

func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error)

GetProcessList returns a list of all "processes".

func (*DAL) GetRunner

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (dalmodel.Runner, error)

func (*DAL) GetRunnersForDeployment

func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]dalmodel.Runner, error)

func (*DAL) GetStatus

func (d *DAL) GetStatus(ctx context.Context) (dalmodel.Status, error)

func (*DAL) GetZombieAsyncCalls added in v0.360.0

func (d *DAL) GetZombieAsyncCalls(ctx context.Context, limit int) ([]*AsyncCall, error)

func (*DAL) KillStaleControllers

func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error)

KillStaleControllers deletes controllers that have not had heartbeats for the given duration.

func (*DAL) KillStaleRunners

func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error)

KillStaleRunners deletes runners that have not had heartbeats for the given duration.

func (*DAL) LoadAsyncCall added in v0.194.0

func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error)

func (*DAL) PollDeployments added in v0.296.3

func (d *DAL) PollDeployments(ctx context.Context)

func (*DAL) PopNextFSMEvent added in v0.337.0

func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string) (optional.Option[NextFSMEvent], error)

PopNextFSMEvent returns the next event for an FSM instance, if any, and deletes it.

func (*DAL) ReplaceDeployment

func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)

ReplaceDeployment replaces an old deployment of a module with a new deployment.

returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active.

func (*DAL) SetDeploymentReplicas

func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error)

SetDeploymentReplicas activates the given deployment.

func (*DAL) SetNextFSMEvent added in v0.337.0

func (d *DAL) SetNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string, nextState schema.RefKey, request json.RawMessage, requestType schema.Type) error

func (*DAL) StartFSMTransition added in v0.226.0

func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, destinationState schema.RefKey, request []byte, encrypted bool, retryParams schema.RetryParams) (err error)

StartFSMTransition sends an event to an executing instance of an FSM.

If the instance doesn't exist a new one will be created.

[name] is the name of the state machine to execute, [executionKey] is the unique identifier for this execution of the FSM.

Returns ErrConflict if the state machine is already executing a transition.

Note: this does not actually call the FSM, it just enqueues an async call for future execution.

Note: no validation of the FSM is performed.

func (*DAL) SucceedFSMInstance added in v0.226.0

func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error

func (*DAL) UpsertController

func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error)

func (*DAL) UpsertModule

func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error)

func (*DAL) UpsertRunner

func (d *DAL) UpsertRunner(ctx context.Context, runner dalmodel.Runner) error

UpsertRunner registers or updates a new runner.

ErrConflict will be returned if a runner with the same endpoint and a different key already exists.

type DeploymentNotification

DeploymentNotification is a notification from the database when a deployment changes.

type FSMInstance added in v0.226.0

type FSMInstance struct {
	leases.Lease
	// The FSM that this instance is executing.
	FSM schema.RefKey
	// The unique key for this instance.
	Key              string
	Status           FSMStatus
	CurrentState     optional.Option[schema.RefKey]
	DestinationState optional.Option[schema.RefKey]
}

type FSMStatus added in v0.226.0

type FSMStatus = sql2.FsmStatus

type IngressRoutingEntry

type IngressRoutingEntry struct {
	Verb   string
	Method string
	Path   string
}

type NextFSMEvent added in v0.337.0

type NextFSMEvent struct {
	DestinationState schema.RefKey
	Request          json.RawMessage
	RequestType      schema.Type
}

type Process

type Process struct {
	Deployment  model.DeploymentKey
	MinReplicas int
	Labels      model.Labels
	Runner      optional.Option[ProcessRunner]
}

type ProcessRunner

type ProcessRunner struct {
	Key      model.RunnerKey
	Endpoint string
	Labels   model.Labels
}

type Reservation

type Reservation interface {
	Runner() dalmodel.Runner
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
}

A Reservation of a Runner.

Directories

Path Synopsis
internal
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL