sql

package
v0.409.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CreateAsyncCallParams

type CreateAsyncCallParams struct {
	ScheduledAt       time.Time
	Verb              schema.RefKey
	Origin            string
	Request           json.RawMessage
	RemainingAttempts int32
	Backoff           sqltypes.Duration
	MaxBackoff        sqltypes.Duration
	CatchVerb         optional.Option[schema.RefKey]
	ParentRequestKey  optional.Option[string]
	TraceContext      json.RawMessage
}

type DBTX

type DBTX interface {
	ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
	PrepareContext(context.Context, string) (*sql.Stmt, error)
	QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
	QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}

type GetNextEventForSubscriptionRow

type GetNextEventForSubscriptionRow struct {
	Event        optional.Option[model.TopicEventKey]
	Payload      pqtype.NullRawMessage
	CreatedAt    sqltypes.OptionalTime
	Caller       optional.Option[string]
	RequestKey   optional.Option[string]
	TraceContext pqtype.NullRawMessage
	Ready        bool
}

type GetRandomSubscriberRow

type GetRandomSubscriberRow struct {
	Sink          schema.RefKey
	RetryAttempts int32
	Backoff       sqltypes.Duration
	MaxBackoff    sqltypes.Duration
	CatchVerb     optional.Option[schema.RefKey]
	DeploymentKey model.DeploymentKey
}

type GetSubscriptionsNeedingUpdateRow

type GetSubscriptionsNeedingUpdateRow struct {
	Key           model.SubscriptionKey
	Cursor        optional.Option[model.TopicEventKey]
	Topic         model.TopicKey
	Name          string
	DeploymentKey model.DeploymentKey
	RequestKey    optional.Option[string]
}

type InsertSubscriberParams

type InsertSubscriberParams struct {
	Key              model.SubscriberKey
	Module           string
	SubscriptionName string
	Deployment       model.DeploymentKey
	Sink             schema.RefKey
	RetryAttempts    int32
	Backoff          sqltypes.Duration
	MaxBackoff       sqltypes.Duration
	CatchVerb        optional.Option[schema.RefKey]
}

type NullTopicSubscriptionState

type NullTopicSubscriptionState struct {
	TopicSubscriptionState TopicSubscriptionState
	Valid                  bool // Valid is true if TopicSubscriptionState is not NULL
}

func (*NullTopicSubscriptionState) Scan

func (ns *NullTopicSubscriptionState) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullTopicSubscriptionState) Value

Value implements the driver Valuer interface.

type PublishEventForTopicParams

type PublishEventForTopicParams struct {
	Key          model.TopicEventKey
	Module       string
	Topic        string
	Caller       string
	Payload      json.RawMessage
	RequestKey   string
	TraceContext json.RawMessage
}

type Querier

type Querier interface {
	AsyncCallQueueDepth(ctx context.Context) (int64, error)
	BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error
	CompleteEventForSubscription(ctx context.Context, name string, module string) error
	CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error)
	DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error)
	DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error)
	GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error)
	GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error)
	GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error)
	// Results may not be ready to be scheduled yet due to event consumption delay
	// Sorting ensures that brand new events (that may not be ready for consumption)
	// don't prevent older events from being consumed
	// We also make sure that the subscription belongs to a deployment that has at least one runner
	GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error)
	GetTopic(ctx context.Context, dollar_1 int64) (Topic, error)
	GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error)
	InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error
	PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error
	SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error
	UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error)
	UpsertTopic(ctx context.Context, arg UpsertTopicParams) error
}

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) AsyncCallQueueDepth

func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error)

func (*Queries) BeginConsumingTopicEvent

func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error

func (*Queries) CompleteEventForSubscription

func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error

func (*Queries) CreateAsyncCall

func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error)

func (*Queries) DeleteSubscribers

func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error)

func (*Queries) DeleteSubscriptions

func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error)

func (*Queries) GetNextEventForSubscription

func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error)

func (*Queries) GetRandomSubscriber

func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error)

func (*Queries) GetSubscription

func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error)

func (*Queries) GetSubscriptionsNeedingUpdate

func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error)

Results may not be ready to be scheduled yet due to event consumption delay Sorting ensures that brand new events (that may not be ready for consumption) don't prevent older events from being consumed We also make sure that the subscription belongs to a deployment that has at least one runner

func (*Queries) GetTopic

func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error)

func (*Queries) GetTopicEvent

func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error)

func (*Queries) InsertSubscriber

func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error

func (*Queries) PublishEventForTopic

func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error

func (*Queries) SetSubscriptionCursor

func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error

func (*Queries) UpsertSubscription

func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error)

func (*Queries) UpsertTopic

func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error

func (*Queries) WithTx

func (q *Queries) WithTx(tx *sql.Tx) *Queries

type Topic

type Topic struct {
	ID        int64
	Key       model.TopicKey
	CreatedAt time.Time
	ModuleID  int64
	Name      string
	Type      string
	Head      optional.Option[int64]
}

type TopicEvent

type TopicEvent struct {
	ID           int64
	CreatedAt    time.Time
	Key          model.TopicEventKey
	TopicID      int64
	Caller       optional.Option[string]
	RequestKey   optional.Option[string]
	TraceContext pqtype.NullRawMessage
	Payload      json.RawMessage
}

type TopicSubscription

type TopicSubscription struct {
	ID           int64
	Key          model.SubscriptionKey
	CreatedAt    time.Time
	TopicID      int64
	ModuleID     int64
	DeploymentID int64
	Name         string
	Cursor       optional.Option[int64]
	State        TopicSubscriptionState
}

type TopicSubscriptionState

type TopicSubscriptionState string
const (
	TopicSubscriptionStateIdle      TopicSubscriptionState = "idle"
	TopicSubscriptionStateExecuting TopicSubscriptionState = "executing"
)

func (*TopicSubscriptionState) Scan

func (e *TopicSubscriptionState) Scan(src interface{}) error

type UpsertSubscriptionParams

type UpsertSubscriptionParams struct {
	Key         model.SubscriptionKey
	TopicModule string
	TopicName   string
	Module      string
	Deployment  model.DeploymentKey
	Name        string
}

type UpsertSubscriptionRow

type UpsertSubscriptionRow struct {
	ID       int64
	Inserted bool
}

type UpsertTopicParams

type UpsertTopicParams struct {
	Topic     model.TopicKey
	Module    string
	Name      string
	EventType string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL