Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyOpsFilter(key string) bool
- func BuildUpdateDelteOplog(prefixField string, obj bson.D) (interface{}, error)
- func DiffUpdateOplogToNormal(updateObj bson.D) (interface{}, error)
- func ExtraCommandName(o bson.D) (string, bool)
- func FilterApplyOps(ele primitive.E, keepSubOp func(primitive.D) bool, computedCmd primitive.D, ...) (primitive.D, int)
- func FindFiledPrefix(input bson.D, prefix string) bool
- func GetId(log bson.D) (primitive.ObjectID, error)
- func GetKey(log bson.D, wanted string) interface{}
- func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)
- func GetObjectId(log bson.D) (primitive.ObjectID, error)
- func IgnoreError(err error, op string, isFullSyncStage bool) bool
- func KeepOperation(command string) bool
- func KeepSubOp(doc bson.D) bool
- func RemoveField(input bson.D, key string) bson.D
- func RunCommand(database, command string, l *oplog.ChangeLog, client *mongo.Client) error
- func RunCommandApplyOps(database string, l *oplog.ChangeLog, client *mongo.Client) error
- func RunCommandCreateIndexes(database string, l *oplog.ChangeLog, client *mongo.Client) error
- func RunCommandDropIndexes(database string, l *oplog.ChangeLog, client *mongo.Client) error
- type CommandOperation
- type Incr
- type OplogReader
- type OplogWriterSingle
- func (w *OplogWriterSingle) Command(l *oplog.ChangeLog) error
- func (ow *OplogWriterSingle) Delete(l *oplog.ChangeLog) error
- func (w *OplogWriterSingle) Insert(l *oplog.ChangeLog) error
- func (w *OplogWriterSingle) RunWriter(ctx context.Context)
- func (w *OplogWriterSingle) StartWriter(ctx context.Context)
- func (w *OplogWriterSingle) StopWriter()
- func (w *OplogWriterSingle) Update(l *oplog.ChangeLog, upsert bool) error
- func (w *OplogWriterSingle) Upsert(l *oplog.ChangeLog, upsert bool) error
- type Reader
- type Writer
Constants ¶
const ( Uuid = "ui" ApplyOps = "applyOps" )
const ( StateUnknown = iota StateRunning = 1 StatePaused = 2 StateSnapshot = 3 )
const ( VersionMark = "$v" OplogVersionError = "unknown version of OPLog" OperationError = "operation error" InsertError = "insert error" UpdateError = "update error" DeleteError = "delete error" UpsertError = "upsert error" DuplicateError = "duplicate error" MatchedCountError = "matched count error" )
const (
CursorWaitTime = 5 * time.Second
)
Variables ¶
Functions ¶
func ApplyOpsFilter ¶ added in v0.0.14
func BuildUpdateDelteOplog ¶
func DiffUpdateOplogToNormal ¶
"o" : { "$v" : 2, "diff" : { "d" : { "count" : false }, "u" : { "name" : "orange" }, "i" : { "c" : 11 } } }
func FilterApplyOps ¶ added in v0.0.14
func FilterApplyOps(ele primitive.E, keepSubOp func(primitive.D) bool, computedCmd primitive.D, computedCmdSize int) (primitive.D, int)
Filter an applyOps command sub-operations
func IgnoreError ¶
true means error can be ignored https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml
func KeepOperation ¶ added in v0.0.14
func KeepSubOp ¶ added in v0.0.14
Check if a sub-operation from an applyOps command should be kept We only keep insert, update and delete operations We also apply filter on the namespace
func RemoveField ¶ added in v0.0.14
pay attention: the input bson.D will be modified.
func RunCommand ¶ added in v0.0.14
See this documentation for a full list of available commands: https://www.mongodb.com/docs/manual/reference/command/
func RunCommandApplyOps ¶ added in v0.0.14
func RunCommandCreateIndexes ¶ added in v0.0.14
Create indexes on a collection using the createIndexes command
Types ¶
type CommandOperation ¶ added in v0.0.14
type CommandOperation struct {
// contains filtered or unexported fields
}
type Incr ¶ added in v0.0.14
type Incr struct {
// contains filtered or unexported fields
}
func NewIncr ¶ added in v0.0.14
func NewIncr(ckptManager checkpoint.CheckpointManager, cmdc chan commands.Command) *Incr
func (*Incr) RunIncremental ¶ added in v0.0.14
type OplogReader ¶
type OplogReader struct {
// contains filtered or unexported fields
}
func NewOplogReader ¶
func NewOplogReader(ckpt checkpoint.CheckpointManager, latest primitive.Timestamp, cmdc <-chan commands.Command, queue chan *oplog.ChangeLog) *OplogReader
func (*OplogReader) RunReader ¶ added in v0.0.14
func (r *OplogReader) RunReader(ctx context.Context)
func (*OplogReader) StartReader ¶
func (r *OplogReader) StartReader(ctx context.Context)
func (*OplogReader) StopReader ¶
func (o *OplogReader) StopReader()
type OplogWriterSingle ¶
type OplogWriterSingle struct {
// contains filtered or unexported fields
}
func NewOplogWriter ¶
func NewOplogWriter(ckptManager checkpoint.CheckpointManager, fullFinishTs int64, queue chan *oplog.ChangeLog) *OplogWriterSingle
func (*OplogWriterSingle) Command ¶ added in v0.0.14
func (w *OplogWriterSingle) Command(l *oplog.ChangeLog) error
func (*OplogWriterSingle) RunWriter ¶ added in v0.0.14
func (w *OplogWriterSingle) RunWriter(ctx context.Context)
Start the writer
func (*OplogWriterSingle) StartWriter ¶
func (w *OplogWriterSingle) StartWriter(ctx context.Context)
Start the writer in a dedicated go routine
func (*OplogWriterSingle) StopWriter ¶
func (w *OplogWriterSingle) StopWriter()