Documentation
¶
Overview ¶
Package txn implements functions for examining and processing transaction oplog entries.
Index ¶
- Constants
- Variables
- func BuildUpdateDelteOplog(prefixField string, obj bson.D) (interface{}, error)
- func ConvertBsonD2M(input bson.D) (bson.M, map[string]struct{})
- func ConvertBsonD2MExcept(input bson.D, except map[string]struct{}) (bson.M, map[string]struct{})
- func ConvertBsonM2D(input bson.M) bson.D
- func DiffUpdateOplogToNormal(updateObj bson.D) (interface{}, error)
- func ExtraCommandName(o bson.D) (string, bool)
- func FindFiledPrefix(input bson.D, prefix string) bool
- func GetIdOrNSFromOplog(log *PartialLog) interface{}
- func GetKey(log bson.D, wanted string) interface{}
- func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)
- func Hash(hashObject interface{}) uint32
- func IsNeedFilterCommand(operation string) bool
- func IsRunOnAdminCommand(operation string) bool
- func IsSyncDataCommand(operation string) bool
- func LogEntryEncode(logs []*GenericOplog) [][]byte
- func RemoveFiled(input bson.D, key string) bson.D
- func SetFiled(input bson.D, key string, value interface{})
- func TimestampGreaterThan(lhs, rhs primitive.Timestamp) bool
- func TimestampLessThan(lhs, rhs primitive.Timestamp) bool
- func TxnOpTimeEquals(lhs TxnOpTime, rhs TxnOpTime) bool
- func TxnOpTimeGreaterThan(lhs TxnOpTime, rhs TxnOpTime) bool
- func TxnOpTimeIsEmpty(opTime TxnOpTime) bool
- func TxnOpTimeLessThan(lhs TxnOpTime, rhs TxnOpTime) bool
- type CommandOperation
- type Event
- type GenericOplog
- type Hasher
- type ParsedLog
- type PartialLog
- type PrimaryKeyHasher
- type TableHasher
- type TxnBuffer
- type TxnID
- type TxnMeta
- type TxnOpTime
- type WhiteListObjectIdHasher
Constants ¶
const ( // field in oplog OplogTsName = "ts" OplogOperationName = "op" OplogGidName = "g" // useless in change stream OplogNamespaceName = "ns" OplogObjectName = "o" OplogQueryName = "o2" OplogUniqueKeyName = "uk" // useless in change stream OplogLsidName = "lsid" OplogFromMigrateName = "fromMigrate" )
const ( ShardByID = "id" ShardByNamespace = "collection" ShardAutomatic = "auto" )
const (
DefaultHashValue = 0
)
const (
PrimaryKey = "_id"
)
Variables ¶
var ErrBufferClosed = errors.New("transaction buffer already closed")
var ErrNotTransaction = errors.New("oplog entry is not a transaction")
var ErrTxnAborted = errors.New("transaction aborted")
Functions ¶
func BuildUpdateDelteOplog ¶
func ConvertBsonD2M ¶
convert bson.D to bson.M
func ConvertBsonD2MExcept ¶
func DiffUpdateOplogToNormal ¶
"o" : { "$v" : 2, "diff" : { "d" : { "count" : false }, "u" : { "name" : "orange" }, "i" : { "c" : 11 } } }
func GetIdOrNSFromOplog ¶
func GetIdOrNSFromOplog(log *PartialLog) interface{}
func IsNeedFilterCommand ¶
func IsRunOnAdminCommand ¶
func IsSyncDataCommand ¶
func LogEntryEncode ¶
func LogEntryEncode(logs []*GenericOplog) [][]byte
func RemoveFiled ¶
pay attention: the input bson.D will be modified.
func TimestampGreaterThan ¶
TimestampGreaterThan returns true if lhs comes after rhs, false otherwise.
func TimestampLessThan ¶
TimestampLessThan returns true if lhs comes before rhs, false otherwise.
func TxnOpTimeEquals ¶
TxnOpTimeEquals returns true if lhs equals rhs, false otherwise. We first check for nil / not nil mismatches between the terms and between the hashes. Then we check for equality between the terms and between the hashes (if they exist) before checking the timestamps.
func TxnOpTimeGreaterThan ¶
TxnOpTimeGreaterThan returns true if lhs comes after rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.
func TxnOpTimeIsEmpty ¶
TxnOpTimeIsEmpty returns true if opTime is uninitialized, false otherwise.
func TxnOpTimeLessThan ¶
TxnOpTimeLessThan returns true if lhs comes before rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.
Types ¶
type CommandOperation ¶
type CommandOperation struct {
// contains filtered or unexported fields
}
type Event ¶
type Event struct { Id bson.M `bson:"_id" json:"_id"` OperationType string `bson:"operationType" json:"operationType"` FullDocument bson.D `bson:"fullDocument,omitempty" json:"fullDocument,omitempty"` // exists on "insert", "replace", "delete", "update" Ns bson.M `bson:"ns" json:"ns"` To bson.M `bson:"to,omitempty" json:"to,omitempty"` DocumentKey bson.D `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists on "insert", "replace", "delete", "update" UpdateDescription bson.M `bson:"updateDescription,omitempty" json:"updateDescription,omitempty"` ClusterTime primitive.Timestamp `bson:"clusterTime,omitempty" json:"clusterTime,omitempty"` TxnNumber *int64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` LSID bson.Raw `bson:"lsid,omitempty" json:"lsid,omitempty"` }
- example: { _id : { // 存储元信息 "_data" : <BinData|hex string> // resumeToken }, "operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate "fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update. 相当于原来的o字段 "ns" : { // 就是ns "db" : "<database>", "coll" : "<collection" }, "to" : { // 只在operationType==rename的时候有效,表示改名以后的ns "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update。正常只包含_id,对于sharded collection,还包括shard key。 "updateDescription" : { // 只在operationType==update的时候出现,相当于是增量的修改,而replace是替换。 "updatedFields" : { <document> }, // 更新的field的值 "removedFields" : [ "<field>", ... ] // 删除的field列表 }, "FullDocument" : { //永不为 nil "fullDocument" : { <document> }, // 开启full_document之后,为updateLookup,不开启则为default } "clusterTime" : <Timestamp>, // 相当于ts字段 "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增 "lsid" : { // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id。 "id" : <UUID>, "uid" : <BinData> } }
type GenericOplog ¶
type GenericOplog struct { Raw []byte Parsed *PartialLog }
func GatherApplyOps ¶
func GatherApplyOps(input []*PartialLog) (*GenericOplog, error)
type Hasher ¶
type Hasher interface {
DistributeOplogByMod(log *PartialLog, mod int) uint32
}
type ParsedLog ¶
type ParsedLog struct { Timestamp primitive.Timestamp `bson:"ts" json:"ts"` Term *int64 `bson:"t" json:"t"` Hash *int64 `bson:"h" json:"h"` Version int `bson:"v" json:"v"` Operation string `bson:"op" json:"op"` Gid string `bson:"g,omitempty" json:"g,omitempty"` Namespace string `bson:"ns" json:"ns"` Object bson.D `bson:"o" json:"o"` Query bson.D `bson:"o2" json:"o2"` // update condition UniqueIndexes bson.M `bson:"uk,omitempty" json:"uk,omitempty"` // LSID bson.Raw `bson:"lsid,omitempty" json:"lsid,omitempty"` // mark the session id, used in transaction FromMigrate bool `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk TxnNumber *int64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` // transaction number in session DocumentKey bson.D `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id PrevOpTime bson.Raw `bson:"prevOpTime,omitempty"` UI *primitive.Binary `bson:"ui,omitempty" json:"ui,omitempty"` // do not enable currently }
func ExtractInnerOps ¶
ExtractInnerOps doc.applyOps[i].ts(Let ckpt use the last ts to judge complete)
applyOps[0 - n-1].ts = doc.ts - 1 applyOps[n-1].ts = doc.ts
type PartialLog ¶
type PartialLog struct { ParsedLog /* * Every field subsequent declared is NEVER persistent or * transfer on network connection. They only be parsed from * respective logic */ UniqueIndexesUpdates bson.M // generate by CollisionMatrix RawSize int // generate by Decorator SourceId int // generate by Validator }
func ConvertEvent2Oplog ¶
func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error)
func LogParsed ¶
func LogParsed(logs []*GenericOplog) []*PartialLog
func NewPartialLog ¶
func NewPartialLog(data bson.M) *PartialLog
func (*PartialLog) Dump ¶
func (partialLog *PartialLog) Dump(keys map[string]struct{}, all bool) bson.D
dump according to the given keys, "all" == true means ignore keys
func (*PartialLog) String ¶
func (partialLog *PartialLog) String() string
type PrimaryKeyHasher ¶
type PrimaryKeyHasher struct {
Hasher
}
******************************************* PrimaryKeyHasher
func (*PrimaryKeyHasher) DistributeOplogByMod ¶
func (objectIdHasher *PrimaryKeyHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32
we need to ensure that oplog entry will be sent to the same job[$hash] if they have the same ObjectID. thus we can consume the oplog entry sequentially
type TableHasher ¶
type TableHasher struct {
Hasher
}
******************************************* PrimaryKeyHasher
func (*TableHasher) DistributeOplogByMod ¶
func (collectionHasher *TableHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32
type TxnBuffer ¶
TxnBuffer stores transaction oplog entries until they are needed to commit them to a desination. It includes a WaitGroup for tracking all goroutines across all transactions for use in global shutdown.
func (*TxnBuffer) AddOp ¶
AddOp sends a transaction oplog entry to a background goroutine (starting one for a new transaction TxnID) for asynchronous pre-processing and storage. If the oplog entry is not a transaction, an error will be returned. Any errors during processing can be discovered later via the error channel from `GetTxnStream`.
Must not be called concurrently with other transaction-related operations. Must not be called for a given transaction after starting to stream that transaction.
func (*TxnBuffer) GetTxnStream ¶
GetTxnStream returns a channel of Oplog entries in a transaction and a channel for errors. If the buffer has been stopped, the returned op channel will be closed and the error channel will have an error on it.
Must not be called concurrently with other transaction-related operations. For a given transaction, it must not be called until after a final oplog entry has been passed to AddOp and it must not be called more than once.
func (*TxnBuffer) OldestOpTime ¶
OldestOpTime returns the optime of the oldest buffered transaction, or an empty optime if no transactions are buffered. This will include committed transactions until they are purged.
func (*TxnBuffer) PurgeTxn ¶
PurgeTxn closes any transaction streams in progress and deletes all oplog entries associated with a transaction.
Must not be called concurrently with other transaction-related operations. For a given transaction, it must not be called until after a final oplog entry has been passed to AddOp and it must not be called more than once.
type TxnID ¶
type TxnID struct {
// contains filtered or unexported fields
}
TxnID wraps fields needed to uniquely identify a transaction for use as a map key. The 'lsid' is a string rather than bson.Raw or []byte so that this type is a valid map key.
type TxnMeta ¶
type TxnMeta struct {
// contains filtered or unexported fields
}
TxnMeta holds information extracted from an oplog entry for later routing logic. Zero value means 'not a transaction'. We store 'prevOpTime' as string so the struct is comparable.
func NewTxnMeta ¶
NewTxnMeta extracts transaction metadata from an oplog entry. A non-transaction will return a zero-value TxnMeta struct, not an error.
Currently there is no way for this to error, but that may change in the future if we change the db.Oplog.Object to bson.Raw, so the API is designed with failure as a possibility.
func (TxnMeta) IsCommit ¶
IsCommit is true if the oplog entry was an abort command or was the final entry of an unprepared transaction.
func (TxnMeta) IsCommitOp ¶
IsCommitOp is commitTransaction oplog
func (TxnMeta) IsFinal ¶
IsFinal is true if the oplog entry is the closing entry of a transaction, i.e. if IsAbort or IsCommit is true.
func (TxnMeta) IsMultiOp ¶
IsMultiOp is true if the oplog entry is part of a prepared and/or large transaction.
type TxnOpTime ¶
type TxnOpTime struct { Timestamp primitive.Timestamp `json:"timestamp"` Term *int64 `json:"term"` Hash *int64 `json:"hash"` }
TxnOpTime represents the values to uniquely identify an oplog entry. An TxnOpTime must always have a timestamp, but may or may not have a term. The hash is set uniquely up until (and including) version 4.0, but is set to zero in version 4.2+ with plans to remove it soon (see SERVER-36334).
func GetTxnOpTimeFromOplogEntry ¶
GetTxnOpTimeFromOplogEntry returns an TxnOpTime struct from the relevant fields in an ParsedLog struct.
type WhiteListObjectIdHasher ¶
type WhiteListObjectIdHasher struct { Hasher TableHasher PrimaryKeyHasher // contains filtered or unexported fields }
******************************************* WhiteListObjectIdHasher: hash by collection in general, when hit white list, hash by _id
func NewWhiteListObjectIdHasher ¶
func NewWhiteListObjectIdHasher(whiteList []string) *WhiteListObjectIdHasher
func (*WhiteListObjectIdHasher) DistributeOplogByMod ¶
func (wloi *WhiteListObjectIdHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32