Documentation ¶
Index ¶
- Variables
- func IsIgnored(path string, ignorePathsConfig []string) bool
- func NewPermissionDeniedError(msg string) error
- func RecordDownloadMetrics(ctx context.Context, m *DownloadMetrics)
- func Register(name string, driver Driver)
- func RegisterAsConnector(name string, driver Driver)
- type AIService
- type AdminService
- type AlertMetadata
- type AlertStatus
- type CatalogStore
- type CompletionMessage
- type Dialect
- func (d Dialect) ConvertToDateTruncSpecifier(specifier runtimev1.TimeGrain) string
- func (d Dialect) DimensionSelect(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_DimensionV2) (dimSelect, unnestClause string)
- func (d Dialect) EscapeIdentifier(ident string) string
- func (d Dialect) EscapeTable(db, schema, table string) string
- func (d Dialect) MetricsViewDimensionExpression(dimension *runtimev1.MetricsViewSpec_DimensionV2) string
- func (d Dialect) String() string
- type DirEntry
- type DownloadMetrics
- type Driver
- type FileIterator
- type FileStore
- type Handle
- type IncrementalStrategy
- type InformationSchema
- type IngestionSummary
- type Instance
- type InstanceConfig
- type ModelEnv
- type ModelExecutor
- type ModelExecutorOptions
- type ModelManager
- type ModelResult
- type NoOpProgress
- type Notifier
- type OLAPStore
- type ObjectStore
- type ObjectType
- type PermissionDeniedError
- type Progress
- type ProgressUnit
- type PropertySpec
- type PropertyType
- type QueryOption
- type RegistryStore
- type RepoObjectStat
- type RepoStore
- type ReportMetadata
- type Resource
- type Result
- type RowIterator
- type SQLStore
- type ScheduledReport
- type Spec
- type Statement
- type Table
- type TransferOptions
- type Transporter
- type WatchCallback
- type WatchEvent
- type WithConnectionFunc
Constants ¶
This section is empty.
Variables ¶
var Connectors = make(map[string]Driver)
Connectors tracks all registered connector drivers.
var Drivers = make(map[string]Driver)
Drivers is a registry of drivers.
var ErrFileAlreadyExists = errors.New("file already exists")
var ErrInconsistentControllerVersion = errors.New("controller: inconsistent version")
ErrInconsistentControllerVersion is returned from Controller when an unexpected controller version is observed in the DB. An unexpected controller version will only be observed if multiple controllers are running simultanesouly (split brain).
var ErrIteratorDone = errors.New("empty iterator")
var ErrNoRows = errors.New("no rows found for the query")
var ErrNotFound = errors.New("driver: not found")
ErrNotFound indicates the resource wasn't found.
var ErrNotImplemented = errors.New("driver: not implemented")
ErrNotImplemented indicates the driver doesn't support the requested operation.
var ErrNotNotifier = errors.New("driver: not a notifier")
ErrNotNotifier indicates the driver cannot be used as a Notifier.
var ErrResourceAlreadyExists = errors.New("controller: resource already exists")
ErrResourceAlreadyExists is returned from catalog functions when attempting to create a resource that already exists.
var ErrResourceNotFound = errors.New("controller: resource not found")
ErrResourceNotFound is returned from catalog functions when a referenced resource does not exist.
var ErrStorageLimitExceeded = fmt.Errorf("connectors: exceeds storage limit")
ErrStorageLimitExceeded indicates the driver's storage limit was exceeded.
var ErrUnsupportedConnector = errors.New("drivers: connector not supported")
ErrUnsupportedConnector is returned from Ingest for unsupported connectors.
Functions ¶
func IsIgnored ¶ added in v0.44.0
IsIgnored returns true if the path (and any files in nested directories) should be ignored.
func NewPermissionDeniedError ¶ added in v0.30.0
func RecordDownloadMetrics ¶ added in v0.30.0
func RecordDownloadMetrics(ctx context.Context, m *DownloadMetrics)
func RegisterAsConnector ¶ added in v0.30.0
RegisterAsConnector tracks a connector driver.
Types ¶
type AIService ¶ added in v0.41.0
type AIService interface {
Complete(ctx context.Context, msgs []*CompletionMessage) (*CompletionMessage, error)
}
type AdminService ¶ added in v0.37.0
type AdminService interface { GetReportMetadata(ctx context.Context, reportName string, annotations map[string]string, executionTime time.Time) (*ReportMetadata, error) GetAlertMetadata(ctx context.Context, alertName string, annotations map[string]string, queryForUserID, queryForUserEmail string) (*AlertMetadata, error) }
type AlertMetadata ¶ added in v0.41.0
type AlertStatus ¶ added in v0.43.0
type CatalogStore ¶
type CatalogStore interface { NextControllerVersion(ctx context.Context) (int64, error) CheckControllerVersion(ctx context.Context, v int64) error FindResources(ctx context.Context) ([]Resource, error) CreateResource(ctx context.Context, v int64, r Resource) error UpdateResource(ctx context.Context, v int64, r Resource) error DeleteResource(ctx context.Context, v int64, k, n string) error DeleteResources(ctx context.Context) error }
CatalogStore is implemented by drivers capable of storing catalog info for a specific instance. Implementations should treat resource kinds as case sensitive, but resource names as case insensitive.
type CompletionMessage ¶ added in v0.41.0
type Dialect ¶ added in v0.16.0
type Dialect int
Dialect enumerates OLAP query languages.
func (Dialect) ConvertToDateTruncSpecifier ¶ added in v0.43.0
func (Dialect) DimensionSelect ¶ added in v0.44.0
func (d Dialect) DimensionSelect(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_DimensionV2) (dimSelect, unnestClause string)
func (Dialect) EscapeIdentifier ¶ added in v0.42.0
EscapeIdentifier returns an escaped SQL identifier in the dialect.
func (Dialect) EscapeTable ¶ added in v0.43.0
EscapeTable returns an esacped fully qualified table name
func (Dialect) MetricsViewDimensionExpression ¶ added in v0.44.0
func (d Dialect) MetricsViewDimensionExpression(dimension *runtimev1.MetricsViewSpec_DimensionV2) string
type DownloadMetrics ¶ added in v0.30.0
type Driver ¶
type Driver interface { // Spec returns metadata about the driver, such as which configuration properties it supports. Spec() Spec // Open opens a new handle. // If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs. Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (Handle, error) // HasAnonymousSourceAccess returns true if the driver can access the data identified by srcProps without any additional configuration. HasAnonymousSourceAccess(ctx context.Context, srcProps map[string]any, logger *zap.Logger) (bool, error) // TertiarySourceConnectors returns a list of drivers required to access the data identified by srcProps, excluding the driver itself. TertiarySourceConnectors(ctx context.Context, srcProps map[string]any, logger *zap.Logger) ([]string, error) }
Driver represents an external service that Rill can connect to.
type FileIterator ¶ added in v0.30.0
type FileIterator interface { // Close do cleanup and release resources Close() error // Next returns a list of file downloaded from external sources // and cleanups file created in previous batch Next() ([]string, error) // Size returns size of data downloaded in unit. // Returns 0,false if not able to compute size in given unit Size(unit ProgressUnit) (int64, bool) // Format returns general file format (json, csv, parquet, etc) // Returns an empty string if there is no general format Format() string }
FileIterator provides ways to iteratively download files from external sources Clients should call close once they are done with iterator to release any resources
type Handle ¶ added in v0.32.0
type Handle interface { // Driver name used to open the handle. Driver() string // Config used to open the handle. Config() map[string]any // Migrate prepares the handle for use. It will always be called before any of the As...() functions. Migrate(ctx context.Context) error // MigrationStatus returns the handle's current and desired migration version (if applicable). MigrationStatus(ctx context.Context) (current int, desired int, err error) // Close closes the handle. Close() error // AsRegistry returns a RegistryStore if the handle can serve as such, otherwise returns false. // A registry is responsible for tracking runtime metadata, namely instances and their configuration. AsRegistry() (RegistryStore, bool) // AsCatalogStore returns a CatalogStore if the handle can serve as such, otherwise returns false. // A catalog stores the state of an instance's resources (such as sources, models, metrics views, alerts, etc.) AsCatalogStore(instanceID string) (CatalogStore, bool) // AsRepoStore returns a RepoStore if the handle can serve as such, otherwise returns false. // A repo stores an instance's file artifacts (mostly YAML and SQL files). AsRepoStore(instanceID string) (RepoStore, bool) // AsAdmin returns an AdminService if the handle can serve as such, otherwise returns false. // An admin service enables the runtime to interact with the control plane that deployed it. AsAdmin(instanceID string) (AdminService, bool) // AsAI returns an AIService if the driver can serve as such, otherwise returns false. // An AI service enables an instance to request prompt-based text inference. AsAI(instanceID string) (AIService, bool) // AsSQLStore returns a SQLStore if the driver can serve as such, otherwise returns false. // A SQL store represents a service that can execute SQL statements and return the resulting rows. AsSQLStore() (SQLStore, bool) // AsOLAP returns an OLAPStore if the driver can serve as such, otherwise returns false. // An OLAP store is used to serve interactive, low-latency, analytical queries. // NOTE: We should consider merging the OLAPStore and SQLStore interfaces. AsOLAP(instanceID string) (OLAPStore, bool) // AsObjectStore returns an ObjectStore if the driver can serve as such, otherwise returns false. // An object store can store, list and retrieve files on a remote server. AsObjectStore() (ObjectStore, bool) // AsFileStore returns a FileStore if the driver can serve as such, otherwise returns false. // A file store can store, list and retrieve local files. // NOTE: The file store can probably be merged with the ObjectStore interface. AsFileStore() (FileStore, bool) // AsModelExecutor returns a ModelExecutor capable of building a model. // Since models may move data between connectors, the model executor can be seem as a "meta driver" that uses handles on other connectors. // The provided options provides both an input connector and an output connector. One or both of these will be the receiver itself. // It should return false if the handle is not capable of executing a model between the provided input and output connectors. AsModelExecutor(instanceID string, opts *ModelExecutorOptions) (ModelExecutor, bool) // AsModelManager returns a ModelManager for managing model results produced by a ModelExecutor. // This is different from the ModelExecutor since sometimes, the model's input connector executes and writes the model result to the output connector. // But managing the result lifecycle is easier to do directly using the output connector. AsModelManager(instanceID string) (ModelManager, bool) // AsTransporter returns a Transporter for moving data between two other handles. One of the input handles may be the Handle itself. // Examples: duckdb.AsTransporter(gcs, duckdb), beam.AsTransporter(gcs, s3). AsTransporter(from Handle, to Handle) (Transporter, bool) // AsNotifier returns a Notifier (if the driver can serve as such) to send notifications: alerts, reports, etc. // Examples: email notifier, slack notifier. AsNotifier(properties map[string]any) (Notifier, error) }
Handle represents a connection to an external service, such as a database, object store, etc. It should implement one or more of the As...() functions.
func Open ¶
func Open(driver, instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (Handle, error)
Open opens a new connection. If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs. If instanceID is not empty, the connection is considered instance-specific and its As...() functions will only be invoked with the same instance ID.
type IncrementalStrategy ¶ added in v0.45.0
type IncrementalStrategy string
IncrementalStrategy is a strategy to use for incrementally inserting data into a SQL table.
const ( IncrementalStrategyUnspecified IncrementalStrategy = "" IncrementalStrategyAppend IncrementalStrategy = "append" IncrementalStrategyMerge IncrementalStrategy = "merge" )
type InformationSchema ¶
type InformationSchema interface { All(ctx context.Context) ([]*Table, error) Lookup(ctx context.Context, db, schema, name string) (*Table, error) }
InformationSchema contains information about existing tables in an OLAP driver. Table lookups should be case insensitive.
type IngestionSummary ¶ added in v0.24.0
type IngestionSummary struct {
BytesIngested int64
}
IngestionSummary is details about ingestion
type Instance ¶
type Instance struct { // Identifier ID string // Environment is the environment that the instance represents Environment string // Driver name to connect to for OLAP OLAPConnector string // ProjectOLAPConnector is an override of OLAPConnector that may be set in rill.yaml. // NOTE: Hopefully we can merge this with OLAPConnector if/when we remove the ability to set OLAPConnector using flags. ProjectOLAPConnector string // Driver name for reading/editing code artifacts RepoConnector string // Driver name for the admin service managing the deployment (optional) AdminConnector string // Driver name for the AI service (optional) AIConnector string // Driver name for catalog CatalogConnector string // CreatedOn is when the instance was created CreatedOn time.Time `db:"created_on"` // UpdatedOn is when the instance was last updated in the registry UpdatedOn time.Time `db:"updated_on"` // Instance specific connectors Connectors []*runtimev1.Connector `db:"connectors"` // ProjectVariables contains default connectors from rill.yaml ProjectConnectors []*runtimev1.Connector `db:"project_connectors"` // Variables contains user-provided variables Variables map[string]string `db:"variables"` // ProjectVariables contains default variables from rill.yaml // (NOTE: This can always be reproduced from rill.yaml, so it's really just a handy cache of the values.) ProjectVariables map[string]string `db:"project_variables"` // FeatureFlags contains feature flags configured in rill.yaml FeatureFlags map[string]bool `db:"feature_flags"` // Annotations to enrich activity events (like usage tracking) Annotations map[string]string // EmbedCatalog tells the runtime to store the instance's catalog in its OLAP store instead // of in the runtime's metadata store. Currently only supported for the duckdb driver. EmbedCatalog bool `db:"embed_catalog"` // WatchRepo indicates whether to watch the repo for file changes and reconcile them automatically. WatchRepo bool `db:"watch_repo"` // IgnoreInitialInvalidProjectError indicates whether to ignore an invalid project error when the instance is initially created. IgnoreInitialInvalidProjectError bool `db:"-"` }
Instance represents a single data project, meaning one OLAP connection, one repo connection, and one catalog connection.
func (*Instance) Config ¶ added in v0.43.0
func (i *Instance) Config() (InstanceConfig, error)
Config resolves the current dynamic config properties for the instance. See InstanceConfig for details.
func (*Instance) ResolveOLAPConnector ¶ added in v0.42.0
ResolveOLAPConnector resolves the OLAP connector to default to for the instance.
func (*Instance) ResolveVariables ¶ added in v0.23.0
ResolveVariables returns the final resolved variables
type InstanceConfig ¶ added in v0.43.0
type InstanceConfig struct { // DownloadRowLimit is the row limit for interactive data exports. If set to 0, there is no limit. DownloadRowLimit int64 `mapstructure:"rill.download_row_limit"` // PivotCellLimit is the maximum number of cells allowed in a single pivot query. // Note that it does not limit the UI's pivot table because it paginates the requests. PivotCellLimit int64 `mapstructure:"rill.pivot_cell_limit"` // InteractiveSQLRowLimit is the row limit for interactive SQL queries. It does not apply to exports of SQL queries. If set to 0, there is no limit. InteractiveSQLRowLimit int64 `mapstructure:"rill.interactive_sql_row_limit"` // StageChanges indicates whether to keep previously ingested tables for sources/models, and only override them if ingestion of a new table is successful. StageChanges bool `mapstructure:"rill.stage_changes"` // ModelDefaultMaterialize indicates whether to materialize models by default. ModelDefaultMaterialize bool `mapstructure:"rill.models.default_materialize"` // ModelMaterializeDelaySeconds adds a delay before materializing models. ModelMaterializeDelaySeconds uint32 `mapstructure:"rill.models.materialize_delay_seconds"` // AlertStreamingRefDefaultRefreshCron sets a default cron expression for refreshing alerts with streaming refs. // Namely, this is used to check alerts against external tables (e.g. in Druid) where new data may be added at any time (i.e. is considered "streaming"). AlertsDefaultStreamingRefreshCron string `mapstructure:"rill.alerts.default_streaming_refresh_cron"` }
InstanceConfig contains dynamic configuration for an instance. It is configured by parsing instance variables prefixed with "rill.". For example, a variable "rill.stage_changes=true" would set the StageChanges field to true. InstanceConfig should only be used for config that the user is allowed to change dynamically at runtime.
type ModelExecutor ¶ added in v0.45.0
type ModelExecutor interface {
Execute(ctx context.Context) (*ModelResult, error)
}
type ModelExecutorOptions ¶ added in v0.45.0
type ModelManager ¶ added in v0.45.0
type ModelManager interface { Rename(ctx context.Context, res *ModelResult, newName string, env *ModelEnv) (*ModelResult, error) Exists(ctx context.Context, res *ModelResult) (bool, error) Delete(ctx context.Context, res *ModelResult) error }
type ModelResult ¶ added in v0.45.0
type NoOpProgress ¶ added in v0.30.0
type NoOpProgress struct{}
func (NoOpProgress) Observe ¶ added in v0.30.0
func (n NoOpProgress) Observe(val int64, unit ProgressUnit)
func (NoOpProgress) Target ¶ added in v0.30.0
func (n NoOpProgress) Target(val int64, unit ProgressUnit)
type Notifier ¶ added in v0.43.0
type Notifier interface { SendAlertStatus(s *AlertStatus) error SendScheduledReport(s *ScheduledReport) error }
Notifier sends notifications.
type OLAPStore ¶
type OLAPStore interface { Dialect() Dialect WithConnection(ctx context.Context, priority int, longRunning, tx bool, fn WithConnectionFunc) error Exec(ctx context.Context, stmt *Statement) error Execute(ctx context.Context, stmt *Statement) (*Result, error) InformationSchema() InformationSchema EstimateSize() (int64, bool) CreateTableAsSelect(ctx context.Context, name string, view bool, sql string) error InsertTableAsSelect(ctx context.Context, name, sql string, byName, inPlace bool, strategy IncrementalStrategy, uniqueKey []string) error DropTable(ctx context.Context, name string, view bool) error RenameTable(ctx context.Context, name, newName string, view bool) error AddTableColumn(ctx context.Context, tableName, columnName string, typ string) error AlterTableColumn(ctx context.Context, tableName, columnName string, newType string) error }
OLAPStore is implemented by drivers that are capable of storing, transforming and serving analytical queries. NOTE crud APIs are not safe to be called with `WithConnection`
type ObjectStore ¶ added in v0.30.0
type ObjectType ¶ added in v0.16.0
type ObjectType int
Constants representing the kinds of catalog objects.
const ( ObjectTypeUnspecified ObjectType = 0 ObjectTypeTable ObjectType = 1 ObjectTypeSource ObjectType = 2 ObjectTypeModel ObjectType = 3 ObjectTypeMetricsView ObjectType = 4 )
type PermissionDeniedError ¶ added in v0.30.0
type PermissionDeniedError struct {
// contains filtered or unexported fields
}
PermissionDeniedError is returned when a driver cannot access some data due to insufficient permissions.
func (*PermissionDeniedError) Error ¶ added in v0.30.0
func (e *PermissionDeniedError) Error() string
type Progress ¶ added in v0.30.0
type Progress interface { Target(val int64, unit ProgressUnit) // Observe is used by caller to provide incremental updates Observe(val int64, unit ProgressUnit) }
Progress is an interface for communicating progress info
type ProgressUnit ¶ added in v0.30.0
type ProgressUnit int
const ( ProgressUnitByte ProgressUnit = iota ProgressUnitFile ProgressUnitRecord )
type PropertySpec ¶ added in v0.43.0
type PropertySpec struct { Key string Type PropertyType Required bool DisplayName string Description string DocsURL string Hint string Default string Placeholder string Secret bool }
PropertySpec provides metadata about a single connector property.
type PropertyType ¶ added in v0.43.0
type PropertyType int
PropertyType is an enum of types supported for connector properties.
const ( UnspecifiedPropertyType PropertyType = iota NumberPropertyType BooleanPropertyType StringPropertyType FilePropertyType InformationalPropertyType )
type QueryOption ¶ added in v0.33.2
type QueryOption struct { // TotalLimitInBytes rerpresent the max limit on the bytes that should be downloaded in a file TotalLimitInBytes int64 }
type RegistryStore ¶
type RegistryStore interface { FindInstances(ctx context.Context) ([]*Instance, error) FindInstance(ctx context.Context, id string) (*Instance, error) CreateInstance(ctx context.Context, instance *Instance) error DeleteInstance(ctx context.Context, id string) error EditInstance(ctx context.Context, instance *Instance) error }
RegistryStore is implemented by drivers capable of storing and looking up instances and repos.
type RepoObjectStat ¶ added in v0.15.0
type RepoStore ¶
type RepoStore interface { Driver() string // Root returns directory where artifacts are stored. Root() string CommitHash(ctx context.Context) (string, error) ListRecursive(ctx context.Context, glob string, skipDirs bool) ([]DirEntry, error) Get(ctx context.Context, path string) (string, error) Stat(ctx context.Context, path string) (*RepoObjectStat, error) Put(ctx context.Context, path string, reader io.Reader) error MakeDir(ctx context.Context, path string) error Rename(ctx context.Context, fromPath string, toPath string) error Delete(ctx context.Context, path string, force bool) error Sync(ctx context.Context) error Watch(ctx context.Context, cb WatchCallback) error }
RepoStore is implemented by drivers capable of storing code artifacts. It mirrors a file system, but may be virtualized by a database for non-local deployments.
type ReportMetadata ¶ added in v0.37.0
type Result ¶ added in v0.15.0
type Result struct { *sqlx.Rows Schema *runtimev1.StructType // contains filtered or unexported fields }
Result wraps the results of query.
func (*Result) Close ¶ added in v0.18.0
Close wraps rows.Close and calls the Result's cleanup function (if it is set). Close should be idempotent.
func (*Result) SetCleanupFunc ¶ added in v0.18.0
SetCleanupFunc sets a function, which will be called when the Result is closed.
type RowIterator ¶ added in v0.31.0
type RowIterator interface { // Schema of the underlying data Schema(ctx context.Context) (*runtimev1.StructType, error) // Next fetches next row Next(ctx context.Context) ([]driver.Value, error) // Close closes the iterator and frees resources Close() error // Size returns total size of data downloaded in unit. // Returns 0,false if not able to compute size in given unit Size(unit ProgressUnit) (uint64, bool) }
RowIterator returns an iterator to iterate over result of a sql query
type SQLStore ¶ added in v0.31.0
type SQLStore interface { // Query returns driver.RowIterator to iterate over results row by row Query(ctx context.Context, props map[string]any) (RowIterator, error) // QueryAsFiles downloads results into files and returns an iterator to iterate over them QueryAsFiles(ctx context.Context, props map[string]any, opt *QueryOption, p Progress) (FileIterator, error) }
SQLStore is implemented by drivers capable of running sql queries and generating an iterator to consume results. In future the results can be produced in other formats like arrow as well. May be call it DataWarehouse to differentiate from OLAP or postgres?
type ScheduledReport ¶ added in v0.43.0
type Spec ¶ added in v0.30.0
type Spec struct { DisplayName string Description string DocsURL string ConfigProperties []*PropertySpec SourceProperties []*PropertySpec ImplementsRegistry bool ImplementsCatalog bool ImplementsRepo bool ImplementsAdmin bool ImplementsAI bool ImplementsSQLStore bool ImplementsOLAP bool ImplementsObjectStore bool ImplementsFileStore bool ImplementsNotifier bool }
Spec provides metadata about a connector and the properties it supports.
type Statement ¶
type Statement struct { Query string Args []any DryRun bool Priority int LongRunning bool ExecutionTimeout time.Duration }
Statement wraps a query to execute against an OLAP driver.
type Table ¶
type Table struct { Database string DatabaseSchema string IsDefaultDatabase bool IsDefaultDatabaseSchema bool Name string View bool Schema *runtimev1.StructType UnsupportedCols map[string]string }
Table represents a table in an information schema.
type TransferOptions ¶ added in v0.34.1
type TransferOptions struct { AllowHostAccess bool RepoRoot string Progress Progress AcquireConnector func(string) (Handle, func(), error) }
TransferOptions provide execution context for Transporter.Transfer
type Transporter ¶ added in v0.30.0
type Transporter interface {
Transfer(ctx context.Context, srcProps, sinkProps map[string]any, opts *TransferOptions) error
}
Transporter implements logic for moving data between two connectors (the actual connector objects are provided in AsTransporter)
type WatchCallback ¶ added in v0.30.0
type WatchCallback func(event []WatchEvent)
type WatchEvent ¶ added in v0.30.0
type WithConnectionFunc ¶ added in v0.18.0
type WithConnectionFunc func(wrappedCtx context.Context, ensuredCtx context.Context, conn *sql.Conn) error
WithConnectionFunc is a callback function that provides a context to be used in further OLAP store calls to enforce affinity to a single connection. It also provides pointers to the actual database/sql and database/sql/driver connections. It's called with two contexts: wrappedCtx wraps the input context (including cancellation), and ensuredCtx wraps a background context (ensuring it can never be cancelled).