dal

package
v0.287.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package dal provides a data abstraction layer for the Controller

Index

Constants

View Source
const (
	RunnerStateIdle     = RunnerState(sql.RunnerStateIdle)
	RunnerStateReserved = RunnerState(sql.RunnerStateReserved)
	RunnerStateAssigned = RunnerState(sql.RunnerStateAssigned)
	RunnerStateDead     = RunnerState(sql.RunnerStateDead)
)

Runner states.

View Source
const (
	ControllerStateLive = ControllerState(sql.ControllerStateLive)
	ControllerStateDead = ControllerState(sql.ControllerStateDead)
)

Controller states.

View Source
const (
	RequestOriginIngress = RequestOrigin(sql.OriginIngress)
	RequestOriginCron    = RequestOrigin(sql.OriginCron)
	RequestOriginPubsub  = RequestOrigin(sql.OriginPubsub)
)
View Source
const (
	EventTypeLog               = sql.EventTypeLog
	EventTypeCall              = sql.EventTypeCall
	EventTypeDeploymentCreated = sql.EventTypeDeploymentCreated
	EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated
)

Supported event types.

View Source
const (
	FSMStatusRunning   = sql.FsmStatusRunning
	FSMStatusCompleted = sql.FsmStatusCompleted
	FSMStatusFailed    = sql.FsmStatusFailed
)

Variables

This section is empty.

Functions

func NewQTx added in v0.281.3

func NewQTx(pool sql.ConnI, tx pgx.Tx) *sql.Queries

func WithReservation

func WithReservation(ctx context.Context, reservation Reservation, fn func() error) error

Types

type AsyncCall added in v0.194.0

type AsyncCall struct {
	*Lease      // May be nil
	ID          int64
	Origin      AsyncOrigin
	Verb        schema.RefKey
	Request     json.RawMessage
	ScheduledAt time.Time

	RemainingAttempts int32
	Backoff           time.Duration
	MaxBackoff        time.Duration
}

type AsyncOrigin added in v0.226.0

type AsyncOrigin interface {

	// Origin returns the origin type.
	Origin() string
	String() string
	// contains filtered or unexported methods
}

AsyncOrigin is a sum type representing the originator of an async call.

This is used to determine how to handle the result of the async call.

func ParseAsyncOrigin added in v0.226.0

func ParseAsyncOrigin(origin string) (AsyncOrigin, error)

ParseAsyncOrigin parses an async origin key.

type AsyncOriginFSM added in v0.226.0

type AsyncOriginFSM struct {
	FSM schema.RefKey `parser:"'fsm' ':' @@"`
	Key string        `parser:"':' @(~EOF)+"`
}

AsyncOriginFSM represents the context for the originator of an FSM async call.

It is in the form fsm:<module>.<name>:<key>

func (AsyncOriginFSM) Origin added in v0.226.0

func (a AsyncOriginFSM) Origin() string

func (AsyncOriginFSM) String added in v0.226.0

func (a AsyncOriginFSM) String() string

type AsyncOriginPubSub added in v0.241.0

type AsyncOriginPubSub struct {
	Subscription schema.RefKey `parser:"'sub' ':' @@"`
}

AsyncOriginPubSub represents the context for the originator of an PubSub async call.

It is in the form fsm:<module>.<subscription_name>

func (AsyncOriginPubSub) Origin added in v0.241.0

func (a AsyncOriginPubSub) Origin() string

func (AsyncOriginPubSub) String added in v0.241.0

func (a AsyncOriginPubSub) String() string

type CallEvent

type CallEvent struct {
	ID            int64
	DeploymentKey model.DeploymentKey
	RequestKey    optional.Option[model.RequestKey]
	Time          time.Time
	SourceVerb    optional.Option[schema.Ref]
	DestVerb      schema.Ref
	Duration      time.Duration
	Request       []byte
	Response      []byte
	Error         optional.Option[string]
	Stack         optional.Option[string]
}

func (*CallEvent) GetID

func (e *CallEvent) GetID() int64

type Controller

type Controller struct {
	Key      model.ControllerKey
	Endpoint string
	State    ControllerState
}

type ControllerState

type ControllerState string

type DAL

type DAL struct {

	// DeploymentChanges is a Topic that receives changes to the deployments table.
	DeploymentChanges *pubsub.Topic[DeploymentNotification]
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, pool *pgxpool.Pool) (*DAL, error)

func (*DAL) AcquireAsyncCall added in v0.191.0

func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)

AcquireAsyncCall acquires a pending async call to execute.

Returns ErrNotFound if there are no async calls to acquire.

func (*DAL) AcquireFSMInstance added in v0.226.0

func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error)

AcquireFSMInstance returns an FSM instance, also acquiring a lease on it.

The lease must be released by the caller.

func (*DAL) AcquireLease added in v0.186.0

func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duration, metadata optional.Option[any]) (leases.Lease, context.Context, error)

AcquireLease acquires a lease for the given key.

Will return leases.ErrConflict (not dalerrs.ErrConflict) if the lease is already held by another controller.

The returned context will be cancelled when the lease fails to renew.

func (*DAL) Begin added in v0.217.9

func (d *DAL) Begin(ctx context.Context) (*Tx, error)

func (*DAL) CompleteAsyncCall added in v0.194.0

func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], finalise func(tx *Tx) error) (err error)

CompleteAsyncCall completes an async call.

"result" is either a []byte representing the successful response, or a string representing a failure message.

func (*DAL) CompleteEventForSubscription added in v0.241.0

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error

func (*DAL) CreateArtefact

func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error)

CreateArtefact inserts a new artefact into the database and returns its ID.

func (*DAL) CreateDeployment

func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)

CreateDeployment (possibly) creates a new deployment and associates previously created artefacts with it.

If an existing deployment with identical artefacts exists, it is returned.

func (*DAL) CreateRequest added in v0.167.0

func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr string) error

func (*DAL) DeregisterRunner

func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error

DeregisterRunner deregisters the given runner.

func (*DAL) ExpireLeases added in v0.186.0

func (d *DAL) ExpireLeases(ctx context.Context) error

ExpireLeases expires (deletes) all leases that have expired.

func (*DAL) ExpireRunnerClaims

func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error)

func (*DAL) FailFSMInstance added in v0.226.0

func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error

func (*DAL) FinishFSMTransition added in v0.226.0

func (d *DAL) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string) error

func (*DAL) GetActiveControllers added in v0.177.2

func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error)

func (*DAL) GetActiveDeploymentSchemas added in v0.79.3

func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error)

func (*DAL) GetActiveDeployments

func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error)

GetActiveDeployments returns all active deployments.

func (*DAL) GetActiveRunners

func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error)

func (*DAL) GetDeployment

func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*model.Deployment, error)

func (*DAL) GetDeploymentsNeedingReconciliation

func (d *DAL) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]Reconciliation, error)

GetDeploymentsNeedingReconciliation returns deployments that have a mismatch between the number of assigned and required replicas.

func (*DAL) GetDeploymentsWithMinReplicas added in v0.163.10

func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, error)

func (*DAL) GetIdleRunners

func (d *DAL) GetIdleRunners(ctx context.Context, limit int, labels model.Labels) ([]Runner, error)

GetIdleRunners returns up to limit idle runners matching the given labels.

"labels" is a single level map of key-value pairs. Values may be scalar or lists of scalars. If a value is a list, it will match the labels if all the values in the list match.

e.g. {"languages": ["kotlin"], "arch": "arm64"}' will match a runner with the labels '{"languages": ["go", "kotlin"], "os": "linux", "arch": "amd64", "pid": 1234}'

If no runners are available, it will return an empty slice.

func (*DAL) GetIngressRoutes

func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error)

func (*DAL) GetLeaseInfo added in v0.263.0

func (d *DAL) GetLeaseInfo(ctx context.Context, key leases.Key, metadata any) (expiry time.Time, err error)

GetLeaseInfo returns the metadata and expiry time for the lease with the given key.

metadata should be a pointer to the type that metadata should be unmarshaled into.

func (*DAL) GetMissingArtefacts

func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error)

GetMissingArtefacts returns the digests of the artefacts that are missing from the database.

func (*DAL) GetProcessList

func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error)

GetProcessList returns a list of all "processes".

func (*DAL) GetRoutingTable

func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string][]Route, error)

GetRoutingTable returns the endpoints for all runners for the given modules, or all routes if modules is empty.

Returns route map keyed by module.

func (*DAL) GetRunner

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error)

func (*DAL) GetRunnerState

func (d *DAL) GetRunnerState(ctx context.Context, runnerKey model.RunnerKey) (RunnerState, error)

func (*DAL) GetRunnersForDeployment

func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]Runner, error)

func (*DAL) GetStatus

func (d *DAL) GetStatus(ctx context.Context) (Status, error)

func (*DAL) GetSubscriptionsNeedingUpdate added in v0.241.0

func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error)

func (*DAL) InsertCallEvent

func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error

func (*DAL) InsertLogEvent

func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error

func (*DAL) KillStaleControllers

func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error)

KillStaleControllers deletes controllers that have not had heartbeats for the given duration.

func (*DAL) KillStaleRunners

func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error)

KillStaleRunners deletes runners that have not had heartbeats for the given duration.

func (*DAL) LoadAsyncCall added in v0.194.0

func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error)

func (*DAL) ProgressSubscriptions added in v0.244.0

func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error)

func (*DAL) PublishEventForTopic added in v0.239.0

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error

func (*DAL) QueryEvents

func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter) ([]Event, error)

func (*DAL) ReplaceDeployment

func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)

ReplaceDeployment replaces an old deployment of a module with a new deployment.

func (*DAL) ReserveRunnerForDeployment

func (d *DAL) ReserveRunnerForDeployment(ctx context.Context, deployment model.DeploymentKey, reservationTimeout time.Duration, labels model.Labels) (Reservation, error)

ReserveRunnerForDeployment reserves a runner for the given deployment.

It returns a Reservation that must be committed or rolled back.

func (*DAL) SetDeploymentReplicas

func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) error

SetDeploymentReplicas activates the given deployment.

func (*DAL) StartFSMTransition added in v0.226.0

func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, destinationState schema.RefKey, request json.RawMessage, retryParams schema.RetryParams) (err error)

StartFSMTransition sends an event to an executing instance of an FSM.

If the instance doesn't exist a new one will be created.

[name] is the name of the state machine to execute, [executionKey] is the unique identifier for this execution of the FSM.

Returns ErrConflict if the state machine is already executing a transition.

Note: this does not actually call the FSM, it just enqueues an async call for future execution.

Note: no validation of the FSM is performed.

func (*DAL) SucceedFSMInstance added in v0.226.0

func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error

func (*DAL) UpsertController

func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error)

func (*DAL) UpsertModule

func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error)

func (*DAL) UpsertRunner

func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error

UpsertRunner registers or updates a new runner.

ErrConflict will be returned if a runner with the same endpoint and a different key already exists.

type Deployment

type Deployment struct {
	Key         model.DeploymentKey
	Language    string
	Module      string
	MinReplicas int
	Replicas    optional.Option[int] // Depending on the query this may or may not be populated.
	Schema      *schema.Module
	CreatedAt   time.Time
	Labels      model.Labels
}

func (Deployment) String

func (d Deployment) String() string

type DeploymentArtefact

type DeploymentArtefact struct {
	Digest     sha256.SHA256
	Executable bool
	Path       string
}

func DeploymentArtefactFromProto

func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error)

func (*DeploymentArtefact) ToProto

type DeploymentCreatedEvent

type DeploymentCreatedEvent struct {
	ID                 int64
	DeploymentKey      model.DeploymentKey
	Time               time.Time
	Language           string
	ModuleName         string
	MinReplicas        int
	ReplacedDeployment optional.Option[model.DeploymentKey]
}

func (*DeploymentCreatedEvent) GetID

func (e *DeploymentCreatedEvent) GetID() int64

type DeploymentNotification

type DeploymentNotification = Notification[Deployment, model.DeploymentKey, *model.DeploymentKey]

DeploymentNotification is a notification from the database when a deployment changes.

type DeploymentUpdatedEvent

type DeploymentUpdatedEvent struct {
	ID              int64
	DeploymentKey   model.DeploymentKey
	Time            time.Time
	MinReplicas     int
	PrevMinReplicas int
}

func (*DeploymentUpdatedEvent) GetID

func (e *DeploymentUpdatedEvent) GetID() int64

type Event

type Event interface {
	GetID() int64
	// contains filtered or unexported methods
}

Event types.

type EventFilter

type EventFilter func(query *eventFilter)

func FilterCall

func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) EventFilter

FilterCall filters call events between the given modules.

May be called multiple times.

func FilterDeployments

func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter

func FilterDescending

func FilterDescending() EventFilter

FilterDescending returns events in descending order.

func FilterIDRange

func FilterIDRange(higherThan, lowerThan int64) EventFilter

FilterIDRange filters events between the given IDs, inclusive.

func FilterLogLevel

func FilterLogLevel(level log.Level) EventFilter

func FilterRequests

func FilterRequests(requestKeys ...model.RequestKey) EventFilter

func FilterTimeRange

func FilterTimeRange(olderThan, newerThan time.Time) EventFilter

FilterTimeRange filters events between the given times, inclusive.

Either maybe be zero to indicate no upper or lower bound.

func FilterTypes

func FilterTypes(types ...sql.EventType) EventFilter

type EventType

type EventType = sql.EventType

type FSMInstance added in v0.226.0

type FSMInstance struct {
	leases.Lease
	// The FSM that this instance is executing.
	FSM schema.RefKey
	// The unique key for this instance.
	Key              string
	Status           FSMStatus
	CurrentState     optional.Option[schema.RefKey]
	DestinationState optional.Option[schema.RefKey]
}

type FSMStatus added in v0.226.0

type FSMStatus = sql.FsmStatus

type IngressRoute

type IngressRoute struct {
	Runner     model.RunnerKey
	Deployment model.DeploymentKey
	Endpoint   string
	Path       string
	Module     string
	Verb       string
}

type IngressRouteEntry

type IngressRouteEntry struct {
	Deployment model.DeploymentKey
	Module     string
	Verb       string
	Method     string
	Path       string
}

type IngressRoutingEntry

type IngressRoutingEntry struct {
	Verb   string
	Method string
	Path   string
}

type Lease added in v0.186.0

type Lease struct {
	// contains filtered or unexported fields
}

Lease represents a lease that is held by a controller.

func (*Lease) Release added in v0.186.0

func (l *Lease) Release() error

func (*Lease) String added in v0.186.0

func (l *Lease) String() string

type LogEvent

type LogEvent struct {
	ID            int64
	DeploymentKey model.DeploymentKey
	RequestKey    optional.Option[model.RequestKey]
	Time          time.Time
	Level         int32
	Attributes    map[string]string
	Message       string
	Error         optional.Option[string]
	Stack         optional.Option[string]
}

func (*LogEvent) GetID

func (e *LogEvent) GetID() int64

type Notification

type Notification[T NotificationPayload, Key any, KeyP interface {
	*Key
	encoding.TextUnmarshaler
}] struct {
	Deleted optional.Option[Key] // If present the object was deleted.
	Message optional.Option[T]
}

A Notification from the database.

func (Notification[T, Key, KeyP]) String

func (n Notification[T, Key, KeyP]) String() string

type NotificationPayload

type NotificationPayload interface {
	// contains filtered or unexported methods
}

NotificationPayload is a row from the database.

type Process

type Process struct {
	Deployment  model.DeploymentKey
	MinReplicas int
	Labels      model.Labels
	Runner      optional.Option[ProcessRunner]
}

type ProcessRunner

type ProcessRunner struct {
	Key      model.RunnerKey
	Endpoint string
	Labels   model.Labels
}

type Reconciliation

type Reconciliation struct {
	Deployment model.DeploymentKey
	Module     string
	Language   string

	AssignedReplicas int
	RequiredReplicas int
}

type RequestOrigin

type RequestOrigin string

type Reservation

type Reservation interface {
	Runner() Runner
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
}

A Reservation of a Runner.

type Route

type Route struct {
	Module     string
	Runner     model.RunnerKey
	Deployment model.DeploymentKey
	Endpoint   string
}

func (Route) String

func (r Route) String() string

type Runner

type Runner struct {
	Key                model.RunnerKey
	Endpoint           string
	State              RunnerState
	ReservationTimeout optional.Option[time.Duration]
	Module             optional.Option[string]
	// Assigned deployment key, if any.
	Deployment optional.Option[model.DeploymentKey]
	Labels     model.Labels
}

type RunnerState

type RunnerState string

func RunnerStateFromProto

func RunnerStateFromProto(state ftlv1.RunnerState) RunnerState

func (RunnerState) ToProto

func (s RunnerState) ToProto() ftlv1.RunnerState

type Status

type Status struct {
	Controllers   []Controller
	Runners       []Runner
	Deployments   []Deployment
	IngressRoutes []IngressRouteEntry
	Routes        []Route
}

type Tx added in v0.217.9

type Tx struct {
	*DAL
}

Tx is DAL within a transaction.

func (*Tx) Commit added in v0.217.9

func (t *Tx) Commit(ctx context.Context) error

func (*Tx) CommitOrRollback added in v0.217.9

func (t *Tx) CommitOrRollback(ctx context.Context, err *error)

CommitOrRollback can be used in a defer statement to commit or rollback a transaction depending on whether the enclosing function returned an error.

func myFunc() (err error) {
  tx, err := dal.Begin(ctx)
  if err != nil { return err }
  defer tx.CommitOrRollback(ctx, &err)
  ...
}

func (*Tx) Rollback added in v0.217.9

func (t *Tx) Rollback(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL