sql

package
v0.387.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AcquireAsyncCallRow

type AcquireAsyncCallRow struct {
	AsyncCallID         int64
	LeaseIdempotencyKey uuid.UUID
	LeaseKey            leases.Key
	QueueDepth          int64
	Origin              string
	Verb                schema.RefKey
	CatchVerb           optional.Option[schema.RefKey]
	Request             api.EncryptedAsyncColumn
	ScheduledAt         time.Time
	RemainingAttempts   int32
	Error               optional.Option[string]
	Backoff             sqltypes.Duration
	MaxBackoff          sqltypes.Duration
	ParentRequestKey    optional.Option[string]
	TraceContext        pqtype.NullRawMessage
	Catching            bool
}

type AsyncCall

type AsyncCall struct {
	ID                int64
	CreatedAt         time.Time
	LeaseID           optional.Option[int64]
	Verb              schema.RefKey
	State             AsyncCallState
	Origin            string
	ScheduledAt       time.Time
	Request           api.EncryptedAsyncColumn
	Response          api.OptionalEncryptedAsyncColumn
	Error             optional.Option[string]
	RemainingAttempts int32
	Backoff           sqltypes.Duration
	MaxBackoff        sqltypes.Duration
	CatchVerb         optional.Option[schema.RefKey]
	Catching          bool
	ParentRequestKey  optional.Option[string]
	TraceContext      pqtype.NullRawMessage
}

type AsyncCallState

type AsyncCallState string
const (
	AsyncCallStatePending   AsyncCallState = "pending"
	AsyncCallStateExecuting AsyncCallState = "executing"
	AsyncCallStateSuccess   AsyncCallState = "success"
	AsyncCallStateError     AsyncCallState = "error"
)

func (*AsyncCallState) Scan

func (e *AsyncCallState) Scan(src interface{}) error

type Controller

type Controller struct {
	ID       int64
	Key      model.ControllerKey
	Created  time.Time
	LastSeen time.Time
	State    ControllerState
	Endpoint string
}

type ControllerState

type ControllerState string
const (
	ControllerStateLive ControllerState = "live"
	ControllerStateDead ControllerState = "dead"
)

func (*ControllerState) Scan

func (e *ControllerState) Scan(src interface{}) error

type CreateAsyncCallParams

type CreateAsyncCallParams struct {
	ScheduledAt       time.Time
	Verb              schema.RefKey
	Origin            string
	Request           api.EncryptedAsyncColumn
	RemainingAttempts int32
	Backoff           sqltypes.Duration
	MaxBackoff        sqltypes.Duration
	CatchVerb         optional.Option[schema.RefKey]
	ParentRequestKey  optional.Option[string]
	TraceContext      json.RawMessage
}

type CreateCronJobParams

type CreateCronJobParams struct {
	Key           model.CronJobKey
	DeploymentKey model.DeploymentKey
	ModuleName    string
	Verb          string
	Schedule      string
	StartTime     time.Time
	NextExecution time.Time
}

type CreateIngressRouteParams

type CreateIngressRouteParams struct {
	Key    model.DeploymentKey
	Module string
	Verb   string
	Method string
	Path   string
}

type CronJob

type CronJob struct {
	ID              int64
	Key             model.CronJobKey
	DeploymentID    int64
	Verb            string
	Schedule        string
	StartTime       time.Time
	NextExecution   time.Time
	ModuleName      string
	LastExecution   sqltypes.OptionalTime
	LastAsyncCallID optional.Option[int64]
}

type DBTX

type DBTX interface {
	ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
	PrepareContext(context.Context, string) (*sql.Stmt, error)
	QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
	QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}

type Deployment

type Deployment struct {
	ID              int64
	CreatedAt       time.Time
	ModuleID        int64
	Key             model.DeploymentKey
	Schema          *schema.Module
	Labels          json.RawMessage
	MinReplicas     int32
	LastActivatedAt time.Time
}

type FailAsyncCallWithRetryParams

type FailAsyncCallWithRetryParams struct {
	RemainingAttempts int32
	Backoff           sqltypes.Duration
	MaxBackoff        sqltypes.Duration
	ScheduledAt       time.Time
	Catching          bool
	OriginalError     optional.Option[string]
	Error             string
	ID                int64
}

type GetActiveDeploymentSchemasRow

type GetActiveDeploymentSchemasRow struct {
	Key    model.DeploymentKey
	Schema *schema.Module
}

type GetActiveDeploymentsRow

type GetActiveDeploymentsRow struct {
	Deployment Deployment
	ModuleName string
	Language   string
	Replicas   int64
}

type GetActiveIngressRoutesRow

type GetActiveIngressRoutesRow struct {
	DeploymentKey model.DeploymentKey
	Module        string
	Verb          string
	Method        string
	Path          string
}

type GetActiveRunnersRow

type GetActiveRunnersRow struct {
	RunnerKey     model.RunnerKey
	Endpoint      string
	Labels        json.RawMessage
	LastSeen      time.Time
	ModuleName    optional.Option[string]
	DeploymentKey model.DeploymentKey
}

type GetCronJobByKeyRow

type GetCronJobByKeyRow struct {
	CronJob    CronJob
	Deployment Deployment
}

type GetDeploymentRow

type GetDeploymentRow struct {
	Deployment  Deployment
	Language    string
	ModuleName  string
	MinReplicas int32
}

type GetDeploymentsWithArtefactsRow

type GetDeploymentsWithArtefactsRow struct {
	ID            int64
	CreatedAt     time.Time
	DeploymentKey model.DeploymentKey
	Schema        *schema.Module
	ModuleName    string
}

type GetDeploymentsWithMinReplicasRow

type GetDeploymentsWithMinReplicasRow struct {
	Deployment Deployment
	ModuleName string
	Language   string
}

type GetExistingDeploymentForModuleRow

type GetExistingDeploymentForModuleRow struct {
	ID              int64
	CreatedAt       time.Time
	ModuleID        int64
	Key             model.DeploymentKey
	Schema          *schema.Module
	Labels          json.RawMessage
	MinReplicas     int32
	LastActivatedAt time.Time
	ID_2            int64
	Language        string
	Name            string
}

type GetIngressRoutesRow

type GetIngressRoutesRow struct {
	RunnerKey     model.RunnerKey
	DeploymentKey model.DeploymentKey
	Endpoint      string
	Path          string
	Module        string
	Verb          string
	Method        string
}

type GetNextEventForSubscriptionRow

type GetNextEventForSubscriptionRow struct {
	Event        optional.Option[model.TopicEventKey]
	Payload      api.OptionalEncryptedAsyncColumn
	CreatedAt    sqltypes.OptionalTime
	Caller       optional.Option[string]
	RequestKey   optional.Option[string]
	TraceContext pqtype.NullRawMessage
	Ready        bool
}

type GetProcessListRow

type GetProcessListRow struct {
	MinReplicas      int32
	DeploymentKey    model.DeploymentKey
	DeploymentLabels json.RawMessage
	RunnerKey        optional.Option[model.RunnerKey]
	Endpoint         optional.Option[string]
	RunnerLabels     pqtype.NullRawMessage
}

type GetRandomSubscriberRow

type GetRandomSubscriberRow struct {
	Sink          schema.RefKey
	RetryAttempts int32
	Backoff       sqltypes.Duration
	MaxBackoff    sqltypes.Duration
	CatchVerb     optional.Option[schema.RefKey]
}

type GetRunnerRow

type GetRunnerRow struct {
	RunnerKey     model.RunnerKey
	Endpoint      string
	Labels        json.RawMessage
	LastSeen      time.Time
	ModuleName    optional.Option[string]
	DeploymentKey model.DeploymentKey
}

type GetRunnersForDeploymentRow

type GetRunnersForDeploymentRow struct {
	ID              int64
	Key             model.RunnerKey
	Created         time.Time
	LastSeen        time.Time
	Endpoint        string
	ModuleName      optional.Option[string]
	DeploymentID    int64
	Labels          json.RawMessage
	ID_2            int64
	CreatedAt       time.Time
	ModuleID        int64
	Key_2           model.DeploymentKey
	Schema          *schema.Module
	Labels_2        json.RawMessage
	MinReplicas     int32
	LastActivatedAt time.Time
}

type GetSubscriptionsNeedingUpdateRow

type GetSubscriptionsNeedingUpdateRow struct {
	Key    model.SubscriptionKey
	Cursor optional.Option[model.TopicEventKey]
	Topic  model.TopicKey
	Name   string
}

type GetUnscheduledCronJobsRow

type GetUnscheduledCronJobsRow struct {
	CronJob    CronJob
	Deployment Deployment
}

type InsertSubscriberParams

type InsertSubscriberParams struct {
	Key              model.SubscriberKey
	Module           string
	SubscriptionName string
	Deployment       model.DeploymentKey
	Sink             schema.RefKey
	RetryAttempts    int32
	Backoff          sqltypes.Duration
	MaxBackoff       sqltypes.Duration
	CatchVerb        optional.Option[schema.RefKey]
}

type InsertTimelineDeploymentCreatedEventParams

type InsertTimelineDeploymentCreatedEventParams struct {
	DeploymentKey model.DeploymentKey
	Language      string
	ModuleName    string
	Payload       api.EncryptedTimelineColumn
}

type InsertTimelineDeploymentUpdatedEventParams

type InsertTimelineDeploymentUpdatedEventParams struct {
	DeploymentKey model.DeploymentKey
	Language      string
	ModuleName    string
	Payload       api.EncryptedTimelineColumn
}

type Module

type Module struct {
	ID       int64
	Language string
	Name     string
}

type NullAsyncCallState

type NullAsyncCallState struct {
	AsyncCallState AsyncCallState
	Valid          bool // Valid is true if AsyncCallState is not NULL
}

func (*NullAsyncCallState) Scan

func (ns *NullAsyncCallState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullAsyncCallState) Value

func (ns NullAsyncCallState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullControllerState

type NullControllerState struct {
	ControllerState ControllerState
	Valid           bool // Valid is true if ControllerState is not NULL
}

func (*NullControllerState) Scan

func (ns *NullControllerState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullControllerState) Value

func (ns NullControllerState) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullOrigin

type NullOrigin struct {
	Origin Origin
	Valid  bool // Valid is true if Origin is not NULL
}

func (*NullOrigin) Scan

func (ns *NullOrigin) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullOrigin) Value

func (ns NullOrigin) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullTopicSubscriptionState

type NullTopicSubscriptionState struct {
	TopicSubscriptionState TopicSubscriptionState
	Valid                  bool // Valid is true if TopicSubscriptionState is not NULL
}

func (*NullTopicSubscriptionState) Scan

func (ns *NullTopicSubscriptionState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullTopicSubscriptionState) Value

Value implements the driver Valuer interface.

type Origin

type Origin string
const (
	OriginIngress Origin = "ingress"
	OriginCron    Origin = "cron"
	OriginPubsub  Origin = "pubsub"
)

func (*Origin) Scan

func (e *Origin) Scan(src interface{}) error

type PublishEventForTopicParams

type PublishEventForTopicParams struct {
	Key          model.TopicEventKey
	Module       string
	Topic        string
	Caller       string
	Payload      api.EncryptedAsyncColumn
	RequestKey   string
	TraceContext json.RawMessage
}

type Querier

type Querier interface {
	// Reserve a pending async call for execution, returning the associated lease
	// reservation key and accompanying metadata.
	AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) (AcquireAsyncCallRow, error)
	AsyncCallQueueDepth(ctx context.Context) (int64, error)
	BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error
	CompleteEventForSubscription(ctx context.Context, name string, module string) error
	CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error)
	CreateCronJob(ctx context.Context, arg CreateCronJobParams) error
	CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error
	CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error
	CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error
	DeleteCronAsyncCallsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error
	DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error
	DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error)
	DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error)
	DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error)
	FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error)
	FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error)
	GetActiveControllers(ctx context.Context) ([]Controller, error)
	GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error)
	GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error)
	GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error)
	GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error)
	GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error)
	GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error)
	GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error)
	// Get all deployments that have artefacts matching the given digests.
	GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error)
	GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error)
	GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error)
	// Get the runner endpoints corresponding to the given ingress route.
	GetIngressRoutes(ctx context.Context) ([]GetIngressRoutesRow, error)
	GetModulesByID(ctx context.Context, ids []int64) ([]Module, error)
	GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error)
	GetProcessList(ctx context.Context) ([]GetProcessListRow, error)
	GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error)
	GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error)
	GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error)
	GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error)
	GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error)
	// Results may not be ready to be scheduled yet due to event consumption delay
	// Sorting ensures that brand new events (that may not be ready for consumption)
	// don't prevent older events from being consumed
	// We also make sure that the subscription belongs to a deployment that has at least one runner
	GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error)
	GetTopic(ctx context.Context, dollar_1 int64) (Topic, error)
	GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error)
	GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error)
	GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error)
	InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error
	InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error
	InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error
	IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error)
	// Mark any controller entries that haven't been updated recently as dead.
	KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error)
	KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error)
	LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error)
	PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error
	SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error
	SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error
	SucceedAsyncCall(ctx context.Context, response api.OptionalEncryptedAsyncColumn, iD int64) (bool, error)
	UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error
	UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error)
	UpsertModule(ctx context.Context, language string, name string) (int64, error)
	// Upsert a runner and return the deployment ID that it is assigned to, if any.
	UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (int64, error)
	UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error)
	UpsertTopic(ctx context.Context, arg UpsertTopicParams) error
}

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) AcquireAsyncCall

func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) (AcquireAsyncCallRow, error)

Reserve a pending async call for execution, returning the associated lease reservation key and accompanying metadata.

func (*Queries) AsyncCallQueueDepth

func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error)

func (*Queries) BeginConsumingTopicEvent

func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error

func (*Queries) CompleteEventForSubscription

func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error

func (*Queries) CreateAsyncCall

func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error)

func (*Queries) CreateCronJob

func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error

func (*Queries) CreateDeployment

func (q *Queries) CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error

func (*Queries) CreateIngressRoute

func (q *Queries) CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error

func (*Queries) CreateRequest

func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error

func (*Queries) DeleteCronAsyncCallsForDeployment added in v0.384.0

func (q *Queries) DeleteCronAsyncCallsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error

func (*Queries) DeleteCronJobsForDeployment added in v0.383.1

func (q *Queries) DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error

func (*Queries) DeleteSubscribers

func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error)

func (*Queries) DeleteSubscriptions

func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error)

func (*Queries) DeregisterRunner

func (q *Queries) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error)

func (*Queries) FailAsyncCall

func (q *Queries) FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error)

func (*Queries) FailAsyncCallWithRetry

func (q *Queries) FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error)

func (*Queries) GetActiveControllers

func (q *Queries) GetActiveControllers(ctx context.Context) ([]Controller, error)

func (*Queries) GetActiveDeploymentSchemas

func (q *Queries) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error)

func (*Queries) GetActiveDeployments

func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error)

func (*Queries) GetActiveIngressRoutes

func (q *Queries) GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error)

func (*Queries) GetActiveRunners

func (q *Queries) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error)

func (*Queries) GetCronJobByKey

func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error)

func (*Queries) GetDeployment

func (q *Queries) GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error)

func (*Queries) GetDeploymentsByID

func (q *Queries) GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error)

func (*Queries) GetDeploymentsWithArtefacts

func (q *Queries) GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error)

Get all deployments that have artefacts matching the given digests.

func (*Queries) GetDeploymentsWithMinReplicas

func (q *Queries) GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error)

func (*Queries) GetExistingDeploymentForModule

func (q *Queries) GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error)

func (*Queries) GetIngressRoutes

func (q *Queries) GetIngressRoutes(ctx context.Context) ([]GetIngressRoutesRow, error)

Get the runner endpoints corresponding to the given ingress route.

func (*Queries) GetModulesByID

func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error)

func (*Queries) GetNextEventForSubscription

func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error)

func (*Queries) GetProcessList

func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, error)

func (*Queries) GetRandomSubscriber

func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error)

func (*Queries) GetRunner

func (q *Queries) GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error)

func (*Queries) GetRunnersForDeployment

func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error)

func (*Queries) GetSchemaForDeployment

func (q *Queries) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error)

func (*Queries) GetSubscription

func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error)

func (*Queries) GetSubscriptionsNeedingUpdate

func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error)

Results may not be ready to be scheduled yet due to event consumption delay Sorting ensures that brand new events (that may not be ready for consumption) don't prevent older events from being consumed We also make sure that the subscription belongs to a deployment that has at least one runner

func (*Queries) GetTopic

func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error)

func (*Queries) GetTopicEvent

func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error)

func (*Queries) GetUnscheduledCronJobs

func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error)

func (*Queries) GetZombieAsyncCalls

func (q *Queries) GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error)

func (*Queries) InsertSubscriber

func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error

func (*Queries) InsertTimelineDeploymentCreatedEvent

func (q *Queries) InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error

func (*Queries) InsertTimelineDeploymentUpdatedEvent

func (q *Queries) InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error

func (*Queries) IsCronJobPending

func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error)

func (*Queries) KillStaleControllers

func (q *Queries) KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error)

Mark any controller entries that haven't been updated recently as dead.

func (*Queries) KillStaleRunners

func (q *Queries) KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error)

func (*Queries) LoadAsyncCall

func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error)

func (*Queries) PublishEventForTopic

func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error

func (*Queries) SetDeploymentDesiredReplicas

func (q *Queries) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error

func (*Queries) SetSubscriptionCursor

func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error

func (*Queries) SucceedAsyncCall

func (q *Queries) SucceedAsyncCall(ctx context.Context, response api.OptionalEncryptedAsyncColumn, iD int64) (bool, error)

func (*Queries) UpdateCronJobExecution

func (q *Queries) UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error

func (*Queries) UpsertController

func (q *Queries) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error)

func (*Queries) UpsertModule

func (q *Queries) UpsertModule(ctx context.Context, language string, name string) (int64, error)

func (*Queries) UpsertRunner

func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (int64, error)

Upsert a runner and return the deployment ID that it is assigned to, if any.

func (*Queries) UpsertSubscription

func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error)

func (*Queries) UpsertTopic

func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error

func (*Queries) WithTx

func (q *Queries) WithTx(tx *sql.Tx) *Queries

type Topic

type Topic struct {
	ID        int64
	Key       model.TopicKey
	CreatedAt time.Time
	ModuleID  int64
	Name      string
	Type      string
	Head      optional.Option[int64]
}

type TopicEvent

type TopicEvent struct {
	ID           int64
	CreatedAt    time.Time
	Key          model.TopicEventKey
	TopicID      int64
	Payload      api.EncryptedAsyncColumn
	Caller       optional.Option[string]
	RequestKey   optional.Option[string]
	TraceContext pqtype.NullRawMessage
}

type TopicSubscription

type TopicSubscription struct {
	ID           int64
	Key          model.SubscriptionKey
	CreatedAt    time.Time
	TopicID      int64
	ModuleID     int64
	DeploymentID int64
	Name         string
	Cursor       optional.Option[int64]
	State        TopicSubscriptionState
}

type TopicSubscriptionState

type TopicSubscriptionState string
const (
	TopicSubscriptionStateIdle      TopicSubscriptionState = "idle"
	TopicSubscriptionStateExecuting TopicSubscriptionState = "executing"
)

func (*TopicSubscriptionState) Scan

func (e *TopicSubscriptionState) Scan(src interface{}) error

type UpdateCronJobExecutionParams

type UpdateCronJobExecutionParams struct {
	LastAsyncCallID int64
	LastExecution   time.Time
	NextExecution   time.Time
	Key             model.CronJobKey
}

type UpsertRunnerParams

type UpsertRunnerParams struct {
	Key           model.RunnerKey
	Endpoint      string
	Labels        json.RawMessage
	DeploymentKey model.DeploymentKey
}

type UpsertSubscriptionParams

type UpsertSubscriptionParams struct {
	Key         model.SubscriptionKey
	TopicModule string
	TopicName   string
	Module      string
	Deployment  model.DeploymentKey
	Name        string
}

type UpsertSubscriptionRow

type UpsertSubscriptionRow struct {
	ID       int64
	Inserted bool
}

type UpsertTopicParams

type UpsertTopicParams struct {
	Topic     model.TopicKey
	Module    string
	Name      string
	EventType string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL