gorp

package
v0.0.0-...-6d3228f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2021 License: MIT Imports: 13 Imported by: 0

README

modules/gorp

This composite orm module combines Gorp with Squirrel to give you a complete solution

  • Gorp saves you time, minimizes the drudgery of getting data in and out of your database, and helps your code focus on algorithms, not infrastructure.
  • Squirrel helps you build SQL queries from composable parts

Activation

module.gorp = github.com/caeril/frevel/modules/orm/gorp

Drivers

  • sqlite3
  • postgres
  • mysql

Configuration file

# Database config
db.autoinit=true # default=true
db.driver=postgres # mysql, postgres, sqlite3

# The database connection properties individually or use the db.connection
db.host=localhost  # Use db.host /tmp/app.db is your driver is sqlite
db.user=dbuser
db.name=dbname
db.password=dbpassword

# Database connection string (host, user, dbname and other params)
db.connection=localhost port=8500 user=user dbname=mydb sslmode=disable password=ack
# If true then the database will be initialized on startup.
db.autoinit=true 

Decelerations

A global Db *DbGorp object is created in github.com/caeril/frevel/modules/gorp/app. The Db is initialized from the app.conf if db.autoinit=true.

// DB Gorp
type DbGorp struct {
   Gorp *gorp.DbMap
   // The Sql statement builder to use to build select statements
   SqlStatementBuilder sq.StatementBuilderType
}

var (
   // The database map to use to populate data
   Db = &DbGorp{}
)

Usage

If db.autoinit=true in app.conf then you can add your tables to Gorp on app start. Note that the tables are added as a function using gorp.Db.SetDbInit - this is for database thread pooling

import (
	"github.com/caeril/frevel/revel"
	"github.com/caeril/frevel/modules/gorp/app"
)
func init() {
	revel.OnAppStart(func(){
		// Register tables
		gorp.Db.SetDbInit(func(dbGorp *gorp.DbGorp) error {
			// Register tables
			gorp.Db.Map.AddTableWithName(model.MyTable{}, "my_table")
			return nil
		})		
	},5)
}
Controller

Controllers, with the gorpController.Controller embedded, have a gorp.DbGorp populated in the Controller.Db. This database is the global one that is created on startup

package controllers

import (
	"github.com/caeril/frevel/revel"
	"github.com/caeril/frevel/modules/orm/gorp/app/controllers"
)

type App struct {
	gorpController.Controller
}
type TableRow struct {
  Id int `db:"id,int64"`
}
func (c App) Index() revel.Result {
  sql,args,_ := c.Db.SqlStatementBuilder.Select("*").From("table").Limit(1).ToSql()
  row := &TableRow{}
  if err:= c.Db.Map.SelectOne(row,sql,args...); err!=nil {
    c.RenderError(err)
  }
  return c.Render(row)
}
Multiple databases

The gorp module can populate a DbGorp object for you from a gorp.DbInfo object. So if you don't want to use the global database (in the gorp module) you can initialize another anywhere in your project.

import (
   "github.com/caeril/frevel/revel"
   "github.com/caeril/frevel/modules/orm/gorp/app"
)
var (
SecondDb = &gorp.DbGorp{} 
)
func init() {
   revel.OnAppStart(func(){
   	// Create a DbInfo object with a minimum of a driver and other details
   	params := gorp.DbInfo{Driver:"postgres",DbUser:revel.Config.StringDefault("seconddb.user", "default")}
   	secondDb.Info = params
   	if err:=secondDb.InitDb(true); err!=nil {
   	  revel.Panicf("Second database failed to open %s", err.Error())
   	}
   },0)
}
Multi Channel Connections

This is not connection pooling - this is for distributing work across multiple channels to get a lot of stuff done fast. It creates a bunch workers and each worker has its own connection (On the start of the worker a status is sent in case you want to do some prework). Tasks are sent through the DbWorkContainer.InputChannel which distributes the task to whatever worker is available.

If you are using any tables that requires GORP to have initialized tables you must register the tables using gorp.Db.SetDbInit. This is the only way that this service can properly initialize the newly thread created GORP instances. Here is an example.

import (
	"github.com/caeril/frevel/revel"
	"github.com/caeril/frevel/modules/gorp/app"
)
func init() {
	revel.OnAppStart(func(){
		// Register tables
		gorp.Db.SetDbInit(func(dbGorp *gorp.DbGorp) error {
			// Register tables
			gorp.Db.Map.AddTableWithName(model.MyTable{}, "my_table")
			return nil
		})		
	},5)
}

In order to achieve this there is a gorp.DbWorkerContainer which is initialized by NewDbWorker(db *DbGorp, callBack DbCallback, numWorkers int) (container *DbWorkerContainer, err error) The DbCallback can be initialized by gorp.MakeCallback(status func(phase WorkerPhase, worker *DbWorker), work func(value interface{}, worker *DbWorker)) DbCallback ()the status function is optional) or implemented by your structure

Once the gorp.DbWorkerContainer is created tasks can be submitted to it by using the gorp.DbWorkerContainer.InputChannel<-task this will call the gorp.DbCallback.Work function passing in an instance of *DbWorker

  // Assume sourceDb is a *gorp.DbGorp instance
	workerPool := gorp.NewDbWorker(d.Db, gorp.MakeCallback(func(phase gorp.WorkerPhase, worker *gorp.DbWorker) {
	    // On start initialize some data to be used later
		if phase == gorp.Start {
			dataList, err := model.FetchData(worker.Db)
			worker.SharedData["dataList"] = dataList

		} else if phase == gorp.JobLongrunning {
			revel.AppLog.Error("Long running process detected", "worker", worker.Id)
		}
	}, func(work interface{}, worker *gorp.DbWorker) {
		dataList := worker.SharedData["dataList"].(model.DataList)
        // Whatever is sent into the workerPool.InputChannel<- will be the value
        value := work.(*SomeObject)
        
	}), 100) 
	// Set the timeout for watchdog notifications
	workerPool.LongWorkTimeout=300
	err := workerPool.Start() // Start a 100 worker threads
	if err!=nil {
	  return err
	}
	defer workerPool.Close(0) // Close, wait for channels to exit (non zero would exit after timeout)
	// var tasks[] a large list of work to be done
	tasksBlock := make([]*SomeObject,100)
	for i,task := range tasks {
	  if i>0 && i%100==0 {
        workerPool.InputChannel<-taskBlock
        tasksBlock = make([]*SomeObject)  
	  }
	  taskBlock = append(taskBlock, task)
	}
	workerPool.InputChannel<-taskBlock
	// Pool is closed on defer function, it will not return till pool closes
	return nil
Implementation notes

If your "work" is short but there is a lot of it then it is highly recommended you pass in lists of items to work on. Channels are great at providing an easy way to move data to and from threads but it is a process of synchronizing between two threads. If you pass in a single object at a time you pay that cost on every row you pass. If you do it once every 10,000 rows then the cost is minimal.

WorkParallel function

There is a handy function called WorkParallel(db *DbGorp, tasks []func(worker *DbWorker), returnResults bool, maxNumWorkers int, timeouts int) (err error) which makes it simple to do create a group of calls

  // Assume sourceDb is a *gorp.DbGorp instance
	task := func(query string) func(db *gorp.DbWorker) {
		return func(worker *gorp.DbWorker) {
			_, e := worker.Db.Map.Exec("ANALYZE " + query)
		}
	}
	gorp.WorkParallel(sourceDb, []func(worker *gorp.DbWorker){
		task("history"),
		task("summary"),
		task("daily"),
		task("monthly"),
	}, false, 0, 0)

Returning data from workers

gorp.DbWorker contains an OutputChannel which you can send the results back to, you must read from the output the same number of times that you wrote. The output size is the same size as the

  // Assume sourceDb is a *gorp.DbGorp instance
	task := func(query string) func(db *gorp.DbWorker) {
		return func(worker *gorp.DbWorker) {
			result, err := worker.Db.Map.Exec("ANALYZE " + query)
			worker.OutputChannel <- []interface{}{result,err}
		}
	}
	r,e:=gorp.WorkParallel(sourceDb, []func(worker *gorp.DbWorker){
		task("history"),
		task("daily"),
		task("monthly"),
	},true, 0, 0)

	println(r, e)
	for _,result := range r {

		key,err:= result.([]interface{})[0],result.([]interface{})[1]
		fmt.Println("Returned result",key,"error",err)
	}

Watchdog Timeouts

gorp.DbWorkerContainer contains a couple of timeout settings (in seconds) used to monitor the startup and running of the workers for the duration of the container (this is typically called a watchdog timeout).

  • StartWorkTimeout If greater then 0 this is the timeout in seconds that it takes to start a worker.
  • LongWorkTimeout If greater then 0 this is the timeout in seconds that it takes before a notification is sent to the DbCallbackImplied.StatusFn func(phase WorkerPhase, worker *DbWorker) if a worker runs past X seconds on a single task. Each worker will have their own watchdog channel and it will send a gorp.JobLongrunning and the gorp.DBWorker to the status function so you can log or investigate long running processes

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// The database map to use to populate data
	Db = &DbGorp{}
)

Functions

func InitDb

func InitDb(dbResult *DbGorp) error

Initialize the database from revel.Config

func WorkParallel

func WorkParallel(db *DbGorp, tasks []func(worker *DbWorker), returnResults bool, maxNumWorkers int, timeouts int) (results []interface{}, err error)

Creates a container to run the group of workers (up to a max of maxNumWorkers), does not return to all workers are completed) If returnResults is true then the task MUST write to the DbWorker.OutputChannel once for every task

Types

type DbCallbackImplied

type DbCallbackImplied struct {
	StatusFn func(phase WorkerPhase, worker *DbWorker)
	WorkFn   func(value interface{}, worker *DbWorker)
}

func (*DbCallbackImplied) Status

func (dbCallback *DbCallbackImplied) Status(phase WorkerPhase, worker *DbWorker)

Call the status function if available

func (*DbCallbackImplied) Work

func (dbCallback *DbCallbackImplied) Work(value interface{}, worker *DbWorker)

Calls the work function

type DbGeneric

type DbGeneric interface {
	Select(i interface{}, query string, args ...interface{}) ([]interface{}, error)
	Exec(query string, args ...interface{}) (sql.Result, error)
	Insert(list ...interface{}) error
	Update(list ...interface{}) (int64, error)
	Delete(i ...interface{}) (int64, error)
	SelectOne(holder interface{}, query string, args ...interface{}) error
}

type DbGorp

type DbGorp struct {
	Map *gorp.DbMap
	// The Sql statement builder to use to build select statements
	SqlStatementBuilder sq.StatementBuilderType
	// Database connection information
	Info *DbInfo
	// contains filtered or unexported fields
}

DB Gorp

func (*DbGorp) Begin

func (dbGorp *DbGorp) Begin() (txn *Transaction, err error)

Close the database connection

func (*DbGorp) Builder

func (dbGorp *DbGorp) Builder() sq.StatementBuilderType

func (*DbGorp) CloneDb

func (dbGorp *DbGorp) CloneDb(open bool) (newDb *DbGorp, err error)

Create a new database connection and open it from this one

func (*DbGorp) Close

func (dbGorp *DbGorp) Close() (err error)

Close the database connection

func (*DbGorp) Delete

func (dbGorp *DbGorp) Delete(i ...interface{}) (int64, error)

func (*DbGorp) ExecInsert

func (dbGorp *DbGorp) ExecInsert(builder sq.InsertBuilder) (r sql.Result, err error)

func (*DbGorp) ExecUpdate

func (dbGorp *DbGorp) ExecUpdate(builder sq.UpdateBuilder) (r sql.Result, err error)

func (*DbGorp) Get

func (dbGorp *DbGorp) Get(i interface{}, keys ...interface{}) (interface{}, error)

func (*DbGorp) GetMap

func (dbGorp *DbGorp) GetMap() DbGeneric

func (*DbGorp) InitDb

func (dbResult *DbGorp) InitDb(open bool) (err error)

func (*DbGorp) Insert

func (dbGorp *DbGorp) Insert(list ...interface{}) error

func (*DbGorp) OpenDb

func (dbGorp *DbGorp) OpenDb() (err error)

OpenDb database

func (*DbGorp) Schema

func (dbGorp *DbGorp) Schema() (result string)

func (*DbGorp) Select

func (dbGorp *DbGorp) Select(i interface{}, builder sq.SelectBuilder) (l []interface{}, err error)

func (*DbGorp) SelectInt

func (dbGorp *DbGorp) SelectInt(builder sq.SelectBuilder) (i int64, err error)

func (*DbGorp) SelectOne

func (dbGorp *DbGorp) SelectOne(i interface{}, builder sq.SelectBuilder) (err error)

func (*DbGorp) SetDbInit

func (dbGorp *DbGorp) SetDbInit(dbInitFn func(dbMap *DbGorp) error) (err error)

Used to specifiy the init function to call when database is initialized Calls the init function immediately

func (*DbGorp) TraceOff

func (dbGorp *DbGorp) TraceOff()

func (*DbGorp) TraceOn

func (dbGorp *DbGorp) TraceOn(log logger.MultiLogger)

func (*DbGorp) Update

func (dbGorp *DbGorp) Update(list ...interface{}) (int64, error)

type DbInfo

type DbInfo struct {
	DbDriver     string
	DbHost       string
	DbUser       string
	DbPassword   string
	DbName       string
	DbSchema     string
	DbConnection string
	Dialect      gorp.Dialect
}

type DbReadable

type DbReadable interface {
	Builder() sq.StatementBuilderType
	Get(i interface{}, keys ...interface{}) (interface{}, error)
	Select(i interface{}, builder sq.SelectBuilder) (l []interface{}, err error)
	SelectOne(i interface{}, builder sq.SelectBuilder) (err error)
	SelectInt(builder sq.SelectBuilder) (i int64, err error)
	GetMap() DbGeneric
	Schema() string
}

type DbWorkInfo

type DbWorkInfo interface {
	Status(phase WorkerPhase, worker *DbWorker)
	Work(value interface{}, worker *DbWorker)
}

func MakeCallback

func MakeCallback(status func(phase WorkerPhase, worker *DbWorker), work func(value interface{}, worker *DbWorker)) DbWorkInfo

A function to return an object that is a valid DbCallback

type DbWorker

type DbWorker struct {
	Id int
	Db *DbGorp
	SharedWorker
	WorkUnit       int
	SharedData     map[string]interface{}
	TimeInfo       *timeoutInfo
	TimeoutChannel chan *timeoutInfo
}

type DbWorkerContainer

type DbWorkerContainer struct {
	SharedWorker

	Workers          []*DbWorker
	NumWorkers       int
	LongWorkTimeout  int64
	StartWorkTimeout int64
	Db               *DbGorp
	// contains filtered or unexported fields
}

The worker container

func NewDbWorker

func NewDbWorker(db *DbGorp, workInfo DbWorkInfo, numWorkers int) (container *DbWorkerContainer)

This creates a DbWorkerContainer with the number of working threads already started. Each working thread has their own database instance running.

func (*DbWorkerContainer) Close

func (container *DbWorkerContainer) Close(timeouts int) (totalWork int, err error)

func (*DbWorkerContainer) Start

func (container *DbWorkerContainer) Start() (err error)

type DbWriteable

type DbWriteable interface {
	DbReadable
	Insert(list ...interface{}) error
	Update(list ...interface{}) (int64, error)
	Delete(i ...interface{}) (int64, error)
	ExecUpdate(builder sq.UpdateBuilder) (r sql.Result, err error)
	ExecInsert(builder sq.InsertBuilder) (r sql.Result, err error)
}

type SharedWorker

type SharedWorker struct {
	InputChannel   chan interface{}
	OutputChannel  chan interface{}
	ControlChannel chan func() (WorkerPhase, *DbWorker)
	// contains filtered or unexported fields
}

type Transaction

type Transaction struct {
	Map *gorpa.Transaction
	// contains filtered or unexported fields
}

This is a small wrapped around gorp.Transaction so you can make use of the builder statements as well

func (*Transaction) Builder

func (txn *Transaction) Builder() sq.StatementBuilderType

func (*Transaction) Commit

func (txn *Transaction) Commit() (err error)

func (*Transaction) Delete

func (txn *Transaction) Delete(i ...interface{}) (int64, error)

func (*Transaction) ExecInsert

func (txn *Transaction) ExecInsert(builder sq.InsertBuilder) (r sql.Result, err error)

func (*Transaction) ExecUpdate

func (txn *Transaction) ExecUpdate(builder sq.UpdateBuilder) (r sql.Result, err error)

func (*Transaction) Get

func (txn *Transaction) Get(i interface{}, keys ...interface{}) (interface{}, error)

func (*Transaction) GetMap

func (txn *Transaction) GetMap() DbGeneric

func (*Transaction) Insert

func (txn *Transaction) Insert(list ...interface{}) error

func (*Transaction) Rollback

func (txn *Transaction) Rollback() (err error)

func (*Transaction) Schema

func (txn *Transaction) Schema() (result string)

func (*Transaction) Select

func (txn *Transaction) Select(i interface{}, builder sq.SelectBuilder) (l []interface{}, err error)

func (*Transaction) SelectInt

func (txn *Transaction) SelectInt(builder sq.SelectBuilder) (i int64, err error)

func (*Transaction) SelectOne

func (txn *Transaction) SelectOne(i interface{}, builder sq.SelectBuilder) (err error)

func (*Transaction) Update

func (txn *Transaction) Update(list ...interface{}) (int64, error)

type WorkerPhase

type WorkerPhase int
const (
	Start WorkerPhase = iota
	Stop
	StartJob
	EndJob
	JobLongrunning
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL