Ch

package
v1.4202.2154 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2024 License: MIT Imports: 16 Imported by: 0

README

Clickhouse Adapter and ORM Generator

This package provides automatic migration (only when adding more columns on the last position, not for changing reordering or changing order key's data type). This package also can be used to generate ORM

Dependencies

go install github.com/fatih/gomodifytags@latest
go install github.com/kokizzu/replacer@latest

Generated ORM example

image

How to create a connection

import (
	"database/sql"
	"fmt"

	_ "github.com/ClickHouse/clickhouse-go/v2"
)

func ConnectClickhouse() *sql.DB {
	connStr := fmt.Sprintf("tcp://%s:%s?debug=true",
		CLICKHOUSE_HOST,
		CLICKHOUSE_PORT,
	)
	click, err := sql.Open(`clickhouse`, connStr)
	L.PanicIf(err, `sql.Open `+connStr)
	return click
}

// then use it like this:
ch := &Ch.Adapter{DB: conf.ConnectClickhouse(), Reconnect: conf.ConnectClickhouse}

Usage

  1. create a model/ directory inside your project
  2. create a model/m[Domain] directory, for example if the business domain is authentication, you might want to create mAuth
  3. create a [domain]_tables.go something like this:
package mAuth

import "github.com/kokizzu/gotro/D/Ch"

// table userlogs
const (
	TableUserLogs Ch.TableName = `userLogs`
	CreatedAt                  = `createdAt`
	RequestId                  = `requestId`
	Error                      = `error`
	ActorId                    = `actorId`
	IpAddr4                    = `ipAddr4`
	IpAddr6                    = `ipAddr6`
	UserAgent                  = `userAgent`
)

var ClickhouseTables = map[Ch.TableName]*Ch.TableProp{
	TableUserLogs: {
		Fields: []Ch.Field{
			{CreatedAt, Ch.DateTime},
			{RequestId, Ch.UInt64},
			{ActorId, Ch.UInt64},
			{Error, Ch.String},
			{IpAddr4, Ch.IPv4},
			{IpAddr6, Ch.IPv6},
			{UserAgent, Ch.String},
			// add more columns here
		},
		Orders: []string{ActorId, RequestId},
	},
}

func GenerateORM() {
	Ch.GenerateOrm(ClickhouseTables) 
}
  1. create a [domain]_generator_test.go something like this:
package mAuth

import (
	"testing"
)

//go:generate go test -run=XXX -bench=Benchmark_GenerateOrm

func Benchmark_GenerateOrm(b *testing.B) {
	GenerateORM()
	b.SkipNow()
}
  1. run the test to generate new ORM, that would generate sa[Domain]/sa[Domain]__ORM.GEN.go file, you might want to create a helper script for that:
#!/usr/bin/env bash

cd ./model
  cat *.go | grep '//go:generate ' | cut -d ' ' -f 2- | bash -x > /tmp/1.log
  
for i in ./m*; do
  if [[ ! -d "$i" ]] ; then continue ; fi
  echo $i
  pushd .
  cd "$i"
  
  # generate ORM
  go test -bench=.
  
  for j in ./*; do 
    echo $j
    if [[ ! -d "$j" ]] ; then continue ; fi
        
    pushd .
    cd "$j" 
    echo `pwd` 
    cat *.go | grep '//go:generate ' | cut -d ' ' -f 2- | bash -x >> /tmp/1.log    
    popd 
    
  done
  
  popd
  
done
  1. in your web server engine/domain logic (one that initializes dependencies), create methods to help initialize the buffer, something like this:
type Domain struct {
	Click     *Ch.Adapter
	chBuffers map[Ch.TableName]*chBuffer.TimedBuffer
	waitGroup *sync.WaitGroup
	// add more dependency initialization here
}

func (d *Domain) InitClickhouseBuffer(preparators map[Ch.TableName]chBuffer.Preparator) {
	for tableName, preparator := range preparators {
		chb := chBuffer.NewTimedBuffer(d.Click.DB, 30000, 1*time.Second, preparator)
		chb.IgnoreInterrupt = true
		d.chBuffers[tableName] = chb
		d.waitGroup.Add(1)
	}
}

func (d *Domain) WaitInterrupt() {
	interrupt := make(chan os.Signal, 1) // we need to reserve to buffer size 1, so the notifier are not blocked
	//signal.Notify(interrupt, os.Interrupt, syscall.SIGKILL)
	signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
	signal.Notify(interrupt, os.Interrupt, syscall.SIGHUP)
	signal.Notify(interrupt, os.Interrupt, syscall.SIGINT)
	signal.Notify(interrupt, os.Interrupt, syscall.SIGQUIT)

	<-interrupt
	L.Print(`caught signal`, interrupt)
}

func (d *Domain) handleTermSignal() {
	d.WaitInterrupt()
	for tableName := range d.chBuffers {
		go func(tableName Ch.TableName) {
			chb := d.chBuffers[tableName]
			chb.Close()
			<-chb.WaitFinalFlush
			L.Print(`done waiting: ` + tableName)
			d.waitGroup.Done()
		}(tableName)
	}
	d.waitGroup.Wait()
	os.Exit(0)
}

func (d *Domain) Statistics(row AnalyticsRow) {
	tableName := row.TableName()
	res := d.chBuffers[tableName]
	if res != nil {
		res.Insert(row.SqlInsertParam())
		return
	}
	panic(`did you forgot to register InitClickhouseBuffer preparators for ` + string(tableName))
}


func NewDomain() *Domain {
	d := &Domain{
		Click: &Ch.Adapter{conf.ConnectClickhouse(), conf.ConnectClickhouse},
	}
	d.waitGroup = &sync.WaitGroup{}
	d.chBuffers = map[Ch.TableName]*chBuffer.TimedBuffer{}
	d.InitClickhouseBuffer(saAuth.Preparators)

	go d.handleTermSignal()
	// add more preparators if there's new clickhouse tables on model
	return d
}
  1. last step is just call Domain.Statistics method to insert a new log, something like this:

func (d *Domain) BusinessLogic1(in *BusinessLogic1_In) (out BusinessLogic1_Out) {
	
	// do something else
	
	d.Statistics(saAuth.UserLogs{
		CreatedAt: in.Now(),
		RequestId: ctx.RequestId,
		Error ctx.Error,
		ActorId: session.UserId,
		IpAddr4: ctx.RemoteAddr4,
		IpAddr6: ctx.RemoteAddr6,
		UserAgent: session.UserAgent,
	})
	
}
  1. If you need to create an extension method for the ORM, just add a new file on sa[Domain]/anything.go, with a new struct method from generated ORM, something like this:
package saAuth

import (
	"github.com/kokizzu/gotro/I"
	"github.com/kokizzu/gotro/L"
	"github.com/kokizzu/gotro/X"
)

type TopUser struct {
	UserId uint64
	Count int64
	Rank int64
}

func (m *UserLogs) FindTop1k(daySpan int, offset int64) (res []TopUser) {
	query := `
SELECT ` + m.SqlActorId() + `
	, COUNT(1) 
FROM ` + m.SqlTableName() + ` 
WHERE ` + m.SqlCreatedAt() + ` >= subtractDays(now(),` + I.ToStr(daySpan) + `) 
GROUP BY ` + m.SqlActorId() + ` 
ORDER BY COUNT(1) DESC
	,  MAX(` + m.SqlCreatedAt() + `)
LIMIT 1000
OFFSET ` + X.ToS(offset) + `
` // note: for string, use S.Z or S.XSS to prevent SQL injection
	rows, err := m.Adapter.Query(query)
	if L.IsError(err, `failed query: `+query) {
		return
	}
	defer rows.Close()
	rankNo := int64(1)
	for rows.Next() {
		row := TopUser{Rank: rankNo}
		err := rows.Scan(&row.UserId, &row.Count)
		if L.IsError(err, `rows.Scan`) {
			return
		}
		rankNo++
		res = append(res, row)
	}
	return
}

  1. to initialize automatic migration, just create model/run_migration.go
func RunMigration() {
	L.Print(`run migration..`)
	ch := &Ch.Adapter{DB: ConnectClickhouse(), Reconnect: ConnectClickhouse}
	ch.MigrateTables(mAuth.ClickhouseTables)
	// add other clickhouse tables to be migrated here
}

then call it on main

func main() {
	model.RunMigration()
}

Documentation

Index

Constants

View Source
const AggregatingMergeTree = `AggregatingMergeTree`
View Source
const Buffer = `Buffer`
View Source
const CodecLz4hc = `CODEC(LZ4HC)`
View Source
const CollapsingMergeTree = `CollapsingMergeTree`
View Source
const Dictionary = `Dictionary`
View Source
const Distributed = `Distributed`
View Source
const EmbeddedRocksDB = `EmbeddedRocksDB`
View Source
const File = `File`
View Source
const GraphiteMergeTree = `GraphiteMergeTree`
View Source
const HDFS = `HDFS`
View Source
const JDBC = `JDBC`
View Source
const Join = `Join`
View Source
const Kafka = `Kafka`
View Source
const Log = `Log`
View Source
const MaterializedView = `MaterializedView`
View Source
const Memory = `Memory`
View Source
const Merge = `Merge`
View Source
const MergeTree = `MergeTree`
View Source
const MongoDB = `MongoDB`
View Source
const MySQL = `MySQL`
View Source
const Null = `Null`
View Source
const ODBC = `ODBC`
View Source
const PostgreSQL = `PostgreSQL`
View Source
const RabbitMQ = `RabbitMQ`
View Source
const ReplacingMergeTree = `ReplacingMergeTree`
View Source
const S3 = `S3`
View Source
const Set = `Set`
View Source
const StripeLog = `StripeLog`
View Source
const SummingMergeTree = `SummingMergeTree`
View Source
const TinyLog = `TinyLog`
View Source
const URL = `URL`
View Source
const VersionedCollapsingMergeTree = `VersionedCollapsingMergeTree`
View Source
const View = `View`

Variables

View Source
var DEBUG = true
View Source
var TypeToConst = map[DataType]string{
	DateTime:    `Ch.DateTime`,
	DateTime64:  `Ch.DateTime64`,
	Decimal:     `Ch.Decimal`,
	FixedString: `Ch.FixedString`,
	Float32:     `Ch.Float32`,
	Float64:     `Ch.Float64`,
	IPv4:        `Ch.IPv4`,
	IPv6:        `Ch.IPv6`,
	Int16:       `Ch.Int16`,
	Int32:       `Ch.Int32`,
	Int64:       `Ch.Int64`,
	Int8:        `Ch.Int8`,
	String:      `Ch.String`,
	UInt16:      `Ch.UInt16`,
	UInt32:      `Ch.UInt32`,
	UInt64:      `Ch.UInt64`,
}

Functions

func CheckClickhouseTables

func CheckClickhouseTables(cConn *Adapter, tables map[TableName]*TableProp)

func Connect1

func Connect1(user, pass, host, port, dbName string, debug bool) *sql.DB

func ConnectLocal

func ConnectLocal(host, port string) *sql.DB

func Descr

func Descr(args ...any)

func GenerateOrm

func GenerateOrm(tables map[TableName]*TableProp)

Types

type Adapter

type Adapter struct {
	*sql.DB
	Reconnect func() *sql.DB
}

func (*Adapter) AlterMissingColumns

func (a *Adapter) AlterMissingColumns(tableName TableName, props *TableProp) bool

func (*Adapter) CreateMaterializedViews added in v1.3817.2143

func (a *Adapter) CreateMaterializedViews(mvName MaterializedViewName, props *MVProp) bool

func (*Adapter) CreateTable

func (a *Adapter) CreateTable(tableName TableName, props *TableProp) bool

func (*Adapter) MigrateTables

func (a *Adapter) MigrateTables(tables map[TableName]*TableProp)

func (*Adapter) UpsertTable

func (a *Adapter) UpsertTable(tableName TableName, props *TableProp) bool

type ChDockerTest

type ChDockerTest struct {
	User     string
	Password string
	Database string
	Image    string
	Port     string
	// contains filtered or unexported fields
}

func (*ChDockerTest) ConnectCheck

func (in *ChDockerTest) ConnectCheck(res *dockertest.Resource) (conn *sql.DB, err error)

func (*ChDockerTest) ImageLatest

func (in *ChDockerTest) ImageLatest(pool *D.DockerTest) *dockertest.RunOptions

func (*ChDockerTest) ImageVersion

func (in *ChDockerTest) ImageVersion(pool *D.DockerTest, version string) *dockertest.RunOptions

ImageVersion https://hub.docker.com/r/clickhouse/clickhouse-server

func (*ChDockerTest) SetDefaults

func (in *ChDockerTest) SetDefaults(img string)

type DataType

type DataType string

SELECT DISTINCT alias_to FROM system.data_type_families ORDER BY alias_to ASC

const (
	DateTime    DataType = `DateTime`
	DateTime64  DataType = `DateTime64`
	Decimal     DataType = `Decimal`
	FixedString DataType = `FixedString`
	Float32     DataType = `Float32`
	Float64     DataType = `Float64`
	IPv4        DataType = `IPv4`
	IPv6        DataType = `IPv6`
	Int16       DataType = `Int16`
	Int32       DataType = `Int32`
	Int64       DataType = `Int64`
	Int8        DataType = `Int8`
	String      DataType = `String`
	UInt16      DataType = `UInt16`
	UInt32      DataType = `UInt32`
	UInt64      DataType = `UInt64`
	UInt8       DataType = `UInt8`
)

type Field

type Field struct {
	Name string
	Type DataType
}

type MVProp added in v1.3817.2143

type MVProp struct {
	SourceTable   string
	SourceColumns []string
	Engine        string
	Partitions    []string
	Orders        []string
}

type MaterializedViewName added in v1.3817.2143

type MaterializedViewName string

type TableName

type TableName string

type TableProp

type TableProp struct {
	Fields     []Field
	Engine     string
	Partitions []string
	Orders     []string

	DefaultCodec string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL