sql

package
v0.307.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(ctx context.Context, dsn string, logLevel log.Level) error

Migrate the database.

Types

type AcquireAsyncCallRow added in v0.191.0

type AcquireAsyncCallRow struct {
	AsyncCallID         int64
	LeaseIdempotencyKey uuid.UUID
	LeaseKey            leases.Key
	Origin              string
	Verb                schema.RefKey
	Request             []byte
	ScheduledAt         time.Time
	RemainingAttempts   int32
	Backoff             time.Duration
	MaxBackoff          time.Duration
}

type Artefact

type Artefact struct {
	ID        int64
	CreatedAt time.Time
	Digest    []byte
	Content   []byte
}

type AssociateArtefactWithDeploymentParams

type AssociateArtefactWithDeploymentParams struct {
	Key        model.DeploymentKey
	ArtefactID int64
	Executable bool
	Path       string
}

type AsyncCall added in v0.191.0

type AsyncCall struct {
	ID                int64
	CreatedAt         time.Time
	LeaseID           optional.Option[int64]
	Verb              schema.RefKey
	State             AsyncCallState
	Origin            string
	ScheduledAt       time.Time
	Request           []byte
	Response          []byte
	Error             optional.Option[string]
	RemainingAttempts int32
	Backoff           time.Duration
	MaxBackoff        time.Duration
}

type AsyncCallState added in v0.191.0

type AsyncCallState string
const (
	AsyncCallStatePending   AsyncCallState = "pending"
	AsyncCallStateExecuting AsyncCallState = "executing"
	AsyncCallStateSuccess   AsyncCallState = "success"
	AsyncCallStateError     AsyncCallState = "error"
)

func (*AsyncCallState) Scan added in v0.191.0

func (e *AsyncCallState) Scan(src interface{}) error

type ConnI added in v0.217.9

type ConnI interface {
	DBTX
	Begin(ctx context.Context) (pgx.Tx, error)
}

type Controller

type Controller struct {
	ID       int64
	Key      model.ControllerKey
	Created  time.Time
	LastSeen time.Time
	State    ControllerState
	Endpoint string
}

type ControllerState

type ControllerState string
const (
	ControllerStateLive ControllerState = "live"
	ControllerStateDead ControllerState = "dead"
)

func (*ControllerState) Scan

func (e *ControllerState) Scan(src interface{}) error

type CreateAsyncCallParams added in v0.229.0

type CreateAsyncCallParams struct {
	Verb              schema.RefKey
	Origin            string
	Request           []byte
	RemainingAttempts int32
	Backoff           time.Duration
	MaxBackoff        time.Duration
}

type CreateIngressRouteParams

type CreateIngressRouteParams struct {
	Key    model.DeploymentKey
	Module string
	Verb   string
	Method string
	Path   string
}

type CronJob added in v0.167.0

type CronJob struct {
	ID            int64
	Key           model.CronJobKey
	DeploymentID  int64
	Verb          string
	Schedule      string
	StartTime     time.Time
	NextExecution time.Time
	State         model.CronJobState
	ModuleName    string
}

type CronJobState added in v0.167.0

type CronJobState string
const (
	CronJobStateIdle      CronJobState = "idle"
	CronJobStateExecuting CronJobState = "executing"
)

func (*CronJobState) Scan added in v0.167.0

func (e *CronJobState) Scan(src interface{}) error

type DB

type DB struct {
	*Queries
	// contains filtered or unexported fields
}

func NewDB

func NewDB(conn ConnI) *DB

func (*DB) Begin

func (d *DB) Begin(ctx context.Context) (*Tx, error)

func (*DB) Conn

func (d *DB) Conn() ConnI

type DBI

type DBI interface {
	Querier
	Conn() ConnI
	Begin(ctx context.Context) (*Tx, error)
}

type DBTX

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
}

type Deployment

type Deployment struct {
	ID          int64
	CreatedAt   time.Time
	ModuleID    int64
	Key         model.DeploymentKey
	Schema      *schema.Module
	Labels      []byte
	MinReplicas int32
}

type DeploymentArtefact

type DeploymentArtefact struct {
	ArtefactID   int64
	DeploymentID int64
	CreatedAt    time.Time
	Executable   bool
	Path         string
}

type Event

type Event struct {
	ID           int64
	TimeStamp    time.Time
	DeploymentID int64
	RequestID    optional.Option[int64]
	Type         EventType
	CustomKey1   optional.Option[string]
	CustomKey2   optional.Option[string]
	CustomKey3   optional.Option[string]
	CustomKey4   optional.Option[string]
	Payload      json.RawMessage
}

type EventType

type EventType string
const (
	EventTypeCall              EventType = "call"
	EventTypeLog               EventType = "log"
	EventTypeDeploymentCreated EventType = "deployment_created"
	EventTypeDeploymentUpdated EventType = "deployment_updated"
)

func (*EventType) Scan

func (e *EventType) Scan(src interface{}) error

type FailAsyncCallWithRetryParams added in v0.229.0

type FailAsyncCallWithRetryParams struct {
	RemainingAttempts int32
	Backoff           time.Duration
	MaxBackoff        time.Duration
	ScheduledAt       time.Time
	Error             string
	ID                int64
}

type FsmInstance added in v0.226.0

type FsmInstance struct {
	ID               int64
	CreatedAt        time.Time
	Fsm              schema.RefKey
	Key              string
	Status           FsmStatus
	CurrentState     optional.Option[schema.RefKey]
	DestinationState optional.Option[schema.RefKey]
	AsyncCallID      optional.Option[int64]
}

type FsmStatus added in v0.191.0

type FsmStatus string
const (
	FsmStatusRunning   FsmStatus = "running"
	FsmStatusCompleted FsmStatus = "completed"
	FsmStatusFailed    FsmStatus = "failed"
)

func (*FsmStatus) Scan added in v0.191.0

func (e *FsmStatus) Scan(src interface{}) error

type GetActiveDeploymentSchemasRow added in v0.79.3

type GetActiveDeploymentSchemasRow struct {
	Key    model.DeploymentKey
	Schema *schema.Module
}

type GetActiveDeploymentsRow added in v0.79.3

type GetActiveDeploymentsRow struct {
	Deployment Deployment
	ModuleName string
	Language   string
	Replicas   int64
}

type GetActiveIngressRoutesRow added in v0.177.2

type GetActiveIngressRoutesRow struct {
	DeploymentKey model.DeploymentKey
	Module        string
	Verb          string
	Method        string
	Path          string
}

type GetActiveRunnersRow

type GetActiveRunnersRow struct {
	RunnerKey     model.RunnerKey
	Endpoint      string
	State         RunnerState
	Labels        []byte
	LastSeen      time.Time
	ModuleName    optional.Option[string]
	DeploymentKey optional.Option[string]
}

type GetArtefactDigestsRow

type GetArtefactDigestsRow struct {
	ID     int64
	Digest []byte
}

type GetDeploymentArtefactsRow

type GetDeploymentArtefactsRow struct {
	CreatedAt    time.Time
	ID           int64
	Executable   bool
	Path         string
	Digest       []byte
	Executable_2 bool
}

type GetDeploymentRow

type GetDeploymentRow struct {
	Deployment  Deployment
	Language    string
	ModuleName  string
	MinReplicas int32
}

type GetDeploymentsNeedingReconciliationRow

type GetDeploymentsNeedingReconciliationRow struct {
	DeploymentKey        model.DeploymentKey
	ModuleName           string
	Language             string
	AssignedRunnersCount int64
	RequiredRunnersCount int64
}

type GetDeploymentsWithArtefactsRow

type GetDeploymentsWithArtefactsRow struct {
	ID            int64
	CreatedAt     time.Time
	DeploymentKey model.DeploymentKey
	Schema        *schema.Module
	ModuleName    string
}

type GetDeploymentsWithMinReplicasRow added in v0.163.10

type GetDeploymentsWithMinReplicasRow struct {
	Deployment Deployment
	ModuleName string
	Language   string
}

type GetExistingDeploymentForModuleRow

type GetExistingDeploymentForModuleRow struct {
	ID          int64
	CreatedAt   time.Time
	ModuleID    int64
	Key         model.DeploymentKey
	Schema      *schema.Module
	Labels      []byte
	MinReplicas int32
	ID_2        int64
	Language    string
	Name        string
}

type GetIngressRoutesRow

type GetIngressRoutesRow struct {
	RunnerKey     model.RunnerKey
	DeploymentKey model.DeploymentKey
	Endpoint      string
	Path          string
	Module        string
	Verb          string
}

type GetLeaseInfoRow added in v0.263.0

type GetLeaseInfoRow struct {
	ExpiresAt time.Time
	Metadata  []byte
}

type GetNextEventForSubscriptionRow added in v0.241.0

type GetNextEventForSubscriptionRow struct {
	Event     optional.Option[model.TopicEventKey]
	Payload   []byte
	CreatedAt optional.Option[time.Time]
	Ready     bool
}

type GetProcessListRow

type GetProcessListRow struct {
	MinReplicas      int32
	DeploymentKey    model.DeploymentKey
	DeploymentLabels []byte
	RunnerKey        optional.Option[model.RunnerKey]
	Endpoint         optional.Option[string]
	RunnerLabels     []byte
}

type GetRandomSubscriberRow added in v0.245.0

type GetRandomSubscriberRow struct {
	Sink          schema.RefKey
	RetryAttempts int32
	Backoff       time.Duration
	MaxBackoff    time.Duration
}

type GetRouteForRunnerRow

type GetRouteForRunnerRow struct {
	Endpoint      string
	RunnerKey     model.RunnerKey
	ModuleName    optional.Option[string]
	DeploymentKey optional.Option[model.DeploymentKey]
	State         RunnerState
}

type GetRoutingTableRow

type GetRoutingTableRow struct {
	Endpoint      string
	RunnerKey     model.RunnerKey
	ModuleName    optional.Option[string]
	DeploymentKey optional.Option[model.DeploymentKey]
}

type GetRunnerRow

type GetRunnerRow struct {
	RunnerKey     model.RunnerKey
	Endpoint      string
	State         RunnerState
	Labels        []byte
	LastSeen      time.Time
	ModuleName    optional.Option[string]
	DeploymentKey optional.Option[string]
}

type GetRunnersForDeploymentRow

type GetRunnersForDeploymentRow struct {
	ID                 int64
	Key                model.RunnerKey
	Created            time.Time
	LastSeen           time.Time
	ReservationTimeout optional.Option[time.Time]
	State              RunnerState
	Endpoint           string
	ModuleName         optional.Option[string]
	DeploymentID       optional.Option[int64]
	Labels             []byte
	ID_2               int64
	CreatedAt          time.Time
	ModuleID           int64
	Key_2              model.DeploymentKey
	Schema             *schema.Module
	Labels_2           []byte
	MinReplicas        int32
}

type GetSubscriptionsNeedingUpdateRow added in v0.241.0

type GetSubscriptionsNeedingUpdateRow struct {
	Key    model.SubscriptionKey
	Cursor optional.Option[model.TopicEventKey]
	Topic  model.TopicKey
	Name   string
}

type IngressRoute

type IngressRoute struct {
	Method       string
	Path         string
	DeploymentID int64
	Module       string
	Verb         string
}

type InsertCallEventParams

type InsertCallEventParams struct {
	DeploymentKey model.DeploymentKey
	RequestKey    optional.Option[string]
	TimeStamp     time.Time
	SourceModule  optional.Option[string]
	SourceVerb    optional.Option[string]
	DestModule    string
	DestVerb      string
	DurationMs    int64
	Request       []byte
	Response      []byte
	Error         optional.Option[string]
	Stack         optional.Option[string]
}

type InsertDeploymentCreatedEventParams

type InsertDeploymentCreatedEventParams struct {
	DeploymentKey model.DeploymentKey
	Language      string
	ModuleName    string
	MinReplicas   int32
	Replaced      optional.Option[model.DeploymentKey]
}

type InsertDeploymentUpdatedEventParams

type InsertDeploymentUpdatedEventParams struct {
	DeploymentKey   model.DeploymentKey
	Language        string
	ModuleName      string
	PrevMinReplicas int32
	MinReplicas     int32
}

type InsertEventParams

type InsertEventParams struct {
	DeploymentID int64
	RequestID    optional.Option[int64]
	Type         EventType
	CustomKey1   optional.Option[string]
	CustomKey2   optional.Option[string]
	CustomKey3   optional.Option[string]
	CustomKey4   optional.Option[string]
	Payload      json.RawMessage
}

type InsertLogEventParams

type InsertLogEventParams struct {
	DeploymentKey model.DeploymentKey
	RequestKey    optional.Option[string]
	TimeStamp     time.Time
	Level         int32
	Message       string
	Attributes    []byte
	Error         optional.Option[string]
	Stack         optional.Option[string]
}

type InsertSubscriberParams added in v0.233.0

type InsertSubscriberParams struct {
	Key              model.SubscriberKey
	Module           string
	SubscriptionName string
	Deployment       model.DeploymentKey
	Sink             schema.RefKey
	RetryAttempts    int32
	Backoff          time.Duration
	MaxBackoff       time.Duration
}

type Lease added in v0.186.0

type Lease struct {
	ID             int64
	IdempotencyKey uuid.UUID
	Key            leases.Key
	CreatedAt      time.Time
	ExpiresAt      time.Time
	Metadata       []byte
}

type Module

type Module struct {
	ID       int64
	Language string
	Name     string
}

type ModuleConfiguration added in v0.231.0

type ModuleConfiguration struct {
	ID        int64
	CreatedAt time.Time
	Module    optional.Option[string]
	Name      string
	Value     []byte
}

type ModuleSecret added in v0.276.5

type ModuleSecret struct {
	ID        int64
	CreatedAt time.Time
	Module    optional.Option[string]
	Name      string
	Url       string
}

type NullAsyncCallState added in v0.191.0

type NullAsyncCallState struct {
	AsyncCallState AsyncCallState
	Valid          bool // Valid is true if AsyncCallState is not NULL
}

func (*NullAsyncCallState) Scan added in v0.191.0

func (ns *NullAsyncCallState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullAsyncCallState) Value added in v0.191.0

func (ns NullAsyncCallState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullControllerState

type NullControllerState struct {
	ControllerState ControllerState
	Valid           bool // Valid is true if ControllerState is not NULL
}

func (*NullControllerState) Scan

func (ns *NullControllerState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullControllerState) Value

func (ns NullControllerState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullCronJobState added in v0.167.0

type NullCronJobState struct {
	CronJobState CronJobState
	Valid        bool // Valid is true if CronJobState is not NULL
}

func (*NullCronJobState) Scan added in v0.167.0

func (ns *NullCronJobState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullCronJobState) Value added in v0.167.0

func (ns NullCronJobState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullEventType

type NullEventType struct {
	EventType EventType
	Valid     bool // Valid is true if EventType is not NULL
}

func (*NullEventType) Scan

func (ns *NullEventType) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullEventType) Value

func (ns NullEventType) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullFsmStatus added in v0.191.0

type NullFsmStatus struct {
	FsmStatus FsmStatus
	Valid     bool // Valid is true if FsmStatus is not NULL
}

func (*NullFsmStatus) Scan added in v0.191.0

func (ns *NullFsmStatus) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullFsmStatus) Value added in v0.191.0

func (ns NullFsmStatus) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullOrigin

type NullOrigin struct {
	Origin Origin
	Valid  bool // Valid is true if Origin is not NULL
}

func (*NullOrigin) Scan

func (ns *NullOrigin) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullOrigin) Value

func (ns NullOrigin) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullRunnerState

type NullRunnerState struct {
	RunnerState RunnerState
	Valid       bool // Valid is true if RunnerState is not NULL
}

func (*NullRunnerState) Scan

func (ns *NullRunnerState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullRunnerState) Value

func (ns NullRunnerState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullTopicSubscriptionState added in v0.241.0

type NullTopicSubscriptionState struct {
	TopicSubscriptionState TopicSubscriptionState
	Valid                  bool // Valid is true if TopicSubscriptionState is not NULL
}

func (*NullTopicSubscriptionState) Scan added in v0.241.0

func (ns *NullTopicSubscriptionState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullTopicSubscriptionState) Value added in v0.241.0

Value implements the driver Valuer interface.

type Origin

type Origin string
const (
	OriginIngress Origin = "ingress"
	OriginCron    Origin = "cron"
	OriginPubsub  Origin = "pubsub"
)

func (*Origin) Scan

func (e *Origin) Scan(src interface{}) error

type PublishEventForTopicParams added in v0.239.0

type PublishEventForTopicParams struct {
	Key     model.TopicEventKey
	Module  string
	Topic   string
	Payload []byte
}

type Querier

type Querier interface {
	// Reserve a pending async call for execution, returning the associated lease
	// reservation key.
	AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error)
	AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error
	BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error
	CompleteEventForSubscription(ctx context.Context, name string, module string) error
	// Create a new artefact and return the artefact ID.
	CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error)
	CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error)
	CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error
	CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error
	CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error
	DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) error
	DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) error
	DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error)
	ExpireLeases(ctx context.Context) (int64, error)
	ExpireRunnerReservations(ctx context.Context) (int64, error)
	FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error)
	FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error)
	FailFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error)
	// Mark an FSM transition as completed, updating the current state and clearing the async call ID.
	FinishFSMTransition(ctx context.Context, fsm schema.RefKey, key string) (bool, error)
	GetActiveControllers(ctx context.Context) ([]Controller, error)
	GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error)
	GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error)
	GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error)
	GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error)
	GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error)
	// Return the digests that exist in the database.
	GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error)
	GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error)
	// Get all artefacts matching the given digests.
	GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error)
	GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error)
	// Get deployments that have a mismatch between the number of assigned and required replicas.
	GetDeploymentsNeedingReconciliation(ctx context.Context) ([]GetDeploymentsNeedingReconciliationRow, error)
	// Get all deployments that have artefacts matching the given digests.
	GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error)
	GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error)
	GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error)
	GetFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (FsmInstance, error)
	GetIdleRunners(ctx context.Context, labels []byte, limit int64) ([]Runner, error)
	// Get the runner endpoints corresponding to the given ingress route.
	GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error)
	GetLeaseInfo(ctx context.Context, key leases.Key) (GetLeaseInfoRow, error)
	GetModulesByID(ctx context.Context, ids []int64) ([]Module, error)
	GetNextEventForSubscription(ctx context.Context, consumptionDelay time.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error)
	GetProcessList(ctx context.Context) ([]GetProcessListRow, error)
	GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error)
	// Retrieve routing information for a runner.
	GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error)
	GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error)
	GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error)
	GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error)
	GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error)
	GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error)
	// Results may not be ready to be scheduled yet due to event consumption delay
	// Sorting ensures that brand new events (that may not be ready for consumption)
	// don't prevent older events from being consumed
	GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error)
	InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error
	InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error
	InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error
	InsertEvent(ctx context.Context, arg InsertEventParams) error
	InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error
	InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error
	// Mark any controller entries that haven't been updated recently as dead.
	KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error)
	KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error)
	LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error)
	NewLease(ctx context.Context, key leases.Key, ttl time.Duration, metadata []byte) (uuid.UUID, error)
	PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error
	ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error)
	RenewLease(ctx context.Context, ttl time.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error)
	// Find an idle runner and reserve it for the given deployment.
	ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error)
	SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error
	// Start a new FSM transition, populating the destination state and async call ID.
	//
	// "key" is the unique identifier for the FSM execution.
	StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error)
	SucceedAsyncCall(ctx context.Context, response []byte, iD int64) (bool, error)
	SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error)
	UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error)
	UpsertModule(ctx context.Context, language string, name string) (int64, error)
	// Upsert a runner and return the deployment ID that it is assigned to, if any.
	// If the deployment key is null, then deployment_rel.id will be null,
	// otherwise we try to retrieve the deployments.id using the key. If
	// there is no corresponding deployment, then the deployment ID is -1
	// and the parent statement will fail due to a foreign key constraint.
	UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (optional.Option[int64], error)
	UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) error
	UpsertTopic(ctx context.Context, arg UpsertTopicParams) error
}

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) AcquireAsyncCall added in v0.191.0

func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error)

Reserve a pending async call for execution, returning the associated lease reservation key.

func (*Queries) AssociateArtefactWithDeployment

func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error

func (*Queries) BeginConsumingTopicEvent added in v0.241.0

func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error

func (*Queries) CompleteEventForSubscription added in v0.241.0

func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error

func (*Queries) CreateArtefact

func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error)

Create a new artefact and return the artefact ID.

func (*Queries) CreateAsyncCall added in v0.226.0

func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error)

func (*Queries) CreateDeployment

func (q *Queries) CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error

func (*Queries) CreateIngressRoute

func (q *Queries) CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error

func (*Queries) CreateRequest added in v0.167.0

func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error

func (*Queries) DeleteSubscribers added in v0.259.1

func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) error

func (*Queries) DeleteSubscriptions added in v0.259.1

func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) error

func (*Queries) DeregisterRunner

func (q *Queries) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error)

func (*Queries) ExpireLeases added in v0.186.0

func (q *Queries) ExpireLeases(ctx context.Context) (int64, error)

func (*Queries) ExpireRunnerReservations

func (q *Queries) ExpireRunnerReservations(ctx context.Context) (int64, error)

func (*Queries) FailAsyncCall added in v0.226.0

func (q *Queries) FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error)

func (*Queries) FailAsyncCallWithRetry added in v0.229.0

func (q *Queries) FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error)

func (*Queries) FailFSMInstance added in v0.226.0

func (q *Queries) FailFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error)

func (*Queries) FinishFSMTransition added in v0.226.0

func (q *Queries) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, key string) (bool, error)

Mark an FSM transition as completed, updating the current state and clearing the async call ID.

func (*Queries) GetActiveControllers added in v0.177.2

func (q *Queries) GetActiveControllers(ctx context.Context) ([]Controller, error)

func (*Queries) GetActiveDeploymentSchemas added in v0.79.3

func (q *Queries) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error)

func (*Queries) GetActiveDeployments added in v0.79.3

func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error)

func (*Queries) GetActiveIngressRoutes added in v0.177.2

func (q *Queries) GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error)

func (*Queries) GetActiveRunners

func (q *Queries) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error)

func (*Queries) GetArtefactContentRange

func (q *Queries) GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error)

func (*Queries) GetArtefactDigests

func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error)

Return the digests that exist in the database.

func (*Queries) GetDeployment

func (q *Queries) GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error)

func (*Queries) GetDeploymentArtefacts

func (q *Queries) GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error)

Get all artefacts matching the given digests.

func (*Queries) GetDeploymentsByID

func (q *Queries) GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error)

func (*Queries) GetDeploymentsNeedingReconciliation

func (q *Queries) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]GetDeploymentsNeedingReconciliationRow, error)

Get deployments that have a mismatch between the number of assigned and required replicas.

func (*Queries) GetDeploymentsWithArtefacts

func (q *Queries) GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error)

Get all deployments that have artefacts matching the given digests.

func (*Queries) GetDeploymentsWithMinReplicas added in v0.163.10

func (q *Queries) GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error)

func (*Queries) GetExistingDeploymentForModule

func (q *Queries) GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error)

func (*Queries) GetFSMInstance added in v0.226.0

func (q *Queries) GetFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (FsmInstance, error)

func (*Queries) GetIdleRunners

func (q *Queries) GetIdleRunners(ctx context.Context, labels []byte, limit int64) ([]Runner, error)

func (*Queries) GetIngressRoutes

func (q *Queries) GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error)

Get the runner endpoints corresponding to the given ingress route.

func (*Queries) GetLeaseInfo added in v0.263.0

func (q *Queries) GetLeaseInfo(ctx context.Context, key leases.Key) (GetLeaseInfoRow, error)

func (*Queries) GetModulesByID

func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error)

func (*Queries) GetNextEventForSubscription added in v0.241.0

func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay time.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error)

func (*Queries) GetProcessList

func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, error)

func (*Queries) GetRandomSubscriber added in v0.245.0

func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error)

func (*Queries) GetRouteForRunner

func (q *Queries) GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error)

Retrieve routing information for a runner.

func (*Queries) GetRoutingTable

func (q *Queries) GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error)

func (*Queries) GetRunner

func (q *Queries) GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error)

func (*Queries) GetRunnerState

func (q *Queries) GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error)

func (*Queries) GetRunnersForDeployment

func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error)

func (*Queries) GetSchemaForDeployment added in v0.259.1

func (q *Queries) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error)

func (*Queries) GetSubscriptionsNeedingUpdate added in v0.241.0

func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error)

Results may not be ready to be scheduled yet due to event consumption delay Sorting ensures that brand new events (that may not be ready for consumption) don't prevent older events from being consumed

func (*Queries) InsertCallEvent

func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error

func (*Queries) InsertDeploymentCreatedEvent

func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error

func (*Queries) InsertDeploymentUpdatedEvent

func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error

func (*Queries) InsertEvent

func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error

func (*Queries) InsertLogEvent

func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error

func (*Queries) InsertSubscriber added in v0.233.0

func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error

func (*Queries) KillStaleControllers

func (q *Queries) KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error)

Mark any controller entries that haven't been updated recently as dead.

func (*Queries) KillStaleRunners

func (q *Queries) KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error)

func (*Queries) LoadAsyncCall added in v0.194.0

func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error)

func (*Queries) NewLease added in v0.186.0

func (q *Queries) NewLease(ctx context.Context, key leases.Key, ttl time.Duration, metadata []byte) (uuid.UUID, error)

func (*Queries) PublishEventForTopic added in v0.239.0

func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error

func (*Queries) ReleaseLease added in v0.186.0

func (q *Queries) ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error)

func (*Queries) RenewLease added in v0.186.0

func (q *Queries) RenewLease(ctx context.Context, ttl time.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error)

func (*Queries) ReserveRunner

func (q *Queries) ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error)

Find an idle runner and reserve it for the given deployment.

func (*Queries) SetDeploymentDesiredReplicas

func (q *Queries) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error

func (*Queries) StartFSMTransition added in v0.226.0

func (q *Queries) StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error)

Start a new FSM transition, populating the destination state and async call ID.

"key" is the unique identifier for the FSM execution.

func (*Queries) SucceedAsyncCall added in v0.226.0

func (q *Queries) SucceedAsyncCall(ctx context.Context, response []byte, iD int64) (bool, error)

func (*Queries) SucceedFSMInstance added in v0.226.0

func (q *Queries) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error)

func (*Queries) UpsertController

func (q *Queries) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error)

func (*Queries) UpsertModule

func (q *Queries) UpsertModule(ctx context.Context, language string, name string) (int64, error)

func (*Queries) UpsertRunner

func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (optional.Option[int64], error)

Upsert a runner and return the deployment ID that it is assigned to, if any. If the deployment key is null, then deployment_rel.id will be null, otherwise we try to retrieve the deployments.id using the key. If there is no corresponding deployment, then the deployment ID is -1 and the parent statement will fail due to a foreign key constraint.

func (*Queries) UpsertSubscription added in v0.233.0

func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) error

func (*Queries) UpsertTopic added in v0.233.0

func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error

func (*Queries) WithTx

func (q *Queries) WithTx(tx pgx.Tx) *Queries

type Request

type Request struct {
	ID         int64
	Origin     Origin
	Key        model.RequestKey
	SourceAddr string
}

type Runner

type Runner struct {
	ID                 int64
	Key                model.RunnerKey
	Created            time.Time
	LastSeen           time.Time
	ReservationTimeout optional.Option[time.Time]
	State              RunnerState
	Endpoint           string
	ModuleName         optional.Option[string]
	DeploymentID       optional.Option[int64]
	Labels             []byte
}

type RunnerState

type RunnerState string
const (
	RunnerStateIdle     RunnerState = "idle"
	RunnerStateReserved RunnerState = "reserved"
	RunnerStateAssigned RunnerState = "assigned"
	RunnerStateDead     RunnerState = "dead"
)

func (*RunnerState) Scan

func (e *RunnerState) Scan(src interface{}) error

type StartFSMTransitionParams added in v0.226.0

type StartFSMTransitionParams struct {
	Fsm              schema.RefKey
	Key              string
	DestinationState schema.RefKey
	AsyncCallID      int64
}

type Topic added in v0.159.1

type Topic struct {
	ID        int64
	Key       model.TopicKey
	CreatedAt time.Time
	ModuleID  int64
	Name      string
	Type      string
	Head      optional.Option[int64]
}

type TopicEvent added in v0.159.1

type TopicEvent struct {
	ID        int64
	CreatedAt time.Time
	Key       model.TopicEventKey
	TopicID   int64
	Payload   []byte
}

type TopicSubscriber added in v0.159.1

type TopicSubscriber struct {
	ID                   int64
	Key                  model.SubscriberKey
	CreatedAt            time.Time
	TopicSubscriptionsID int64
	DeploymentID         int64
	Sink                 schema.RefKey
	RetryAttempts        int32
	Backoff              time.Duration
	MaxBackoff           time.Duration
}

type TopicSubscription added in v0.159.1

type TopicSubscription struct {
	ID           int64
	Key          model.SubscriptionKey
	CreatedAt    time.Time
	TopicID      int64
	ModuleID     int64
	DeploymentID int64
	Name         string
	Cursor       optional.Option[int64]
	State        TopicSubscriptionState
}

type TopicSubscriptionState added in v0.241.0

type TopicSubscriptionState string
const (
	TopicSubscriptionStateIdle      TopicSubscriptionState = "idle"
	TopicSubscriptionStateExecuting TopicSubscriptionState = "executing"
)

func (*TopicSubscriptionState) Scan added in v0.241.0

func (e *TopicSubscriptionState) Scan(src interface{}) error

type Tx

type Tx struct {
	*Queries
	// contains filtered or unexported fields
}

func (*Tx) Begin added in v0.217.9

func (t *Tx) Begin(ctx context.Context) (*Tx, error)

func (*Tx) Commit

func (t *Tx) Commit(ctx context.Context) error

func (*Tx) CommitOrRollback

func (t *Tx) CommitOrRollback(ctx context.Context, err *error)

CommitOrRollback can be used in a defer statement to commit or rollback a transaction depending on whether the enclosing function returned an error.

func myFunc() (err error) {
  tx, err := db.Begin(ctx)
  if err != nil { return err }
  defer tx.CommitOrRollback(ctx, &err)
  ...
}

func (*Tx) Conn added in v0.217.9

func (t *Tx) Conn() ConnI

func (*Tx) Rollback

func (t *Tx) Rollback(ctx context.Context) error

func (*Tx) Tx added in v0.281.3

func (t *Tx) Tx() pgx.Tx

type Type added in v0.226.0

type Type struct {
	schema.Type
}

Type is a database adapter type for schema.Type.

It encodes to/from the protobuf representation of a Type.

func (*Type) Scan added in v0.226.0

func (t *Type) Scan(src interface{}) error

func (*Type) Value added in v0.226.0

func (t *Type) Value() (driver.Value, error)

type UpsertRunnerParams

type UpsertRunnerParams struct {
	Key           model.RunnerKey
	Endpoint      string
	State         RunnerState
	Labels        []byte
	DeploymentKey optional.Option[model.DeploymentKey]
}

type UpsertSubscriptionParams added in v0.233.0

type UpsertSubscriptionParams struct {
	Key         model.SubscriptionKey
	TopicModule string
	TopicName   string
	Module      string
	Deployment  model.DeploymentKey
	Name        string
}

type UpsertTopicParams added in v0.233.0

type UpsertTopicParams struct {
	Topic     model.TopicKey
	Module    string
	Name      string
	EventType string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL