sharding

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2023 License: MIT Imports: 12 Imported by: 14

README

Gorm Sharding

Go

Gorm Sharding plugin using SQL parser and replace for splits large tables into smaller ones, redirects Query into sharding tables. Give you a high performance database access.

Gorm Sharding 是一个高性能的数据库分表中间件。

它基于 Conn 层做 SQL 拦截、AST 解析、分表路由、自增主键填充,带来的额外开销极小。对开发者友好、透明,使用上与普通 SQL、Gorm 查询无差别,只需要额外注意一下分表键条件。

Features

  • Non-intrusive design. Load the plugin, specify the config, and all done.
  • Lighting-fast. No network based middlewares, as fast as Go.
  • Multiple database (PostgreSQL, MySQL) support.
  • Integrated primary key generator (Snowflake, PostgreSQL Sequence, Custom, ...).

Install

go get -u gorm.io/sharding

Usage

Config the sharding middleware, register the tables which you want to shard.

import (
  "fmt"

  "gorm.io/driver/postgres"
  "gorm.io/gorm"
  "gorm.io/sharding"
)

db, err := gorm.Open(postgres.New(postgres.Config{DSN: "postgres://localhost:5432/sharding-db?sslmode=disable"))

db.Use(sharding.Register(sharding.Config{
    ShardingKey:         "user_id",
    NumberOfShards:      64,
    PrimaryKeyGenerator: sharding.PKSnowflake,
}, "orders", Notification{}, AuditLog{}))
// This case for show up give notifications, audit_logs table use same sharding rule.

Use the db session as usual. Just note that the query should have the Sharding Key when operate sharding tables.

// Gorm create example, this will insert to orders_02
db.Create(&Order{UserID: 2})
// sql: INSERT INTO orders_2 ...

// Show have use Raw SQL to insert, this will insert into orders_03
db.Exec("INSERT INTO orders(user_id) VALUES(?)", int64(3))

// This will throw ErrMissingShardingKey error, because there not have sharding key presented.
db.Create(&Order{Amount: 10, ProductID: 100})
fmt.Println(err)

// Find, this will redirect query to orders_02
var orders []Order
db.Model(&Order{}).Where("user_id", int64(2)).Find(&orders)
fmt.Printf("%#v\n", orders)

// Raw SQL also supported
db.Raw("SELECT * FROM orders WHERE user_id = ?", int64(3)).Scan(&orders)
fmt.Printf("%#v\n", orders)

// This will throw ErrMissingShardingKey error, because WHERE conditions not included sharding key
err = db.Model(&Order{}).Where("product_id", "1").Find(&orders).Error
fmt.Println(err)

// Update and Delete are similar to create and query
db.Exec("UPDATE orders SET product_id = ? WHERE user_id = ?", 2, int64(3))
err = db.Exec("DELETE FROM orders WHERE product_id = 3").Error
fmt.Println(err) // ErrMissingShardingKey

The full example is here.

🚨 NOTE: Gorm config PrepareStmt: true is not supported for now.

🚨 NOTE: Default snowflake generator in multiple nodes may result conflicted primary key, use your custom primary key generator, or regenerate a primary key when conflict occurs.

Primary Key

When you sharding tables, you need consider how the primary key generate.

Recommend options:

Use Snowflake

Built-in Snowflake primary key generator.

db.Use(sharding.Register(sharding.Config{
    ShardingKey:         "user_id",
    NumberOfShards:      64,
    PrimaryKeyGenerator: sharding.PKSnowflake,
}, "orders")
Use PostgreSQL Sequence

There has built-in PostgreSQL sequence primary key implementation in Gorm Sharding, you just configure PrimaryKeyGenerator: sharding.PKPGSequence to use.

You don't need create sequence manually, Gorm Sharding check and create when the PostgreSQL sequence does not exists.

This sequence name followed gorm_sharding_${table_name}_id_seq, for example orders table, the sequence name is gorm_sharding_orders_id_seq.

db.Use(sharding.Register(sharding.Config{
    ShardingKey:         "user_id",
    NumberOfShards:      64,
    PrimaryKeyGenerator: sharding.PKPGSequence,
}, "orders")
No primary key

If your table doesn't have a primary key, or has a primary key that isn't called id, anyway, you don't want to auto-fill the id field, then you can set PrimaryKeyGenerator to PKCustom and have PrimaryKeyGeneratorFn return 0.

Combining with dbresolver

🚨 NOTE: Use dbresolver first.

dsn := "host=localhost user=gorm password=gorm dbname=gorm port=5432 sslmode=disable"
dsnRead := "host=localhost user=gorm password=gorm dbname=gorm-slave port=5432 sslmode=disable"

conn := postgres.Open(dsn)
connRead := postgres.Open(dsnRead)

db, err := gorm.Open(conn, &gorm.Config{})
dbRead, err := gorm.Open(conn, &gorm.Config{})

db.Use(dbresolver.Register(dbresolver.Config{
  Replicas: []gorm.Dialector{dbRead.Dialector},
}))

db.Use(sharding.Register(sharding.Config{
  ShardingKey:         "user_id",
  NumberOfShards:      64,
  PrimaryKeyGenerator: sharding.PKSnowflake,
}))

Sharding process

This graph show up how Gorm Sharding works.

graph TD
first("SELECT * FROM orders WHERE user_id = ? AND status = ?
args = [100, 1]")

first--->gorm(["Gorm Query"])

subgraph "Gorm"
  gorm--->gorm_query
  gorm--->gorm_exec
  gorm--->gorm_queryrow
  gorm_query["connPool.QueryContext(sql, args)"]
  gorm_exec[/"connPool.ExecContext"/]
  gorm_queryrow[/"connPool.QueryRowContext"/]
end

subgraph "database/sql"
  gorm_query-->conn(["Conn"])
  gorm_exec-->conn(["Conn"])
  gorm_queryrow-->conn(["Conn"])
  ExecContext[/"ExecContext"/]
  QueryContext[/"QueryContext"/]
  QueryRowContext[/"QueryRowContext"/]


  conn-->ExecContext
  conn-->QueryRowContext
  conn-->QueryContext
end

subgraph sharding ["Sharding"]
  QueryContext-->router-->| Format to get full SQL string |format_sql-->| Parser to AST |parse-->check_table
  router[["router(sql, args)<br>"]]
  format_sql>"sql = SELECT * FROM orders WHERE user_id = 100 AND status = 1"]

  check_table{"Check sharding rules<br>by table name"}
  check_table-->| Exist |process_ast
  check_table_1{{"Return Raw SQL"}}
  not_match_error[/"Return Error<br>SQL query must has sharding key"\]

  parse[["ast = sqlparser.Parse(sql)"]]

  check_table-.->| Not exist |check_table_1
  process_ast(("Sharding rules"))
  get_new_table_name[["Use value in WhereValue (100) for get sharding table index<br>orders + (100 % 16)<br>Sharding Table = orders_4"]]
  new_sql{{"SELECT * FROM orders_4 WHERE user_id = 100 AND status = 1"}}

  process_ast-.->| Not match ShardingKey |not_match_error
  process_ast-->| Match ShardingKey |match_sharding_key-->| Get table name |get_new_table_name-->| Replace TableName to get new SQL |new_sql
end


subgraph database [Database]
  orders_other[("orders_0, orders_1 ... orders_3")]
  orders_4[(orders_4)]
  orders_last[("orders_5 ... orders_15")]
  other_tables[(Other non-sharding tables<br>users, stocks, topics ...)]

  new_sql-->| Sharding Query | orders_4
  check_table_1-.->| None sharding Query |other_tables
end

orders_4-->result
other_tables-.->result
result[/Query results\]

License

MIT license.

Original fork from Longbridge.

Documentation

Index

Constants

View Source
const (
	// Use Snowflake primary key generator
	PKSnowflake = iota
	// Use PostgreSQL sequence primary key generator
	PKPGSequence
	// Use custom primary key generator
	PKCustom
)

Variables

View Source
var (
	ErrMissingShardingKey = errors.New("sharding key or id required, and use operator =")
	ErrInvalidID          = errors.New("invalid id format")
	ErrInsertDiffSuffix   = errors.New("can not insert different suffix table in one query ")
)
View Source
var (
	ShardingIgnoreStoreKey = "sharding_ignore"
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// When DoubleWrite enabled, data will double write to both main table and sharding table.
	DoubleWrite bool

	// ShardingKey specifies the table column you want to used for sharding the table rows.
	// For example, for a product order table, you may want to split the rows by `user_id`.
	ShardingKey string

	// NumberOfShards specifies how many tables you want to sharding.
	NumberOfShards uint

	// ShardingAlgorithm specifies a function to generate the sharding
	// table's suffix by the column value.
	// For example, this function implements a mod sharding algorithm.
	//
	// 	func(value any) (suffix string, err error) {
	//		if uid, ok := value.(int64);ok {
	//			return fmt.Sprintf("_%02d", user_id % 64), nil
	//		}
	//		return "", errors.New("invalid user_id")
	// 	}
	ShardingAlgorithm func(columnValue any) (suffix string, err error)

	// ShardingSuffixs specifies a function to generate all table's suffix.
	// Used to support Migrator and generate PrimaryKey.
	// For example, this function get a mod all sharding suffixs.
	//
	// func () (suffixs []string) {
	// 	numberOfShards := 5
	// 	for i := 0; i < numberOfShards; i++ {
	// 		suffixs = append(suffixs, fmt.Sprintf("_%02d", i%numberOfShards))
	// 	}
	// 	return
	// }
	ShardingSuffixs func() (suffixs []string)

	// ShardingAlgorithmByPrimaryKey specifies a function to generate the sharding
	// table's suffix by the primary key. Used when no sharding key specified.
	// For example, this function use the Snowflake library to generate the suffix.
	//
	// 	func(id int64) (suffix string) {
	//		return fmt.Sprintf("_%02d", snowflake.ParseInt64(id).Node())
	//	}
	ShardingAlgorithmByPrimaryKey func(id int64) (suffix string)

	// PrimaryKeyGenerator specifies the primary key generate algorithm.
	// Used only when insert and the record does not contains an id field.
	// Options are PKSnowflake, PKPGSequence and PKCustom.
	// When use PKCustom, you should also specify PrimaryKeyGeneratorFn.
	PrimaryKeyGenerator int

	// PrimaryKeyGeneratorFn specifies a function to generate the primary key.
	// When use auto-increment like generator, the tableIdx argument could ignored.
	// For example, this function use the Snowflake library to generate the primary key.
	// If you don't want to auto-fill the `id` or use a primary key that isn't called `id`, just return 0.
	//
	// 	func(tableIdx int64) int64 {
	//		return nodes[tableIdx].Generate().Int64()
	//	}
	PrimaryKeyGeneratorFn func(tableIdx int64) int64
	// contains filtered or unexported fields
}

Config specifies the configuration for sharding.

type ConnPool

type ConnPool struct {
	gorm.ConnPool
	// contains filtered or unexported fields
}

ConnPool Implement a ConnPool for replace db.Statement.ConnPool in Gorm

func (*ConnPool) BeginTx

func (pool *ConnPool) BeginTx(ctx context.Context, opt *sql.TxOptions) (gorm.ConnPool, error)

BeginTx Implement ConnPoolBeginner.BeginTx

func (*ConnPool) Commit

func (pool *ConnPool) Commit() error

Implement TxCommitter.Commit

func (ConnPool) ExecContext

func (pool ConnPool) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

func (*ConnPool) Ping

func (pool *ConnPool) Ping() error

func (ConnPool) PrepareContext

func (pool ConnPool) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

func (ConnPool) QueryRowContext

func (pool ConnPool) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

func (*ConnPool) Rollback

func (pool *ConnPool) Rollback() error

Implement TxCommitter.Rollback

func (*ConnPool) String

func (pool *ConnPool) String() string

type Sharding

type Sharding struct {
	*gorm.DB
	ConnPool *ConnPool
	// contains filtered or unexported fields
}

func Register

func Register(config Config, tables ...any) *Sharding

func (*Sharding) Initialize

func (s *Sharding) Initialize(db *gorm.DB) error

Initialize implement for Gorm plugin interface

func (*Sharding) LastQuery

func (s *Sharding) LastQuery() string

LastQuery get last SQL query

func (*Sharding) Name

func (s *Sharding) Name() string

Name plugin name for Gorm plugin interface

type ShardingDialector added in v0.5.0

type ShardingDialector struct {
	gorm.Dialector
	// contains filtered or unexported fields
}

func NewShardingDialector added in v0.5.0

func NewShardingDialector(d gorm.Dialector, s *Sharding) ShardingDialector

func (ShardingDialector) Migrator added in v0.5.0

func (d ShardingDialector) Migrator(db *gorm.DB) gorm.Migrator

type ShardingMigrator added in v0.5.0

type ShardingMigrator struct {
	gorm.Migrator
	// contains filtered or unexported fields
}

func (ShardingMigrator) AutoMigrate added in v0.5.0

func (m ShardingMigrator) AutoMigrate(dst ...any) error

func (ShardingMigrator) DropTable added in v0.5.2

func (m ShardingMigrator) DropTable(dst ...any) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL