connmanager

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	// MaxConnectionPoolSize defines the maximum allowed Pool size.
	// This is a restriction of Netcool Omnibus Object Server software
	MaxConnectionPoolSize = 1000
)

Variables

View Source
var ErrPoolClosed = app.Err(app.ErrCodeUnknown, "pool is closed already")

Functions

This section is empty.

Types

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool creates and stores DB connections to reuse it

func NewPool

func NewPool(connector db.DBConnector, seedList []db.Addr, options ...PoolOption) (*Pool, error)

NewPool builds new ready for use Pool.

Params:

  • connector: object that can open DB connections
  • seedList: list of DB instances

Options:

  • WithMaxSize - max connections
  • WithFailBack - failover strategy used for Aggregation layer of OMNIbus cluster
  • WithRandomFailOver - failover strategy used for Display level of OMNIbus cluster
Example
package main

import (
	"context"
	"fmt"
	"time"

	cm "github.com/ncotds/nco-qoordinator/internal/connmanager"
	db "github.com/ncotds/nco-qoordinator/internal/dbconnector"

	qc "github.com/ncotds/nco-qoordinator/pkg/models"
)

// DemoConnector implements dbconnector.DBConnector interface for examples only, use your own implementation
type DemoConnector struct {
}

func (dc *DemoConnector) Connect(
	ctx context.Context,
	addr db.Addr,
	credentials qc.Credentials,
) (conn db.ExecutorCloser, err error) {
	return nil, err
}

func main() {
	pool, err := cm.NewPool(
		&DemoConnector{},
		[]db.Addr{"host1:4100", "host2:4100", "host3:4100"},
		// optional params
		cm.WithMaxSize(10),
		cm.WithFailBack(1*time.Minute), // or cm.WithRandomFailOver(),
	)

	fmt.Printf("%T, %v\n", pool, err)
}
Output:

*connmanager.Pool, <nil>

func (*Pool) Acquire

func (p *Pool) Acquire(ctx context.Context, credentials qc.Credentials) (conn *PoolSlot, err error)

Acquire returns connection for defined credentials from Pool.

If connection not exists yet, tries to acquire free slot and establish the new one. If there are no free slots, tries to find the oldest idle connection and close it

Example
package main

import (
	"context"
	"fmt"

	cm "github.com/ncotds/nco-qoordinator/internal/connmanager"
	db "github.com/ncotds/nco-qoordinator/internal/dbconnector"

	qc "github.com/ncotds/nco-qoordinator/pkg/models"
)

// DemoConnector implements dbconnector.DBConnector interface for examples only, use your own implementation
type DemoConnector struct {
}

func (dc *DemoConnector) Connect(
	ctx context.Context,
	addr db.Addr,
	credentials qc.Credentials,
) (conn db.ExecutorCloser, err error) {
	return nil, err
}

func main() {
	pool, _ := cm.NewPool(
		&DemoConnector{},
		[]db.Addr{"host1:4100", "host2:4100", "host3:4100"},
		cm.WithMaxSize(2),
	)
	credentials := qc.Credentials{
		UserName: "someuser",
		Password: "superpass",
	}

	conn1, err1 := pool.Acquire(context.Background(), credentials)
	conn2, err2 := pool.Acquire(context.Background(), credentials)
	conn3, err3 := pool.Acquire(context.Background(), credentials)

	fmt.Printf("%T, %v\n", conn1, err1)
	fmt.Printf("%T, %v\n", conn2, err2)
	fmt.Printf("%T, %v\n", conn3, err3)
}
Output:

*connmanager.PoolSlot, <nil>
*connmanager.PoolSlot, <nil>
*connmanager.PoolSlot, ERR_UNAVAILABLE: connections limit exceed

func (*Pool) Close

func (p *Pool) Close() error

Close marks Pool as 'closed' to prevent acquiring connections.

Next, Close() waits until all connections are released to Pool and try to close it.

'Closed' Pool cannot be 'opened' again, you should create the new one if needed

Example
package main

import (
	"context"
	"fmt"

	cm "github.com/ncotds/nco-qoordinator/internal/connmanager"
	db "github.com/ncotds/nco-qoordinator/internal/dbconnector"

	qc "github.com/ncotds/nco-qoordinator/pkg/models"
)

// DemoConnector implements dbconnector.DBConnector interface for examples only, use your own implementation
type DemoConnector struct {
}

func (dc *DemoConnector) Connect(
	ctx context.Context,
	addr db.Addr,
	credentials qc.Credentials,
) (conn db.ExecutorCloser, err error) {
	return nil, err
}

func main() {
	pool, _ := cm.NewPool(
		&DemoConnector{},
		[]db.Addr{"host1:4100", "host2:4100", "host3:4100"},
		cm.WithMaxSize(2),
	)
	credentials := qc.Credentials{
		UserName: "someuser",
		Password: "superpass",
	}
	conn, _ := pool.Acquire(context.Background(), credentials)
	_ = pool.Release(conn)

	err1 := pool.Close()
	_, err2 := pool.Acquire(context.Background(), credentials)

	fmt.Println(err1)
	fmt.Println(err2)
}
Output:

<nil>
ERR_UNKNOWN: pool is closed already

func (*Pool) Drop

func (p *Pool) Drop(conn *PoolSlot) error

Drop returns connection to Pool to close it and mark slot as 'free'

Example
package main

import (
	"context"
	"fmt"

	cm "github.com/ncotds/nco-qoordinator/internal/connmanager"
	db "github.com/ncotds/nco-qoordinator/internal/dbconnector"

	qc "github.com/ncotds/nco-qoordinator/pkg/models"
)

// DemoConnector implements dbconnector.DBConnector interface for examples only, use your own implementation
type DemoConnector struct {
}

func (dc *DemoConnector) Connect(
	ctx context.Context,
	addr db.Addr,
	credentials qc.Credentials,
) (conn db.ExecutorCloser, err error) {
	return nil, err
}

func main() {
	pool, _ := cm.NewPool(
		&DemoConnector{},
		[]db.Addr{"host1:4100", "host2:4100", "host3:4100"},
		cm.WithMaxSize(2),
	)
	credentials := qc.Credentials{
		UserName: "someuser",
		Password: "superpass",
	}
	conn, _ := pool.Acquire(context.Background(), credentials)

	err1 := pool.Drop(conn)
	err2 := pool.Drop(conn)

	fmt.Println(err1)
	fmt.Println(err2)
}
Output:

<nil>
ERR_UNKNOWN: cannot release connection, not in use

func (*Pool) Release

func (p *Pool) Release(conn *PoolSlot) error

Release returns connection to Pool to reuse it in future

Example
package main

import (
	"context"
	"fmt"

	cm "github.com/ncotds/nco-qoordinator/internal/connmanager"
	db "github.com/ncotds/nco-qoordinator/internal/dbconnector"

	qc "github.com/ncotds/nco-qoordinator/pkg/models"
)

// DemoConnector implements dbconnector.DBConnector interface for examples only, use your own implementation
type DemoConnector struct {
}

func (dc *DemoConnector) Connect(
	ctx context.Context,
	addr db.Addr,
	credentials qc.Credentials,
) (conn db.ExecutorCloser, err error) {
	return nil, err
}

func main() {
	pool, _ := cm.NewPool(
		&DemoConnector{},
		[]db.Addr{"host1:4100", "host2:4100", "host3:4100"},
		cm.WithMaxSize(2),
	)
	credentials := qc.Credentials{
		UserName: "someuser",
		Password: "superpass",
	}
	conn, _ := pool.Acquire(context.Background(), credentials)

	err1 := pool.Release(conn)
	err2 := pool.Release(conn)

	fmt.Println(err1)
	fmt.Println(err2)
}
Output:

<nil>
ERR_UNKNOWN: cannot release connection, not in use

type PoolOption

type PoolOption func(pool *Pool) error

PoolOption represent optional parameter to configure new Pool

func WithFailBack

func WithFailBack(delay time.Duration) PoolOption

WithFailBack option sets Pool's failover policy to 'FailBack'

It means that when current connection is loosed, Pool firstly tries to connect:

  • to the first address from seed list if Delay exceeded (from the last reconnect try)
  • to address from seed list, next to current one otherwise

If those address fails, then Pool continue with next seed... and makes one attempt to each address from seed list until success or attempts to all addresses will fail

func WithLogger

func WithLogger(log *app.Logger) PoolOption

WithLogger sets logger for Pool. By default no-op logger is used

func WithMaxSize

func WithMaxSize(maxSize int) PoolOption

WithMaxSize option sets max opened connections that Pool can store.

When you ask new connection from full Pool, it tries to return the idle connection if exists or to close the oldest unused one.

maxSize value should be between 0 and MaxConnectionPoolSize

func WithRandomFailOver

func WithRandomFailOver() PoolOption

WithRandomFailOver option sets Pool's failover strategy to 'RandomFailOver'

It means that when current connection is loosed, Pool firstly tries to connect to any random address from seed list, except the current one. If those address fails, then Pool continue with next seed... and makes one attempt to each address from seed list until success or attempts to all addresses will fail

type PoolSlot

type PoolSlot struct {
	// contains filtered or unexported fields
}

PoolSlot is container which stores concrete DB connection implementation

func (*PoolSlot) Exec

func (s *PoolSlot) Exec(ctx context.Context, query models.Query) (rows models.RowSet, affectedRows int, err error)

Exec makes DB query using underlying DB connection implementation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL