pq

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2021 License: MIT Imports: 18 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Get

func Get(ctx context.Context) (db.Provider, error)

func GetEnity

func GetEnity(ctx context.Context, enityName string) (interface{}, error)

func Registrate

func Registrate(ctx context.Context) (context.Context, error)

func RegistrateEnity

func RegistrateEnity(ctx context.Context, enityName string, options interface{}) (context.Context, error)

Types

type Config

type Config struct {
	// DSN is a connection string in form of DSN. Example:
	// postgres://user:password@host:port/databaseName.
	// Default: "postgres://db:db@localhost:5432/db"
	DSN string `envconfig:"optional"`
	// MaxConnectionLifetime specifies maximum connection lifetime
	// for reusage. Default: 10 seconds.
	MaxConnectionLifetime time.Duration `envconfig:"optional"`
	// MaxIdleConnections specify maximum connections to database that
	// can stay in idle state. Default: 10 connections.
	MaxIdleConnections int `envconfig:"optional"`
	// MaxOpenedConnections specify upper limit for opened connections
	// count. Default: 30 connections.
	MaxOpenedConnections int `envconfig:"optional"`
	// Options is a string with additional options that will be passed
	// to connection. Default: "connect_timeout=10&sslmode=disable".
	Options string `envconfig:"optional"`
	// QueueWorkerTimeout is a timeout in seconds which will be used by
	// queue worker for queue processing. Defaulting to 1. If it'll be
	// set to 0 - it will be reset to 1.
	QueueWorkerTimeout time.Duration `envconfig:"optional"`
	// StartQueueWorker indicates to connection controller that it should
	// also start asynchronous queue worker. This worker can be used for
	// bulking (executing many insert/update/delete requests without
	// big performance penalties).
	StartQueueWorker bool `envconfig:"optional"`
	// StartWatcher indicates to connection controller that it should
	// also start asynchronous connection watcher.
	StartWatcher bool `envconfig:"optional"`
	// Timeout is a timeout in seconds for connection checking. Every
	// this count of seconds database connection will be checked for
	// aliveness and, if it dies, attempt to reestablish connection
	// will be made. Default timeout is 10 seconds.
	Timeout time.Duration `envconfig:"optional"`
	// Migrate struct contains options for migrate
	Migrate *MigrateConfig
}

Config represents configuration structure for every connection.

func (*Config) ComposeDSN added in v0.3.0

func (c *Config) ComposeDSN() string

ComposeDSN compose DSN

func (*Config) GetDBName

func (c *Config) GetDBName() string

GetDBName return database name from DSN

func (*Config) SetDefault added in v0.3.0

func (c *Config) SetDefault() *Config

SetDefault checks connection config. If required field is empty - it will be filled with some default value. Returns a copy of config.

type Enity

type Enity struct {
	// Metrics
	stats.Service
	// DB connection
	Conn *sqlx.DB
	// contains filtered or unexported fields
}

Enity is a connection controlling structure. It controls connection, asynchronous queue and everything that related to specified connection.

func GetEnityTypeCast

func GetEnityTypeCast(ctx context.Context, enityName string) (*Enity, error)

func NewEnity

func NewEnity(ctx context.Context, name string, cfg interface{}) (*Enity, error)

NewEnity create new enity.

func (*Enity) AppendToQueue

func (c *Enity) AppendToQueue(queueItem *QueueItem)

func (*Enity) GetMetrics

func (c *Enity) GetMetrics(prefix string) stats.MapMetricsOptions

GetMetrics return map of the metrics from database connection

func (*Enity) GetReadyHandlers

func (c *Enity) GetReadyHandlers(prefix string) stats.MapCheckFunc

GetReadyHandlers return array of the readyHandlers from database connection

func (*Enity) Migrate

func (c *Enity) Migrate()

Migrates database.

func (*Enity) NewMutex

func (c *Enity) NewMutex(checkInterval time.Duration) (*Mutex, error)

NewMutex create new database mutex

func (*Enity) NewMutexByID

func (c *Enity) NewMutexByID(lockID int64, checkInterval time.Duration) (*Mutex, error)

NewMutexByID create new database mutex with selected id

func (*Enity) RegisterMigration

func (c *Enity) RegisterMigration(migration *MigrationInCode)

func (*Enity) SetConnPoolLifetime

func (c *Enity) SetConnPoolLifetime(connMaxLifetime time.Duration)

SetConnPoolLifetime sets connection lifetime.

func (*Enity) SetConnPoolLimits

func (c *Enity) SetConnPoolLimits(maxIdleConnections, maxOpenedConnections int)

SetConnPoolLimits sets pool limits for connections counts.

func (*Enity) SetPoolLimits

func (c *Enity) SetPoolLimits(maxIdleConnections, maxOpenedConnections int, connMaxLifetime time.Duration)

SetPoolLimits sets connection pool limits.

func (*Enity) SetSchema added in v0.3.0

func (c *Enity) SetSchema(schema string)

SetSchema sets schema for migrations.

func (*Enity) Shutdown

func (c *Enity) Shutdown() error

Shutdown shutdowns queue worker and connection watcher. Later will also close connection to database. This is a blocking call.

func (*Enity) Start

func (c *Enity) Start() error

Start starts connection workers and connection procedure itself.

func (*Enity) WaitForEstablishing

func (c *Enity) WaitForEstablishing()

WaitForEstablishing will block execution until connection will be successfully established and database migrations will be applied (or rolled back).

type MigrateConfig

type MigrateConfig struct {
	// Action for migration, may be: nothing, up, down
	Action string `envconfig:"optional"`
	// Count of applied/rollbacked migration
	Count int64 `envconfig:"optional"`
	// Directory is a path to migrate scripts
	Directory string `envconfig:"optional"`
	// Only migration, exit from service after migration
	Only bool `envconfig:"optional"`
	// MigrationsType instructs database migration package to use one
	// or another migrations types.
	MigrationsType MigrationsType `envconfig:"optional"`
	// Name of schema in database
	Schema string `envconfig:"optional"`
}

func (*MigrateConfig) SetDefault added in v0.3.0

func (c *MigrateConfig) SetDefault() *MigrateConfig

SetDefault checks migration options. If required field is empty - it will be filled with some default value.

type MigrationInCode

type MigrationInCode struct {
	Name string
	Down func(tx *sql.Tx) error
	Up   func(tx *sql.Tx) error
}

MigrationInCode represents informational struct for database migration that was written as Go code. When using such migrations you should not use SQL migrations as such mix might fuck up everything. This might be changed in future.

type MigrationsType

type MigrationsType int

MigrationsType represents enumerator for acceptable migration types.

const (
	MigrationTypeSQLFiles MigrationsType = iota
	MigrationTypeGoCode
)

func (MigrationsType) String

func (mt MigrationsType) String() string

String returns stringified representation of migrations type.

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex provides a distributed mutex across multiple instances via PostgreSQL database

func NewMutex

func NewMutex(conn *sqlx.DB, checkInterval time.Duration) (*Mutex, error)

NewMutex creates new distributed postgresql mutex

func NewMutexByID

func NewMutexByID(conn *sqlx.DB, lockID int64, checkInterval time.Duration) (*Mutex, error)

NewMutexByID creates new distributed postgresql mutex by ID

func (*Mutex) GenerateLockID

func (m *Mutex) GenerateLockID(databaseName string, additionalNames ...string)

GenerateLockID generate LockID by database name and other artifacts

func (*Mutex) IsLocked added in v0.4.0

func (m *Mutex) IsLocked() bool

IsLocked returns locked or not locked mutex

func (*Mutex) Lock

func (m *Mutex) Lock() (err error)

Lock sets lock item for database (PostgreSQL) locking. It is blocking call which will wait until database lock key will be deleted, pretty much like simple mutex.

func (*Mutex) RWLock

func (m *Mutex) RWLock() (err error)

RWLock sets rwlock item for database (PostgreSQL) locking. It is blocking call which will wait until database lock key will be deleted, pretty much like simple mutex.

func (*Mutex) RWUnlock

func (m *Mutex) RWUnlock() (err error)

RWUnlock deletes rwlock item for database (PostgreSQL) locking.

func (*Mutex) Unlock

func (m *Mutex) Unlock() (err error)

Unlock deletes lock item for database (PostgreSQL) locking.

type Provider

type Provider struct {
	*providerwithmetrics.Provider
}

Provider provides PostgreSQL database worker. This provider supports asynchronous database actions (like bulk inserting). Every connection will have own goroutine for queue processing.

func NewProvider

func NewProvider(ctx context.Context) *Provider

Initialize should initialize provider. If asynchronous mode supported by provider (e.g. for batch inserting using transactions) queue processor should also be started here.

func (*Provider) AppendToQueue

func (p *Provider) AppendToQueue(connectionName string, item interface{}) error

AppendToQueue adds passed item into processing queue.

func (*Provider) CreateEnity

func (p *Provider) CreateEnity(enityName string, options interface{}) error

CreateEnity should create enity using passed parameters.

func (*Provider) GetEnity

func (p *Provider) GetEnity(connectionName string) (interface{}, error)

GetEnity should return pointer to connection structure to caller.

func (*Provider) NewMutex

func (p *Provider) NewMutex(connectionName string, checkInterval time.Duration) (*Mutex, error)

NewMutex creates new distributed mutex

func (*Provider) NewMutexByID

func (p *Provider) NewMutexByID(connectionName string, lockID int64, checkInterval time.Duration) (*Mutex, error)

NewMutexByID creates new distributed postgresql mutex by ID

func (*Provider) RegisterMigration

func (p *Provider) RegisterMigration(connectionName string, migration interface{}) error

RegisterMigration registers migration for specified connection. It is up to provider to provide instructions about working with migrations and how to put them into migration interface. It is recommended to use separate structure.

func (*Provider) WaitForFlush

func (p *Provider) WaitForFlush(connectionName string) error

WaitForFlush blocks execution until queue will be empty.

type QueueItem

type QueueItem struct {
	// Query is a SQL query with placeholders for NamedExec(). See sqlx's
	// documentation about NamedExec().
	Query string
	// Params should be a structure that describes parameters. Fields
	// should have proper "db" tag to be properly put in database if
	// field name and table columns differs.
	Param QueueItemParam

	// Queue item flags.
	// IsWaitForFlush signals that this item blocking execution flow
	// and true should be sent via WaitForFlush chan.
	// When this flag set query from this item won't be processed.
	// Also all queue items following this item will be re-added
	// in the beginning of queue for later processing.
	IsWaitForFlush bool
	// WaitForFlush is a channel which will receive "true" once item
	// will be processed.
	WaitForFlush chan bool
}

QueueItem is a queue element. It will be used in conjunction with sqlx's NamedExec() function.

type QueueItemParam

type QueueItemParam interface {
	// IsUnique should return false if item is not unique (and therefore
	// should not be processed) and true if item is unique and should
	// be processed. When uniqueness isn't necessary you may return
	// true here.
	IsUnique(conn *sqlx.DB) bool
	// Prepare should prepare items if needed. For example it may parse
	// timestamps from JSON-only fields into database-only ones. It should
	// return true if item is ready to be processed and false if error
	// occurred and item should not be processed. If preparation isn't
	// necessary you may return true here.
	Prepare() bool
}

QueueItemParam is an interface for wrapping passed structure.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL