Documentation ¶
Index ¶
- Constants
- Variables
- func AdjustDBRef(input bson.M, dbRef bool) bson.M
- func ApplyOpsFilter(key string) bool
- func BlockMongoUrlPassword(url, replace string) string
- func DEBUG_LOG(arg0 interface{}, args ...interface{})
- func DelayFor(ms int64)
- func ExtractMongoTimestamp(ts interface{}) int64
- func ExtractMongoTimestampCounter(ts interface{}) int64
- func ExtractTimestampForLog(ts interface{}) string
- func FindFirstErrorIndexAndMessage(error string) (int, string, bool)
- func FullSyncInitHttpApi(port int)
- func GetAllTimestamp(sources []*MongoSource) (map[string]TimestampNode, bson.MongoTimestamp, bson.MongoTimestamp, ...)
- func GetAllTimestampInUT() (map[string]TimestampNode, bson.MongoTimestamp, bson.MongoTimestamp, ...)
- func GetAndCompareVersion(session *mgo.Session, threshold string, compare string) (bool, error)
- func GetDBVersion(session *mgo.Session) (string, error)
- func GetMetricWithSize(input interface{}) string
- func GetNewestTimestampBySession(session *mgo.Session) (bson.MongoTimestamp, error)
- func GetNewestTimestampByUrl(url string, fromMongoS bool) (bson.MongoTimestamp, error)
- func GetOldestTimestampBySession(session *mgo.Session) (bson.MongoTimestamp, error)
- func GetOldestTimestampByUrl(url string, fromMongoS bool) (bson.MongoTimestamp, error)
- func Goodbye()
- func HasDBRef(object bson.M) bool
- func HasDuplicated(slice []string) bool
- func HasUniqueIndex(index []mgo.Index) bool
- func IncrSyncInitHttpApi(port int)
- func InitialLogger(logDir, logFile, level string, logFlush bool, verbose bool) error
- func Int64ToString(v int64) string
- func IsCollectionCappedError(err error) bool
- func IsFromMongos(url string) (bool, error)
- func IsNotFound(err error) bool
- func JournalFileName(identifier string) string
- func LogFetchStage(stage int32) string
- func MarshalStruct(input interface{}) string
- func MayBeRandom(port int) int
- func Mkdirs(dirs ...string) error
- func ParseIntFromInterface(input interface{}) (int64, error)
- func RunStatusMessage(status uint64) string
- func SortDBRef(input bson.M) bson.D
- func TimestampToInt64(ts bson.MongoTimestamp) int64
- func TimestampToString(ts int64) string
- func Welcome()
- func WritePid(id string) (err error)
- func WritePidById(dir, id string) bool
- func YieldInMs(n int64)
- type ChangeStreamConn
- type Checkpoint
- type Configuration
- type ElapsedTask
- type Fcv
- type Int64Slice
- type Journal
- type MetricDelta
- type MongoCommunityConn
- type MongoConn
- func (conn *MongoConn) AcquireReplicaSetName() string
- func (conn *MongoConn) Close()
- func (conn *MongoConn) CurrentDate() bson.MongoTimestamp
- func (conn *MongoConn) HasOplogNs() bool
- func (conn *MongoConn) HasUniqueIndex() bool
- func (conn *MongoConn) IsGood() bool
- func (conn *MongoConn) IsMongos() bool
- type MongoSource
- type NS
- type OpsCounter
- type Pair
- type Qos
- type ReplicationMetric
- func (metric *ReplicationMetric) AddApply(incr uint64)
- func (metric *ReplicationMetric) AddCheckpoint(number uint64)
- func (metric *ReplicationMetric) AddConsume(incr uint64)
- func (metric *ReplicationMetric) AddFailed(incr uint64)
- func (metric *ReplicationMetric) AddFilter(incr uint64)
- func (metric *ReplicationMetric) AddGet(incr uint64)
- func (metric *ReplicationMetric) AddRetransmission(number uint64)
- func (metric *ReplicationMetric) AddSuccess(incr uint64)
- func (metric *ReplicationMetric) AddTableOps(table string, n uint64)
- func (metric *ReplicationMetric) AddTunnelTraffic(number uint64)
- func (metric *ReplicationMetric) AddWriteFailed(incr uint64)
- func (metric *ReplicationMetric) Apply() uint64
- func (metric *ReplicationMetric) Close()
- func (metric *ReplicationMetric) Get() uint64
- func (metric *ReplicationMetric) SetLSN(lsn int64)
- func (metric *ReplicationMetric) SetLSNACK(ack int64)
- func (metric *ReplicationMetric) SetLSNCheckpoint(ckpt int64)
- func (metric *ReplicationMetric) SetOplogAvg(size int64)
- func (metric *ReplicationMetric) SetOplogMax(max int64)
- func (metric *ReplicationMetric) String() string
- func (metric *ReplicationMetric) Success() uint64
- func (metric *ReplicationMetric) TableOps() map[string]uint64
- func (metric *ReplicationMetric) Tps() uint64
- type ReplicationStatus
- type Sentinel
- type TableOps
- type TimestampNode
Constants ¶
const ( GlobalDiagnosticPath = "diagnostic" // This is the time of golang was born to the world GolangSecurityTime = "2006-01-02T15:04:05Z" WorkGood uint64 = 0 GetReady uint64 = 1 FetchBad uint64 = 2 TunnelSendBad uint64 = 4 TunnelSyncBad uint64 = 8 ReplicaExecBad uint64 = 16 MajorityWriteConcern = "majority" Int32max = (int64(1) << 32) - 1 )
const ( DBRefRef = "$ref" DBRefId = "$id" DBRefDb = "$db" CollectionCapped = "CollectionScan died due to position in capped" // bigger than 3.0 CollectionCappedLowVersion = "UnknownError" // <= 3.0 version )
const ( // log VarLogLevelDebug = "debug" VarLogLevelInfo = "info" VarLogLevelWarning = "warning" VarLogLevelError = "error" // sync mode VarSyncModeAll = "all" VarSyncModeIncr = "incr" VarSyncModeFull = "full" // mongo connect mode VarMongoConnectModePrimary = "primary" VarMongoConnectModeSecondaryPreferred = "secondaryPreferred" VarMongoConnectModeSecondary = "secondary" VarMongoConnectModeNearset = "nearest" VarMongoConnectModeStandalone = "standalone" // full_sync.create_index VarFullSyncCreateIndexNone = "none" VarFullSyncCreateIndexForeground = "foreground" VarFullSyncCreateIndexBackground = "background" // incr_sync.mongo_fetch_method VarIncrSyncMongoFetchMethodOplog = "oplog" VarIncrSyncMongoFetchMethodChangeStream = "change_stream" // incr_sync.shard_key VarIncrSyncShardKeyAuto = "auto" VarIncrSyncShardKeyId = "id" VarIncrSyncShardKeyCollection = "collection" // incr_sync.worker.oplog_compressor VarIncrSyncWorkerOplogCompressorNone = "none" VarIncrSyncWorkerOplogCompressorGzip = "gzip" VarIncrSyncWorkerOplogCompressorZlib = "zlib" VarIncrSyncWorkerOplogCompressorDeflate = "deflate" VarIncrSyncWorkerOplogCompressorSnappy = "snappy" // incr_sync.tunnel VarTunnelDirect = "direct" VarTunnelRpc = "rpc" VarTunnelFile = "file" VarTunnelTcp = "tcp" VarTunnelKafka = "kafka" VarTunnelMock = "mock" // incr_sync.tunnel.message VarTunnelMessageRaw = "raw" VarTunnelMessageJson = "json" VarTunnelMessageBson = "bson" // incr_sync.conflict_write_to VarIncrSyncConflictWriteToNone = "none" VarIncrSyncConflictWriteToDb = "db" VarIncrSyncConflictWriteToSdk = "sdk" // checkpoint.storage.db VarCheckpointStorageDbReplicaDefault = "mongoshake" VarCheckpointStorageDbShardingDefault = "admin" VarCheckpointStorageCollectionDefault = "ckpt_default" // inner variable: checkpoint.storage VarCheckpointStorageApi = "api" VarCheckpointStorageDatabase = "database" // innder variable: incr_sync.reader_debug VarIncrSyncReaderDebugNone = "" VarIncrSyncReaderDebugDiscard = "discard" // throw all VarIncrSyncReaderDebugPrint = "print" // print )
const ( FetchStageStoreUnknown int32 = 0 FetchStageStoreDiskNoApply int32 = 1 FetchStageStoreDiskApply int32 = 2 FetchStageStoreMemoryApply int32 = 3 )
const ( JournalNothingOnDefault = iota JournalSampling JournalAll )
const ( FrequentInSeconds = 5 TimeFormat string = "2006-01-02 15:04:05" )
const ( KB = 1024 MB = 1024 * KB GB = 1024 * MB TB = 1024 * GB PB = 1024 * TB )
const ( METRIC_NONE = 0x0000000000000000 METRIC_CKPT_TIMES = 0x0000000000000001 METRIC_TUNNEL_TRAFFIC = 0x0000000000000010 METRIC_LSN = 0x0000000000000100 METRIC_RETRANSIMISSION = 0x0000000000001000 METRIC_TPS = 0x0000000000010000 METRIC_SUCCESS = 0x0000000000100000 METRIC_WORKER = 0x0000000001000000 // worker metric METRIC_FULLSYNC_WRITE = 0x0000000010000000 // full sync writer METRIC_FILTER = 0x0000000100000000 )
const ( TypeFull = "full" TypeIncr = "incr" )
const ( OplogNS = "oplog.rs" ReadWriteConcernDefault = "" ReadWriteConcernLocal = "local" ReadWriteConcernAvailable = "available" // for >= 3.6 ReadWriteConcernMajority = "majority" ReadWriteConcernLinearizable = "linearizable" )
const (
BufferCapacity = 4 * 1024 * 1024
)
const (
OpsMax = 'z' - 'A'
)
const (
SampleFrequency = 1000
)
Variables ¶
var ( AppDatabase = VarCheckpointStorageDbReplicaDefault APPConflictDatabase = VarCheckpointStorageDbReplicaDefault + "_conflict" )
var ( FcvCheckpoint = Checkpoint{ CurrentVersion: 2, FeatureCompatibleVersion: 1, } FcvConfiguration = Configuration{ CurrentVersion: 7, FeatureCompatibleVersion: 3, } LowestCheckpointVersion = map[int]string{ 0: "1.0.0", 1: "2.4.0", 2: "2.4.6", } LowestConfigurationVersion = map[int]string{ 0: "1.0.0", 1: "2.4.0", 2: "2.4.1", 3: "2.4.3", 4: "2.4.6", 5: "2.4.7", 6: "2.4.12", 7: "2.4.17", } )
var ( FullSyncHttpApi *nimo.HttpRestProvider IncrSyncHttpApi *nimo.HttpRestProvider )
var BRANCH = "$"
Build info
var FullSentinelOptions struct { TPS int64 }
only used in full sync.
var (
GetAllTimestampInUTInput map[string]Pair // replicaSet/MongoS name => <oldest timestamp, newest timestamp>
)
for UT only
var IncrSentinelOptions struct { OplogDump int64 DuplicatedDump bool Pause bool TPS int64 TargetDelay int64 ExitPoint int64 // 32 bits timestamp Shutdown bool // close shake }
IncrSentinelOptions. option's value type should be String or Bool or Int64 only used in incremental sync.
var JournalFilePattern = GlobalDiagnosticPath + string(filepath.Separator) + "%s.journal"
var (
QueryTs = "ts"
)
var SIGNALPROFILE = "$"
var SIGNALSTACK = "$"
Functions ¶
func AdjustDBRef ¶
deprecated adjust dbRef order: $ref, $id, $db, others
func ApplyOpsFilter ¶
func BlockMongoUrlPassword ¶
*
- block password in mongo_urls:
- two kind mongo_urls:
- 1. mongodb://username:password@address
- 2. username:password@address
func ExtractMongoTimestamp ¶
func ExtractMongoTimestamp(ts interface{}) int64
func ExtractMongoTimestampCounter ¶
func ExtractMongoTimestampCounter(ts interface{}) int64
func ExtractTimestampForLog ¶
func ExtractTimestampForLog(ts interface{}) string
func FindFirstErrorIndexAndMessage ¶
used to handle bulk return error
func FullSyncInitHttpApi ¶
func FullSyncInitHttpApi(port int)
func GetAllTimestamp ¶
func GetAllTimestamp(sources []*MongoSource) (map[string]TimestampNode, bson.MongoTimestamp, bson.MongoTimestamp, bson.MongoTimestamp, bson.MongoTimestamp, error)
* get all newest timestamp * return: * map: whole timestamp map, key: replset name, value: struct that includes the newest and oldest timestamp * bson.MongoTimestamp: the biggest of the newest timestamp * bson.MongoTimestamp: the smallest of the newest timestamp * error: error
func GetAllTimestampInUT ¶
func GetAllTimestampInUT() (map[string]TimestampNode, bson.MongoTimestamp, bson.MongoTimestamp, bson.MongoTimestamp, bson.MongoTimestamp, error)
only used in unit test
func GetAndCompareVersion ¶
get current db version and compare to threshold. Return whether the result is bigger or equal to the input threshold.
func GetDBVersion ¶
get db version, return string with format like "3.0.1"
func GetMetricWithSize ¶
func GetMetricWithSize(input interface{}) string
func GetNewestTimestampBySession ¶
func GetNewestTimestampBySession(session *mgo.Session) (bson.MongoTimestamp, error)
get newest oplog
func GetNewestTimestampByUrl ¶
func GetNewestTimestampByUrl(url string, fromMongoS bool) (bson.MongoTimestamp, error)
func GetOldestTimestampBySession ¶
func GetOldestTimestampBySession(session *mgo.Session) (bson.MongoTimestamp, error)
get oldest oplog
func GetOldestTimestampByUrl ¶
func GetOldestTimestampByUrl(url string, fromMongoS bool) (bson.MongoTimestamp, error)
func HasDuplicated ¶
func HasUniqueIndex ¶
func IncrSyncInitHttpApi ¶
func IncrSyncInitHttpApi(port int)
func InitialLogger ¶
func Int64ToString ¶
func IsCollectionCappedError ¶
func IsFromMongos ¶
func IsNotFound ¶
func JournalFileName ¶
func LogFetchStage ¶
func MayBeRandom ¶
func ParseIntFromInterface ¶
func RunStatusMessage ¶
func TimestampToInt64 ¶
func TimestampToInt64(ts bson.MongoTimestamp) int64
func TimestampToString ¶
func WritePidById ¶
Types ¶
type ChangeStreamConn ¶
type ChangeStreamConn struct { Client *mongo.Client CsHandler *mongo.ChangeStream // contains filtered or unexported fields }
func NewChangeStreamConn ¶
func (*ChangeStreamConn) Close ¶
func (csc *ChangeStreamConn) Close()
func (*ChangeStreamConn) GetNext ¶
func (csc *ChangeStreamConn) GetNext() (bool, []byte)
func (*ChangeStreamConn) IsNotNil ¶
func (csc *ChangeStreamConn) IsNotNil() bool
type Checkpoint ¶
type Checkpoint struct { /* * version: 0(or set not), MongoShake < 2.4, fcv == 0 * version: 1, MongoShake == 2.4, 0 < fcv <= 1 */ CurrentVersion int FeatureCompatibleVersion int }
for checkpoint
func (Checkpoint) IsCompatible ¶
func (c Checkpoint) IsCompatible(v int) bool
type Configuration ¶
type Configuration struct { /* * version: 0(or set not), MongoShake < 2.4.0, fcv == 0 * version: 1, MongoShake == 2.4.0, 0 <= fcv <= 1 */ CurrentVersion int FeatureCompatibleVersion int }
for configuration
func (Configuration) IsCompatible ¶
func (c Configuration) IsCompatible(v int) bool
type ElapsedTask ¶
type ElapsedTask struct { // timer trigger TimeLimit int64 // batch trigger BatchLimit int64 // contains filtered or unexported fields }
func NewThresholder ¶
func NewThresholder(timeLimit, batchLimit int64) *ElapsedTask
func (*ElapsedTask) Reset ¶
func (thresholder *ElapsedTask) Reset()
func (*ElapsedTask) Triiger ¶
func (thresholder *ElapsedTask) Triiger() bool
type Int64Slice ¶
type Int64Slice []int64
func (Int64Slice) Len ¶
func (p Int64Slice) Len() int
func (Int64Slice) Less ¶
func (p Int64Slice) Less(i, j int) bool
func (Int64Slice) Swap ¶
func (p Int64Slice) Swap(i, j int)
type Journal ¶
type Journal struct {
// contains filtered or unexported fields
}
func NewJournal ¶
func (*Journal) WriteRecord ¶
func (j *Journal) WriteRecord(oplog *oplog.PartialLog)
type MetricDelta ¶
struct used to mark the metric delta. Value: current value Delta: the difference between current value and previous value previous: store the previous value
func (*MetricDelta) Update ¶
func (o *MetricDelta) Update()
type MongoCommunityConn ¶
type MongoCommunityConn struct { Client *mongo.Client URL string // contains filtered or unexported fields }
func NewMongoCommunityConn ¶
type MongoConn ¶
func NewMongoConn ¶
func (*MongoConn) AcquireReplicaSetName ¶
func (*MongoConn) CurrentDate ¶
func (conn *MongoConn) CurrentDate() bson.MongoTimestamp
func (*MongoConn) HasOplogNs ¶
func (*MongoConn) HasUniqueIndex ¶
type MongoSource ¶
func (*MongoSource) String ¶
func (ms *MongoSource) String() string
type OpsCounter ¶
type OpsCounter struct {
// contains filtered or unexported fields
}
one writer and multi readers
func (*OpsCounter) Add ¶
func (opsCounter *OpsCounter) Add(char byte, v uint64)
func (*OpsCounter) Map ¶
func (opsCounter *OpsCounter) Map() map[string]uint64
type Qos ¶
type Qos struct { Limit int64 // qps, <= 0 means disable limit Ticket int64 // one tick size, default is 1 // contains filtered or unexported fields }
func (*Qos) FetchBucket ¶
func (q *Qos) FetchBucket()
type ReplicationMetric ¶
type ReplicationMetric struct { NAME string STAGE string SUBSCRIBE uint64 OplogFilter MetricDelta OplogGet MetricDelta OplogConsume MetricDelta OplogApply MetricDelta OplogSuccess MetricDelta OplogFail MetricDelta OplogWriteFail MetricDelta // full: write failed. currently, only used in full sync stage. CheckpointTimes uint64 Retransmission uint64 TunnelTraffic uint64 LSN int64 LSNAck int64 LSNCheckpoint int64 OplogMaxSize int64 OplogAvgSize int64 TableOperations *TableOps // replication status ReplStatus ReplicationStatus // contains filtered or unexported fields }
func NewMetric ¶
func NewMetric(name, stage string, subscribe uint64) *ReplicationMetric
func (*ReplicationMetric) AddApply ¶
func (metric *ReplicationMetric) AddApply(incr uint64)
func (*ReplicationMetric) AddCheckpoint ¶
func (metric *ReplicationMetric) AddCheckpoint(number uint64)
func (*ReplicationMetric) AddConsume ¶
func (metric *ReplicationMetric) AddConsume(incr uint64)
func (*ReplicationMetric) AddFailed ¶
func (metric *ReplicationMetric) AddFailed(incr uint64)
func (*ReplicationMetric) AddFilter ¶
func (metric *ReplicationMetric) AddFilter(incr uint64)
func (*ReplicationMetric) AddGet ¶
func (metric *ReplicationMetric) AddGet(incr uint64)
func (*ReplicationMetric) AddRetransmission ¶
func (metric *ReplicationMetric) AddRetransmission(number uint64)
func (*ReplicationMetric) AddSuccess ¶
func (metric *ReplicationMetric) AddSuccess(incr uint64)
func (*ReplicationMetric) AddTableOps ¶
func (metric *ReplicationMetric) AddTableOps(table string, n uint64)
func (*ReplicationMetric) AddTunnelTraffic ¶
func (metric *ReplicationMetric) AddTunnelTraffic(number uint64)
func (*ReplicationMetric) AddWriteFailed ¶
func (metric *ReplicationMetric) AddWriteFailed(incr uint64)
func (*ReplicationMetric) Apply ¶
func (metric *ReplicationMetric) Apply() uint64
func (*ReplicationMetric) Close ¶
func (metric *ReplicationMetric) Close()
func (*ReplicationMetric) Get ¶
func (metric *ReplicationMetric) Get() uint64
func (*ReplicationMetric) SetLSN ¶
func (metric *ReplicationMetric) SetLSN(lsn int64)
func (*ReplicationMetric) SetLSNACK ¶
func (metric *ReplicationMetric) SetLSNACK(ack int64)
func (*ReplicationMetric) SetLSNCheckpoint ¶
func (metric *ReplicationMetric) SetLSNCheckpoint(ckpt int64)
func (*ReplicationMetric) SetOplogAvg ¶
func (metric *ReplicationMetric) SetOplogAvg(size int64)
func (*ReplicationMetric) SetOplogMax ¶
func (metric *ReplicationMetric) SetOplogMax(max int64)
func (*ReplicationMetric) String ¶
func (metric *ReplicationMetric) String() string
func (*ReplicationMetric) Success ¶
func (metric *ReplicationMetric) Success() uint64
func (*ReplicationMetric) TableOps ¶
func (metric *ReplicationMetric) TableOps() map[string]uint64
func (*ReplicationMetric) Tps ¶
func (metric *ReplicationMetric) Tps() uint64
type ReplicationStatus ¶
type ReplicationStatus uint64
func (*ReplicationStatus) Clear ¶
func (status *ReplicationStatus) Clear(s uint64)
func (*ReplicationStatus) GetStatusString ¶
func (status *ReplicationStatus) GetStatusString() string
func (*ReplicationStatus) IsGood ¶
func (status *ReplicationStatus) IsGood() bool
func (*ReplicationStatus) Update ¶
func (status *ReplicationStatus) Update(s uint64)
type Sentinel ¶
type Sentinel struct {
// contains filtered or unexported fields
}
func NewSentinel ¶
type TableOps ¶
TableOps, count collection operations
func NewTableOps ¶
func NewTableOps() *TableOps
type TimestampNode ¶
type TimestampNode struct { Oldest bson.MongoTimestamp Newest bson.MongoTimestamp }
record the oldest and newest timestamp of each mongod