dtmimp

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: BSD-3-Clause Imports: 17 Imported by: 0

README

Notice

Please donot use this package, and this package should only be used in dtm internally. The interfaces are not stable, and package name has postfix "imp"

Documentation

Index

Constants

View Source
const (
	// ResultFailure for result of a trans/trans branch
	// Same as HTTP status 409 and GRPC code 10
	ResultFailure = "FAILURE"
	// ResultSuccess for result of a trans/trans branch
	// Same as HTTP status 200 and GRPC code 0
	ResultSuccess = "SUCCESS"
	// ResultOngoing for result of a trans/trans branch
	// Same as HTTP status 425 and GRPC code 9
	ResultOngoing = "ONGOING"

	// OpTry branch type for TCC
	OpTry = "try"
	// OpConfirm branch type for TCC
	OpConfirm = "confirm"
	// OpCancel branch type for TCC
	OpCancel = "cancel"
	// OpAction branch type for message, SAGA, XA
	OpAction = "action"
	// OpCompensate branch type for SAGA
	OpCompensate = "compensate"
	// OpCommit branch type for XA
	OpCommit = "commit"
	// OpRollback branch type for XA
	OpRollback = "rollback"

	// DBTypeMysql const for driver mysql
	DBTypeMysql = "mysql"
	// DBTypePostgres const for driver postgres
	DBTypePostgres = "postgres"
	// DBTypeRedis const for driver redis
	DBTypeRedis = "redis"
	// Jrpc const for json-rpc
	Jrpc = "json-rpc"
	// JrpcCodeFailure const for json-rpc failure
	JrpcCodeFailure = -32901

	// JrpcCodeOngoing const for json-rpc ongoing
	JrpcCodeOngoing = -32902

	// MsgDoBranch0 const for DoAndSubmit barrier branch
	MsgDoBranch0 = "00"
	// MsgDoBarrier1 const for DoAndSubmit barrier barrierID
	MsgDoBarrier1 = "01"
	// MsgDoOp const for DoAndSubmit barrier op
	MsgDoOp = "msg"
	//MsgTopicPrefix const for Add topic msg
	MsgTopicPrefix = "topic://"

	// XaBarrier1 const for xa barrier id
	XaBarrier1 = "01"

	// ProtocolGRPC const for protocol grpc
	ProtocolGRPC = "grpc"
	// ProtocolHTTP const for protocol http
	ProtocolHTTP = "http"
)

Variables

View Source
var BarrierTableName = "dtm_barrier.barrier"

BarrierTableName the table name of barrier table

View Source
var ErrDuplicated = errors.New("DUPLICATED")

ErrDuplicated error of DUPLICATED for only msg if QueryPrepared executed before call. then DoAndSubmit return this error

View Source
var ErrFailure = errors.New("FAILURE")

ErrFailure error of FAILURE

View Source
var ErrOngoing = errors.New("ONGOING")

ErrOngoing error of ONGOING

View Source
var FatalIfError = logger.FatalIfError

FatalIfError fatal if error is not nil Deprecated: use logger.FatalIfError

View Source
var LogIfFatalf = logger.FatalfIf

LogIfFatalf fatal if cond is true Deprecated: use logger.FatalfIf

View Source
var LogRedf = logger.Errorf

LogRedf an alias of Errorf Deprecated: use logger.Errorf

View Source
var Logf = logger.Infof

Logf an alias of Infof Deprecated: use logger.Errorf

View Source
var MapFailure = map[string]interface{}{"dtm_result": ResultFailure}

MapFailure HTTP result of FAILURE

View Source
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}

MapSuccess HTTP result of SUCCESS

Functions

func AddRestyMiddlewares

func AddRestyMiddlewares(client *resty.Client)

AddRestyMiddlewares will add the middlewares used by dtm

func AsError

func AsError(x interface{}) error

AsError wrap a panic value as an error

func CatchP

func CatchP(f func()) (rerr error)

CatchP catch panic to error

func DBExec

func DBExec(dbType string, db DB, sql string, values ...interface{}) (affected int64, rerr error)

DBExec use raw db to exec

func DBExecForGoPg added in v0.0.4

func DBExecForGoPg(dbType string, db *pg.Tx, sql string, values ...interface{}) (affected int64, rerr error)

func DeferDo

func DeferDo(rerr *error, success func() error, fail func() error)

DeferDo a common defer do used in dtmcli/dtmgrpc

func E2P

func E2P(err error)

E2P error to panic

func Escape

func Escape(input string) string

Escape solve CodeQL reported problem

func EscapeGet

func EscapeGet(qs url.Values, key string) string

EscapeGet escape get

func GetCurrentDBType

func GetCurrentDBType() string

GetCurrentDBType get currentDBType

func GetDsn

func GetDsn(conf DBConf) string

GetDsn get dsn from map config

func GetFuncName

func GetFuncName() string

GetFuncName get current call func name

func GetRestyClient2

func GetRestyClient2(timeout time.Duration) *resty.Client

GetRestyClient2 will return a resty client with timeout set

func If

func If(condition bool, trueObj interface{}, falseObj interface{}) interface{}

If ternary operator

func InsertBarrier

func InsertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string, dbType string, barrierTableName string) (int64, error)

InsertBarrier insert a record to barrier

func InsertBarrierForGoPg added in v0.0.4

func InsertBarrierForGoPg(tx *pg.Tx, transType string, gid string, branchID string, op string, barrierID string, reason string, dbType string, barrierTableName string) (int64, error)

func MayReplaceLocalhost

func MayReplaceLocalhost(host string) string

MayReplaceLocalhost when run in docker compose, change localhost to host.docker.internal for accessing host network

func MustAtoi

func MustAtoi(s string) int

MustAtoi is string to int

func MustMarshal

func MustMarshal(v interface{}) []byte

MustMarshal checked version for marshal

func MustMarshalString

func MustMarshalString(v interface{}) string

MustMarshalString string version of MustMarshal

func MustRemarshal

func MustRemarshal(from interface{}, to interface{})

MustRemarshal marshal and unmarshal, and check error

func MustUnmarshal

func MustUnmarshal(b []byte, obj interface{})

MustUnmarshal checked version for unmarshal

func MustUnmarshalString

func MustUnmarshalString(s string, obj interface{})

MustUnmarshalString string version of MustUnmarshal

func OrString

func OrString(ss ...string) string

OrString return the first not empty string

func P2E

func P2E(perr *error)

P2E panic to error

func PanicIf

func PanicIf(cond bool, err error)

PanicIf name is clear

func PooledDB

func PooledDB(conf DBConf) (*sql.DB, error)

PooledDB get pooled sql.DB

func RespAsErrorByJSONRPC

func RespAsErrorByJSONRPC(resp *resty.Response) error

RespAsErrorByJSONRPC translate json rpc resty response to error

func SetCurrentDBType

func SetCurrentDBType(dbType string)

SetCurrentDBType set currentDBType

func StandaloneDB

func StandaloneDB(conf DBConf) (*sql.DB, error)

StandaloneDB get a standalone db instance

func TransCallDtm

func TransCallDtm(tb *TransBase, operation string) error

TransCallDtm is the short call for TransCallDtmExt

func TransCallDtmExt

func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error)

TransCallDtmExt TransBase call dtm

func TransRegisterBranch

func TransRegisterBranch(tb *TransBase, added map[string]string, operation string) error

TransRegisterBranch TransBase register a branch to dtm

func TransRequestBranch

func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error)

TransRequestBranch TransBase request branch result

func XaClose

func XaClose(db *sql.DB)

XaClose will log and close the db

func XaDB

func XaDB(conf DBConf) (*sql.DB, error)

XaDB return a standalone db instance for xa

func XaHandleGlobalTrans

func XaHandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error)

XaHandleGlobalTrans http/grpc GlobalTransaction shared func

func XaHandleLocalTrans

func XaHandleLocalTrans(xa *TransBase, dbConf DBConf, cb func(*sql.DB) error) (rerr error)

XaHandleLocalTrans public handler of LocalTransaction via http/grpc

func XaHandlePhase2

func XaHandlePhase2(gid string, dbConf DBConf, branchID string, op string) error

XaHandlePhase2 Handle the callback of commit/rollback

Types

type BranchIDGen

type BranchIDGen struct {
	BranchID string
	// contains filtered or unexported fields
}

BranchIDGen used to generate a sub branch id

func (*BranchIDGen) CurrentSubBranchID

func (g *BranchIDGen) CurrentSubBranchID() string

CurrentSubBranchID return current branchID

func (*BranchIDGen) NewSubBranchID

func (g *BranchIDGen) NewSubBranchID() string

NewSubBranchID generate a sub branch id

type DB

type DB interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}

DB inteface of dtmcli db

type DBConf

type DBConf struct {
	Driver   string `yaml:"Driver"`
	Host     string `yaml:"Host"`
	Port     int64  `yaml:"Port"`
	User     string `yaml:"User"`
	Password string `yaml:"Password"`
	Db       string `yaml:"Db"`
	Schema   string `yaml:"Schema"`
}

DBConf defines db config

type DBSpecial

type DBSpecial interface {
	GetPlaceHoldSQL(sql string) string
	GetInsertIgnoreTemplate(tableAndValues string, pgConstraint string) string
	GetXaSQL(command string, xid string) string
}

DBSpecial db specific operations

func GetDBSpecial

func GetDBSpecial(dbType string) DBSpecial

GetDBSpecial get DBSpecial for currentDBType

type TransBase

type TransBase struct {
	Gid        string `json:"gid"` //  NOTE: unique in storage, can customize the generation rules instead of using server-side generation, it will help with the tracking
	TransType  string `json:"trans_type"`
	Dtm        string `json:"-"`
	CustomData string `json:"custom_data,omitempty"` // nosql data persistence
	TransOptions
	Context context.Context `json:"-" gorm:"-"`

	Steps       []map[string]string `json:"steps,omitempty"`    // use in MSG/SAGA
	Payloads    []string            `json:"payloads,omitempty"` // used in MSG/SAGA
	BinPayloads [][]byte            `json:"-"`
	BranchIDGen `json:"-"`          // used in XA/TCC
	Op          string              `json:"-"` // used in XA/TCC

	QueryPrepared  string `json:"query_prepared,omitempty"` // used in MSG
	Protocol       string `json:"protocol"`
	RollbackReason string `json:"rollback_reason,omitempty" gorm:"-"`
}

TransBase base for all trans

func NewTransBase

func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase

NewTransBase new a TransBase

func TransBaseFromQuery

func TransBaseFromQuery(qs url.Values) *TransBase

TransBaseFromQuery construct transaction info from request

func (*TransBase) WithGlobalTransRequestTimeout

func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64)

WithGlobalTransRequestTimeout defines global trans request timeout

func (*TransBase) WithRetryLimit

func (t *TransBase) WithRetryLimit(retryLimit int64)

WithRetryLimit defines global trans retry limit

type TransOptions

type TransOptions struct {
	WaitResult     bool              `json:"wait_result,omitempty" gorm:"-"`
	TimeoutToFail  int64             `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc, unit: second
	RequestTimeout int64             `json:"request_timeout,omitempty" gorm:"-"` // for global trans resets request timeout, unit: second
	RetryInterval  int64             `json:"retry_interval,omitempty" gorm:"-"`  // for trans type: msg saga xa tcc, unit: second
	BranchHeaders  map[string]string `json:"branch_headers,omitempty" gorm:"-"`  // custom branch headers,  dtm server => service api
	Concurrent     bool              `json:"concurrent" gorm:"-"`                // for trans type: saga msg
	RetryLimit     int64             `json:"retry_limit,omitempty" gorm:"-"`     // for trans type: saga
	RetryCount     int64             `json:"retry_count,omitempty" gorm:"-"`     // for trans type: saga
}

TransOptions transaction options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL