pgbalancer

package module
v0.0.0-...-f84b213 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2019 License: MIT Imports: 10 Imported by: 0

README

pgbalancer

Requires golang version >= 1.9.

Warning: This balancer is the client-side only, and it assumed that all nodes have been replicated by database-side.

Package pgbalancer is a balancer of the postgres databases with RPS limiter.

This package implements only four methods to select information from a cluster (Query, QueryContext, QueryRow, QueryRowContext). Also pgbalancer support few methods of sql.DB (Close, SetMaxOpenConns, SetMaxIdleConns and so on).

Pgbalancer contents a logger to write all queries. Log pattern is node: %s; q: %s; t: %s where node is node's name, q is query and t - time of running a query. You may use any logger which implements log.Logger from std lib.

How it works.

After initialization of the cluster it starts a goroutine with infinite loop. When a node is available (by RPS limits) it puts this selected note to the channel with the buffer size 1. And that infinite loop blocks till the selected node will be read by any query to the cluster.

Examples.
import "github.com/GolubAlexander/pgbalancer"
...
// Creates a new cluster with a limit 10 requests per seconds for a single node.
cl := pgbalancer.NewCluster(10)
// You may use any favorite logger.
l := zerolog.New(os.Stderr)
l = l.With().Timestamp().Logger()
// and set logger to cluster
cl.SetLogger(l)
// Adds nodes to the cluster. If name is already exists method will return ErrDuplicateNodeName.
dsn := "postgres://postgres:@localhost:5432/postgres?sslmode=disable"
if err := cl.AddNode("alpha", dsn); err != nil {
	panic(err)
}
// Makes a query to the cluster (Query, QueryContext, QueryRow, QueryRowContext). 
// Returns `*sql.Rows` and error.
r, err := cl.Query("SELECT 1")
if err != nil {
	panic(err)
}
_ = r.Close()
// Removes the node from the cluster by its name.
cl.RemoveNode("alpha")
// Sets connection's options to all current nodes and to new ones. 
cl.SetMaxOpenConns(10)
cl.SetMaxIdleConns(10)
// Closes all connections in the cluster.
if err := cl.Close(); err != nil {
	panic(err)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDuplicateNodeName = fmt.Errorf("node name is already exist")
)

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(limitRPS int) *Cluster

NewCluster initializes a new cluster of databases. The parameter limitRPS sets a limit of RPS to every databases. If limitRPS equals to 0 then function sets no limits to requests. Also function sets default output as os.Stdout. See: `SetLogger(w io.Writer)` to change a logger to your own.

func (*Cluster) AddNode

func (c *Cluster) AddNode(name string, dsn string) error

AddNode adds a node into the cluster with it's name and DSN string to a database. It may return an error, if the name of Node is already uses in the cluster. You may add a new node into the cluster at runtime.

func (*Cluster) Close

func (c *Cluster) Close() error

Close closes all connections to the databases in the cluster. It returns all occurred errors.

func (*Cluster) Len

func (c *Cluster) Len() int

Len returns the amounts of cluster's databases.

func (*Cluster) Node

func (c *Cluster) Node() *Node

Node fetches idle node for a sql query.

func (*Cluster) Query

func (c *Cluster) Query(query string, args ...interface{}) (*sql.Rows, error)

Query executes a query that returns rows, typically a SELECT. The args are for any placeholder parameters in the query. Query uses a free node as the physical db.

func (*Cluster) QueryContext

func (c *Cluster) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

QueryContext executes a query that returns rows, typically a SELECT. The args are for any placeholder parameters in the query. Query uses a free node as the physical db.

func (*Cluster) QueryRow

func (c *Cluster) QueryRow(query string, args ...interface{}) *sql.Row

QueryRow executes a query that is expected to return at most one row. QueryRow always return a non-nil value. Errors are deferred until Row's Scan method is called. Query uses a free node as the physical db.

func (*Cluster) QueryRowContext

func (c *Cluster) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

QueryRowContext executes a query that is expected to return at most one row. QueryRowContext always return a non-nil value. Errors are deferred until Row's Scan method is called. Query uses a free node as the physical db.

func (*Cluster) RemoveNode

func (c *Cluster) RemoveNode(name string)

RemoveNode removes a node from the cluster by it's name. You may remove nodes at runtime.

func (*Cluster) SetLimitRPS

func (c *Cluster) SetLimitRPS(limitRPS int)

SetLimitRPS sets RPS limits to every database in the cluster. If limitRPS equals to 0 then function sets no limits to requests.

func (*Cluster) SetLogger

func (c *Cluster) SetLogger(w io.Writer)

SetLogger sets the output of logger.

func (*Cluster) SetMaxIdleConns

func (c *Cluster) SetMaxIdleConns(n int)

SetMaxIdleConns sets the maximum number of connections in the idle connection pool to all databases in the cluster. All new node will be set the value.

If MaxOpenConns is greater than 0 but less than the new MaxIdleConns, then the new MaxIdleConns will be reduced to match the MaxOpenConns limit.

If n <= 0, no idle connections are retained.

func (*Cluster) SetMaxOpenConns

func (c *Cluster) SetMaxOpenConns(n int)

SetMaxOpenConns sets the maximum number of open connections to the database's cluster. All new node will be set the value.

If MaxIdleConns is greater than 0 and the new MaxOpenConns is less than MaxIdleConns, then MaxIdleConns will be reduced to match the new MaxOpenConns limit.

If n <= 0, then there is no limit on the number of open connections. The default is 0 (unlimited).

type Node

type Node struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL