dal

package
v0.364.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2024 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Overview

Package dal provides a data abstraction layer for the Controller

Index

Constants

View Source
const (
	ControllerStateLive = ControllerState(dalsql.ControllerStateLive)
	ControllerStateDead = ControllerState(dalsql.ControllerStateDead)
)

Controller states.

View Source
const (
	RequestOriginIngress = RequestOrigin(dalsql.OriginIngress)
	RequestOriginCron    = RequestOrigin(dalsql.OriginCron)
	RequestOriginPubsub  = RequestOrigin(dalsql.OriginPubsub)
)
View Source
const (
	FSMStatusRunning   = sql2.FsmStatusRunning
	FSMStatusCompleted = sql2.FsmStatusCompleted
	FSMStatusFailed    = sql2.FsmStatusFailed
)

Variables

View Source
var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")

Functions

This section is empty.

Types

type AsyncCall added in v0.194.0

type AsyncCall struct {
	*leasedal.Lease  // May be nil
	ID               int64
	Origin           AsyncOrigin
	Verb             schema.RefKey
	CatchVerb        optional.Option[schema.RefKey]
	Request          []byte
	ScheduledAt      time.Time
	QueueDepth       int64
	ParentRequestKey optional.Option[string]
	TraceContext     []byte

	Error optional.Option[string]

	RemainingAttempts int32
	Backoff           time.Duration
	MaxBackoff        time.Duration
	Catching          bool
}

type AsyncOrigin added in v0.226.0

type AsyncOrigin interface {

	// Origin returns the origin type.
	Origin() string
	String() string
	// contains filtered or unexported methods
}

AsyncOrigin is a sum type representing the originator of an async call.

This is used to determine how to handle the result of the async call.

func ParseAsyncOrigin added in v0.226.0

func ParseAsyncOrigin(origin string) (AsyncOrigin, error)

ParseAsyncOrigin parses an async origin key.

type AsyncOriginCron added in v0.359.0

type AsyncOriginCron struct {
	CronJobKey model.CronJobKey `parser:"'cron' ':' @(~EOF)+"`
}

AsyncOriginCron represents the context for the originator of a cron async call.

It is in the form cron:<module>.<verb>

func (AsyncOriginCron) Origin added in v0.359.0

func (a AsyncOriginCron) Origin() string

func (AsyncOriginCron) String added in v0.359.0

func (a AsyncOriginCron) String() string

type AsyncOriginFSM added in v0.226.0

type AsyncOriginFSM struct {
	FSM schema.RefKey `parser:"'fsm' ':' @@"`
	Key string        `parser:"':' @(~EOF)+"`
}

AsyncOriginFSM represents the context for the originator of an FSM async call.

It is in the form fsm:<module>.<name>:<key>

func (AsyncOriginFSM) Origin added in v0.226.0

func (a AsyncOriginFSM) Origin() string

func (AsyncOriginFSM) String added in v0.226.0

func (a AsyncOriginFSM) String() string

type AsyncOriginPubSub added in v0.241.0

type AsyncOriginPubSub struct {
	Subscription schema.RefKey `parser:"'sub' ':' @@"`
}

AsyncOriginPubSub represents the context for the originator of an PubSub async call.

It is in the form fsm:<module>.<subscription_name>

func (AsyncOriginPubSub) Origin added in v0.241.0

func (a AsyncOriginPubSub) Origin() string

func (AsyncOriginPubSub) String added in v0.241.0

func (a AsyncOriginPubSub) String() string

type Controller

type Controller struct {
	Key      model.ControllerKey
	Endpoint string
	State    ControllerState
}

type ControllerState

type ControllerState string

type DAL

type DAL struct {
	*libdal.Handle[DAL]

	// DeploymentChanges is a Topic that receives changes to the deployments table.
	DeploymentChanges *pubsub.Topic[DeploymentNotification]
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *DAL

func (*DAL) AcquireAsyncCall added in v0.191.0

func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx context.Context, err error)

AcquireAsyncCall acquires a pending async call to execute.

Returns ErrNotFound if there are no async calls to acquire.

func (*DAL) AcquireFSMInstance added in v0.226.0

func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)

AcquireFSMInstance returns an FSM instance, also acquiring a lease on it.

The lease must be released by the caller.

func (*DAL) CompleteAsyncCall added in v0.194.0

func (d *DAL) CompleteAsyncCall(ctx context.Context,
	call *AsyncCall,
	result either.Either[[]byte, string],
	finalise func(tx *DAL, isFinalResult bool) error) (didScheduleAnotherCall bool, err error)

CompleteAsyncCall completes an async call. The call will use the existing transaction if d is a transaction. Otherwise it will create and commit a new transaction.

"result" is either a []byte representing the successful response, or a string representing a failure message.

func (*DAL) CompleteEventForSubscription added in v0.241.0

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error

func (*DAL) CreateArtefact

func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error)

CreateArtefact inserts a new artefact into the database and returns its ID.

func (*DAL) CreateDeployment

func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)

CreateDeployment (possibly) creates a new deployment and associates previously created artefacts with it.

If an existing deployment with identical artefacts exists, it is returned.

func (*DAL) CreateRequest added in v0.167.0

func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr string) error

func (*DAL) DeregisterRunner

func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error

DeregisterRunner deregisters the given runner.

func (*DAL) FailFSMInstance added in v0.226.0

func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error

func (*DAL) FinishFSMTransition added in v0.226.0

func (d *DAL) FinishFSMTransition(ctx context.Context, instance *FSMInstance) (*FSMInstance, error)

FinishFSMTransition marks an FSM transition as completed

func (*DAL) GetActiveControllers added in v0.177.2

func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error)

func (*DAL) GetActiveDeploymentSchemas added in v0.79.3

func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error)

func (*DAL) GetActiveDeployments

func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error)

GetActiveDeployments returns all active deployments.

func (*DAL) GetActiveRunners

func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error)

func (*DAL) GetActiveSchema added in v0.276.3

func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error)

GetActiveSchema returns the schema for all active deployments.

func (*DAL) GetDeployment

func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)

func (*DAL) GetDeploymentsWithMinReplicas added in v0.163.10

func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, error)

func (*DAL) GetFSMStates added in v0.337.0

func (d *DAL) GetFSMStates(ctx context.Context, fsm schema.RefKey, instanceKey string) (currentState, destinationState optional.Option[schema.RefKey], err error)

func (*DAL) GetIngressRoutes

func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error)

func (*DAL) GetMissingArtefacts

func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)

GetMissingArtefacts returns the digests of the artefacts that are missing from the database.

func (*DAL) GetProcessList

func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error)

GetProcessList returns a list of all "processes".

func (*DAL) GetRoutingTable

func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string][]Route, error)

GetRoutingTable returns the endpoints for all runners for the given modules, or all routes if modules is empty.

Returns route map keyed by module.

func (*DAL) GetRunner

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error)

func (*DAL) GetRunnersForDeployment

func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]Runner, error)

func (*DAL) GetStatus

func (d *DAL) GetStatus(ctx context.Context) (Status, error)

func (*DAL) GetSubscriptionsNeedingUpdate added in v0.241.0

func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error)

func (*DAL) GetZombieAsyncCalls added in v0.360.0

func (d *DAL) GetZombieAsyncCalls(ctx context.Context, limit int) ([]*AsyncCall, error)

func (*DAL) KillStaleControllers

func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error)

KillStaleControllers deletes controllers that have not had heartbeats for the given duration.

func (*DAL) KillStaleRunners

func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error)

KillStaleRunners deletes runners that have not had heartbeats for the given duration.

func (*DAL) LoadAsyncCall added in v0.194.0

func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error)

func (*DAL) PollDeployments added in v0.296.3

func (d *DAL) PollDeployments(ctx context.Context)

func (*DAL) PopNextFSMEvent added in v0.337.0

func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string) (optional.Option[NextFSMEvent], error)

PopNextFSMEvent returns the next event for an FSM instance, if any, and deletes it.

func (*DAL) ProgressSubscriptions added in v0.244.0

func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error)

func (*DAL) PublishEventForTopic added in v0.239.0

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error

func (*DAL) ReplaceDeployment

func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)

ReplaceDeployment replaces an old deployment of a module with a new deployment.

returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active.

func (*DAL) ResetSubscription added in v0.312.0

func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err error)

ResetSubscription resets the subscription cursor to the topic's head.

func (*DAL) SetDeploymentReplicas

func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error)

SetDeploymentReplicas activates the given deployment.

func (*DAL) SetNextFSMEvent added in v0.337.0

func (d *DAL) SetNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string, nextState schema.RefKey, request json.RawMessage, requestType schema.Type) error

func (*DAL) StartFSMTransition added in v0.226.0

func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, destinationState schema.RefKey, request []byte, encrypted bool, retryParams schema.RetryParams) (err error)

StartFSMTransition sends an event to an executing instance of an FSM.

If the instance doesn't exist a new one will be created.

[name] is the name of the state machine to execute, [executionKey] is the unique identifier for this execution of the FSM.

Returns ErrConflict if the state machine is already executing a transition.

Note: this does not actually call the FSM, it just enqueues an async call for future execution.

Note: no validation of the FSM is performed.

func (*DAL) SucceedFSMInstance added in v0.226.0

func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error

func (*DAL) UpsertController

func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error)

func (*DAL) UpsertModule

func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error)

func (*DAL) UpsertRunner

func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error

UpsertRunner registers or updates a new runner.

ErrConflict will be returned if a runner with the same endpoint and a different key already exists.

type Deployment

type Deployment struct {
	Key         model.DeploymentKey
	Language    string
	Module      string
	MinReplicas int
	Replicas    optional.Option[int] // Depending on the query this may or may not be populated.
	Schema      *schema.Module
	CreatedAt   time.Time
	Labels      model.Labels
}

func (Deployment) String

func (d Deployment) String() string

type DeploymentArtefact

type DeploymentArtefact struct {
	Digest     sha256.SHA256
	Executable bool
	Path       string
}

func DeploymentArtefactFromProto

func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error)

func (*DeploymentArtefact) ToProto

type DeploymentNotification

type DeploymentNotification = Notification[Deployment, model.DeploymentKey, *model.DeploymentKey]

DeploymentNotification is a notification from the database when a deployment changes.

type FSMInstance added in v0.226.0

type FSMInstance struct {
	leases.Lease
	// The FSM that this instance is executing.
	FSM schema.RefKey
	// The unique key for this instance.
	Key              string
	Status           FSMStatus
	CurrentState     optional.Option[schema.RefKey]
	DestinationState optional.Option[schema.RefKey]
}

type FSMStatus added in v0.226.0

type FSMStatus = sql2.FsmStatus

type IngressRoute

type IngressRoute struct {
	Runner     model.RunnerKey
	Deployment model.DeploymentKey
	Endpoint   string
	Path       string
	Module     string
	Verb       string
}

type IngressRouteEntry

type IngressRouteEntry struct {
	Deployment model.DeploymentKey
	Module     string
	Verb       string
	Method     string
	Path       string
}

type IngressRoutingEntry

type IngressRoutingEntry struct {
	Verb   string
	Method string
	Path   string
}

type NextFSMEvent added in v0.337.0

type NextFSMEvent struct {
	DestinationState schema.RefKey
	Request          json.RawMessage
	RequestType      schema.Type
}

type Notification

type Notification[T NotificationPayload, Key any, KeyP interface {
	*Key
	encoding.TextUnmarshaler
}] struct {
	Deleted optional.Option[Key] // If present the object was deleted.
	Message optional.Option[T]
}

A Notification from the database.

func (Notification[T, Key, KeyP]) String

func (n Notification[T, Key, KeyP]) String() string

type NotificationPayload

type NotificationPayload interface {
	// contains filtered or unexported methods
}

NotificationPayload is a row from the database.

type Process

type Process struct {
	Deployment  model.DeploymentKey
	MinReplicas int
	Labels      model.Labels
	Runner      optional.Option[ProcessRunner]
}

type ProcessRunner

type ProcessRunner struct {
	Key      model.RunnerKey
	Endpoint string
	Labels   model.Labels
}

type Reconciliation

type Reconciliation struct {
	Deployment model.DeploymentKey
	Module     string
	Language   string

	AssignedReplicas int
	RequiredReplicas int
}

type RequestOrigin

type RequestOrigin string

type Reservation

type Reservation interface {
	Runner() Runner
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
}

A Reservation of a Runner.

type Route

type Route struct {
	Module     string
	Runner     model.RunnerKey
	Deployment model.DeploymentKey
	Endpoint   string
}

func (Route) String

func (r Route) String() string

type Runner

type Runner struct {
	Key                model.RunnerKey
	Endpoint           string
	ReservationTimeout optional.Option[time.Duration]
	Module             optional.Option[string]
	Deployment         model.DeploymentKey
	Labels             model.Labels
}

type Status

type Status struct {
	Controllers   []Controller
	Runners       []Runner
	Deployments   []Deployment
	IngressRoutes []IngressRouteEntry
	Routes        []Route
}

Directories

Path Synopsis
internal
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL