dal

package
v0.409.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package dal provides a data abstraction layer for the Controller

Index

Constants

This section is empty.

Variables

View Source
var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")

Functions

This section is empty.

Types

type AsyncCall added in v0.194.0

type AsyncCall struct {
	ID               int64
	Origin           async.AsyncOrigin
	Verb             schema.RefKey
	CatchVerb        optional.Option[schema.RefKey]
	Request          json.RawMessage
	ScheduledAt      time.Time
	QueueDepth       int64
	ParentRequestKey optional.Option[string]
	TraceContext     []byte

	Error optional.Option[string]

	RemainingAttempts int32
	Backoff           time.Duration
	MaxBackoff        time.Duration
	Catching          bool
}

type DAL

type DAL struct {
	*libdal.Handle[DAL]

	// DeploymentChanges is a Topic that receives changes to the deployments table.
	DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification]
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, conn libdal.Connection, pubsub *pubsub.Service, registry aregistry.Service) *DAL

func (*DAL) AcquireAsyncCall added in v0.191.0

func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx context.Context, err error)

AcquireAsyncCall acquires a pending async call to execute.

Returns ErrNotFound if there are no async calls to acquire.

func (*DAL) CompleteAsyncCall added in v0.194.0

func (d *DAL) CompleteAsyncCall(ctx context.Context,
	call *AsyncCall,
	result either.Either[[]byte, string],
	finalise func(tx *DAL, isFinalResult bool) error) (didScheduleAnotherCall bool, err error)

CompleteAsyncCall completes an async call. The call will use the existing transaction if d is a transaction. Otherwise it will create and commit a new transaction.

"result" is either a []byte representing the successful response, or a string representing a failure message.

func (*DAL) CreateDeployment

func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact) (key model.DeploymentKey, err error)

CreateDeployment (possibly) creates a new deployment and associates previously created artefacts with it.

If an existing deployment with identical artefacts exists, it is returned.

func (*DAL) DeregisterRunner

func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error

DeregisterRunner deregisters the given runner.

func (*DAL) GetActiveDeploymentSchemas added in v0.79.3

func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error)

func (*DAL) GetActiveDeploymentSchemasByDeploymentKey added in v0.409.0

func (d *DAL) GetActiveDeploymentSchemasByDeploymentKey(ctx context.Context) (map[string]*schema.Module, error)

GetActiveDeploymentSchemasByDeploymentKey returns the schema for all active deployments by deployment key.

model.DeploymentKey is not used directly as a key as it's not a valid map key.

func (*DAL) GetActiveDeployments

func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment, error)

GetActiveDeployments returns all active deployments.

func (*DAL) GetActiveRunners

func (d *DAL) GetActiveRunners(ctx context.Context) ([]dalmodel.Runner, error)

func (*DAL) GetActiveSchema added in v0.276.3

func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error)

GetActiveSchema returns the schema for all active deployments.

func (*DAL) GetDeployment

func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)

func (*DAL) GetDeploymentsWithMinReplicas added in v0.163.10

func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]dalmodel.Deployment, error)

func (*DAL) GetProcessList

func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error)

GetProcessList returns a list of all "processes".

func (*DAL) GetRunner

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (dalmodel.Runner, error)

func (*DAL) GetRunnersForDeployment

func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]dalmodel.Runner, error)

func (*DAL) GetStatus

func (d *DAL) GetStatus(ctx context.Context, controller dalmodel.Controller) (dalmodel.Status, error)

func (*DAL) GetZombieAsyncCalls added in v0.360.0

func (d *DAL) GetZombieAsyncCalls(ctx context.Context, limit int) ([]*AsyncCall, error)

func (*DAL) KillStaleRunners

func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error)

KillStaleRunners deletes runners that have not had heartbeats for the given duration.

func (*DAL) LoadAsyncCall added in v0.194.0

func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error)

func (*DAL) PollDeployments added in v0.296.3

func (d *DAL) PollDeployments(ctx context.Context)

func (*DAL) ReplaceDeployment

func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)

ReplaceDeployment replaces an old deployment of a module with a new deployment.

returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active.

func (*DAL) SetDeploymentReplicas

func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error)

SetDeploymentReplicas activates the given deployment.

func (*DAL) UpdateModuleSchema added in v0.409.0

func (d *DAL) UpdateModuleSchema(ctx context.Context, deployment model.DeploymentKey, module *schema.Module) error

UpdateModuleSchema updates the schema for a deployment in place.

Note that this is racey as the deployment can be updated by another process. This will go away once we ditch the DB.

func (*DAL) UpsertModule

func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error)

func (*DAL) UpsertRunner

func (d *DAL) UpsertRunner(ctx context.Context, runner dalmodel.Runner) error

UpsertRunner registers or updates a new runner.

ErrConflict will be returned if a runner with the same endpoint and a different key already exists.

type DeploymentNotification

DeploymentNotification is a notification from the database when a deployment changes.

type Process

type Process struct {
	Deployment  model.DeploymentKey
	MinReplicas int
	Labels      model.Labels
	Runner      optional.Option[ProcessRunner]
}

type ProcessRunner

type ProcessRunner struct {
	Key      model.RunnerKey
	Endpoint string
	Labels   model.Labels
}

type Reservation

type Reservation interface {
	Runner() dalmodel.Runner
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
}

A Reservation of a Runner.

Directories

Path Synopsis
internal
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL