incr

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	VersionMark       = "$v"
	OplogVersionError = "unknown version of OPLog"
	OperationError    = "operation error"
	InsertError       = "insert error"
	UpdateError       = "update error"
	DeleteError       = "delete error"
	UpsertError       = "upsert error"
	DuplicateError    = "duplicate error"
	MatchedCountError = "matched count error"
)
View Source
const (
	CursorWaitTime = 5 * time.Second
)

Variables

View Source
var (
	FilteredOperations = map[string]bool{
		"n":  true,
		"c":  true,
		"db": true,

		"u": false,
		"d": false,
		"i": false,
	}
)

Functions

func BuildUpdateDelteOplog

func BuildUpdateDelteOplog(prefixField string, obj bson.D) (interface{}, error)

func DiffUpdateOplogToNormal

func DiffUpdateOplogToNormal(updateObj bson.D) (interface{}, error)

"o" : { "$v" : 2, "diff" : { "d" : { "count" : false }, "u" : { "name" : "orange" }, "i" : { "c" : 11 } } }

func FindFiledPrefix

func FindFiledPrefix(input bson.D, prefix string) bool

func GetDbAndCollection

func GetDbAndCollection(namespace string) (string, string)

Split the namespace to get the database name namespace = "database.collection.bla" parts = ["database", "collection.bla"]

func GetKey

func GetKey(log bson.D, wanted string) interface{}

func GetKeyWithIndex

func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)

func GetObjectId

func GetObjectId(log bson.D) (primitive.ObjectID, error)

func IgnoreError

func IgnoreError(err error, op string, isFullSyncStage bool) bool

true means error can be ignored https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml

func RemoveFiled

func RemoveFiled(input bson.D, key string) bson.D

pay attention: the input bson.D will be modified.

func StartIncrementalReplication

func StartIncrementalReplication(ctx context.Context, checkpointManager checkpoint.CheckpointManager)

Types

type ChangeLog

type ChangeLog struct {
	ParsedLog

	// for update operation, the update condition
	Db         string
	Collection string
}

type GenericObject

type GenericObject struct {
	// The object id
	Id primitive.ObjectID `bson:"_id" json:"_id omitempty"`
}

type GenericOplog

type GenericOplog struct {
	Raw    []byte
	Parsed *ChangeLog
}

type OplogReader

type OplogReader struct {
	// contains filtered or unexported fields
}

func NewOplogReader

func NewOplogReader(ckptManager checkpoint.CheckpointManager) *OplogReader

func (*OplogReader) StartReader

func (o *OplogReader) StartReader(ctx context.Context)

func (*OplogReader) StopReader

func (o *OplogReader) StopReader()

type OplogWriterSingle

type OplogWriterSingle struct {
	// contains filtered or unexported fields
}

func NewOplogWriter

func NewOplogWriter(fullFinishTs int64, queuedLogs chan *ChangeLog, ckptManager checkpoint.CheckpointManager) *OplogWriterSingle

func (*OplogWriterSingle) Delete

func (ow *OplogWriterSingle) Delete(l *ChangeLog) error

func (*OplogWriterSingle) Insert

func (ow *OplogWriterSingle) Insert(l *ChangeLog) error

func (*OplogWriterSingle) StartWriter

func (ow *OplogWriterSingle) StartWriter(ctx context.Context)

Start the writer

func (*OplogWriterSingle) StopWriter

func (ow *OplogWriterSingle) StopWriter()

func (*OplogWriterSingle) Update

func (ow *OplogWriterSingle) Update(l *ChangeLog, upsert bool) error

Update the document

func (*OplogWriterSingle) Upsert

func (sw *OplogWriterSingle) Upsert(l *ChangeLog, upsert bool) error

Upsert the document

type ParsedLog

type ParsedLog struct {
	Timestamp     primitive.Timestamp `bson:"ts" json:"ts"`
	Term          *int64              `bson:"t" json:"t"`
	Hash          *int64              `bson:"h" json:"h"`
	Version       int                 `bson:"v" json:"v"`
	Operation     string              `bson:"op" json:"op"`
	Gid           string              `bson:"g,omitempty" json:"g,omitempty"`
	Namespace     string              `bson:"ns" json:"ns"`
	Object        bson.D              `bson:"o" json:"o"`
	Query         bson.D              `bson:"o2" json:"o2"`                                       // update condition
	UniqueIndexes bson.M              `bson:"uk,omitempty" json:"uk,omitempty"`                   //
	LSID          bson.Raw            `bson:"lsid,omitempty" json:"lsid,omitempty"`               // mark the session id, used in transaction
	FromMigrate   bool                `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk
	TxnNumber     *int64              `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"`     // transaction number in session
	DocumentKey   bson.D              `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id
	PrevOpTime    bson.Raw            `bson:"prevOpTime,omitempty"`
	UI            *primitive.Binary   `bson:"ui,omitempty" json:"ui,omitempty"` // do not enable currently
}

type Reader

type Reader interface {
	StartReader(context.Context)
	StopReader(context.Context)
}

type Writer

type Writer interface {
	StartWriter(context.Context)
	StopWriter(context.Context)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL