Documentation
¶
Overview ¶
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultMaxIdleConns is the default maximum number of idle connections // allowed in the database pool. DefaultMaxIdleConns = runtime.GOMAXPROCS(0) // DefaultMaxOpenConns is the default maximum number of open connections // allowed in the database pool. DefaultMaxOpenConns = DefaultMaxIdleConns * 10 // DefaultMaxConnLifetime is the default maximum lifetime of database // connections. DefaultMaxConnLifetime = 10 * time.Minute // DefaultLockTTL is the default interval at which providers renew the locks // on a specific application's data. DefaultLockTTL = 10 * time.Second )
Functions ¶
func CreateSchema ¶
CreateSchema creates the schema elements necessary to use the given database.
It does not return an error if the schema already exists.
Types ¶
type AggregateDriver ¶
type AggregateDriver interface { // InsertAggregateMetaData inserts meta-data for an aggregate instance. // // It returns false if the row already exists. InsertAggregateMetaData( ctx context.Context, tx *sql.Tx, ak string, md persistence.AggregateMetaData, ) (bool, error) // UpdateAggregateMetaData updates meta-data for an aggregate instance. // // It returns false if the row does not exist or md.Revision is not current. UpdateAggregateMetaData( ctx context.Context, tx *sql.Tx, ak string, md persistence.AggregateMetaData, ) (bool, error) // SelectAggregateMetaData selects an aggregate instance's meta-data. SelectAggregateMetaData( ctx context.Context, db *sql.DB, ak, hk, id string, ) (persistence.AggregateMetaData, error) }
AggregateDriver is the subset of the Driver interface that is concerned with aggregates.
type DSNProvider ¶
type DSNProvider struct { // DriverName is the driver name to be passed to sql.Open(). DriverName string // DSN is the data-source name to be passed to sql.Open(). DSN string // Driver is the Verity SQL driver to use with this database. If it is nil, // it is chosen automatically from one of the built-in drivers. Driver Driver // LockTTL is the interval at which the provider renews its exclusive lock // on the application's data. If it is non-positive, DefaultLockTTL is used. LockTTL time.Duration // MaxIdleConnections is the maximum number of idle connections allowed in // the database pool. // // If it is zero, DefaultMaxIdleConns is used. MaxIdleConns int // MaxOpenConnections is the maximum number of open connections allowed in // the database pool. // // If it is zero, DefaultMaxOpenConns is used. MaxOpenConns int // maxConnLifetime is the maximum lifetime of database connections. // If it is zero, DefaultMaxConnLifetime is used. MaxConnLifetime time.Duration // contains filtered or unexported fields }
DSNProvider is an implementation of provider.Provider for SQL that opens a a database pool using a DSN.
func (*DSNProvider) Open ¶
func (p *DSNProvider) Open(ctx context.Context, k string) (persistence.DataStore, error)
Open returns a data-store for a specific application.
k is the identity key of the application.
Data stores are opened for exclusive use. If another engine instance has already opened this application's data-store, ErrDataStoreLocked is returned.
type Driver ¶
type Driver interface { LockDriver AggregateDriver EventDriver OffsetDriver ProcessDriver QueueDriver // IsCompatibleWith returns nil if this driver can be used with db. IsCompatibleWith(ctx context.Context, db *sql.DB) error // Begin starts a transaction for use in a peristence.Transaction. Begin(ctx context.Context, db *sql.DB) (*sql.Tx, error) // CreateSchema creates any SQL schema elements required by the driver. CreateSchema(ctx context.Context, db *sql.DB) error // DropSchema removes any SQL schema elements created by CreateSchema(). DropSchema(ctx context.Context, db *sql.DB) error }
Driver is used to interface with the underlying SQL database.
type EventDriver ¶
type EventDriver interface { // UpdateNextOffset increments the next offset by one and returns the new // value. UpdateNextOffset( ctx context.Context, tx *sql.Tx, ak string, ) (uint64, error) // InsertEvent saves an event at a specific offset. InsertEvent( ctx context.Context, tx *sql.Tx, o uint64, env *envelopespec.Envelope, ) error // InsertEventFilter inserts a filter that limits selected events to those // with a portable name in the given set. // // It returns the filter's ID. InsertEventFilter( ctx context.Context, db *sql.DB, ak string, f map[string]struct{}, ) (int64, error) // DeleteEventFilter deletes an event filter. // // f is the filter ID, as returned by InsertEventFilter(). DeleteEventFilter( ctx context.Context, db *sql.DB, f int64, ) error // PurgeEventFilters deletes all event filters for the given application. PurgeEventFilters( ctx context.Context, db *sql.DB, ak string, ) error // SelectNextEventOffset selects the next "unused" offset. SelectNextEventOffset( ctx context.Context, db *sql.DB, ak string, ) (uint64, error) // SelectEventsByType selects events that match the given type filter. // // f is a filter ID, as returned by InsertEventFilter(). o is the minimum // offset to include in the results. SelectEventsByType( ctx context.Context, db *sql.DB, ak string, f int64, o uint64, ) (*sql.Rows, error) // SelectEventsBySource selects events that were produced by a specific // handler. SelectEventsBySource( ctx context.Context, db *sql.DB, ak, hk, id string, o uint64, ) (*sql.Rows, error) // SelectOffsetByMessageID selects the offset of the message with the given // ID. It returns false as a second return value if the message cannot be // found. SelectOffsetByMessageID( ctx context.Context, db *sql.DB, id string, ) (uint64, bool, error) // ScanEvent scans the next event from a row-set returned by // SelectEventsByType() and SelectEventsBySource(). ScanEvent( rows *sql.Rows, ev *persistence.Event, ) error }
EventDriver is the subset of the Driver interface that is concerned with events.
type LockDriver ¶
type LockDriver interface { // AcquireLock acquires an exclusive lock on an application's data. // // It returns the lock ID, which can be used in subsequent calls to // RenewLock() and ReleaseLock(). // // It returns false if the lock can not be acquired. AcquireLock( ctx context.Context, db *sql.DB, ak string, ttl time.Duration, ) (int64, bool, error) // RenewLock updates the expiry timestamp on a lock that has already been // acquired. // // It returns false if the lock has not been acquired. RenewLock( ctx context.Context, db *sql.DB, id int64, ttl time.Duration, ) (bool, error) // ReleaseLock releases a lock that was previously acquired. ReleaseLock( ctx context.Context, db *sql.DB, id int64, ) error }
LockDriver is the subset of the Driver interface that is concerned with application locking.
type OffsetDriver ¶
type OffsetDriver interface { // LoadOffset loads the last offset associated with the given source // application key sk. ak is the 'owner' application key. // // If there is no offset associated with the given source application key, // the offset is returned as zero and error as nil. LoadOffset( ctx context.Context, db *sql.DB, ak, sk string, ) (uint64, error) // InsertOffset inserts a new offset associated with the given source // application key sk. ak is the 'owner' application key. // // It returns false if the row already exists. InsertOffset( ctx context.Context, tx *sql.Tx, ak, sk string, n uint64, ) (bool, error) // UpdateOffset updates the offset associated with the given source // application key sk. ak is the 'owner' application key. // // It returns false if the row does not exist or c is not the current offset // associated with the given application key. UpdateOffset( ctx context.Context, tx *sql.Tx, ak, sk string, c, n uint64, ) (bool, error) }
OffsetDriver is the subset of the Driver interface that is concerned with persisting event stream offsets.
type ProcessDriver ¶
type ProcessDriver interface { // InsertProcessInstance inserts a process instance. // // It returns false if the row already exists. InsertProcessInstance( ctx context.Context, tx *sql.Tx, ak string, inst persistence.ProcessInstance, ) (bool, error) // UpdateProcessInstance updates a process instance. // // It returns false if the row does not exist or inst.Revision is not // current. UpdateProcessInstance( ctx context.Context, tx *sql.Tx, ak string, inst persistence.ProcessInstance, ) (bool, error) // DeleteProcessInstance deletes a process instance. // // It returns false if the row does not exist or inst.Revision is not // current. DeleteProcessInstance( ctx context.Context, tx *sql.Tx, ak string, inst persistence.ProcessInstance, ) (bool, error) // SelectProcessInstance selects a process instance's data. SelectProcessInstance( ctx context.Context, db *sql.DB, ak, hk, id string, ) (persistence.ProcessInstance, error) }
ProcessDriver is the subset of the Driver interface that is concerned with processess.
type Provider ¶
type Provider struct { // DB is the SQL database to use. DB *sql.DB // Driver is the Verity SQL driver to use with this database. If it is nil, // it is chosen automatically from one of the built-in drivers. Driver Driver // LockTTL is the interval at which the provider renews its exclusive lock // on the application's data. If it is non-positive, DefaultLockTTL is used. LockTTL time.Duration // contains filtered or unexported fields }
Provider is an implementation of provider.Provider for SQL that uses an existing open database pool.
type QueueDriver ¶
type QueueDriver interface { // InsertQueueMessage inserts a message in the queue. // // It returns false if the row already exists. InsertQueueMessage( ctx context.Context, tx *sql.Tx, ak string, m persistence.QueueMessage, ) (bool, error) // UpdateQueueMessage updates meta-data about a message that is already on // the queue. // // It returns false if the row does not exist or m.Revision is not current. UpdateQueueMessage( ctx context.Context, tx *sql.Tx, ak string, m persistence.QueueMessage, ) (bool, error) // DeleteQueueMessage deletes a message from the queue. // // It returns false if the row does not exist or m.Revision is not current. DeleteQueueMessage( ctx context.Context, tx *sql.Tx, ak string, m persistence.QueueMessage, ) (bool, error) // DeleteQueueTimeoutMessagesByProcessInstance deletes timeout messages that // were produced by a specific process instance. DeleteQueueTimeoutMessagesByProcessInstance( ctx context.Context, tx *sql.Tx, ak string, inst persistence.ProcessInstance, ) error // SelectQueueMessages selects up to n messages from the queue. SelectQueueMessages( ctx context.Context, db *sql.DB, ak string, n int, ) (*sql.Rows, error) // ScanQueueMessage scans the next message from a row-set returned by // SelectQueueMessages(). ScanQueueMessage( rows *sql.Rows, m *persistence.QueueMessage, ) error }
QueueDriver is the subset of the Driver interface that is concerned with the message queue subsystem.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package mysql is a MySQL driver for the SQL persistence provider.
|
Package mysql is a MySQL driver for the SQL persistence provider. |
Package postgres is a PostgreSQL driver for the SQL persistence provider.
|
Package postgres is a PostgreSQL driver for the SQL persistence provider. |
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
|
Package sqlite is an SQlite v3 driver for the SQL persistence provider. |