sql

package
v0.19.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Artefact

type Artefact struct {
	ID        int64
	CreatedAt time.Time
	Digest    []byte
	Content   []byte
}

type AssociateArtefactWithDeploymentParams

type AssociateArtefactWithDeploymentParams struct {
	Name       model.DeploymentName
	ArtefactID int64
	Executable bool
	Path       string
}

type Controller

type Controller struct {
	ID       int64
	Key      model.ControllerKey
	Created  time.Time
	LastSeen time.Time
	State    ControllerState
	Endpoint string
}

type ControllerState

type ControllerState string
const (
	ControllerStateLive ControllerState = "live"
	ControllerStateDead ControllerState = "dead"
)

func (*ControllerState) Scan

func (e *ControllerState) Scan(src interface{}) error

type CreateIngressRouteParams

type CreateIngressRouteParams struct {
	Name   model.DeploymentName
	Module string
	Verb   string
	Method string
	Path   string
}

type DB

type DB struct {
	*Queries
	// contains filtered or unexported fields
}

func NewDB

func NewDB(conn DBI) *DB

func (*DB) Begin

func (d *DB) Begin(ctx context.Context) (*Tx, error)

func (*DB) Conn

func (d *DB) Conn() DBI

type DBI

type DBI interface {
	DBTX
	Begin(ctx context.Context) (pgx.Tx, error)
}

type DBTX

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
}

type Deployment

type Deployment struct {
	ID          int64
	CreatedAt   time.Time
	ModuleID    int64
	Name        model.DeploymentName
	Schema      []byte
	Labels      []byte
	MinReplicas int32
}

type DeploymentArtefact

type DeploymentArtefact struct {
	ArtefactID   int64
	DeploymentID int64
	CreatedAt    time.Time
	Executable   bool
	Path         string
}

type Event

type Event struct {
	ID           int64
	TimeStamp    time.Time
	DeploymentID int64
	RequestID    types.Option[int64]
	Type         EventType
	CustomKey1   types.Option[string]
	CustomKey2   types.Option[string]
	CustomKey3   types.Option[string]
	CustomKey4   types.Option[string]
	Payload      json.RawMessage
}

type EventType

type EventType string
const (
	EventTypeCall       EventType = "call"
	EventTypeLog        EventType = "log"
	EventTypeDeployment EventType = "deployment"
)

func (*EventType) Scan

func (e *EventType) Scan(src interface{}) error

type GetActiveRunnersRow

type GetActiveRunnersRow struct {
	RunnerKey      sqltypes.Key
	Endpoint       string
	State          RunnerState
	Labels         []byte
	LastSeen       time.Time
	ModuleName     types.Option[string]
	DeploymentName interface{}
}

type GetAllIngressRoutesRow

type GetAllIngressRoutesRow struct {
	DeploymentName model.DeploymentName
	Module         string
	Verb           string
	Method         string
	Path           string
}

type GetArtefactDigestsRow

type GetArtefactDigestsRow struct {
	ID     int64
	Digest []byte
}

type GetDeploymentArtefactsRow

type GetDeploymentArtefactsRow struct {
	CreatedAt    time.Time
	ID           int64
	Executable   bool
	Path         string
	Digest       []byte
	Executable_2 bool
}

type GetDeploymentRow

type GetDeploymentRow struct {
	Deployment Deployment
	Language   string
	ModuleName string
}

type GetDeploymentsNeedingReconciliationRow

type GetDeploymentsNeedingReconciliationRow struct {
	DeploymentName       model.DeploymentName
	ModuleName           string
	Language             string
	AssignedRunnersCount int64
	RequiredRunnersCount int64
}

type GetDeploymentsRow

type GetDeploymentsRow struct {
	Deployment Deployment
	ModuleName string
	Language   string
}

type GetDeploymentsWithArtefactsRow

type GetDeploymentsWithArtefactsRow struct {
	ID             int64
	CreatedAt      time.Time
	DeploymentName model.DeploymentName
	ModuleName     string
}

type GetExistingDeploymentForModuleRow

type GetExistingDeploymentForModuleRow struct {
	ID          int64
	CreatedAt   time.Time
	ModuleID    int64
	Name        model.DeploymentName
	Schema      []byte
	Labels      []byte
	MinReplicas int32
	ID_2        int64
	Language    string
	Name_2      string
}

type GetIngressRoutesRow

type GetIngressRoutesRow struct {
	RunnerKey sqltypes.Key
	Endpoint  string
	Module    string
	Verb      string
}

type GetProcessListRow added in v0.14.0

type GetProcessListRow struct {
	MinReplicas      int32
	DeploymentName   model.DeploymentName
	DeploymentLabels []byte
	RunnerKey        sqltypes.NullKey
	Endpoint         types.Option[string]
	RunnerLabels     []byte
}

type GetRouteForRunnerRow

type GetRouteForRunnerRow struct {
	Endpoint       string
	RunnerKey      sqltypes.Key
	ModuleName     types.Option[string]
	DeploymentName model.DeploymentName
	State          RunnerState
}

type GetRoutingTableRow

type GetRoutingTableRow struct {
	Endpoint       string
	RunnerKey      sqltypes.Key
	ModuleName     types.Option[string]
	DeploymentName model.DeploymentName
}

type GetRunnerRow

type GetRunnerRow struct {
	RunnerKey      sqltypes.Key
	Endpoint       string
	State          RunnerState
	Labels         []byte
	LastSeen       time.Time
	ModuleName     types.Option[string]
	DeploymentName interface{}
}

type GetRunnersForDeploymentRow

type GetRunnersForDeploymentRow struct {
	ID                 int64
	Key                sqltypes.Key
	Created            time.Time
	LastSeen           time.Time
	ReservationTimeout sqltypes.NullTime
	State              RunnerState
	Endpoint           string
	ModuleName         types.Option[string]
	DeploymentID       types.Option[int64]
	Labels             []byte
	ID_2               int64
	CreatedAt          time.Time
	ModuleID           int64
	Name               model.DeploymentName
	Schema             []byte
	Labels_2           []byte
	MinReplicas        int32
}

type IngressRoute

type IngressRoute struct {
	Method       string
	Path         string
	DeploymentID int64
	Module       string
	Verb         string
}

type InsertCallEventParams

type InsertCallEventParams struct {
	DeploymentName string
	RequestName    types.Option[string]
	TimeStamp      time.Time
	SourceModule   types.Option[string]
	SourceVerb     types.Option[string]
	DestModule     string
	DestVerb       string
	DurationMs     int64
	Request        []byte
	Response       []byte
	Error          types.Option[string]
}

type InsertDeploymentEventParams

type InsertDeploymentEventParams struct {
	DeploymentName string
	Type           string
	Language       string
	ModuleName     string
	MinReplicas    int32
	Replaced       types.Option[string]
}

type InsertEventParams

type InsertEventParams struct {
	DeploymentID int64
	RequestID    types.Option[int64]
	Type         EventType
	CustomKey1   types.Option[string]
	CustomKey3   types.Option[string]
	CustomKey4   types.Option[string]
	Payload      json.RawMessage
}

type InsertLogEventParams

type InsertLogEventParams struct {
	DeploymentName model.DeploymentName
	RequestName    types.Option[string]
	TimeStamp      time.Time
	Level          int32
	Message        string
	Attributes     []byte
	Error          types.Option[string]
}

type Module

type Module struct {
	ID       int64
	Language string
	Name     string
}

type NullControllerState

type NullControllerState struct {
	ControllerState ControllerState
	Valid           bool // Valid is true if ControllerState is not NULL
}

func (*NullControllerState) Scan

func (ns *NullControllerState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullControllerState) Value

func (ns NullControllerState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullEventType

type NullEventType struct {
	EventType EventType
	Valid     bool // Valid is true if EventType is not NULL
}

func (*NullEventType) Scan

func (ns *NullEventType) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullEventType) Value

func (ns NullEventType) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullOrigin added in v0.12.0

type NullOrigin struct {
	Origin Origin
	Valid  bool // Valid is true if Origin is not NULL
}

func (*NullOrigin) Scan added in v0.12.0

func (ns *NullOrigin) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullOrigin) Value added in v0.12.0

func (ns NullOrigin) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullRunnerState

type NullRunnerState struct {
	RunnerState RunnerState
	Valid       bool // Valid is true if RunnerState is not NULL
}

func (*NullRunnerState) Scan

func (ns *NullRunnerState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullRunnerState) Value

func (ns NullRunnerState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type Origin added in v0.12.0

type Origin string
const (
	OriginIngress Origin = "ingress"
	OriginCron    Origin = "cron"
	OriginPubsub  Origin = "pubsub"
)

func (*Origin) Scan added in v0.12.0

func (e *Origin) Scan(src interface{}) error

type Querier

type Querier interface {
	AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error
	// Create a new artefact and return the artefact ID.
	CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error)
	CreateDeployment(ctx context.Context, name model.DeploymentName, moduleName string, schema []byte) error
	CreateIngressRequest(ctx context.Context, origin Origin, name string, sourceAddr string) error
	CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error
	DeregisterRunner(ctx context.Context, key sqltypes.Key) (int64, error)
	ExpireRunnerReservations(ctx context.Context) (int64, error)
	GetActiveRunners(ctx context.Context, all bool) ([]GetActiveRunnersRow, error)
	GetAllIngressRoutes(ctx context.Context, all bool) ([]GetAllIngressRoutesRow, error)
	GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error)
	// Return the digests that exist in the database.
	GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error)
	GetControllers(ctx context.Context, all bool) ([]Controller, error)
	GetDeployment(ctx context.Context, name model.DeploymentName) (GetDeploymentRow, error)
	// Get all artefacts matching the given digests.
	GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error)
	GetDeployments(ctx context.Context, all bool) ([]GetDeploymentsRow, error)
	GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error)
	// Get deployments that have a mismatch between the number of assigned and required replicas.
	GetDeploymentsNeedingReconciliation(ctx context.Context) ([]GetDeploymentsNeedingReconciliationRow, error)
	// Get all deployments that have artefacts matching the given digests.
	GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, count interface{}) ([]GetDeploymentsWithArtefactsRow, error)
	GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error)
	GetIdleRunners(ctx context.Context, labels []byte, limit int32) ([]Runner, error)
	// Get the runner endpoints corresponding to the given ingress route.
	GetIngressRoutes(ctx context.Context, method string, path string) ([]GetIngressRoutesRow, error)
	GetModulesByID(ctx context.Context, ids []int64) ([]Module, error)
	GetProcessList(ctx context.Context) ([]GetProcessListRow, error)
	// Retrieve routing information for a runner.
	GetRouteForRunner(ctx context.Context, key sqltypes.Key) (GetRouteForRunnerRow, error)
	GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error)
	GetRunner(ctx context.Context, key sqltypes.Key) (GetRunnerRow, error)
	GetRunnerState(ctx context.Context, key sqltypes.Key) (RunnerState, error)
	GetRunnersForDeployment(ctx context.Context, name model.DeploymentName) ([]GetRunnersForDeploymentRow, error)
	InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error
	InsertDeploymentEvent(ctx context.Context, arg InsertDeploymentEventParams) error
	InsertEvent(ctx context.Context, arg InsertEventParams) error
	InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error
	// Mark any controller entries that haven't been updated recently as dead.
	KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error)
	KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error)
	ReplaceDeployment(ctx context.Context, oldDeployment string, newDeployment string, minReplicas int32) (int64, error)
	// Find an idle runner and reserve it for the given deployment.
	ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentName model.DeploymentName, labels []byte) (Runner, error)
	SetDeploymentDesiredReplicas(ctx context.Context, name model.DeploymentName, minReplicas int32) error
	UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error)
	UpsertModule(ctx context.Context, language string, name string) (int64, error)
	// Upsert a runner and return the deployment ID that it is assigned to, if any.
	// If the deployment name is null, then deployment_rel.id will be null,
	// otherwise we try to retrieve the deployments.id using the key. If
	// there is no corresponding deployment, then the deployment ID is -1
	// and the parent statement will fail due to a foreign key constraint.
	UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (types.Option[int64], error)
}

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) AssociateArtefactWithDeployment

func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error

func (*Queries) CreateArtefact

func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error)

Create a new artefact and return the artefact ID.

func (*Queries) CreateDeployment

func (q *Queries) CreateDeployment(ctx context.Context, name model.DeploymentName, moduleName string, schema []byte) error

func (*Queries) CreateIngressRequest

func (q *Queries) CreateIngressRequest(ctx context.Context, origin Origin, name string, sourceAddr string) error

func (*Queries) CreateIngressRoute

func (q *Queries) CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error

func (*Queries) DeregisterRunner

func (q *Queries) DeregisterRunner(ctx context.Context, key sqltypes.Key) (int64, error)

func (*Queries) ExpireRunnerReservations

func (q *Queries) ExpireRunnerReservations(ctx context.Context) (int64, error)

func (*Queries) GetActiveRunners

func (q *Queries) GetActiveRunners(ctx context.Context, all bool) ([]GetActiveRunnersRow, error)

func (*Queries) GetAllIngressRoutes

func (q *Queries) GetAllIngressRoutes(ctx context.Context, all bool) ([]GetAllIngressRoutesRow, error)

func (*Queries) GetArtefactContentRange

func (q *Queries) GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error)

func (*Queries) GetArtefactDigests

func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error)

Return the digests that exist in the database.

func (*Queries) GetControllers

func (q *Queries) GetControllers(ctx context.Context, all bool) ([]Controller, error)

func (*Queries) GetDeployment

func (q *Queries) GetDeployment(ctx context.Context, name model.DeploymentName) (GetDeploymentRow, error)

func (*Queries) GetDeploymentArtefacts

func (q *Queries) GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error)

Get all artefacts matching the given digests.

func (*Queries) GetDeployments

func (q *Queries) GetDeployments(ctx context.Context, all bool) ([]GetDeploymentsRow, error)

func (*Queries) GetDeploymentsByID

func (q *Queries) GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error)

func (*Queries) GetDeploymentsNeedingReconciliation

func (q *Queries) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]GetDeploymentsNeedingReconciliationRow, error)

Get deployments that have a mismatch between the number of assigned and required replicas.

func (*Queries) GetDeploymentsWithArtefacts

func (q *Queries) GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, count interface{}) ([]GetDeploymentsWithArtefactsRow, error)

Get all deployments that have artefacts matching the given digests.

func (*Queries) GetExistingDeploymentForModule

func (q *Queries) GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error)

func (*Queries) GetIdleRunners

func (q *Queries) GetIdleRunners(ctx context.Context, labels []byte, limit int32) ([]Runner, error)

func (*Queries) GetIngressRoutes

func (q *Queries) GetIngressRoutes(ctx context.Context, method string, path string) ([]GetIngressRoutesRow, error)

Get the runner endpoints corresponding to the given ingress route.

func (*Queries) GetModulesByID

func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error)

func (*Queries) GetProcessList added in v0.14.0

func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, error)

func (*Queries) GetRouteForRunner

func (q *Queries) GetRouteForRunner(ctx context.Context, key sqltypes.Key) (GetRouteForRunnerRow, error)

Retrieve routing information for a runner.

func (*Queries) GetRoutingTable

func (q *Queries) GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error)

func (*Queries) GetRunner

func (q *Queries) GetRunner(ctx context.Context, key sqltypes.Key) (GetRunnerRow, error)

func (*Queries) GetRunnerState

func (q *Queries) GetRunnerState(ctx context.Context, key sqltypes.Key) (RunnerState, error)

func (*Queries) GetRunnersForDeployment

func (q *Queries) GetRunnersForDeployment(ctx context.Context, name model.DeploymentName) ([]GetRunnersForDeploymentRow, error)

func (*Queries) InsertCallEvent

func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error

func (*Queries) InsertDeploymentEvent

func (q *Queries) InsertDeploymentEvent(ctx context.Context, arg InsertDeploymentEventParams) error

func (*Queries) InsertEvent

func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error

func (*Queries) InsertLogEvent

func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error

func (*Queries) KillStaleControllers

func (q *Queries) KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error)

Mark any controller entries that haven't been updated recently as dead.

func (*Queries) KillStaleRunners

func (q *Queries) KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error)

func (*Queries) ReplaceDeployment

func (q *Queries) ReplaceDeployment(ctx context.Context, oldDeployment string, newDeployment string, minReplicas int32) (int64, error)

func (*Queries) ReserveRunner

func (q *Queries) ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentName model.DeploymentName, labels []byte) (Runner, error)

Find an idle runner and reserve it for the given deployment.

func (*Queries) SetDeploymentDesiredReplicas

func (q *Queries) SetDeploymentDesiredReplicas(ctx context.Context, name model.DeploymentName, minReplicas int32) error

func (*Queries) UpsertController

func (q *Queries) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error)

func (*Queries) UpsertModule

func (q *Queries) UpsertModule(ctx context.Context, language string, name string) (int64, error)

func (*Queries) UpsertRunner

func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (types.Option[int64], error)

Upsert a runner and return the deployment ID that it is assigned to, if any. If the deployment name is null, then deployment_rel.id will be null, otherwise we try to retrieve the deployments.id using the key. If there is no corresponding deployment, then the deployment ID is -1 and the parent statement will fail due to a foreign key constraint.

func (*Queries) WithTx

func (q *Queries) WithTx(tx pgx.Tx) *Queries

type Request added in v0.12.0

type Request struct {
	ID         int64
	Origin     Origin
	Name       string
	SourceAddr string
}

type Runner

type Runner struct {
	ID                 int64
	Key                sqltypes.Key
	Created            time.Time
	LastSeen           time.Time
	ReservationTimeout sqltypes.NullTime
	State              RunnerState
	Endpoint           string
	ModuleName         types.Option[string]
	DeploymentID       types.Option[int64]
	Labels             []byte
}

type RunnerState

type RunnerState string
const (
	RunnerStateIdle     RunnerState = "idle"
	RunnerStateReserved RunnerState = "reserved"
	RunnerStateAssigned RunnerState = "assigned"
	RunnerStateDead     RunnerState = "dead"
)

func (*RunnerState) Scan

func (e *RunnerState) Scan(src interface{}) error

type Tx

type Tx struct {
	*Queries
	// contains filtered or unexported fields
}

func (*Tx) Commit

func (t *Tx) Commit(ctx context.Context) error

func (*Tx) CommitOrRollback

func (t *Tx) CommitOrRollback(ctx context.Context, err *error)

CommitOrRollback can be used in a defer statement to commit or rollback a transaction depending on whether the enclosing function returned an error.

func myFunc() (err error) {
  tx, err := db.Begin(ctx)
  if err != nil { return err }
  defer tx.CommitOrRollback(ctx, &err)
  ...
}

func (*Tx) Rollback

func (t *Tx) Rollback(ctx context.Context) error

type UpsertRunnerParams

type UpsertRunnerParams struct {
	Key            sqltypes.Key
	Endpoint       string
	State          RunnerState
	Labels         []byte
	DeploymentName types.Option[string]
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL