pgxdb

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BulkInsertEntitiesWithTags

func BulkInsertEntitiesWithTags[T dbx.RowConvertibleEntity](mgr dbx.InstanceManager, ctx context.Context, distLockId int64, tableName string, entities []T) (int64, error)

BulkInsertEntitiesWithTags inserts a large number of structs into a PostgreSQL table using pgx.CopyFrom.

This function takes a slice of structs that implement the `RowConvertibleEntity` interface, derives the column names from the `db` struct tags of the first struct, and inserts the data into the database. Each struct must implement the `ToRow()` method, which converts the struct to a row of values corresponding to the derived column names.

The `db` tags in the struct define how the fields are mapped to the corresponding columns in the database. Fields with `db:"-"` or without a `db` tag will be ignored during the insertion process.

Arguments:

  • mgr: The instance manager responsible for managing the database connection.
  • ctx: The context for the query execution, which can be used to control cancellation and deadlines.
  • distLockId: The ID used to retrieve a specific connection associated with a distributed lock. If 0, it gets a connection from the general pool.
  • tableName: The name of the table into which data will be inserted (CASE SENSITIVE).
  • entities: A slice of structs implementing the `RowConvertibleEntity` interface, representing the rows to be inserted.

Returns:

  • int64: The number of rows successfully inserted.
  • error: Any error encountered during the bulk insert.

Notes:

  • The function derives column names from the first entity's `db` tags using the `DeriveColumnNamesFromTags` function.
  • Each struct passed to the function must implement the `RowConvertibleEntity` interface, specifically the `ToRow()` method, which returns the corresponding row values as a slice of `interface{}`.
  • The `db` tag in the struct is used to map fields to PostgreSQL columns. Fields without a `db` tag or with `db:"-"` are skipped.
  • The PostgreSQL `pgx.CopyFrom` function is used for efficient bulk insertion.

func ExecTransactionalTask

func ExecTransactionalTask(mgr dbx.InstanceManager, ctx context.Context, distLockId int64, task func(ctx context.Context, tx dbx.Transaction) error, args ...interface{}) error

ExecTransactionalTask performs a task inside a transaction.

This function facilitates the execution of a series of database operations within a transaction. It ensures that all operations either succeed or fail together, maintaining the integrity of the data. If any error occurs during the task, the transaction is rolled back. Otherwise, the transaction is committed.

Arguments:

  • mgr: The instance manager responsible for managing the database connection.
  • ctx: The context for the transaction execution, which can manage timeouts and cancellation.
  • distLockId: The ID used to retrieve a specific connection associated with a distributed lock. If 0, it gets a connection from the general pool.
  • task: A function that encapsulates the operations to be performed within the transaction. This function receives the context and the transaction object, allowing it to execute queries within the transaction.
  • args: Any additional arguments needed for the transaction execution.

Returns:

  • error: Any error encountered during transaction initiation, execution, or commit.

func NewEmptyBatch

func NewEmptyBatch() dbx.Batch

NewEmptyBatch creates a new, empty batch for queuing SQL statements.

This function returns an implementation of the dbx.Batch interface, which can be used to queue multiple SQL statements that will be executed in a single batch operation within a transaction.

Returns:

  • dbx.Batch: A new, empty batch ready to have SQL statements queued.

func QueryAndMap

func QueryAndMap[T any](mgr dbx.InstanceManager, ctx context.Context, distLockId int64, query string, args ...interface{}) ([]T, error)

QueryAndMap uses pgx's struct scanning to map rows directly to a slice of structs.

This function leverages pgx's built-in support for struct scanning to execute a query and automatically map each row in the result set to a struct of type T. The function eliminates the need for a custom scan function, making it easier to work with structured data.

Arguments:

  • mgr: The instance manager responsible for managing the database connection.
  • ctx: The context for the query execution, which can manage cancellation and deadlines.
  • distLockId: The ID used to retrieve a specific connection associated with a distributed lock. If 0, it gets a connection from the general pool.
  • query: The SQL query to be executed.
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • []T: A slice of the struct type T, representing the mapped results from the query.
  • error: Any error encountered during query execution or row mapping.

func QueryAndScan

func QueryAndScan[T any](mgr dbx.InstanceManager, ctx context.Context, distLockId int64, scanFunc func(rows pgx.Rows) (T, error), query string, args ...interface{}) ([]T, error)

QueryAndScan executes a query and maps the result to structs using the provided scanFunc.

This function simplifies the process of executing a SQL query and converting each row in the result set to a specific struct type using a custom scan function. It handles the query execution, iteration over the rows, and error management.

Arguments:

  • mgr: The instance manager responsible for managing the database connection.
  • ctx: The context for the query execution, which can be used to control cancellation and deadlines.
  • distLockId: The ID used to retrieve a specific connection associated with a distributed lock. If 0, it gets a connection from the general pool.
  • scanFunc: A function that maps each row (pgx.Rows) to the desired struct type (T).
  • query: The SQL query to be executed.
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • []T: A slice of the struct type T, representing the mapped results from the query.
  • error: Any error encountered during query execution or row scanning.

func QueryMapAndProcess

func QueryMapAndProcess[T any](mgr dbx.InstanceManager, ctx context.Context, distLockId int64, query string, processCallbackFunc func(item T) error, args ...interface{}) error

QueryMapAndProcess uses pgx's struct scanning to map rows directly to a struct and then process each struct.

This function combines pgx's struct scanning feature with a processing callback. It executes a query, maps each resulting row to a struct of type T using pgx's built-in capabilities, and then passes each struct to a callback function for further processing.

Arguments:

  • mgr: The instance manager responsible for managing the database connection.
  • ctx: The context for the query execution, which can manage cancellation and deadlines.
  • distLockId: The ID used to retrieve a specific connection associated with a distributed lock. If 0, it gets a connection from the general pool.
  • query: The SQL query to be executed.
  • processCallbackFunc: A callback function that processes each mapped struct (T).
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • error: Any error encountered during query execution, row mapping, or processing.

func QueryScanAndProcess

func QueryScanAndProcess[T any](mgr dbx.InstanceManager, ctx context.Context, distLockId int64, query string, scanFunc func(rows pgx.Rows) (T, error), processCallbackFunc func(item T) error, args ...interface{}) error

QueryScanAndProcess executes a query and processes each row with a callback that receives a struct.

This function is designed to execute a SQL query and process each resulting row as a struct of type T. The rows are mapped to the struct using a custom scan function provided by the caller. Each struct is then passed to a callback function for processing, allowing for side effects or further actions to be performed on each individual result.

Arguments:

  • mgr: The instance manager responsible for managing the database connection.
  • ctx: The context for the query execution, allowing for cancellation and timeout management.
  • distLockId: The ID used to retrieve a specific connection associated with a distributed lock. If 0, it gets a connection from the general pool.
  • query: The SQL query to be executed.
  • scanFunc: A function that maps each row (pgx.Rows) to the desired struct type (T).
  • processCallbackFunc: A callback function that processes each mapped struct (T).
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • error: Any error encountered during query execution, row scanning, or processing.

func SetupPostgresDbManager

func SetupPostgresDbManager(ctx context.Context, dbConf dbx.ConnConfig, preparesStatements ...dbx.PreparedStatement) dbx.InstanceManager

SetupPostgresDbManager - setup Postgres DB connection.

func TxExecBatch

func TxExecBatch(tx dbx.Transaction, ctx context.Context, batch dbx.Batch) (int64, error)

TxExecBatch processes the results of a batch of SQL statements within a single transaction and returns the total number of rows affected.

This function sends a batch of SQL statements for execution within the context of an existing transaction. After the batch is sent, it processes the results of each query in the batch sequentially. If any query in the batch fails, the function returns an error and stops further processing.

Arguments:

  • tx: The transaction within which the batch will be processed. This must implement the dbx.Transaction interface.
  • ctx: The context for managing the batch processing, allowing for cancellation and timeouts.
  • batch: The batch of SQL statements to be processed. This must implement the dbx.Batch interface.

Returns:

  • int64: The total number of rows affected by all the SQL statements in the batch.
  • error: An error if any query in the batch fails, otherwise nil.

Behavior:

  • The function first converts the custom dbx.Batch interface to the underlying pgx.Batch.
  • It then sends the batch for execution within the transaction context using the pgx.SendBatch method.
  • The function iterates over each query result in the batch, reading the outcome and accumulating the total number of rows affected.
  • If reading the result of any query fails, the function captures the error, stops further processing, and returns the error along with the number of rows affected up to that point.

Example Usage:

batch := pgxdb.NewEmptyBatch()
batch.Queue("INSERT INTO users (name, email) VALUES ($1, $2)", "John Doe", "john@example.com")
batch.Queue("UPDATE users SET last_login = now() WHERE id = $1", userID)

rowsAffected, err := pgxdb.TxExecBatch(tx, ctx, batch)
if err != nil {
    log.Fatal("Failed to execute batch:", err)
}
log.Printf("Batch executed successfully, total rows affected: %d", rowsAffected)

func TxQueryAndMap

func TxQueryAndMap[T any](tx dbx.Transaction, ctx context.Context, query string, args ...interface{}) ([]T, error)

TxQueryAndMap uses pgx's struct scanning to map rows directly to a slice of structs within a transaction.

This function leverages pgx's built-in support for struct scanning to execute a query within a transaction and automatically map each row in the result set to a struct of type T. The function eliminates the need for a custom scan function, making it easier to work with structured data.

Arguments:

  • tx: The transaction object within which the query is executed.
  • ctx: The context for the query execution, which can manage cancellation and deadlines.
  • query: The SQL query to be executed.
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • []T: A slice of the struct type T, representing the mapped results from the query.
  • error: Any error encountered during query execution or row mapping.

func TxQueryAndScan

func TxQueryAndScan[T any](tx dbx.Transaction, ctx context.Context, scanFunc func(rows pgx.Rows) (T, error), query string, args ...interface{}) ([]T, error)

TxQueryAndScan executes a query within a transaction and maps the result to structs using the provided scanFunc.

This function simplifies the process of executing a SQL query within a transaction and converting each row in the result set to a specific struct type using a custom scan function. It handles the query execution, iteration over the rows, and error management.

Arguments:

  • tx: The transaction object within which the query is executed.
  • ctx: The context for the query execution, which can be used to control cancellation and deadlines.
  • scanFunc: A function that maps each row (pgx.Rows) to the desired struct type (T).
  • query: The SQL query to be executed.
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • []T: A slice of the struct type T, representing the mapped results from the query.
  • error: Any error encountered during query execution or row scanning.

func TxQueryMapAndProcess

func TxQueryMapAndProcess[T any](tx dbx.Transaction, ctx context.Context, query string, processCallbackFunc func(item T) error, args ...interface{}) error

TxQueryMapAndProcess uses pgx's struct scanning to map rows directly to a struct and then process each struct within a transaction.

This function combines pgx's struct scanning feature with a processing callback. It executes a query within a transaction, maps each resulting row to a struct of type T using pgx's built-in capabilities, and then passes each struct to a callback function for further processing.

Arguments:

  • tx: The transaction object within which the query is executed.
  • ctx: The context for the query execution, which can manage cancellation and deadlines.
  • query: The SQL query to be executed.
  • processCallbackFunc: A callback function that processes each mapped struct (T).
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • error: Any error encountered during query execution, row mapping, or processing.

func TxQueryScanAndProcess

func TxQueryScanAndProcess[T any](tx dbx.Transaction, ctx context.Context, query string, scanFunc func(rows pgx.Rows) (T, error), processCallbackFunc func(item T) error, args ...interface{}) error

TxQueryScanAndProcess executes a query within a transaction and processes each row with a callback that receives a struct.

This function is designed to execute a SQL query within a transaction and process each resulting row as a struct of type T. The rows are mapped to the struct using a custom scan function provided by the caller. Each struct is then passed to a callback function for processing, allowing for side effects or further actions to be performed on each individual result.

Arguments:

  • tx: The transaction object within which the query is executed.
  • ctx: The context for the query execution, allowing for cancellation and timeout management.
  • query: The SQL query to be executed.
  • scanFunc: A function that maps each row (pgx.Rows) to the desired struct type (T).
  • processCallbackFunc: A callback function that processes each mapped struct (T).
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • error: Any error encountered during query execution, row scanning, or processing.

Types

type PostgresDB

type PostgresDB struct {
	// contains filtered or unexported fields
}

PostgresDB - dbx manager. It Implements dbx.InstanceManager

func (*PostgresDB) CloseDbConnPool

func (dbm *PostgresDB) CloseDbConnPool()

CloseDbConnPool - close dbx connection pool.

func (*PostgresDB) Exec

func (dbm *PostgresDB) Exec(ctx context.Context, distLockId int64, execQuery string, args ...any) (int64, error)

Exec executes a SQL query that does not return rows, such as INSERT, UPDATE, or DELETE, and returns the number of rows affected.

This method is typically used for executing SQL commands where the result is not a set of rows but a count of rows affected. The method acquires a database connection from the connection pool, executes the provided query, and then releases the connection back to the pool.

Arguments:

  • ctx: The context for managing the query execution, which allows for cancellation and timeouts.
  • distLockId: An ID used to retrieve a specific connection associated with a distributed lock. This can be used for ensuring that specific operations are executed on a particular connection if necessary. In this implementation, this parameter is not used, but it's included to match the signature of other methods.
  • execQuery: The SQL query to be executed. This query should be a command that modifies data, such as an INSERT, UPDATE, DELETE, or similar operation.
  • args: The variadic arguments that will be passed to the SQL query. These arguments will be substituted into the query at the appropriate placeholders.

Returns:

  • int64: The number of rows affected by the query.
  • error: Any error encountered during the query execution or connection management.

Behavior:

  • The method first acquires a connection from the connection pool.
  • It then executes the provided SQL query with the given arguments.
  • If the execution is successful, it returns the number of rows affected by the query.
  • If there is an error during execution, the method logs the error and returns it wrapped in a custom error type.
  • The database connection is released back to the pool at the end of the method, regardless of success or failure.

Example Usage:

rowsAffected, err := dbm.Exec(ctx, 0, "UPDATE users SET last_login = now() WHERE id = $1", userID)
if err != nil {
    log.Fatal("Failed to update last login time:", err)
}
log.Printf("%d rows were updated", rowsAffected)

func (*PostgresDB) GetConnFromPool

func (dbm *PostgresDB) GetConnFromPool(ctx context.Context) (any, error)

GetConnFromPool - get a connection from the pool.

func (*PostgresDB) GetConnectionConfig

func (dbm *PostgresDB) GetConnectionConfig() dbx.ConnConfig

GetConnectionConfig - get Db Connection config.

func (*PostgresDB) GetDbConnPool

func (dbm *PostgresDB) GetDbConnPool() (any, error)

GetDbConnPool - get the connection pool.

func (*PostgresDB) Query

func (dbm *PostgresDB) Query(ctx context.Context, distLockId int64, query string, args ...interface{}) (conn any, rows any, err error)

Query executes a SQL query and returns both the resulting rows and the database connection.

This method is designed to execute a query and return the resulting rows along with the database connection used to execute the query. This allows for explicit control over the connection lifecycle, ensuring that the connection can be properly released back to the pool after the rows have been processed.

Arguments:

  • ctx: The context for the query execution, allowing for cancellation and deadline management.
  • distLockId: The ID used to retrieve a specific connection associated with a distributed lock. If 0, it gets a connection from the general pool.
  • query: The SQL query to be executed.
  • args: The variadic arguments for the SQL query, if any.

Returns:

  • conn: The database connection used for the query, returned as `any`. This must be cast to the appropriate connection type (e.g., `*pgxpool.Conn`) and explicitly released after use.
  • rows: The result set from the query, returned as `any`. This must be cast to `pgx.Rows` and properly closed after processing to release associated resources.
  • err: Any error encountered during connection acquisition, query execution, or row retrieval.

Usage:

After calling this method, you should ensure that both `rows` and `conn` are properly closed and released
to avoid connection leaks. Typically, you would use `defer` to ensure that resources are released:

conn, rows, err := dbm.Query(ctx, "SELECT * FROM my_table WHERE id = $1", 123)
if err != nil {
    // Handle error
}
defer rows.(pgx.Rows).Close()
defer conn.(*pgxpool.Conn).Release()

func (*PostgresDB) TxBegin

func (dbm *PostgresDB) TxBegin(ctx context.Context) (pgxTx dbx.Transaction, err error)

TxBegin starts a new database transaction and returns a Transaction interface that can be used to commit or roll back the transaction.

This method is used to initiate a new transaction within the database. It acquires a connection from the connection pool, begins a transaction on that connection, and returns a `PostgresTx` struct that implements the `dbx.Transaction` interface. The transaction can then be used to execute multiple SQL statements as part of a single atomic operation.

Arguments:

  • ctx: The context for managing the transaction initiation, which allows for cancellation and timeouts.

Returns:

  • pgxTx: A `dbx.Transaction` interface that represents the active transaction. This can be used to execute queries within the transaction and to commit or roll back the transaction.
  • err: Any error encountered during the transaction initiation.

Behavior:

  • The method first acquires a connection from the connection pool.
  • It then begins a transaction on the acquired connection.
  • If the transaction is successfully started, a `PostgresTx` struct is returned, which includes the transaction object, the connection, and a randomly generated transaction ID.
  • If there is an error during the connection acquisition or transaction initiation, the error is wrapped and returned.

Example Usage:

tx, err := dbm.TxBegin(ctx)
if err != nil {
    log.Fatal("Failed to start transaction:", err)
}
defer tx.TxRollback(ctx)

// Execute queries within the transaction
rowsAffected, err := tx.Exec(ctx, "UPDATE users SET last_login = now() WHERE id = $1", userID)
if err != nil {
    log.Fatal("Failed to update last login time:", err)
}

// Commit the transaction
err = tx.TxCommit(ctx)
if err != nil {
    log.Fatal("Failed to commit transaction:", err)
}

type PostgresTx

type PostgresTx struct {
	// contains filtered or unexported fields
}

PostgresTx - Postgres Transaction manager. Implements dbx.Transaction, providing methods to manage a PostgreSQL transaction.

func (*PostgresTx) GetTx

func (tx *PostgresTx) GetTx() any

GetTx - Returns the underlying pgx transaction.

This method exposes the underlying pgx.Tx object, allowing for direct interaction with the transaction if needed. It is primarily used internally to execute queries or commands within the transaction's context.

Returns:

  • any: The underlying pgx.Tx object.

func (*PostgresTx) TxCommit

func (tx *PostgresTx) TxCommit(ctx context.Context) error

TxCommit - Commits a transaction and releases the connection to the pool.

This method finalizes the transaction by committing all changes made during the transaction. After the commit, the database connection is released back to the connection pool for reuse.

Arguments:

  • ctx: The context for managing the transaction commit, which allows for cancellation and timeouts.

Returns:

  • error: Any error encountered during the commit process. If the commit fails, the error is wrapped in a custom error type.

Example Usage:

err := tx.TxCommit(ctx)
if err != nil {
    log.Fatal("Failed to commit transaction:", err)
}

func (*PostgresTx) TxExec

func (tx *PostgresTx) TxExec(ctx context.Context, execQuery string, args ...any) (int64, error)

TxExec - Executes a command query under a transaction and returns the number of rows affected.

This method is used to execute SQL commands that modify the database, such as INSERT, UPDATE, or DELETE, within the context of the active transaction. It returns the number of rows affected by the query.

Arguments:

  • ctx: The context for managing the query execution, which allows for cancellation and timeouts.
  • execQuery: The SQL query to be executed.
  • args: The arguments for the SQL query.

Returns:

  • int64: The number of rows affected by the query.
  • error: Any error encountered during the query execution. If an error occurs, it is wrapped in a custom error type.

Example Usage:

rowsAffected, err := tx.TxExec(ctx, "UPDATE users SET last_login = now() WHERE id = $1", userID)
if err != nil {
    log.Fatal("Failed to execute update:", err)
}
log.Printf("%d rows were updated", rowsAffected)

func (*PostgresTx) TxQuery

func (tx *PostgresTx) TxQuery(ctx context.Context, query string, args ...interface{}) (any, error)

TxQuery executes a query within the transaction and returns pgx.Rows.

This method is used to execute a SQL query that returns rows, such as a SELECT statement, within the context of the active transaction. It returns a pgx.Rows object that can be used to iterate over the results.

Arguments:

  • ctx: The context for managing the query execution, which allows for cancellation and timeouts.
  • query: The SQL query to be executed.
  • args: The arguments for the SQL query.

Returns:

  • any: The resulting rows from the query, typically of type pgx.Rows.
  • error: Any error encountered during the query execution.

Example Usage:

rows, err := tx.TxQuery(ctx, "SELECT * FROM users WHERE id = $1", userID)
if err != nil {
    log.Fatal("Failed to execute query:", err)
}
defer rows.Close()

func (*PostgresTx) TxRollback

func (tx *PostgresTx) TxRollback(ctx context.Context)

TxRollback - Rolls back a transaction and releases the connection to the pool.

This method aborts the transaction, discarding all changes made during the transaction. After the rollback, the database connection is released back to the connection pool.

Arguments:

  • ctx: The context for managing the transaction rollback, which allows for cancellation and timeouts.

Example Usage:

tx.TxRollback(ctx) // No error returned, typically used in a deferred call

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL