oplog

package
v0.0.0-...-d253ebd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2021 License: GPL-3.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// field in oplog
	OplogTsName          = "ts"
	OplogOperationName   = "op"
	OplogGidName         = "g" // useless in change stream
	OplogNamespaceName   = "ns"
	OplogObjectName      = "o"
	OplogQueryName       = "o2"
	OplogUniqueKeyName   = "uk" // useless in change stream
	OplogLsidName        = "lsid"
	OplogFromMigrateName = "fromMigrate"
)
View Source
const (
	ShardByID        = "id"
	ShardByNamespace = "collection"
	ShardAutomatic   = "auto"
)
View Source
const (
	DefaultHashValue = 0
)
View Source
const (
	PrimaryKey = "_id"
)

Variables

This section is empty.

Functions

func ConvertBsonD2M

func ConvertBsonD2M(input bson.D) (bson.M, map[string]struct{})

convert bson.D to bson.M

func ConvertBsonD2MExcept

func ConvertBsonD2MExcept(input bson.D, except map[string]struct{}) (bson.M, map[string]struct{})

func ConvertBsonM2D

func ConvertBsonM2D(input bson.M) bson.D

func ExtraCommandName

func ExtraCommandName(o bson.D) (string, bool)

func GetIdOrNSFromOplog

func GetIdOrNSFromOplog(log *PartialLog) interface{}

func GetKey

func GetKey(log bson.D, wanted string) interface{}

func GetKeyWithIndex

func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)

func Hash

func Hash(hashObject interface{}) uint32

func IsRunOnAdminCommand

func IsRunOnAdminCommand(operation string) bool

func IsSyncDataCommand

func IsSyncDataCommand(operation string) bool

func LogEntryEncode

func LogEntryEncode(logs []*GenericOplog) [][]byte

func ParseTimestampFromBson

func ParseTimestampFromBson(intput []byte) bson.MongoTimestamp

func RemoveFiled

func RemoveFiled(input bson.D, key string) bson.D

pay attention: the input bson.D will be modified.

func SetFiled

func SetFiled(input bson.D, key string, value interface{})

Types

type CommandOperation

type CommandOperation struct {
	// contains filtered or unexported fields
}

type Event

type Event struct {
	Id                bson.M              `bson:"_id" json:"_id"`
	OperationType     string              `bson:"operationType" json:"operationType"`
	FullDocument      bson.D              `bson:"fullDocument,omitempty" json:"fullDocument,omitempty"` // exists on "insert", "replace", "delete", "update"
	Ns                bson.M              `bson:"ns" json:"ns"`
	To                bson.M              `bson:"to,omitempty" json:"to,omitempty"`
	DocumentKey       bson.M              `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists on "insert", "replace", "delete", "update"
	UpdateDescription bson.M              `bson:"updateDescription,omitempty" json:"updateDescription,omitempty"`
	ClusterTime       bson.MongoTimestamp `bson:"clusterTime,omitempty" json:"clusterTime,omitempty"`
	TxnNumber         uint64              `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"`
	Lsid              bson.M              `bson:"lsid,omitempty" json:"lsid,omitempty"`
}
  • example: { _id : { // 存储元信息 "_data" : <BinData|hex string> // resumeToken }, "operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate "fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update. 相当于原来的o字段 "ns" : { // 就是ns "db" : "<database>", "coll" : "<collection" }, "to" : { // 只在operationType==rename的时候有效,表示改名以后的ns "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update。正常只包含_id,对于sharded collection,还包括shard key。 "updateDescription" : { // 只在operationType==update的时候出现,相当于是增量的修改,而replace是替换。 "updatedFields" : { <document> }, // 更新的field的值 "removedFields" : [ "<field>", ... ] // 删除的field列表 }, "FullDocument" : { //永不为 nil "fullDocument" : { <document> }, // 开启full_document之后,为updateLookup,不开启则为default } "clusterTime" : <Timestamp>, // 相当于ts字段 "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增 "lsid" : { // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id。 "id" : <UUID>, "uid" : <BinData> } }

func (*Event) String

func (e *Event) String() string

type GenericOplog

type GenericOplog struct {
	Raw    []byte
	Parsed *PartialLog
}

func GatherApplyOps

func GatherApplyOps(input []*PartialLog) (*GenericOplog, error)

type Hasher

type Hasher interface {
	DistributeOplogByMod(log *PartialLog, mod int) uint32
}

type ParsedLog

type ParsedLog struct {
	Timestamp     bson.MongoTimestamp `bson:"ts" json:"ts"`
	HistoryId     int64               `bson:"h,omitempty" json:"h,omitempty"`
	Version       int                 `bson:"v,omitempty" json:"v,omitempty"`
	Operation     string              `bson:"op" json:"op"`
	Gid           string              `bson:"g,omitempty" json:"g,omitempty"`
	Namespace     string              `bson:"ns" json:"ns"`
	Object        bson.D              `bson:"o" json:"o"`
	Query         bson.M              `bson:"o2" json:"o2"`
	UniqueIndexes bson.M              `bson:"uk,omitempty" json:"uk,omitempty"`
	Lsid          bson.M              `bson:"lsid,omitempty" json:"lsid,omitempty"`               // mark the session id, used in transaction
	FromMigrate   bool                `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk
	TxnNumber     uint64              `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"`     // transaction number in session
	DocumentKey   bson.M              `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id

}

type PartialLog

type PartialLog struct {
	ParsedLog

	/*
	 * Every field subsequent declared is NEVER persistent or
	 * transfer on network connection. They only be parsed from
	 * respective logic
	 */
	UniqueIndexesUpdates bson.M // generate by CollisionMatrix
	RawSize              int    // generate by Decorator
	SourceId             int    // generate by Validator
}

func ConvertEvent2Oplog

func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error)

func LogParsed

func LogParsed(logs []*GenericOplog) []*PartialLog

func NewPartialLog

func NewPartialLog(data bson.M) *PartialLog

func (*PartialLog) Dump

func (partialLog *PartialLog) Dump(keys map[string]struct{}, all bool) bson.D

dump according to the given keys, "all" == true means ignore keys

func (*PartialLog) String

func (partialLog *PartialLog) String() string

type PrimaryKeyHasher

type PrimaryKeyHasher struct {
	Hasher
}

******************************************* PrimaryKeyHasher

func (*PrimaryKeyHasher) DistributeOplogByMod

func (objectIdHasher *PrimaryKeyHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32

we need to ensure that oplog entry will be sent to the same job[$hash] if they have the same ObjectID. thus we can consume the oplog entry sequentially

type TableHasher

type TableHasher struct {
	Hasher
}

******************************************* PrimaryKeyHasher

func (*TableHasher) DistributeOplogByMod

func (collectionHasher *TableHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32

type WhiteListObjectIdHasher

type WhiteListObjectIdHasher struct {
	Hasher

	TableHasher
	PrimaryKeyHasher
	// contains filtered or unexported fields
}

******************************************* WhiteListObjectIdHasher: hash by collection in general, when hit white list, hash by _id

func NewWhiteListObjectIdHasher

func NewWhiteListObjectIdHasher(whiteList []string) *WhiteListObjectIdHasher

func (*WhiteListObjectIdHasher) DistributeOplogByMod

func (wloi *WhiteListObjectIdHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL