bucket

package
v2.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// BucketAlias bucket alias in SQL queries.
	BucketAlias = "__bucket__"
	// BucketPrefix prefix for bucket table names.
	BucketPrefix = "bucket_"
)

Variables

This section is empty.

Functions

func BucketCount

func BucketCount(buckets []*BucketInfo) int

BucketCount number of buckets.

func BucketToContext

func BucketToContext(ctx context.Context, bucketID BucketID) context.Context

BucketToContext puts BucketID into context.

func GroupByShard

func GroupByShard[KEYT any, OBJT any](
	db *DB[KEYT],
	objs []OBJT,
	f func(v OBJT) BucketID,
) (map[shard.ShardID][]OBJT, error)

GroupByShard groups objects by cluster shards based on a function that returns a key for each object.

func GroupByShardFunc

func GroupByShardFunc[KEYT any, OBJT any](
	ctx context.Context,
	db *DB[KEYT],
	objs []OBJT,
	fGroup func(v OBJT) BucketID,
	fCall func(ctx context.Context, shardID shard.ShardID, objs []OBJT) error,
) error

GroupByShardFunc groups objects by cluster shards based on function fGroup, which returns a key for each object. Then function fCall is called for each group.

func PrepareBucketSQL

func PrepareBucketSQL(sql string, bucketID BucketID) string

PrepareBucketSQL replaces bucket aliases in the query with their table names.

func ShardBatchQueryAll

func ShardBatchQueryAll[TKEY any, TRES any](ctx context.Context, batch *ShardBatch[TKEY], dst *[]TRES) error

ShardBatchQueryAll executes queries, reads all rows from all query results and passes them to dst, then closes the batch.

func ShardBatchQueryAllFunc

func ShardBatchQueryAllFunc[TKEY any, TRES any](ctx context.Context, batch *ShardBatch[TKEY],
	f func(ctx context.Context, rows []TRES) error,
) error

ShardBatchQueryAllFunc executes queries, reads all rows from all query results and passes them to f, then closes the batch.

func UniformBucketFn

func UniformBucketFn(n int) func(shardKey string) BucketID

UniformBucketFn returns a function that uniformly distributes any keys across n buckets. uses string as a key.

Types

type BucketBatch

type BucketBatch struct {
	// contains filtered or unexported fields
}

BucketBatch is an analog of pgx.Batch, but with bucket support. For correct work with buckets, you need to create a BucketBatch instance using the NewBucketBatch function, not pgx.BucketBatch. Then add queries to BucketBatch using the BucketBatch.Queue function, after which you can pass BucketBatch.PgxBatch() to SendBatch.

func NewBucketBatch

func NewBucketBatch(bucketID BucketID) *BucketBatch

NewBucketBatch creates a new Batch.

func (*BucketBatch) BucketID

func (b *BucketBatch) BucketID() BucketID

BucketID returns the bucket identifier.

func (*BucketBatch) Len

func (b *BucketBatch) Len() int

Len returns the number of queries in Batch.

func (*BucketBatch) PgxBatch

func (b *BucketBatch) PgxBatch() *pgx.Batch

PgxBatch returns the original pgx.Batch.

func (*BucketBatch) Queue

func (b *BucketBatch) Queue(query string, args ...any)

Queue adds a query to Batch, applying transformation of bucket aliases to table names.

func (*BucketBatch) QueueBucket

func (b *BucketBatch) QueueBucket(bucketID BucketID, query string, args ...any)

QueueBucket adds a query to Batch. If bucketID doesn't match the current bucket, the operation is not performed.

type BucketID

type BucketID uint

BucketID bucket identifier.

func BucketFromContext

func BucketFromContext(ctx context.Context) (BucketID, bool)

BucketFromContext extracts BucketID from context.

func (BucketID) Schema

func (b BucketID) Schema() string

Schema returns the database schema name for the bucket.

func (BucketID) String

func (b BucketID) String() string

String converts BucketID to string.

type BucketInfo

type BucketInfo struct {
	ShardID     shard.ShardID
	BucketRange *BucketRange
}

BucketInfo information about a bucket.

type BucketRange

type BucketRange struct {
	FromID BucketID
	ToID   BucketID
}

BucketRange range of buckets.

func NewBucketRange

func NewBucketRange(fromID, toID BucketID) *BucketRange

NewBucketRange creates a range of buckets.

func (BucketRange) Contains

func (b BucketRange) Contains(bucketID BucketID) bool

Contains checks if the bucket id is within the range.

func (BucketRange) Count

func (b BucketRange) Count() int

Count number of buckets in the range.

type DB

type DB[T any] struct {
	// contains filtered or unexported fields
}

DB is a wrapper around shard.DB, which manages the distribution of data between buckets, which in turn distribute data between shards. For working with a sharded database, you should use bucket.DB, not shard.DB.

func New

func New[T any](shardDB *shard.DB, buckets []*BucketInfo,
	shardKeyToBucketIDFunc ShardKeyToBucketIDFunc[T], opts ...Option[T],
) *DB[T]

New creates a sharded DB with bucket support.

func NewBucketCluster

func NewBucketCluster(shardInfo []*shard.ShardInfo, bucketInfo []*BucketInfo,
	bucketOpts ...Option[string],
) *DB[string]

NewBucketCluster creates connections with shards immediately and wraps everything in bucket.DB. Helper to simplify bucket.DB creation.

func NewBucketClusterFromDSN

func NewBucketClusterFromDSN(dsn []shard.DSNInfo, bucketInfo []*BucketInfo,
	shardOpts []shard.Option,
	bucketOpts []Option[string],
) *DB[string]

NewBucketClusterFromDSN creates connections with shards immediately and wraps everything in bucket.DB. Connections to shards are created using DSN. Helper to simplify bucket.DB creation.

func (*DB[T]) Connection

func (b *DB[T]) Connection(ctx context.Context, shardKey T, opt ...conn.ConnectionOption) conn.IConnection

Connection returns IConnection interface implementation for the specified sharding key.

func (*DB[T]) CopyFrom

func (b *DB[T]) CopyFrom(ctx context.Context, shardKey T, tableName pgx.Identifier,
	columnNames []string, rowSrc pgx.CopyFromSource,
) (n int64, err error)

CopyFrom implements bulk data insertion into a table.

func (*DB[T]) Exec

func (b *DB[T]) Exec(ctx context.Context, shardKey T, sql string, arguments ...any) (pgconn.CommandTag, error)

Exec executes a query without returning data.

func (*DB[T]) GetBucketByKey

func (b *DB[T]) GetBucketByKey(key T) (shard.ShardID, BucketID, error)

GetBucketByKey returns ShardID, BucketID for the specified key.

func (*DB[T]) GetFunc

func (b *DB[T]) GetFunc() ShardKeyToBucketIDFunc[T]

GetFunc returns a function to get bucket id by sharding key.

func (*DB[T]) GetShardID

func (b *DB[T]) GetShardID(bucketID BucketID) (shard.ShardID, error)

GetShardID returns shardID for the specified bucketID.

func (*DB[T]) Info

func (b *DB[T]) Info() bootstrap.Info

Info returns service information.

func (*DB[T]) InitCluster

func (b *DB[T]) InitCluster(ctx context.Context, sql string) (err error)

InitCluster initializes PostgreSQL cluster for working with buckets. Creates necessary schemas (buckets) and applies database query. bucket.DB must be successfully started using Start. sql contains SQL where table name is specified as __bucket__.tableName and will be replaced with bucket_<ID>.tableName. For example: CREATE TABLE IF NOT EXISTS __bucket__.tableName (id SERIAL PRIMARY KEY, name TEXT NOT NULL);.

func (*DB[T]) LargeObjects

func (b *DB[T]) LargeObjects(ctx context.Context, shardKey T) pgx.LargeObjects

LargeObjects supports working with large objects and is only available within a transaction (this is a postgresql limitation). Outside of a transaction, it will panic.

func (*DB[T]) NewBatch

func (b *DB[T]) NewBatch(bucketID BucketID) *BucketBatch

NewBatch creates a new Batch based on the key.

func (*DB[T]) Query

func (b *DB[T]) Query(ctx context.Context, shardKey T, sql string, args ...any) (pgx.Rows, error)

Query executes a query and returns the result.

func (*DB[T]) QueryRow

func (b *DB[T]) QueryRow(ctx context.Context, shardKey T, sql string, args ...any) pgx.Row

QueryRow gets a connection and executes a query that should return no more than one row. Errors are deferred until pgx.Row.Scan method is called. If the query doesn't select a row, pgx.Row.Scan will return pgx.ErrNoRows. Otherwise, pgx.Row.Scan scans the first selected row and discards the rest.

func (*DB[T]) RunBucketFunc

func (b *DB[T]) RunBucketFunc(ctx context.Context,
	f func(ctx context.Context, shardID shard.ShardID, bucketID BucketID, con conn.IConnection) error,
) error

RunBucketFunc executes a function for all buckets in the cluster. The order of buckets is not defined.

func (*DB[T]) RunShardFunc

func (b *DB[T]) RunShardFunc(ctx context.Context, f func(ctx context.Context,
	shardID shard.ShardID, con conn.IConnection) error,
) error

RunShardFunc executes a function for all shards in the cluster. The order of shards is not defined.

func (*DB[T]) SendBatch

func (b *DB[T]) SendBatch(ctx context.Context, shardKey T, batch *pgx.Batch) pgx.BatchResults

SendBatch sends a set of queries for execution, combining all queries into one package.

func (*DB[T]) ShardConnection

func (b *DB[T]) ShardConnection(ctx context.Context, shardID shard.ShardID,
	opt ...conn.ConnectionOption,
) conn.IConnection

ShardConnection returns IConnection interface implementation for the specified shardID.

func (*DB[T]) Start

func (b *DB[T]) Start(ctx context.Context) (err error)

Start launches the service.

func (*DB[T]) Stop

func (b *DB[T]) Stop(ctx context.Context) error

Stop stops the service.

type Option

type Option[T any] func(*DB[T])

Option option for bucket.DB.

func WithAfterStartFunc

func WithAfterStartFunc[T any](f func(context.Context, *DB[T]) error) Option[T]

WithAfterStartFunc sets a function that will be called after successful service startup.

func WithLogger

func WithLogger[T any](logger ctxlog.ILogger) Option[T]

WithLogger sets the logger.

func WithName

func WithName[T any](name string) Option[T]

WithName sets the name of the sharded DB.

func WithRunLimit

func WithRunLimit[T any](limit int) Option[T]

WithRunLimit sets the limit of parallel executions in RunBucketFunc method.

type ShardBatch

type ShardBatch[TKEY any] struct {
	// contains filtered or unexported fields
}

ShardBatch analog of pgx.ShardBatch, with distribution of queries across shards.

func NewShardBatch

func NewShardBatch[TKEY any](db *DB[TKEY]) *ShardBatch[TKEY]

NewShardBatch creates a new ShardBatch.

func (*ShardBatch[TKEY]) Close

func (b *ShardBatch[TKEY]) Close() error

Close closes the Batch.

func (*ShardBatch[TKEY]) Exec

func (b *ShardBatch[TKEY]) Exec() (pgconn.CommandTag, error)

Exec executes batch sequentially for each shard. The order of shard selection is not defined.

func (*ShardBatch[TKEY]) ExecAll

func (b *ShardBatch[TKEY]) ExecAll(ctx context.Context) error

ExecAll executes all queries and then closes the batch.

func (*ShardBatch[TKEY]) Len

func (b *ShardBatch[TKEY]) Len() (n int)

Len returns the number of queries in ShardBatch.

func (*ShardBatch[TKEY]) Query

func (b *ShardBatch[TKEY]) Query() (pgx.Rows, error)

Query executes batch sequentially for each shard. The order of shard selection is not defined.

func (*ShardBatch[TKEY]) QueryRow

func (b *ShardBatch[TKEY]) QueryRow() pgx.Row

QueryRow executes batch sequentially for each shard. The order of shard selection is not defined.

func (*ShardBatch[TKEY]) Queue

func (b *ShardBatch[TKEY]) Queue(key TKEY, sql string, args ...any) error

Queue adds a query to ShardBatch.

func (*ShardBatch[TKEY]) Send

func (b *ShardBatch[TKEY]) Send(ctx context.Context) error

Send sends the batch for execution for each shard.

type ShardKeyToBucketIDFunc

type ShardKeyToBucketIDFunc[T any] func(shardKey T) BucketID

ShardKeyToBucketIDFunc function to get bucket id by sharding key.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL