Documentation ¶
Index ¶
- Constants
- func CantPerformQuery(err error, q string) error
- func IsRetryable(err error) bool
- func IsStruct(subject interface{}) bool
- func IsUnixAddr(host string) bool
- func JoinHostPort(host string, port int) string
- func RegisterDrivers(logger logr.Logger)
- func TableName(t interface{}) string
- func Zero[T any]() T
- type CleanupStmt
- type Database
- func (db *Database) BatchSizeByPlaceholders(n int) int
- func (db *Database) BuildDeleteStmt(from interface{}) string
- func (db *Database) BuildSelectStmt(table interface{}, columns interface{}) string
- func (db *Database) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int)
- func (db *Database) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, ...) error
- func (db *Database) CleanupOlderThan(ctx context.Context, stmt CleanupStmt, count uint64, olderThan time.Time, ...) (uint64, error)
- func (db *Database) Connect() bool
- func (db *Database) DeleteStreamed(ctx context.Context, from interface{}, ids <-chan interface{}, ...) error
- func (db *Database) GetSemaphoreForTable(table string) *semaphore.Weighted
- func (db *Database) NamedBulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, ...) error
- func (db *Database) PeriodicCleanup(ctx context.Context, stmt CleanupStmt) error
- func (db *Database) UpsertStreamed(ctx context.Context, entities <-chan interface{}, features ...Feature) error
- func (db *Database) YieldAll(ctx context.Context, factoryFunc func() (interface{}, error), query string, ...) (<-chan interface{}, <-chan error)
- type Driver
- type Feature
- type Features
- type HasRelations
- type OnSuccess
- type PgSQLDriver
- type Quoter
- type Relation
- type RelationOption
- type RetryConnector
- type TableNamer
- type UUID
- type Upserter
Constants ¶
const MySQL = "icinga-mysql"
const PostgreSQL = "icinga-pgsql"
Variables ¶
This section is empty.
Functions ¶
func CantPerformQuery ¶
CantPerformQuery wraps the given error with the specified query that cannot be executed.
func IsRetryable ¶
IsRetryable checks whether the given error is retryable.
func IsUnixAddr ¶
func JoinHostPort ¶
JoinHostPort is like its equivalent in net., but handles UNIX sockets as well.
func RegisterDrivers ¶
RegisterDrivers makes our database Driver(s) available under the name "icinga-*sql".
Types ¶
type CleanupStmt ¶
CleanupStmt defines information needed to compose cleanup statements.
type Database ¶
type Database struct { *sqlx.DB Options database.Options // contains filtered or unexported fields }
Database is a wrapper around sqlx.DB with bulk execution, statement building, streaming and logging capabilities.
func NewFromConfig ¶
NewFromConfig returns a new Database connection from the given Config.
func (*Database) BatchSizeByPlaceholders ¶
BatchSizeByPlaceholders returns how often the specified number of placeholders fits into Options.MaxPlaceholdersPerStatement, but at least 1.
func (*Database) BuildDeleteStmt ¶
BuildDeleteStmt returns a DELETE statement for the given struct.
func (*Database) BuildSelectStmt ¶
BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct and the column list from the specified columns struct.
func (*Database) BuildUpsertStmt ¶
BuildUpsertStmt returns an upsert statement for the given struct.
func (*Database) BulkExec ¶
func (db *Database) BulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, features ...Feature, ) error
BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. Takes in up to the number of arguments specified in count from the arg stream, derives and expands a query and executes it with this set of arguments until the arg stream has been processed. The derived queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Arguments for which the query ran successfully will be passed to onSuccess.
func (*Database) CleanupOlderThan ¶
func (db *Database) CleanupOlderThan( ctx context.Context, stmt CleanupStmt, count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}], ) (uint64, error)
CleanupOlderThan deletes all rows with the specified statement that are older than the given time. Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. Returns the total number of rows deleted.
func (*Database) DeleteStreamed ¶
func (db *Database) DeleteStreamed( ctx context.Context, from interface{}, ids <-chan interface{}, features ...Feature, ) error
DeleteStreamed bulk deletes the specified ids via BulkExec. The delete statement is created using BuildDeleteStmt with the passed entityType. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. IDs for which the query ran successfully will be passed to onSuccess.
func (*Database) GetSemaphoreForTable ¶
func (*Database) NamedBulkExec ¶
func (db *Database) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, splitPolicyFactory com.BulkChunkSplitPolicyFactory[interface{}], features ...Feature, ) error
NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely in the format INSERT ... VALUES. Takes in up to the number of entities specified in count from the arg stream, derives and executes a new query with the VALUES clause expanded to this set of arguments, until the arg stream has been processed. The queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Entities for which the query ran successfully will be passed to onSuccess.
func (*Database) PeriodicCleanup ¶ added in v0.2.0
func (db *Database) PeriodicCleanup(ctx context.Context, stmt CleanupStmt) error
func (*Database) UpsertStreamed ¶
func (db *Database) UpsertStreamed( ctx context.Context, entities <-chan interface{}, features ...Feature, ) error
UpsertStreamed bulk upserts the specified entities via NamedBulkExec. The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable.
func (*Database) YieldAll ¶
func (db *Database) YieldAll(ctx context.Context, factoryFunc func() (interface{}, error), query string, scope ...interface{}) (<-chan interface{}, <-chan error)
YieldAll executes the query with the supplied scope, scans each resulting row into an entity returned by the factory function, and streams them into a returned channel.
type Driver ¶
Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector.
type Feature ¶
type Feature func(*Features)
func WithBlocking ¶
func WithBlocking() Feature
func WithCascading ¶
func WithCascading() Feature
func WithOnSuccess ¶
func WithOnSuccess(fn com.ProcessBulk[any]) Feature
type Features ¶
type Features struct {
// contains filtered or unexported fields
}
func NewFeatures ¶
type HasRelations ¶
type HasRelations interface {
Relations() []Relation
}
type PgSQLDriver ¶
PgSQLDriver extends pq.Driver with driver.DriverContext compliance.
func (PgSQLDriver) OpenConnector ¶
func (PgSQLDriver) OpenConnector(name string) (driver.Connector, error)
OpenConnector implements the driver.DriverContext interface.
type Quoter ¶
type Quoter struct {
// contains filtered or unexported fields
}
func (*Quoter) QuoteColumns ¶
func (*Quoter) QuoteIdentifier ¶
type Relation ¶
type Relation interface { ForeignKey() string SetForeignKey(fk string) CascadeDelete() bool WithoutCascadeDelete() StreamInto(context.Context, chan interface{}) error TableName() string }
func HasMany ¶
func HasMany[T comparable](entities []T, options ...RelationOption) Relation
func HasOne ¶
func HasOne[T comparable](entity T, options ...RelationOption) Relation
type RelationOption ¶
type RelationOption func(r Relation)
func WithForeignKey ¶
func WithForeignKey(fk string) RelationOption
func WithoutCascadeDelete ¶
func WithoutCascadeDelete() RelationOption
type RetryConnector ¶
RetryConnector wraps driver.Connector with retry logic.
func (RetryConnector) Driver ¶
func (c RetryConnector) Driver() driver.Driver
Driver implements part of the driver.Connector interface.
type TableNamer ¶
type TableNamer interface {
TableName() string // TableName tells the table.
}
TableNamer implements the TableName method, which returns the table of the object.