Documentation ¶
Index ¶
- func Get(ctx context.Context) (db.Provider, error)
- func GetEnity(ctx context.Context, enityName string) (interface{}, error)
- func Registrate(ctx context.Context) (context.Context, error)
- func RegistrateEnity(ctx context.Context, enityName string, options interface{}) (context.Context, error)
- type Config
- type Enity
- func (c *Enity) AppendToQueue(queueItem *QueueItem)
- func (c *Enity) GetMetrics(prefix string) stats.MapMetricsOptions
- func (c *Enity) GetReadyHandlers(prefix string) stats.MapCheckFunc
- func (c *Enity) Migrate()
- func (c *Enity) NewMutex(checkInterval time.Duration) (*Mutex, error)
- func (c *Enity) NewMutexByID(lockID int64, checkInterval time.Duration) (*Mutex, error)
- func (c *Enity) RegisterMigration(migration *MigrationInCode)
- func (c *Enity) SetConnPoolLifetime(connMaxLifetime time.Duration)
- func (c *Enity) SetConnPoolLimits(maxIdleConnections, maxOpenedConnections int)
- func (c *Enity) SetPoolLimits(maxIdleConnections, maxOpenedConnections int, connMaxLifetime time.Duration)
- func (c *Enity) SetSchema(schema string)
- func (c *Enity) Shutdown() error
- func (c *Enity) Start() error
- func (c *Enity) WaitForEstablishing()
- type MigrateConfig
- type MigrationInCode
- type MigrationsType
- type Mutex
- type Provider
- func (p *Provider) AppendToQueue(connectionName string, item interface{}) error
- func (p *Provider) CreateEnity(enityName string, options interface{}) error
- func (p *Provider) GetEnity(connectionName string) (interface{}, error)
- func (p *Provider) NewMutex(connectionName string, checkInterval time.Duration) (*Mutex, error)
- func (p *Provider) NewMutexByID(connectionName string, lockID int64, checkInterval time.Duration) (*Mutex, error)
- func (p *Provider) RegisterMigration(connectionName string, migration interface{}) error
- func (p *Provider) WaitForFlush(connectionName string) error
- type QueueItem
- type QueueItemParam
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { // DSN is a connection string in form of DSN. Example: // postgres://user:password@host:port/databaseName. // Default: "postgres://db:db@localhost:5432/db" DSN string `envconfig:"optional"` // MaxConnectionLifetime specifies maximum connection lifetime // for reusage. Default: 10 seconds. MaxConnectionLifetime time.Duration `envconfig:"optional"` // MaxIdleConnections specify maximum connections to database that // can stay in idle state. Default: 10 connections. MaxIdleConnections int `envconfig:"optional"` // MaxOpenedConnections specify upper limit for opened connections // count. Default: 30 connections. MaxOpenedConnections int `envconfig:"optional"` // Options is a string with additional options that will be passed // to connection. Default: "connect_timeout=10&sslmode=disable". Options string `envconfig:"optional"` // QueueWorkerTimeout is a timeout in seconds which will be used by // queue worker for queue processing. Defaulting to 1. If it'll be // set to 0 - it will be reset to 1. QueueWorkerTimeout time.Duration `envconfig:"optional"` // StartQueueWorker indicates to connection controller that it should // also start asynchronous queue worker. This worker can be used for // bulking (executing many insert/update/delete requests without // big performance penalties). StartQueueWorker bool `envconfig:"optional"` // StartWatcher indicates to connection controller that it should // also start asynchronous connection watcher. StartWatcher bool `envconfig:"optional"` // Timeout is a timeout in seconds for connection checking. Every // this count of seconds database connection will be checked for // aliveness and, if it dies, attempt to reestablish connection // will be made. Default timeout is 10 seconds. Timeout time.Duration `envconfig:"optional"` // Migrate struct contains options for migrate Migrate *MigrateConfig }
Config represents configuration structure for every connection.
func (*Config) ComposeDSN ¶ added in v0.3.0
ComposeDSN compose DSN
func (*Config) SetDefault ¶ added in v0.3.0
SetDefault checks connection config. If required field is empty - it will be filled with some default value. Returns a copy of config.
type Enity ¶
type Enity struct { // Metrics stats.Service // DB connection Conn *sqlx.DB // contains filtered or unexported fields }
Enity is a connection controlling structure. It controls connection, asynchronous queue and everything that related to specified connection.
func GetEnityTypeCast ¶
func (*Enity) AppendToQueue ¶
func (*Enity) GetMetrics ¶
func (c *Enity) GetMetrics(prefix string) stats.MapMetricsOptions
GetMetrics return map of the metrics from database connection
func (*Enity) GetReadyHandlers ¶
func (c *Enity) GetReadyHandlers(prefix string) stats.MapCheckFunc
GetReadyHandlers return array of the readyHandlers from database connection
func (*Enity) NewMutexByID ¶
NewMutexByID create new database mutex with selected id
func (*Enity) RegisterMigration ¶
func (c *Enity) RegisterMigration(migration *MigrationInCode)
func (*Enity) SetConnPoolLifetime ¶
SetConnPoolLifetime sets connection lifetime.
func (*Enity) SetConnPoolLimits ¶
SetConnPoolLimits sets pool limits for connections counts.
func (*Enity) SetPoolLimits ¶
func (c *Enity) SetPoolLimits(maxIdleConnections, maxOpenedConnections int, connMaxLifetime time.Duration)
SetPoolLimits sets connection pool limits.
func (*Enity) Shutdown ¶
Shutdown shutdowns queue worker and connection watcher. Later will also close connection to database. This is a blocking call.
func (*Enity) WaitForEstablishing ¶
func (c *Enity) WaitForEstablishing()
WaitForEstablishing will block execution until connection will be successfully established and database migrations will be applied (or rolled back).
type MigrateConfig ¶
type MigrateConfig struct { // Action for migration, may be: nothing, up, down Action string `envconfig:"optional"` // Count of applied/rollbacked migration Count int64 `envconfig:"optional"` // Directory is a path to migrate scripts Directory string `envconfig:"optional"` // Only migration, exit from service after migration Only bool `envconfig:"optional"` // MigrationsType instructs database migration package to use one // or another migrations types. MigrationsType MigrationsType `envconfig:"optional"` // Name of schema in database Schema string `envconfig:"optional"` }
func (*MigrateConfig) SetDefault ¶ added in v0.3.0
func (c *MigrateConfig) SetDefault() *MigrateConfig
SetDefault checks migration options. If required field is empty - it will be filled with some default value.
type MigrationInCode ¶
MigrationInCode represents informational struct for database migration that was written as Go code. When using such migrations you should not use SQL migrations as such mix might fuck up everything. This might be changed in future.
type MigrationsType ¶
type MigrationsType int
MigrationsType represents enumerator for acceptable migration types.
const ( MigrationTypeSQLFiles MigrationsType = iota MigrationTypeGoCode )
func (MigrationsType) String ¶
func (mt MigrationsType) String() string
String returns stringified representation of migrations type.
type Mutex ¶
type Mutex struct {
// contains filtered or unexported fields
}
Mutex provides a distributed mutex across multiple instances via PostgreSQL database
func NewMutexByID ¶
NewMutexByID creates new distributed postgresql mutex by ID
func (*Mutex) GenerateLockID ¶
GenerateLockID generate LockID by database name and other artifacts
func (*Mutex) Lock ¶
Lock sets lock item for database (PostgreSQL) locking. It is blocking call which will wait until database lock key will be deleted, pretty much like simple mutex.
func (*Mutex) RWLock ¶
RWLock sets rwlock item for database (PostgreSQL) locking. It is blocking call which will wait until database lock key will be deleted, pretty much like simple mutex.
type Provider ¶
type Provider struct {
*providerwithmetrics.Provider
}
Provider provides PostgreSQL database worker. This provider supports asynchronous database actions (like bulk inserting). Every connection will have own goroutine for queue processing.
func NewProvider ¶
Initialize should initialize provider. If asynchronous mode supported by provider (e.g. for batch inserting using transactions) queue processor should also be started here.
func (*Provider) AppendToQueue ¶
AppendToQueue adds passed item into processing queue.
func (*Provider) CreateEnity ¶
CreateEnity should create enity using passed parameters.
func (*Provider) NewMutexByID ¶
func (p *Provider) NewMutexByID(connectionName string, lockID int64, checkInterval time.Duration) (*Mutex, error)
NewMutexByID creates new distributed postgresql mutex by ID
func (*Provider) RegisterMigration ¶
RegisterMigration registers migration for specified connection. It is up to provider to provide instructions about working with migrations and how to put them into migration interface. It is recommended to use separate structure.
func (*Provider) WaitForFlush ¶
WaitForFlush blocks execution until queue will be empty.
type QueueItem ¶
type QueueItem struct { // Query is a SQL query with placeholders for NamedExec(). See sqlx's // documentation about NamedExec(). Query string // Params should be a structure that describes parameters. Fields // should have proper "db" tag to be properly put in database if // field name and table columns differs. Param QueueItemParam // Queue item flags. // IsWaitForFlush signals that this item blocking execution flow // and true should be sent via WaitForFlush chan. // When this flag set query from this item won't be processed. // Also all queue items following this item will be re-added // in the beginning of queue for later processing. IsWaitForFlush bool // WaitForFlush is a channel which will receive "true" once item // will be processed. WaitForFlush chan bool }
QueueItem is a queue element. It will be used in conjunction with sqlx's NamedExec() function.
type QueueItemParam ¶
type QueueItemParam interface { // IsUnique should return false if item is not unique (and therefore // should not be processed) and true if item is unique and should // be processed. When uniqueness isn't necessary you may return // true here. IsUnique(conn *sqlx.DB) bool // Prepare should prepare items if needed. For example it may parse // timestamps from JSON-only fields into database-only ones. It should // return true if item is ready to be processed and false if error // occurred and item should not be processed. If preparation isn't // necessary you may return true here. Prepare() bool }
QueueItemParam is an interface for wrapping passed structure.