Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyOpsFilter(key string) bool
- func BlockMongoUrlPassword(url, replace string) string
- func DEBUG_LOG(arg0 interface{}, args ...interface{})
- func DelayFor(ms int64)
- func DuplicateKey(err error) bool
- func ExtractMongoTimestamp(ts interface{}) int64
- func ExtractMongoTimestampCounter(ts interface{}) int64
- func ExtractTimestampForLog(ts interface{}) string
- func FindFirstErrorIndexAndMessageN(err error) (int, string, bool)
- func FullSyncInitHttpApi(port int)
- func GetAllNamespace(sources []*MongoSource, filterFunc func(name string) bool, sslRootFile string) (map[NS]struct{}, map[string][]string, error)
- func GetAllTimestamp(sources []*MongoSource, sslRootFile string) (map[string]TimestampNode, int64, int64, int64, int64, error)
- func GetAllTimestampInUT() (map[string]TimestampNode, int64, int64, int64, int64, error)
- func GetAndCompareVersion(conn *MongoCommunityConn, threshold string, compare string) (bool, error)
- func GetDBVersion(conn *MongoCommunityConn) (string, error)
- func GetKey(log bson.D, wanted string) interface{}
- func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)
- func GetListCollectionQueryCondition(conn *MongoCommunityConn) bson.M
- func GetMetricWithSize(input interface{}) string
- func GetNewestTimestampByConn(conn *MongoCommunityConn) (int64, error)
- func GetNewestTimestampByUrl(url string, fromMongoS bool, sslRootFile string) (int64, error)
- func GetOldestTimestampByConn(conn *MongoCommunityConn) (int64, error)
- func GetOldestTimestampByUrl(url string, fromMongoS bool, sslRootFile string) (int64, error)
- func Goodbye()
- func HasDuplicated(slice []string) bool
- func HaveIdIndexKey(obj bson.D) bool
- func IncrSyncInitHttpApi(port int)
- func InitialLogger(logDir, logFile, level string, logFlush bool, verbose int) error
- func Int64ToString(v int64) string
- func Int64ToTimestamp(t int64) primitive.Timestamp
- func IsCollectionCappedError(err error) bool
- func JournalFileName(identifier string) string
- func LogFetchStage(stage int32) string
- func MarshalStruct(input interface{}) string
- func MayBeRandom(port int) int
- func Mkdirs(dirs ...string) error
- func ParseIntFromInterface(input interface{}) (int64, error)
- func RunStatusMessage(status uint64) string
- func SetFiled(input bson.D, key string, value interface{}, upsert bool)
- func TimeStampToInt64(ts primitive.Timestamp) int64
- func TimeToTimestamp(t int64) primitive.Timestamp
- func TimestampToString(ts int64) string
- func Welcome()
- func WritePid(id string) (err error)
- func WritePidById(dir, id string) bool
- func YieldInMs(n int64)
- type ChangeStreamConn
- type Checkpoint
- type Configuration
- type ElapsedTask
- type Fcv
- type Int64Slice
- type Journal
- type MetricDelta
- type MongoCommunityConn
- func (conn *MongoCommunityConn) AcquireReplicaSetName() string
- func (conn *MongoCommunityConn) Close()
- func (conn *MongoCommunityConn) CurrentDate() primitive.Timestamp
- func (conn *MongoCommunityConn) HasOplogNs(queryConditon bson.M) bool
- func (conn *MongoCommunityConn) HasUniqueIndex(queryConditon bson.M) bool
- func (conn *MongoCommunityConn) IsGood() bool
- func (conn *MongoCommunityConn) IsTimeSeriesCollection(dbName string, collName string) bool
- type MongoSource
- type NS
- type OpsCounter
- type Pair
- type Qos
- type ReplicationMetric
- func (metric *ReplicationMetric) AddApply(incr uint64)
- func (metric *ReplicationMetric) AddCheckpoint(number uint64)
- func (metric *ReplicationMetric) AddConsume(incr uint64)
- func (metric *ReplicationMetric) AddFailed(incr uint64)
- func (metric *ReplicationMetric) AddFilter(incr uint64)
- func (metric *ReplicationMetric) AddGet(incr uint64)
- func (metric *ReplicationMetric) AddRetransmission(number uint64)
- func (metric *ReplicationMetric) AddSuccess(incr uint64)
- func (metric *ReplicationMetric) AddTableOps(table string, n uint64)
- func (metric *ReplicationMetric) AddTunnelTraffic(number uint64)
- func (metric *ReplicationMetric) AddWriteFailed(incr uint64)
- func (metric *ReplicationMetric) Apply() uint64
- func (metric *ReplicationMetric) Close()
- func (metric *ReplicationMetric) Get() uint64
- func (metric *ReplicationMetric) SetLSN(lsn int64)
- func (metric *ReplicationMetric) SetLSNACK(ack int64)
- func (metric *ReplicationMetric) SetLSNCheckpoint(ckpt int64)
- func (metric *ReplicationMetric) SetOplogAvg(size int64)
- func (metric *ReplicationMetric) SetOplogMax(max int64)
- func (metric *ReplicationMetric) String() string
- func (metric *ReplicationMetric) Success() uint64
- func (metric *ReplicationMetric) TableOps() map[string]uint64
- func (metric *ReplicationMetric) Tps() uint64
- type ReplicationStatus
- type Sentinel
- type TableOps
- type TimestampNode
Constants ¶
const ( GlobalDiagnosticPath = "diagnostic" // This is the time of golang was born to the world GolangSecurityTime = "2006-01-02T15:04:05Z" WorkGood uint64 = 0 GetReady uint64 = 1 FetchBad uint64 = 2 TunnelSendBad uint64 = 4 TunnelSyncBad uint64 = 8 ReplicaExecBad uint64 = 16 MajorityWriteConcern = "majority" Int32max = (int64(1) << 32) - 1 )
const ( DBRefRef = "$ref" DBRefId = "$id" DBRefDb = "$db" CollectionCapped = "CollectionScan died due to position in capped" // bigger than 3.0 CollectionCappedLowVersion = "UnknownError" // <= 3.0 version )
const ( // log VarLogLevelDebug = "debug" VarLogLevelInfo = "info" VarLogLevelWarning = "warning" VarLogLevelError = "error" // sync mode VarSyncModeAll = "all" VarSyncModeIncr = "incr" VarSyncModeFull = "full" // mongo connect mode VarMongoConnectModePrimary = "primary" VarMongoConnectModeSecondaryPreferred = "secondaryPreferred" VarMongoConnectModeSecondary = "secondary" VarMongoConnectModeNearset = "nearest" VarMongoConnectModeStandalone = "standalone" // full_sync.create_index VarFullSyncCreateIndexNone = "none" VarFullSyncCreateIndexForeground = "foreground" VarFullSyncCreateIndexBackground = "background" // incr_sync.mongo_fetch_method VarIncrSyncMongoFetchMethodOplog = "oplog" VarIncrSyncMongoFetchMethodChangeStream = "change_stream" // incr_sync.shard_key VarIncrSyncShardKeyAuto = "auto" VarIncrSyncShardKeyId = "id" VarIncrSyncShardKeyCollection = "collection" // incr_sync.worker.oplog_compressor VarIncrSyncWorkerOplogCompressorNone = "none" VarIncrSyncWorkerOplogCompressorGzip = "gzip" VarIncrSyncWorkerOplogCompressorZlib = "zlib" VarIncrSyncWorkerOplogCompressorDeflate = "deflate" VarIncrSyncWorkerOplogCompressorSnappy = "snappy" // incr_sync.tunnel VarTunnelDirect = "direct" VarTunnelRpc = "rpc" VarTunnelFile = "file" VarTunnelTcp = "tcp" VarTunnelKafka = "kafka" VarTunnelMock = "mock" // incr_sync.tunnel.message VarTunnelMessageRaw = "raw" VarTunnelMessageJson = "json" VarTunnelMessageBson = "bson" // incr_sync.conflict_write_to VarIncrSyncConflictWriteToNone = "none" VarIncrSyncConflictWriteToDb = "db" VarIncrSyncConflictWriteToSdk = "sdk" // checkpoint.storage.db VarCheckpointStorageDbReplicaDefault = "mongoshake" VarCheckpointStorageDbShardingDefault = "admin" VarCheckpointStorageCollectionDefault = "ckpt_default" // inner variable: checkpoint.storage VarCheckpointStorageApi = "api" VarCheckpointStorageDatabase = "database" // innder variable: incr_sync.reader_debug VarIncrSyncReaderDebugNone = "" VarIncrSyncReaderDebugDiscard = "discard" // throw all VarIncrSyncReaderDebugPrint = "print" // print // special VarSpecialSourceDBFlagAliyunServerless = "aliyun_serverless" )
const ( FetchStageStoreUnknown int32 = 0 FetchStageStoreDiskNoApply int32 = 1 FetchStageStoreDiskApply int32 = 2 FetchStageStoreMemoryApply int32 = 3 )
const ( JournalNothingOnDefault = iota JournalSampling JournalAll )
const ( FrequentInSeconds = 5 TimeFormat string = "2006-01-02 15:04:05" )
const ( KB = 1024 MB = 1024 * KB GB = 1024 * MB TB = 1024 * GB PB = 1024 * TB )
const ( METRIC_NONE = 0x0000000000000000 METRIC_CKPT_TIMES = 0x0000000000000001 METRIC_TUNNEL_TRAFFIC = 0x0000000000000010 METRIC_LSN = 0x0000000000000100 METRIC_RETRANSIMISSION = 0x0000000000001000 METRIC_TPS = 0x0000000000010000 METRIC_SUCCESS = 0x0000000000100000 METRIC_WORKER = 0x0000000001000000 // worker metric METRIC_FULLSYNC_WRITE = 0x0000000010000000 // full sync writer METRIC_FILTER = 0x0000000100000000 )
const ( TypeFull = "full" TypeIncr = "incr" )
const ( OplogNS = "oplog.rs" ReadWriteConcernDefault = "" ReadWriteConcernLocal = "local" ReadWriteConcernAvailable = "available" // for >= 3.6 ReadWriteConcernMajority = "majority" ReadWriteConcernLinearizable = "linearizable" )
const (
BufferCapacity = 4 * 1024 * 1024
)
const (
OpsMax = 'z' - 'A'
)
const (
SampleFrequency = 1000
)
Variables ¶
var ( AppDatabase = VarCheckpointStorageDbReplicaDefault APPConflictDatabase = VarCheckpointStorageDbReplicaDefault + "_conflict" )
var ( FcvCheckpoint = Checkpoint{ CurrentVersion: 2, FeatureCompatibleVersion: 1, } FcvConfiguration = Configuration{ CurrentVersion: 10, FeatureCompatibleVersion: 10, } LowestCheckpointVersion = map[int]string{ 0: "1.0.0", 1: "2.4.0", 2: "2.4.6", } LowestConfigurationVersion = map[int]string{ 0: "1.0.0", 1: "2.4.0", 2: "2.4.1", 3: "2.4.3", 4: "2.4.6", 5: "2.4.7", 6: "2.4.12", 7: "2.4.17", 8: "2.4.20", 9: "2.4.21", 10: "2.6.4", } )
var ( FullSyncHttpApi *nimo.HttpRestProvider IncrSyncHttpApi *nimo.HttpRestProvider )
var BRANCH = "$"
Build info
var FullSentinelOptions struct { TPS int64 }
only used in full sync.
var (
GetAllTimestampInUTInput map[string]Pair // replicaSet/MongoS name => <oldest timestamp, newest timestamp>
)
for UT only
var IncrSentinelOptions struct { OplogDump int64 DuplicatedDump bool Pause bool TPS int64 TargetDelay int64 ExitPoint int64 // store 64 bit full timestamp Shutdown bool // close shake }
IncrSentinelOptions. option's value type should be String or Bool or Int64 only used in incremental sync.
var JournalFilePattern = GlobalDiagnosticPath + string(filepath.Separator) + "%s.journal"
var (
QueryTs = "ts"
)
var SIGNALPROFILE = "$"
var SIGNALSTACK = "$"
Functions ¶
func ApplyOpsFilter ¶
func BlockMongoUrlPassword ¶
*
- block password in mongo_urls:
- two kind mongo_urls:
- 1. mongodb://username:password@address
- 2. username:password@address
func DuplicateKey ¶
func ExtractMongoTimestamp ¶
func ExtractMongoTimestamp(ts interface{}) int64
func ExtractMongoTimestampCounter ¶
func ExtractMongoTimestampCounter(ts interface{}) int64
func ExtractTimestampForLog ¶
func ExtractTimestampForLog(ts interface{}) string
func FullSyncInitHttpApi ¶
func FullSyncInitHttpApi(port int)
func GetAllNamespace ¶
func GetAllNamespace(sources []*MongoSource, filterFunc func(name string) bool, sslRootFile string) (map[NS]struct{}, map[string][]string, error)
*
- return all namespace. return:
- @map[NS]struct{}: namespace set where key is the namespace while value is useless, e.g., "a.b"->nil, "a.c"->nil
- @map[string][]string: db->collection map. e.g., "a"->[]string{"b", "c"}
- @error: error info
func GetAllTimestamp ¶
func GetAllTimestamp(sources []*MongoSource, sslRootFile string) (map[string]TimestampNode, int64, int64, int64, int64, error)
* get all newest timestamp * return: * map: whole timestamp map, key: replset name, value: struct that includes the newest and oldest timestamp * primitive.Timestamp: the biggest of the newest timestamp * primitive.Timestamp: the smallest of the newest timestamp * error: error
func GetAllTimestampInUT ¶
only used in unit test
func GetAndCompareVersion ¶
func GetAndCompareVersion(conn *MongoCommunityConn, threshold string, compare string) (bool, error)
get current db version and compare to threshold. Return whether the result is bigger or equal to the input threshold.
func GetDBVersion ¶
func GetDBVersion(conn *MongoCommunityConn) (string, error)
get db version, return string with format like "3.0.1"
func GetListCollectionQueryCondition ¶
func GetListCollectionQueryCondition(conn *MongoCommunityConn) bson.M
func GetMetricWithSize ¶
func GetMetricWithSize(input interface{}) string
func GetNewestTimestampByConn ¶
func GetNewestTimestampByConn(conn *MongoCommunityConn) (int64, error)
get newest oplog
func GetNewestTimestampByUrl ¶
func GetOldestTimestampByConn ¶
func GetOldestTimestampByConn(conn *MongoCommunityConn) (int64, error)
get oldest oplog
func GetOldestTimestampByUrl ¶
func HasDuplicated ¶
func HaveIdIndexKey ¶
Return true only Indexe only have key _id
func IncrSyncInitHttpApi ¶
func IncrSyncInitHttpApi(port int)
func InitialLogger ¶
InitialLogger initialize logger
verbose: where log goes to: 0 - file,1 - file+stdout,2 - stdout
func Int64ToString ¶
func Int64ToTimestamp ¶
func IsCollectionCappedError ¶
func JournalFileName ¶
func LogFetchStage ¶
func MayBeRandom ¶
func ParseIntFromInterface ¶
func RunStatusMessage ¶
func TimeStampToInt64 ¶
func TimestampToString ¶
func WritePidById ¶
Types ¶
type ChangeStreamConn ¶
type ChangeStreamConn struct { Client *mongo.Client CsHandler *mongo.ChangeStream Ops *options.ChangeStreamOptions // contains filtered or unexported fields }
func NewChangeStreamConn ¶
func (*ChangeStreamConn) Close ¶
func (csc *ChangeStreamConn) Close()
func (*ChangeStreamConn) GetNext ¶
func (csc *ChangeStreamConn) GetNext() (bool, []byte)
func (*ChangeStreamConn) IsNotNil ¶
func (csc *ChangeStreamConn) IsNotNil() bool
func (*ChangeStreamConn) ResumeToken ¶
func (csc *ChangeStreamConn) ResumeToken() interface{}
func (*ChangeStreamConn) TryNext ¶
func (csc *ChangeStreamConn) TryNext() (bool, []byte)
type Checkpoint ¶
type Checkpoint struct { /* * version: 0(or set not), MongoShake < 2.4, fcv == 0 * version: 1, MongoShake == 2.4, 0 < fcv <= 1 */ CurrentVersion int FeatureCompatibleVersion int }
for checkpoint
func (Checkpoint) IsCompatible ¶
func (c Checkpoint) IsCompatible(v int) bool
type Configuration ¶
type Configuration struct { /* * version: 0(or set not), MongoShake < 2.4.0, fcv == 0 * version: 1, MongoShake == 2.4.0, 0 <= fcv <= 1 */ CurrentVersion int FeatureCompatibleVersion int }
for configuration
func (Configuration) IsCompatible ¶
func (c Configuration) IsCompatible(v int) bool
type ElapsedTask ¶
type ElapsedTask struct { // timer trigger TimeLimit int64 // batch trigger BatchLimit int64 // contains filtered or unexported fields }
func NewThresholder ¶
func NewThresholder(timeLimit, batchLimit int64) *ElapsedTask
func (*ElapsedTask) Reset ¶
func (thresholder *ElapsedTask) Reset()
func (*ElapsedTask) Triiger ¶
func (thresholder *ElapsedTask) Triiger() bool
type Int64Slice ¶
type Int64Slice []int64
func (Int64Slice) Len ¶
func (p Int64Slice) Len() int
func (Int64Slice) Less ¶
func (p Int64Slice) Less(i, j int) bool
func (Int64Slice) Swap ¶
func (p Int64Slice) Swap(i, j int)
type Journal ¶
type Journal struct {
// contains filtered or unexported fields
}
func NewJournal ¶
func (*Journal) WriteRecord ¶
func (j *Journal) WriteRecord(oplog *oplog.PartialLog)
type MetricDelta ¶
struct used to mark the metric delta. Value: current value Delta: the difference between current value and previous value previous: store the previous value
func (*MetricDelta) Update ¶
func (o *MetricDelta) Update()
type MongoCommunityConn ¶
type MongoCommunityConn struct { Client *mongo.Client URL string // contains filtered or unexported fields }
func NewMongoCommunityConn ¶
func (*MongoCommunityConn) AcquireReplicaSetName ¶
func (conn *MongoCommunityConn) AcquireReplicaSetName() string
func (*MongoCommunityConn) Close ¶
func (conn *MongoCommunityConn) Close()
func (*MongoCommunityConn) CurrentDate ¶
func (conn *MongoCommunityConn) CurrentDate() primitive.Timestamp
func (*MongoCommunityConn) HasOplogNs ¶
func (conn *MongoCommunityConn) HasOplogNs(queryConditon bson.M) bool
func (*MongoCommunityConn) HasUniqueIndex ¶
func (conn *MongoCommunityConn) HasUniqueIndex(queryConditon bson.M) bool
func (*MongoCommunityConn) IsGood ¶
func (conn *MongoCommunityConn) IsGood() bool
func (*MongoCommunityConn) IsTimeSeriesCollection ¶
func (conn *MongoCommunityConn) IsTimeSeriesCollection(dbName string, collName string) bool
type MongoSource ¶
func (*MongoSource) String ¶
func (ms *MongoSource) String() string
type NS ¶
func GetDbNamespace ¶
func GetDbNamespace(url string, filterFunc func(name string) bool, sslRootFile string) ([]NS, map[string][]string, error)
*
- return db namespace. return:
- @[]NS: namespace list, e.g., []{"a.b", "a.c"}
- @map[string][]string: db->collection map. e.g., "a"->[]string{"b", "c"}
- @error: error info
type OpsCounter ¶
type OpsCounter struct {
// contains filtered or unexported fields
}
one writer and multi readers
func (*OpsCounter) Add ¶
func (opsCounter *OpsCounter) Add(char byte, v uint64)
func (*OpsCounter) Map ¶
func (opsCounter *OpsCounter) Map() map[string]uint64
type Qos ¶
type Qos struct { Limit int64 // qps, <= 0 means disable limit Ticket int64 // one tick size, default is 1 // contains filtered or unexported fields }
func (*Qos) FetchBucket ¶
func (q *Qos) FetchBucket()
type ReplicationMetric ¶
type ReplicationMetric struct { NAME string STAGE string SUBSCRIBE uint64 OplogFilter MetricDelta OplogGet MetricDelta OplogConsume MetricDelta OplogApply MetricDelta OplogSuccess MetricDelta OplogFail MetricDelta OplogWriteFail MetricDelta // full: write failed. currently, only used in full sync stage. CheckpointTimes uint64 Retransmission uint64 TunnelTraffic uint64 LSN int64 LSNAck int64 LSNCheckpoint int64 OplogMaxSize int64 OplogAvgSize int64 TableOperations *TableOps // replication status ReplStatus ReplicationStatus // contains filtered or unexported fields }
func NewMetric ¶
func NewMetric(name, stage string, subscribe uint64) *ReplicationMetric
func (*ReplicationMetric) AddApply ¶
func (metric *ReplicationMetric) AddApply(incr uint64)
func (*ReplicationMetric) AddCheckpoint ¶
func (metric *ReplicationMetric) AddCheckpoint(number uint64)
func (*ReplicationMetric) AddConsume ¶
func (metric *ReplicationMetric) AddConsume(incr uint64)
func (*ReplicationMetric) AddFailed ¶
func (metric *ReplicationMetric) AddFailed(incr uint64)
func (*ReplicationMetric) AddFilter ¶
func (metric *ReplicationMetric) AddFilter(incr uint64)
func (*ReplicationMetric) AddGet ¶
func (metric *ReplicationMetric) AddGet(incr uint64)
func (*ReplicationMetric) AddRetransmission ¶
func (metric *ReplicationMetric) AddRetransmission(number uint64)
func (*ReplicationMetric) AddSuccess ¶
func (metric *ReplicationMetric) AddSuccess(incr uint64)
func (*ReplicationMetric) AddTableOps ¶
func (metric *ReplicationMetric) AddTableOps(table string, n uint64)
func (*ReplicationMetric) AddTunnelTraffic ¶
func (metric *ReplicationMetric) AddTunnelTraffic(number uint64)
func (*ReplicationMetric) AddWriteFailed ¶
func (metric *ReplicationMetric) AddWriteFailed(incr uint64)
func (*ReplicationMetric) Apply ¶
func (metric *ReplicationMetric) Apply() uint64
func (*ReplicationMetric) Close ¶
func (metric *ReplicationMetric) Close()
func (*ReplicationMetric) Get ¶
func (metric *ReplicationMetric) Get() uint64
func (*ReplicationMetric) SetLSN ¶
func (metric *ReplicationMetric) SetLSN(lsn int64)
func (*ReplicationMetric) SetLSNACK ¶
func (metric *ReplicationMetric) SetLSNACK(ack int64)
func (*ReplicationMetric) SetLSNCheckpoint ¶
func (metric *ReplicationMetric) SetLSNCheckpoint(ckpt int64)
func (*ReplicationMetric) SetOplogAvg ¶
func (metric *ReplicationMetric) SetOplogAvg(size int64)
func (*ReplicationMetric) SetOplogMax ¶
func (metric *ReplicationMetric) SetOplogMax(max int64)
func (*ReplicationMetric) String ¶
func (metric *ReplicationMetric) String() string
func (*ReplicationMetric) Success ¶
func (metric *ReplicationMetric) Success() uint64
func (*ReplicationMetric) TableOps ¶
func (metric *ReplicationMetric) TableOps() map[string]uint64
func (*ReplicationMetric) Tps ¶
func (metric *ReplicationMetric) Tps() uint64
type ReplicationStatus ¶
type ReplicationStatus uint64
func (*ReplicationStatus) Clear ¶
func (status *ReplicationStatus) Clear(s uint64)
func (*ReplicationStatus) GetStatusString ¶
func (status *ReplicationStatus) GetStatusString() string
func (*ReplicationStatus) IsGood ¶
func (status *ReplicationStatus) IsGood() bool
func (*ReplicationStatus) Update ¶
func (status *ReplicationStatus) Update(s uint64)
type Sentinel ¶
type Sentinel struct {
// contains filtered or unexported fields
}
func NewSentinel ¶
type TableOps ¶
TableOps, count collection operations
func NewTableOps ¶
func NewTableOps() *TableOps
type TimestampNode ¶
record the oldest and newest timestamp of each mongod