Documentation ¶
Index ¶
- Constants
- Variables
- func BuildUpdateDelteOplog(prefixField string, obj bson.D) (interface{}, error)
- func DiffUpdateOplogToNormal(updateObj bson.D) (interface{}, error)
- func FindFiledPrefix(input bson.D, prefix string) bool
- func GetDbAndCollection(namespace string) (string, string)
- func GetKey(log bson.D, wanted string) interface{}
- func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)
- func GetObjectId(log bson.D) (primitive.ObjectID, error)
- func IgnoreError(err error, op string, isFullSyncStage bool) bool
- func RemoveFiled(input bson.D, key string) bson.D
- func StartIncrementalReplication(ctx context.Context, checkpointManager checkpoint.CheckpointManager)
- type ChangeLog
- type GenericObject
- type GenericOplog
- type OplogReader
- type OplogWriterSingle
- func (ow *OplogWriterSingle) Delete(l *ChangeLog) error
- func (ow *OplogWriterSingle) Insert(l *ChangeLog) error
- func (ow *OplogWriterSingle) StartWriter(ctx context.Context)
- func (ow *OplogWriterSingle) StopWriter()
- func (ow *OplogWriterSingle) Update(l *ChangeLog, upsert bool) error
- func (sw *OplogWriterSingle) Upsert(l *ChangeLog, upsert bool) error
- type ParsedLog
- type Reader
- type Writer
Constants ¶
View Source
const ( VersionMark = "$v" OplogVersionError = "unknown version of OPLog" OperationError = "operation error" InsertError = "insert error" UpdateError = "update error" DeleteError = "delete error" UpsertError = "upsert error" DuplicateError = "duplicate error" MatchedCountError = "matched count error" )
View Source
const (
CursorWaitTime = 5 * time.Second
)
Variables ¶
Functions ¶
func BuildUpdateDelteOplog ¶
func DiffUpdateOplogToNormal ¶
"o" : { "$v" : 2, "diff" : { "d" : { "count" : false }, "u" : { "name" : "orange" }, "i" : { "c" : 11 } } }
func GetDbAndCollection ¶
Split the namespace to get the database name namespace = "database.collection.bla" parts = ["database", "collection.bla"]
func IgnoreError ¶
true means error can be ignored https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml
func RemoveFiled ¶
pay attention: the input bson.D will be modified.
func StartIncrementalReplication ¶
func StartIncrementalReplication(ctx context.Context, checkpointManager checkpoint.CheckpointManager)
Types ¶
type GenericObject ¶
type GenericOplog ¶
type OplogReader ¶
type OplogReader struct {
// contains filtered or unexported fields
}
func NewOplogReader ¶
func NewOplogReader(ckptManager checkpoint.CheckpointManager) *OplogReader
func (*OplogReader) StartReader ¶
func (o *OplogReader) StartReader(ctx context.Context)
func (*OplogReader) StopReader ¶
func (o *OplogReader) StopReader()
type OplogWriterSingle ¶
type OplogWriterSingle struct {
// contains filtered or unexported fields
}
func NewOplogWriter ¶
func NewOplogWriter(fullFinishTs int64, queuedLogs chan *ChangeLog, ckptManager checkpoint.CheckpointManager) *OplogWriterSingle
func (*OplogWriterSingle) Delete ¶
func (ow *OplogWriterSingle) Delete(l *ChangeLog) error
func (*OplogWriterSingle) Insert ¶
func (ow *OplogWriterSingle) Insert(l *ChangeLog) error
func (*OplogWriterSingle) StartWriter ¶
func (ow *OplogWriterSingle) StartWriter(ctx context.Context)
Start the writer
func (*OplogWriterSingle) StopWriter ¶
func (ow *OplogWriterSingle) StopWriter()
type ParsedLog ¶
type ParsedLog struct { Timestamp primitive.Timestamp `bson:"ts" json:"ts"` Term *int64 `bson:"t" json:"t"` Hash *int64 `bson:"h" json:"h"` Version int `bson:"v" json:"v"` Operation string `bson:"op" json:"op"` Gid string `bson:"g,omitempty" json:"g,omitempty"` Namespace string `bson:"ns" json:"ns"` Object bson.D `bson:"o" json:"o"` Query bson.D `bson:"o2" json:"o2"` // update condition UniqueIndexes bson.M `bson:"uk,omitempty" json:"uk,omitempty"` // LSID bson.Raw `bson:"lsid,omitempty" json:"lsid,omitempty"` // mark the session id, used in transaction FromMigrate bool `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk TxnNumber *int64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` // transaction number in session DocumentKey bson.D `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id PrevOpTime bson.Raw `bson:"prevOpTime,omitempty"` UI *primitive.Binary `bson:"ui,omitempty" json:"ui,omitempty"` // do not enable currently }
Click to show internal directories.
Click to hide internal directories.