Documentation ¶
Index ¶
- Constants
- Variables
- func AddRestyMiddlewares(client *resty.Client)
- func AsError(x interface{}) error
- func CatchP(f func()) (rerr error)
- func DBExec(dbType string, db DB, sql string, values ...interface{}) (affected int64, rerr error)
- func DeferDo(rerr *error, success func() error, fail func() error)
- func E2P(err error)
- func Escape(input string) string
- func EscapeGet(qs url.Values, key string) string
- func GetCurrentDBType() string
- func GetDsn(conf DBConf) string
- func GetFuncName() string
- func GetRestyClient2(timeout time.Duration) *resty.Client
- func If(condition bool, trueObj interface{}, falseObj interface{}) interface{}
- func InsertBarrier(tx DB, transType string, gid string, branchID string, op string, ...) (int64, error)
- func MayReplaceLocalhost(host string) string
- func MustAtoi(s string) int
- func MustMarshal(v interface{}) []byte
- func MustMarshalString(v interface{}) string
- func MustRemarshal(from interface{}, to interface{})
- func MustUnmarshal(b []byte, obj interface{})
- func MustUnmarshalString(s string, obj interface{})
- func OrString(ss ...string) string
- func P2E(perr *error)
- func PanicIf(cond bool, err error)
- func PooledDB(conf DBConf) (*sql.DB, error)
- func RespAsErrorByJSONRPC(resp *resty.Response) error
- func SetCurrentDBType(dbType string)
- func StandaloneDB(conf DBConf) (*sql.DB, error)
- func TransCallDtm(tb *TransBase, operation string) error
- func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error)
- func TransRegisterBranch(tb *TransBase, added map[string]string, operation string) error
- func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, ...) (*resty.Response, error)
- func XaClose(db *sql.DB)
- func XaDB(conf DBConf) (*sql.DB, error)
- func XaHandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error)
- func XaHandleLocalTrans(xa *TransBase, dbConf DBConf, cb func(*sql.DB) error) (rerr error)
- func XaHandlePhase2(gid string, dbConf DBConf, branchID string, op string) error
- type BranchIDGen
- type DB
- type DBConf
- type DBSpecial
- type TransBase
- type TransOptions
Constants ¶
const ( // ResultFailure for result of a trans/trans branch // Same as HTTP status 409 and GRPC code 10 ResultFailure = "FAILURE" // ResultSuccess for result of a trans/trans branch // Same as HTTP status 200 and GRPC code 0 ResultSuccess = "SUCCESS" // ResultOngoing for result of a trans/trans branch // Same as HTTP status 425 and GRPC code 9 ResultOngoing = "ONGOING" // OpTry branch type for TCC OpTry = "try" // OpConfirm branch type for TCC OpConfirm = "confirm" // OpCancel branch type for TCC OpCancel = "cancel" // OpAction branch type for message, SAGA, XA OpAction = "action" // OpCompensate branch type for SAGA OpCompensate = "compensate" // OpCommit branch type for XA OpCommit = "commit" // OpRollback branch type for XA OpRollback = "rollback" // DBTypeMysql const for driver mysql DBTypeMysql = "mysql" // DBTypePostgres const for driver postgres DBTypePostgres = "postgres" // DBTypeRedis const for driver redis DBTypeRedis = "redis" // Jrpc const for json-rpc Jrpc = "json-rpc" // JrpcCodeFailure const for json-rpc failure JrpcCodeFailure = -32901 // JrpcCodeOngoing const for json-rpc ongoing JrpcCodeOngoing = -32902 // MsgDoBranch0 const for DoAndSubmit barrier branch MsgDoBranch0 = "00" // MsgDoBarrier1 const for DoAndSubmit barrier barrierID MsgDoBarrier1 = "01" // MsgDoOp const for DoAndSubmit barrier op MsgDoOp = "msg" //MsgTopicPrefix const for Add topic msg MsgTopicPrefix = "topic://" // XaBarrier1 const for xa barrier id XaBarrier1 = "01" // ProtocolGRPC const for protocol grpc ProtocolGRPC = "grpc" // ProtocolHTTP const for protocol http ProtocolHTTP = "http" )
Variables ¶
var BarrierTableName = "dtm_barrier.barrier"
BarrierTableName the table name of barrier table
var ErrDuplicated = errors.New("DUPLICATED")
ErrDuplicated error of DUPLICATED for only msg if QueryPrepared executed before call. then DoAndSubmit return this error
var ErrFailure = errors.New("FAILURE")
ErrFailure error of FAILURE
var ErrOngoing = errors.New("ONGOING")
ErrOngoing error of ONGOING
var FatalIfError = logger.FatalIfError
FatalIfError fatal if error is not nil Deprecated: use logger.FatalIfError
var LogIfFatalf = logger.FatalfIf
LogIfFatalf fatal if cond is true Deprecated: use logger.FatalfIf
var LogRedf = logger.Errorf
LogRedf an alias of Errorf Deprecated: use logger.Errorf
var Logf = logger.Infof
Logf an alias of Infof Deprecated: use logger.Errorf
var MapFailure = map[string]interface{}{"dtm_result": ResultFailure}
MapFailure HTTP result of FAILURE
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}
MapSuccess HTTP result of SUCCESS
Functions ¶
func AddRestyMiddlewares ¶
func AddRestyMiddlewares(client *resty.Client)
AddRestyMiddlewares will add the middlewares used by dtm
func GetRestyClient2 ¶
GetRestyClient2 will return a resty client with timeout set
func If ¶
func If(condition bool, trueObj interface{}, falseObj interface{}) interface{}
If ternary operator
func InsertBarrier ¶
func InsertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string, dbType string, barrierTableName string) (int64, error)
InsertBarrier insert a record to barrier
func MayReplaceLocalhost ¶
MayReplaceLocalhost when run in docker compose, change localhost to host.docker.internal for accessing host network
func MustMarshalString ¶
func MustMarshalString(v interface{}) string
MustMarshalString string version of MustMarshal
func MustRemarshal ¶
func MustRemarshal(from interface{}, to interface{})
MustRemarshal marshal and unmarshal, and check error
func MustUnmarshal ¶
func MustUnmarshal(b []byte, obj interface{})
MustUnmarshal checked version for unmarshal
func MustUnmarshalString ¶
func MustUnmarshalString(s string, obj interface{})
MustUnmarshalString string version of MustUnmarshal
func RespAsErrorByJSONRPC ¶
func RespAsErrorByJSONRPC(resp *resty.Response) error
RespAsErrorByJSONRPC translate json rpc resty response to error
func StandaloneDB ¶
StandaloneDB get a standalone db instance
func TransCallDtm ¶
TransCallDtm is the short call for TransCallDtmExt
func TransCallDtmExt ¶
TransCallDtmExt TransBase call dtm
func TransRegisterBranch ¶
TransRegisterBranch TransBase register a branch to dtm
func TransRequestBranch ¶
func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error)
TransRequestBranch TransBase request branch result
func XaHandleGlobalTrans ¶
func XaHandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error)
XaHandleGlobalTrans http/grpc GlobalTransaction shared func
func XaHandleLocalTrans ¶
XaHandleLocalTrans public handler of LocalTransaction via http/grpc
Types ¶
type BranchIDGen ¶
type BranchIDGen struct { BranchID string // contains filtered or unexported fields }
BranchIDGen used to generate a sub branch id
func (*BranchIDGen) CurrentSubBranchID ¶
func (g *BranchIDGen) CurrentSubBranchID() string
CurrentSubBranchID return current branchID
func (*BranchIDGen) NewSubBranchID ¶
func (g *BranchIDGen) NewSubBranchID() string
NewSubBranchID generate a sub branch id
type DB ¶
type DB interface { Exec(query string, args ...interface{}) (sql.Result, error) QueryRow(query string, args ...interface{}) *sql.Row }
DB inteface of dtmcli db
type DBConf ¶
type DBConf struct { Driver string `yaml:"Driver"` Host string `yaml:"Host"` Port int64 `yaml:"Port"` User string `yaml:"User"` Password string `yaml:"Password"` Db string `yaml:"Db"` Schema string `yaml:"Schema"` }
DBConf defines db config
type DBSpecial ¶
type DBSpecial interface { GetPlaceHoldSQL(sql string) string GetInsertIgnoreTemplate(tableAndValues string, pgConstraint string) string GetXaSQL(command string, xid string) string }
DBSpecial db specific operations
func GetDBSpecial ¶
GetDBSpecial get DBSpecial for currentDBType
type TransBase ¶
type TransBase struct { Gid string `json:"gid"` // NOTE: unique in storage, can customize the generation rules instead of using server-side generation, it will help with the tracking TransType string `json:"trans_type"` Dtm string `json:"-"` CustomData string `json:"custom_data,omitempty"` // nosql data persistence TransOptions Context context.Context `json:"-" gorm:"-"` Steps []map[string]string `json:"steps,omitempty"` // use in MSG/SAGA Payloads []string `json:"payloads,omitempty"` // used in MSG/SAGA BinPayloads [][]byte `json:"-"` BranchIDGen `json:"-"` // used in XA/TCC Op string `json:"-"` // used in XA/TCC QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG Protocol string `json:"protocol"` RollbackReason string `json:"rollback_reason,omitempty" gorm:"-"` }
TransBase base for all trans
func NewTransBase ¶
NewTransBase new a TransBase
func TransBaseFromQuery ¶
TransBaseFromQuery construct transaction info from request
func (*TransBase) WithGlobalTransRequestTimeout ¶
WithGlobalTransRequestTimeout defines global trans request timeout
func (*TransBase) WithRetryLimit ¶
WithRetryLimit defines global trans retry limit
type TransOptions ¶
type TransOptions struct { WaitResult bool `json:"wait_result,omitempty" gorm:"-"` TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc, unit: second RequestTimeout int64 `json:"request_timeout,omitempty" gorm:"-"` // for global trans resets request timeout, unit: second RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg RetryLimit int64 `json:"retry_limit,omitempty" gorm:"-"` // for trans type: saga RetryCount int64 `json:"retry_count,omitempty" gorm:"-"` // for trans type: saga }
TransOptions transaction options