Documentation
¶
Index ¶
- Constants
- func ConvertBsonD2M(input bson.D) (bson.M, map[string]struct{})
- func ConvertBsonD2MExcept(input bson.D, except map[string]struct{}) (bson.M, map[string]struct{})
- func ConvertBsonM2D(input bson.M) bson.D
- func ExtraCommandName(o bson.D) (string, bool)
- func GetIdOrNSFromOplog(log *PartialLog) interface{}
- func GetKey(log bson.D, wanted string) interface{}
- func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)
- func Hash(hashObject interface{}) uint32
- func IsRunOnAdminCommand(operation string) bool
- func IsSyncDataCommand(operation string) bool
- func LogEntryEncode(logs []*GenericOplog) [][]byte
- func ParseTimestampFromBson(intput []byte) bson.MongoTimestamp
- func RemoveFiled(input bson.D, key string) bson.D
- func SetFiled(input bson.D, key string, value interface{})
- type CommandOperation
- type Event
- type GenericOplog
- type Hasher
- type ParsedLog
- type PartialLog
- type PrimaryKeyHasher
- type TableHasher
- type WhiteListObjectIdHasher
Constants ¶
View Source
const ( // field in oplog OplogTsName = "ts" OplogOperationName = "op" OplogGidName = "g" // useless in change stream OplogNamespaceName = "ns" OplogObjectName = "o" OplogQueryName = "o2" OplogUniqueKeyName = "uk" // useless in change stream OplogLsidName = "lsid" OplogFromMigrateName = "fromMigrate" )
View Source
const ( ShardByID = "id" ShardByNamespace = "collection" ShardAutomatic = "auto" )
View Source
const (
DefaultHashValue = 0
)
View Source
const (
PrimaryKey = "_id"
)
Variables ¶
This section is empty.
Functions ¶
func ConvertBsonD2M ¶
convert bson.D to bson.M
func ConvertBsonD2MExcept ¶
func GetIdOrNSFromOplog ¶
func GetIdOrNSFromOplog(log *PartialLog) interface{}
func IsRunOnAdminCommand ¶
func IsSyncDataCommand ¶
func LogEntryEncode ¶
func LogEntryEncode(logs []*GenericOplog) [][]byte
func ParseTimestampFromBson ¶
func ParseTimestampFromBson(intput []byte) bson.MongoTimestamp
func RemoveFiled ¶
pay attention: the input bson.D will be modified.
Types ¶
type CommandOperation ¶
type CommandOperation struct {
// contains filtered or unexported fields
}
type Event ¶
type Event struct { Id bson.M `bson:"_id" json:"_id"` OperationType string `bson:"operationType" json:"operationType"` FullDocument bson.D `bson:"fullDocument,omitempty" json:"fullDocument,omitempty"` // exists on "insert", "replace", "delete", "update" Ns bson.M `bson:"ns" json:"ns"` To bson.M `bson:"to,omitempty" json:"to,omitempty"` DocumentKey bson.M `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists on "insert", "replace", "delete", "update" UpdateDescription bson.M `bson:"updateDescription,omitempty" json:"updateDescription,omitempty"` ClusterTime bson.MongoTimestamp `bson:"clusterTime,omitempty" json:"clusterTime,omitempty"` TxnNumber uint64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` Lsid bson.M `bson:"lsid,omitempty" json:"lsid,omitempty"` }
- example: { _id : { // 存储元信息 "_data" : <BinData|hex string> // resumeToken }, "operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate "fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update. 相当于原来的o字段 "ns" : { // 就是ns "db" : "<database>", "coll" : "<collection" }, "to" : { // 只在operationType==rename的时候有效,表示改名以后的ns "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update。正常只包含_id,对于sharded collection,还包括shard key。 "updateDescription" : { // 只在operationType==update的时候出现,相当于是增量的修改,而replace是替换。 "updatedFields" : { <document> }, // 更新的field的值 "removedFields" : [ "<field>", ... ] // 删除的field列表 }, "FullDocument" : { //永不为 nil "fullDocument" : { <document> }, // 开启full_document之后,为updateLookup,不开启则为default } "clusterTime" : <Timestamp>, // 相当于ts字段 "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增 "lsid" : { // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id。 "id" : <UUID>, "uid" : <BinData> } }
type GenericOplog ¶
type GenericOplog struct { Raw []byte Parsed *PartialLog }
func GatherApplyOps ¶
func GatherApplyOps(input []*PartialLog) (*GenericOplog, error)
type Hasher ¶
type Hasher interface {
DistributeOplogByMod(log *PartialLog, mod int) uint32
}
type ParsedLog ¶
type ParsedLog struct { Timestamp bson.MongoTimestamp `bson:"ts" json:"ts"` HistoryId int64 `bson:"h,omitempty" json:"h,omitempty"` Version int `bson:"v,omitempty" json:"v,omitempty"` Operation string `bson:"op" json:"op"` Gid string `bson:"g,omitempty" json:"g,omitempty"` Namespace string `bson:"ns" json:"ns"` Object bson.D `bson:"o" json:"o"` Query bson.M `bson:"o2" json:"o2"` UniqueIndexes bson.M `bson:"uk,omitempty" json:"uk,omitempty"` Lsid bson.M `bson:"lsid,omitempty" json:"lsid,omitempty"` // mark the session id, used in transaction FromMigrate bool `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk TxnNumber uint64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` // transaction number in session DocumentKey bson.M `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id }
type PartialLog ¶
type PartialLog struct { ParsedLog /* * Every field subsequent declared is NEVER persistent or * transfer on network connection. They only be parsed from * respective logic */ UniqueIndexesUpdates bson.M // generate by CollisionMatrix RawSize int // generate by Decorator SourceId int // generate by Validator }
func ConvertEvent2Oplog ¶
func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error)
func LogParsed ¶
func LogParsed(logs []*GenericOplog) []*PartialLog
func NewPartialLog ¶
func NewPartialLog(data bson.M) *PartialLog
func (*PartialLog) Dump ¶
func (partialLog *PartialLog) Dump(keys map[string]struct{}, all bool) bson.D
dump according to the given keys, "all" == true means ignore keys
func (*PartialLog) String ¶
func (partialLog *PartialLog) String() string
type PrimaryKeyHasher ¶
type PrimaryKeyHasher struct {
Hasher
}
******************************************* PrimaryKeyHasher
func (*PrimaryKeyHasher) DistributeOplogByMod ¶
func (objectIdHasher *PrimaryKeyHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32
we need to ensure that oplog entry will be sent to the same job[$hash] if they have the same ObjectID. thus we can consume the oplog entry sequentially
type TableHasher ¶
type TableHasher struct {
Hasher
}
******************************************* PrimaryKeyHasher
func (*TableHasher) DistributeOplogByMod ¶
func (collectionHasher *TableHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32
type WhiteListObjectIdHasher ¶
type WhiteListObjectIdHasher struct { Hasher TableHasher PrimaryKeyHasher // contains filtered or unexported fields }
******************************************* WhiteListObjectIdHasher: hash by collection in general, when hit white list, hash by _id
func NewWhiteListObjectIdHasher ¶
func NewWhiteListObjectIdHasher(whiteList []string) *WhiteListObjectIdHasher
func (*WhiteListObjectIdHasher) DistributeOplogByMod ¶
func (wloi *WhiteListObjectIdHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32
Click to show internal directories.
Click to hide internal directories.