dtmimp

package
v1.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2022 License: BSD-3-Clause Imports: 16 Imported by: 0

README

Notice

Please donot use this package, and this package should only be used in dtm internally. The interfaces are not stable, and package name has postfix "imp"

Documentation

Index

Constants

View Source
const (
	// ResultFailure for result of a trans/trans branch
	// Same as HTTP status 409 and GRPC code 10
	ResultFailure = "FAILURE"
	// ResultSuccess for result of a trans/trans branch
	// Same as HTTP status 200 and GRPC code 0
	ResultSuccess = "SUCCESS"
	// ResultOngoing for result of a trans/trans branch
	// Same as HTTP status 425 and GRPC code 9
	ResultOngoing = "ONGOING"

	// OpTry branch type for TCC
	OpTry = "try"
	// OpConfirm branch type for TCC
	OpConfirm = "confirm"
	// OpCancel branch type for TCC
	OpCancel = "cancel"
	// OpAction branch type for message, SAGA, XA
	OpAction = "action"
	// OpCompensate branch type for SAGA
	OpCompensate = "compensate"
	// OpCommit branch type for XA
	OpCommit = "commit"
	// OpRollback branch type for XA
	OpRollback = "rollback"

	// DBTypeMysql const for driver mysql
	DBTypeMysql = "mysql"
	// DBTypePostgres const for driver postgres
	DBTypePostgres = "postgres"
	// DBTypeRedis const for driver redis
	DBTypeRedis = "redis"
	// Jrpc const for json-rpc
	Jrpc = "json-rpc"
	// JrpcCodeFailure const for json-rpc failure
	JrpcCodeFailure = -32901

	// JrpcCodeOngoing const for json-rpc ongoing
	JrpcCodeOngoing = -32902

	// MsgDoBranch0 const for DoAndSubmit barrier branch
	MsgDoBranch0 = "00"
	// MsgDoBarrier1 const for DoAndSubmit barrier barrierID
	MsgDoBarrier1 = "01"
	// MsgDoOp const for DoAndSubmit barrier op
	MsgDoOp = "msg"

	// XaBarrier1 const for xa barrier id
	XaBarrier1 = "01"

	// ProtocolGRPC const for protocol grpc
	ProtocolGRPC = "grpc"
	// ProtocolHTTP const for protocol http
	ProtocolHTTP = "http"
)

Variables

View Source
var BarrierTableName = "dtm_barrier.barrier"

BarrierTableName the table name of barrier table

View Source
var ErrDuplicated = errors.New("DUPLICATED")

ErrDuplicated error of DUPLICATED for only msg if QueryPrepared executed before call. then DoAndSubmit return this error

View Source
var ErrFailure = errors.New("FAILURE")

ErrFailure error of FAILURE

View Source
var ErrOngoing = errors.New("ONGOING")

ErrOngoing error of ONGOING

View Source
var FatalIfError = logger.FatalIfError

FatalIfError fatal if error is not nil Deprecated: use logger.FatalIfError

View Source
var LogIfFatalf = logger.FatalfIf

LogIfFatalf fatal if cond is true Deprecated: use logger.FatalfIf

View Source
var LogRedf = logger.Errorf

LogRedf an alias of Errorf Deprecated: use logger.Errorf

View Source
var Logf = logger.Infof

Logf an alias of Infof Deprecated: use logger.Errorf

View Source
var MapFailure = map[string]interface{}{"dtm_result": ResultFailure}

MapFailure HTTP result of FAILURE

View Source
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}

MapSuccess HTTP result of SUCCESS

View Source
var PassthroughHeaders = []string{}

PassthroughHeaders will be passed to every sub-trans call

View Source
var RestyClient = resty.New()

RestyClient the resty object

View Source
var XaSQLTimeoutMs = 15000

XaSQLTimeoutMs milliseconds for Xa sql to timeout

Functions

func AsError

func AsError(x interface{}) error

AsError wrap a panic value as an error

func CatchP

func CatchP(f func()) (rerr error)

CatchP catch panic to error

func DBExec

func DBExec(dbType string, db DB, sql string, values ...interface{}) (affected int64, rerr error)

DBExec use raw db to exec

func DeferDo

func DeferDo(rerr *error, success func() error, fail func() error)

DeferDo a common defer do used in dtmcli/dtmgrpc

func E2P

func E2P(err error)

E2P error to panic

func Escape

func Escape(input string) string

Escape solve CodeQL reported problem

func EscapeGet

func EscapeGet(qs url.Values, key string) string

EscapeGet escape get

func GetCurrentDBType

func GetCurrentDBType() string

GetCurrentDBType get currentDBType

func GetDsn

func GetDsn(conf DBConf) string

GetDsn get dsn from map config

func GetFuncName

func GetFuncName() string

GetFuncName get current call func name

func If

func If(condition bool, trueObj interface{}, falseObj interface{}) interface{}

If ternary operator

func InsertBarrier

func InsertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string, dbType string, barrierTableName string) (int64, error)

InsertBarrier insert a record to barrier

func MayReplaceLocalhost

func MayReplaceLocalhost(host string) string

MayReplaceLocalhost when run in docker compose, change localhost to host.docker.internal for accessing host network

func MustAtoi

func MustAtoi(s string) int

MustAtoi is string to int

func MustMarshal

func MustMarshal(v interface{}) []byte

MustMarshal checked version for marshal

func MustMarshalString

func MustMarshalString(v interface{}) string

MustMarshalString string version of MustMarshal

func MustRemarshal

func MustRemarshal(from interface{}, to interface{})

MustRemarshal marshal and unmarshal, and check error

func MustUnmarshal

func MustUnmarshal(b []byte, obj interface{})

MustUnmarshal checked version for unmarshal

func MustUnmarshalString

func MustUnmarshalString(s string, obj interface{})

MustUnmarshalString string version of MustUnmarshal

func OrString

func OrString(ss ...string) string

OrString return the first not empty string

func P2E

func P2E(perr *error)

P2E panic to error

func PanicIf

func PanicIf(cond bool, err error)

PanicIf name is clear

func PooledDB

func PooledDB(conf DBConf) (*sql.DB, error)

PooledDB get pooled sql.DB

func RespAsErrorByT

func RespAsErrorByT(resp *resty.Response) error

RespAsErrorByT translate a Takumi response to error

func RespAsErrorCompatible

func RespAsErrorCompatible(resp *resty.Response) error

RespAsErrorCompatible translate a resty response to error compatible with version < v1.10

func SetCurrentDBType

func SetCurrentDBType(dbType string)

SetCurrentDBType set currentDBType

func StandaloneDB

func StandaloneDB(conf DBConf) (*sql.DB, error)

StandaloneDB get a standalone db instance

func TransCallDtm

func TransCallDtm(tb *TransBase, body interface{}, operation string) error

TransCallDtm TransBase call dtm

func TransRegisterBranch

func TransRegisterBranch(tb *TransBase, added map[string]string, operation string) error

TransRegisterBranch TransBase register a branch to dtm

func TransRequestBranch

func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error)

TransRequestBranch TransBase request branch result

func XaDB

func XaDB(conf DBConf) (*sql.DB, error)

XaDB return a standalone db instance for xa

func XaHandleGlobalTrans

func XaHandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error)

XaHandleGlobalTrans http/grpc GlobalTransaction shared func

func XaHandleLocalTrans

func XaHandleLocalTrans(xa *TransBase, dbConf DBConf, cb func(*sql.DB) error) (rerr error)

XaHandleLocalTrans public handler of LocalTransaction via http/grpc

func XaHandlePhase2

func XaHandlePhase2(gid string, dbConf DBConf, branchID string, op string) error

XaHandlePhase2 Handle the callback of commit/rollback

Types

type BranchIDGen

type BranchIDGen struct {
	BranchID string
	// contains filtered or unexported fields
}

BranchIDGen used to generate a sub branch id

func (*BranchIDGen) CurrentSubBranchID

func (g *BranchIDGen) CurrentSubBranchID() string

CurrentSubBranchID return current branchID

func (*BranchIDGen) NewSubBranchID

func (g *BranchIDGen) NewSubBranchID() string

NewSubBranchID generate a sub branch id

type DB

type DB interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}

DB inteface of dtmcli db

type DBConf

type DBConf struct {
	Driver   string `yaml:"Driver"`
	Host     string `yaml:"Host"`
	Port     int64  `yaml:"Port"`
	User     string `yaml:"User"`
	Password string `yaml:"Password"`
	Db       string `yaml:"Db"`
}

DBConf defines db config

type DBSpecial

type DBSpecial interface {
	GetPlaceHoldSQL(sql string) string
	GetInsertIgnoreTemplate(tableAndValues string, pgConstraint string) string
	GetXaSQL(command string, xid string) string
}

DBSpecial db specific operations

func GetDBSpecial

func GetDBSpecial(dbType string) DBSpecial

GetDBSpecial get DBSpecial for currentDBType

type TResp

type TResp struct {
	RetCode int32  `json:"retcode"`
	Message string `json:"message"`
}

type TransBase

type TransBase struct {
	Gid        string `json:"gid"` //  NOTE: unique in storage, can customize the generation rules instead of using server-side generation, it will help with the tracking
	TransType  string `json:"trans_type"`
	Dtm        string `json:"-"`
	CustomData string `json:"custom_data,omitempty"` // nosql data persistence
	TransOptions
	Context context.Context `json:"-" gorm:"-"`

	Steps       []map[string]string `json:"steps,omitempty"`    // use in MSG/SAGA
	Payloads    []string            `json:"payloads,omitempty"` // used in MSG/SAGA
	BinPayloads [][]byte            `json:"-"`
	BranchIDGen `json:"-"`          // used in XA/TCC
	Op          string              `json:"-"` // used in XA/TCC

	QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG
	Protocol      string `json:"protocol"`
}

TransBase base for all trans

func NewTransBase

func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase

NewTransBase new a TransBase

func TransBaseFromQuery

func TransBaseFromQuery(qs url.Values) *TransBase

TransBaseFromQuery construct transaction info from request

func (*TransBase) WithGlobalTransRequestTimeout

func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64)

WithGlobalTransRequestTimeout defines global trans request timeout

type TransOptions

type TransOptions struct {
	WaitResult         bool              `json:"wait_result,omitempty" gorm:"-"`
	TimeoutToFail      int64             `json:"timeout_to_fail,omitempty" gorm:"-"`     // for trans type: xa, tcc, unit: second
	RequestTimeout     int64             `json:"request_timeout,omitempty" gorm:"-"`     // for global trans resets request timeout, unit: second
	RetryInterval      int64             `json:"retry_interval,omitempty" gorm:"-"`      // for trans type: msg saga xa tcc, unit: second
	PassthroughHeaders []string          `json:"passthrough_headers,omitempty" gorm:"-"` // for inherit the specified gin context headers
	BranchHeaders      map[string]string `json:"branch_headers,omitempty" gorm:"-"`      // custom branch headers,  dtm server => service api
	Concurrent         bool              `json:"concurrent" gorm:"-"`                    // for trans type: saga msg
}

TransOptions transaction options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL