Documentation
¶
Index ¶
- Constants
- func BucketCount(buckets []*BucketInfo) int
- func BucketToContext(ctx context.Context, bucketID BucketID) context.Context
- func GroupByShard[KEYT any, OBJT any](db *DB[KEYT], objs []OBJT, f func(v OBJT) BucketID) (map[shard.ShardID][]OBJT, error)
- func GroupByShardFunc[KEYT any, OBJT any](ctx context.Context, db *DB[KEYT], objs []OBJT, fGroup func(v OBJT) BucketID, ...) error
- func PrepareBucketSQL(sql string, bucketID BucketID) string
- func ShardBatchQueryAll[TKEY any, TRES any](ctx context.Context, batch *ShardBatch[TKEY], dst *[]TRES) error
- func ShardBatchQueryAllFunc[TKEY any, TRES any](ctx context.Context, batch *ShardBatch[TKEY], ...) error
- func UniformBucketFn(n int) func(shardKey string) BucketID
- type BucketBatch
- type BucketID
- type BucketInfo
- type BucketRange
- type DB
- func (b *DB[T]) Connection(ctx context.Context, shardKey T, opt ...conn.ConnectionOption) conn.IConnection
- func (b *DB[T]) CopyFrom(ctx context.Context, shardKey T, tableName pgx.Identifier, ...) (n int64, err error)
- func (b *DB[T]) Exec(ctx context.Context, shardKey T, sql string, arguments ...any) (pgconn.CommandTag, error)
- func (b *DB[T]) GetBucketByKey(key T) (shard.ShardID, BucketID, error)
- func (b *DB[T]) GetFunc() ShardKeyToBucketIDFunc[T]
- func (b *DB[T]) GetShardID(bucketID BucketID) (shard.ShardID, error)
- func (b *DB[T]) Info() bootstrap.Info
- func (b *DB[T]) InitCluster(ctx context.Context, sql string) (err error)
- func (b *DB[T]) LargeObjects(ctx context.Context, shardKey T) pgx.LargeObjects
- func (b *DB[T]) NewBatch(bucketID BucketID) *BucketBatch
- func (b *DB[T]) Query(ctx context.Context, shardKey T, sql string, args ...any) (pgx.Rows, error)
- func (b *DB[T]) QueryRow(ctx context.Context, shardKey T, sql string, args ...any) pgx.Row
- func (b *DB[T]) RunBucketFunc(ctx context.Context, ...) error
- func (b *DB[T]) RunShardFunc(ctx context.Context, ...) error
- func (b *DB[T]) SendBatch(ctx context.Context, shardKey T, batch *pgx.Batch) pgx.BatchResults
- func (b *DB[T]) ShardConnection(ctx context.Context, shardID shard.ShardID, opt ...conn.ConnectionOption) conn.IConnection
- func (b *DB[T]) Start(ctx context.Context) (err error)
- func (b *DB[T]) Stop(ctx context.Context) error
- type Option
- type ShardBatch
- func (b *ShardBatch[TKEY]) Close() error
- func (b *ShardBatch[TKEY]) Exec() (pgconn.CommandTag, error)
- func (b *ShardBatch[TKEY]) ExecAll(ctx context.Context) error
- func (b *ShardBatch[TKEY]) Len() (n int)
- func (b *ShardBatch[TKEY]) Query() (pgx.Rows, error)
- func (b *ShardBatch[TKEY]) QueryRow() pgx.Row
- func (b *ShardBatch[TKEY]) Queue(key TKEY, sql string, args ...any) error
- func (b *ShardBatch[TKEY]) Send(ctx context.Context) error
- type ShardKeyToBucketIDFunc
Constants ¶
const ( // BucketAlias bucket alias in SQL queries. BucketAlias = "__bucket__" // BucketPrefix prefix for bucket table names. BucketPrefix = "bucket_" )
Variables ¶
This section is empty.
Functions ¶
func BucketToContext ¶
BucketToContext puts BucketID into context.
func GroupByShard ¶
func GroupByShard[KEYT any, OBJT any]( db *DB[KEYT], objs []OBJT, f func(v OBJT) BucketID, ) (map[shard.ShardID][]OBJT, error)
GroupByShard groups objects by cluster shards based on a function that returns a key for each object.
func GroupByShardFunc ¶
func GroupByShardFunc[KEYT any, OBJT any]( ctx context.Context, db *DB[KEYT], objs []OBJT, fGroup func(v OBJT) BucketID, fCall func(ctx context.Context, shardID shard.ShardID, objs []OBJT) error, ) error
GroupByShardFunc groups objects by cluster shards based on function fGroup, which returns a key for each object. Then function fCall is called for each group.
func PrepareBucketSQL ¶
PrepareBucketSQL replaces bucket aliases in the query with their table names.
func ShardBatchQueryAll ¶
func ShardBatchQueryAll[TKEY any, TRES any](ctx context.Context, batch *ShardBatch[TKEY], dst *[]TRES) error
ShardBatchQueryAll executes queries, reads all rows from all query results and passes them to dst, then closes the batch.
func ShardBatchQueryAllFunc ¶
func ShardBatchQueryAllFunc[TKEY any, TRES any](ctx context.Context, batch *ShardBatch[TKEY], f func(ctx context.Context, rows []TRES) error, ) error
ShardBatchQueryAllFunc executes queries, reads all rows from all query results and passes them to f, then closes the batch.
func UniformBucketFn ¶
UniformBucketFn returns a function that uniformly distributes any keys across n buckets. uses string as a key.
Types ¶
type BucketBatch ¶
type BucketBatch struct {
// contains filtered or unexported fields
}
BucketBatch is an analog of pgx.Batch, but with bucket support. For correct work with buckets, you need to create a BucketBatch instance using the NewBucketBatch function, not pgx.BucketBatch. Then add queries to BucketBatch using the BucketBatch.Queue function, after which you can pass BucketBatch.PgxBatch() to SendBatch.
func NewBucketBatch ¶
func NewBucketBatch(bucketID BucketID) *BucketBatch
NewBucketBatch creates a new Batch.
func (*BucketBatch) BucketID ¶
func (b *BucketBatch) BucketID() BucketID
BucketID returns the bucket identifier.
func (*BucketBatch) Len ¶
func (b *BucketBatch) Len() int
Len returns the number of queries in Batch.
func (*BucketBatch) PgxBatch ¶
func (b *BucketBatch) PgxBatch() *pgx.Batch
PgxBatch returns the original pgx.Batch.
func (*BucketBatch) Queue ¶
func (b *BucketBatch) Queue(query string, args ...any)
Queue adds a query to Batch, applying transformation of bucket aliases to table names.
func (*BucketBatch) QueueBucket ¶
func (b *BucketBatch) QueueBucket(bucketID BucketID, query string, args ...any)
QueueBucket adds a query to Batch. If bucketID doesn't match the current bucket, the operation is not performed.
type BucketID ¶
type BucketID uint
BucketID bucket identifier.
func BucketFromContext ¶
BucketFromContext extracts BucketID from context.
type BucketInfo ¶
type BucketInfo struct { ShardID shard.ShardID BucketRange *BucketRange }
BucketInfo information about a bucket.
type BucketRange ¶
BucketRange range of buckets.
func NewBucketRange ¶
func NewBucketRange(fromID, toID BucketID) *BucketRange
NewBucketRange creates a range of buckets.
func (BucketRange) Contains ¶
func (b BucketRange) Contains(bucketID BucketID) bool
Contains checks if the bucket id is within the range.
type DB ¶
type DB[T any] struct { // contains filtered or unexported fields }
DB is a wrapper around shard.DB, which manages the distribution of data between buckets, which in turn distribute data between shards. For working with a sharded database, you should use bucket.DB, not shard.DB.
func New ¶
func New[T any](shardDB *shard.DB, buckets []*BucketInfo, shardKeyToBucketIDFunc ShardKeyToBucketIDFunc[T], opts ...Option[T], ) *DB[T]
New creates a sharded DB with bucket support.
func NewBucketCluster ¶
func NewBucketCluster(shardInfo []*shard.ShardInfo, bucketInfo []*BucketInfo, bucketOpts ...Option[string], ) *DB[string]
NewBucketCluster creates connections with shards immediately and wraps everything in bucket.DB. Helper to simplify bucket.DB creation.
func NewBucketClusterFromDSN ¶
func NewBucketClusterFromDSN(dsn []shard.DSNInfo, bucketInfo []*BucketInfo, shardOpts []shard.Option, bucketOpts []Option[string], ) *DB[string]
NewBucketClusterFromDSN creates connections with shards immediately and wraps everything in bucket.DB. Connections to shards are created using DSN. Helper to simplify bucket.DB creation.
func (*DB[T]) Connection ¶
func (b *DB[T]) Connection(ctx context.Context, shardKey T, opt ...conn.ConnectionOption) conn.IConnection
Connection returns IConnection interface implementation for the specified sharding key.
func (*DB[T]) CopyFrom ¶
func (b *DB[T]) CopyFrom(ctx context.Context, shardKey T, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource, ) (n int64, err error)
CopyFrom implements bulk data insertion into a table.
func (*DB[T]) Exec ¶
func (b *DB[T]) Exec(ctx context.Context, shardKey T, sql string, arguments ...any) (pgconn.CommandTag, error)
Exec executes a query without returning data.
func (*DB[T]) GetBucketByKey ¶
GetBucketByKey returns ShardID, BucketID for the specified key.
func (*DB[T]) GetFunc ¶
func (b *DB[T]) GetFunc() ShardKeyToBucketIDFunc[T]
GetFunc returns a function to get bucket id by sharding key.
func (*DB[T]) GetShardID ¶
GetShardID returns shardID for the specified bucketID.
func (*DB[T]) InitCluster ¶
InitCluster initializes PostgreSQL cluster for working with buckets. Creates necessary schemas (buckets) and applies database query. bucket.DB must be successfully started using Start. sql contains SQL where table name is specified as __bucket__.tableName and will be replaced with bucket_<ID>.tableName. For example: CREATE TABLE IF NOT EXISTS __bucket__.tableName (id SERIAL PRIMARY KEY, name TEXT NOT NULL);.
func (*DB[T]) LargeObjects ¶
LargeObjects supports working with large objects and is only available within a transaction (this is a postgresql limitation). Outside of a transaction, it will panic.
func (*DB[T]) NewBatch ¶
func (b *DB[T]) NewBatch(bucketID BucketID) *BucketBatch
NewBatch creates a new Batch based on the key.
func (*DB[T]) QueryRow ¶
QueryRow gets a connection and executes a query that should return no more than one row. Errors are deferred until pgx.Row.Scan method is called. If the query doesn't select a row, pgx.Row.Scan will return pgx.ErrNoRows. Otherwise, pgx.Row.Scan scans the first selected row and discards the rest.
func (*DB[T]) RunBucketFunc ¶
func (b *DB[T]) RunBucketFunc(ctx context.Context, f func(ctx context.Context, shardID shard.ShardID, bucketID BucketID, con conn.IConnection) error, ) error
RunBucketFunc executes a function for all buckets in the cluster. The order of buckets is not defined.
func (*DB[T]) RunShardFunc ¶
func (b *DB[T]) RunShardFunc(ctx context.Context, f func(ctx context.Context, shardID shard.ShardID, con conn.IConnection) error, ) error
RunShardFunc executes a function for all shards in the cluster. The order of shards is not defined.
func (*DB[T]) SendBatch ¶
SendBatch sends a set of queries for execution, combining all queries into one package.
func (*DB[T]) ShardConnection ¶
func (b *DB[T]) ShardConnection(ctx context.Context, shardID shard.ShardID, opt ...conn.ConnectionOption, ) conn.IConnection
ShardConnection returns IConnection interface implementation for the specified shardID.
type Option ¶
Option option for bucket.DB.
func WithAfterStartFunc ¶
WithAfterStartFunc sets a function that will be called after successful service startup.
func WithLogger ¶
WithLogger sets the logger.
func WithRunLimit ¶
WithRunLimit sets the limit of parallel executions in RunBucketFunc method.
type ShardBatch ¶
type ShardBatch[TKEY any] struct { // contains filtered or unexported fields }
ShardBatch analog of pgx.ShardBatch, with distribution of queries across shards.
func NewShardBatch ¶
func NewShardBatch[TKEY any](db *DB[TKEY]) *ShardBatch[TKEY]
NewShardBatch creates a new ShardBatch.
func (*ShardBatch[TKEY]) Exec ¶
func (b *ShardBatch[TKEY]) Exec() (pgconn.CommandTag, error)
Exec executes batch sequentially for each shard. The order of shard selection is not defined.
func (*ShardBatch[TKEY]) ExecAll ¶
func (b *ShardBatch[TKEY]) ExecAll(ctx context.Context) error
ExecAll executes all queries and then closes the batch.
func (*ShardBatch[TKEY]) Len ¶
func (b *ShardBatch[TKEY]) Len() (n int)
Len returns the number of queries in ShardBatch.
func (*ShardBatch[TKEY]) Query ¶
func (b *ShardBatch[TKEY]) Query() (pgx.Rows, error)
Query executes batch sequentially for each shard. The order of shard selection is not defined.
func (*ShardBatch[TKEY]) QueryRow ¶
func (b *ShardBatch[TKEY]) QueryRow() pgx.Row
QueryRow executes batch sequentially for each shard. The order of shard selection is not defined.
type ShardKeyToBucketIDFunc ¶
ShardKeyToBucketIDFunc function to get bucket id by sharding key.