Documentation ¶
Index ¶
- Constants
- Variables
- func CollectionCreator() error
- func Compare(filename string) error
- func ConfigCopier() error
- func ConfigureMaskOption(include *Include) error
- func DataCopier() error
- func DataGen(coll *mongo.Collection, total int) (*mongo.InsertManyResult, error)
- func DataGenMulti(db *mongo.Database, total int, nColls int) error
- func DocGen(i int) bson.D
- func DoesDataExist() error
- func DoesFileExist(filename string) bool
- func GetAllReplicas(uri string) ([]string, error)
- func GetDateTime() string
- func GetHTMLTemplate() (*template.Template, error)
- func GetMongoClient(uri string) (*mongo.Client, error)
- func GetMongoClientWait(connstr string, duration ...time.Duration) (*mongo.Client, error)
- func GetQualifiedDBs(client *mongo.Client, metaDB string) ([]string, error)
- func GetQualifiedNamespaces(client *mongo.Client, includeCollection bool, metaDB string) ([]string, error)
- func GetTailableCursor(client *mongo.Client, ts *primitive.Timestamp) (*mongo.Cursor, error)
- func IndexCopier() error
- func IsBalancerEnabled(client *mongo.Client) (bool, error)
- func MaskFields(doc *bson.D, fields []string, method string)
- func Neutrino(version string) error
- func OplogStreamers() error
- func RedactedURI(uri string) string
- func Resume(filename string, extra ...bool) error
- func Simulate(filename string) error
- func SkipOplog(oplog Oplog) bool
- func Splitter(tasks []*Task) error
- func Start(filename string, extra ...bool) error
- func StartSimulation(sim Simulator) error
- func StartWebServer(port int) error
- func Stringify(doc interface{}) string
- func ToFloat64(s interface{}) float64
- func ToInt32(s interface{}) int32
- func ToInt64(s interface{}) int64
- func ValidateMigratorConfig(migrator *Migrator) error
- func Wait() error
- func Worker(id string) error
- type BSONReader
- type BulkWriteOplogsResult
- type Chart
- type ConfigChunk
- type ConfigCollection
- type ConfigDB
- type Include
- type Includes
- type Migrator
- func (inst *Migrator) AddOplogStreamer(streamer *OplogStreamer)
- func (inst *Migrator) CheckIfBalancersDisabled() error
- func (inst *Migrator) DropCollections() error
- func (inst *Migrator) GetToNamespace(ns string) string
- func (inst *Migrator) Included() map[string]*Include
- func (inst *Migrator) IsExit() bool
- func (inst *Migrator) LiveStreamingOplogs()
- func (inst *Migrator) NotifyWorkerExit()
- func (inst *Migrator) Replicas() map[string]string
- func (inst *Migrator) ResetIncludesTo(includes Includes)
- func (inst *Migrator) SkipNamespace(namespace string) bool
- func (inst *Migrator) SourceStats() *mdb.ClusterStats
- func (inst *Migrator) TargetStats() *mdb.ClusterStats
- func (inst *Migrator) Workspace() Workspace
- type Oplog
- type OplogStreamer
- type OplogWriteModel
- type Simulator
- type Task
- type TaskStatusCounts
- type Workspace
- func (ws *Workspace) CleanUpWorkspace() error
- func (ws *Workspace) CountAllStatus() (TaskStatusCounts, error)
- func (ws *Workspace) CreateTaskIndexes() error
- func (ws *Workspace) DropMetaDB() error
- func (ws *Workspace) FindAllParentTasks() ([]*Task, error)
- func (ws *Workspace) FindNextTaskAndUpdate(replset string, updatedBy string, rev int) (*Task, error)
- func (ws *Workspace) GetOplogTimestamp(setName string) *primitive.Timestamp
- func (ws *Workspace) InsertTasks(tasks []*Task) error
- func (ws *Workspace) Log(status string) error
- func (ws *Workspace) LogConfig() error
- func (ws *Workspace) Reset() error
- func (ws *Workspace) ResetLongRunningTasks(ago time.Duration) (int, error)
- func (ws *Workspace) ResetParentTask(task Task) error
- func (ws *Workspace) ResetProcessingTasks() error
- func (ws *Workspace) SaveOplogTimestamp(setName string, ts primitive.Timestamp) error
- func (ws *Workspace) UpdateTask(task *Task) error
Constants ¶
const ( // TaskAdded added TaskAdded = "added" // TaskCompleted completed TaskCompleted = "completed" // TaskFailed failed TaskFailed = "failed" // TaskProcessing processing TaskProcessing = "processing" // TaskSplitting splitting TaskSplitting = "splitting" )
const ( // MaskDefault uses default masking method MaskDefault = "default" // MaskHEX uses HEX masking method MaskHEX = "hex" // MaskPartial uses partial masking method MaskPartial = "partial" )
const ( // DefaultSpool defines default work space DefaultSpool = "./spool" // MaxBlockSize defines max batch size of a task MaxBlockSize = 10000 // MaxNumberWorkers defines max number of concurrent workers MaxNumberWorkers = 16 // NumberWorkers defines max number of concurrent workers NumberWorkers = 8 // Port defines port number to listen to Port = 3629 )
const ( // CommandAll copies all CommandAll = "all" // CommandConfig copies configurations CommandConfig = "config" // CommandData copies data and tail oplogs after completion CommandData = "data" // CommandDataOnly copies data only CommandDataOnly = "data-only" // CommandIndex copies indexes CommandIndex = "index" // CommandOplog tails oplogs CommandOplog = "oplog" )
const ( // BSONSizeLimit set to 10,000 BSONSizeLimit = 16 * mb // CacheDataSizeLimit set to 10,000 CacheDataSizeLimit = 4 * BSONSizeLimit // GZippedBSONFileExt is .bson.gz GZippedBSONFileExt = ".bson.gz" // OplogBatchSize set to 10,000 OplogBatchSize = 10000 )
const ( // DefaultDuration s.duration to simulate DefaultDuration = 5 * time.Minute // DefaultNumOplogs default number of oplogs per thread DefaultNumOplogs = 300 )
const ( // MaxBatchDataSize size of a insert batch MaxBatchDataSize = (64 * mb) // MaxBatchSize size of a insert batch MaxBatchSize = 1000 )
const ( // FavIcon favicon FavIcon = `` /* 940-byte string literal not displayed */ // LogoPNG hummingbird PNG LogoPNG = `` /* 5132-byte string literal not displayed */ )
const ( // MetaDBName defines default meta database name MetaDBName = "_neutrino" // MetaLogs defines default meta oplogs collection name MetaLogs = "logs" // MetaOplogs defines default meta oplogs collection name MetaOplogs = "oplogs" // MetaTasks defines default meta tasks collection name MetaTasks = "tasks" )
const HTMLTemplate = `` /* 4019-byte string literal not displayed */
HTMLTemplate stores contents
const (
// NumberSplitters number of splitters
NumberSplitters = 4
)
Variables ¶
var ( // Rainbow colors Rainbow = []string{"Red", "Orange", "Yellow", "Green", "Blue", "Indigo", "Violet"} )
Functions ¶
func CollectionCreator ¶
func CollectionCreator() error
CollectionCreator creates collections at target
func ConfigCopier ¶
func ConfigCopier() error
ConfigCopier copies configuration including indexes from source to target
func ConfigureMaskOption ¶
ConfigureMaskOption assigns mask option
func DataGen ¶
func DataGen(coll *mongo.Collection, total int) (*mongo.InsertManyResult, error)
DataGen populate data
func DataGenMulti ¶
DataGenMulti populate data into different collections
func DoesFileExist ¶
DoesFileExist returns true if file exists
func GetAllReplicas ¶
GetAllReplicas return all connections strings from an URI
func GetHTMLTemplate ¶
GetHTMLTemplate returns HTML template
func GetMongoClient ¶
GetMongoClient returns a mongo client by a connection string
func GetMongoClientWait ¶
GetMongoClientWait waits and returns mongo client
func GetQualifiedDBs ¶
GetQualifiedDBs returns a list of qualified database names
func GetQualifiedNamespaces ¶
func GetQualifiedNamespaces(client *mongo.Client, includeCollection bool, metaDB string) ([]string, error)
GetQualifiedNamespaces returns a list of qualified namespace names
func GetTailableCursor ¶
GetTailableCursor returns a tailable cursor
func IsBalancerEnabled ¶
IsBalancerEnabled checks if balancer is enabled
func MaskFields ¶
MaskFields mask all matched fields by traversing a doc
func OplogStreamers ¶
func OplogStreamers() error
OplogStreamers copies oplogs from source to target
func StartSimulation ¶
StartSimulation starts a simulation
func StartWebServer ¶
StartWebServer start an http server at port 3629
func ValidateMigratorConfig ¶
ValidateMigratorConfig validates configuration from a file
Types ¶
type BSONReader ¶
type BSONReader struct {
Stream io.ReadCloser
}
BSONReader stores bson reader info
func NewBSONReader ¶
func NewBSONReader(filename string) (*BSONReader, error)
NewBSONReader returns a bson reader
type BulkWriteOplogsResult ¶
type BulkWriteOplogsResult struct { DeletedCount int64 InsertedCount int64 ModifiedCount int64 UpsertedCount int64 TotalCount int64 }
BulkWriteOplogsResult stores results
func BulkWriteOplogs ¶
func BulkWriteOplogs(oplogs []Oplog) (*BulkWriteOplogsResult, error)
BulkWriteOplogs applies oplogs in bulk
type ConfigChunk ¶
type ConfigChunk struct { Namespace string `json:"ns" bson:"ns"` Max bson.D `json:"max" bson:"max"` Min bson.D `json:"min" bson:"min"` Shard string `json:"shard" bson:"shard"` }
ConfigChunk contains config.chunks
type ConfigCollection ¶
type ConfigCollection struct { ID string `bson:"_id"` DefaultCollation bson.D `bson:"defaultCollation"` Dropped bool `bson:"dropped"` Key bson.D `bson:"key"` Unique bool `bson:"unique"` }
ConfigCollection contains config.collections
type ConfigDB ¶
type ConfigDB struct { ID string `bson:"_id"` Partitioned bool `bson:"partitioned"` Primary string `bson:"primary"` }
ConfigDB contains config.databases
type Include ¶
type Include struct { Filter bson.D `bson:"filter,omitempty"` Limit int64 `bson:"limit,omitempty"` Masks []string `bson:"masks,omitempty"` Method string `bson:"method,omitempty"` Namespace string `bson:"namespace"` To string `bson:"to,omitempty"` }
Include stores namespace and query
type Migrator ¶
type Migrator struct { Block int `bson:"block,omitempty"` Command string `bson:"command"` Includes Includes `bson:"includes,omitempty"` IsDrop bool `bson:"drop,omitempty"` License string `bson:"license,omitempty"` Port int `bson:"port,omitempty"` Source string `bson:"source"` Spool string `bson:"spool,omitempty"` Target string `bson:"target"` Verbose bool `bson:"verbose,omitempty"` Workers int `bson:"workers,omitempty"` Yes bool `bson:"yes,omitempty"` // contains filtered or unexported fields }
Migrator stores migration configurations
func GetMigratorInstance ¶
func GetMigratorInstance() *Migrator
GetMigratorInstance returns Migratro migratorInstance
func NewMigratorInstance ¶
NewMigratorInstance sets and returns a migrator instance
func ReadMigratorConfig ¶
ReadMigratorConfig validates configuration from a file
func (*Migrator) AddOplogStreamer ¶
func (inst *Migrator) AddOplogStreamer(streamer *OplogStreamer)
AddOplogStreamer returns isExit
func (*Migrator) CheckIfBalancersDisabled ¶
CheckIfBalancersDisabled check if both source and target balancers are disabled
func (*Migrator) DropCollections ¶
DropCollections drops all qualified collections
func (*Migrator) GetToNamespace ¶
GetToNamespace returns target namespace
func (*Migrator) LiveStreamingOplogs ¶
func (inst *Migrator) LiveStreamingOplogs()
LiveStreamingOplogs set isExit to true
func (*Migrator) NotifyWorkerExit ¶
func (inst *Migrator) NotifyWorkerExit()
NotifyWorkerExit set isExit to true
func (*Migrator) ResetIncludesTo ¶
ResetIncludesTo is a convenient function for go tests
func (*Migrator) SkipNamespace ¶
SkipNamespace skips namespace
func (*Migrator) SourceStats ¶
func (inst *Migrator) SourceStats() *mdb.ClusterStats
SourceStats returns stats
func (*Migrator) TargetStats ¶
func (inst *Migrator) TargetStats() *mdb.ClusterStats
TargetStats returns stats
type Oplog ¶
type Oplog struct { Hash *int64 `bson:"h"` Namespace string `bson:"ns"` Object bson.D `bson:"o"` Operation string `bson:"op"` Query bson.D `bson:"o2,omitempty"` Term *int64 `bson:"t"` Timestamp primitive.Timestamp `bson:"ts"` Version int `bson:"v"` }
Oplog stores an oplog
type OplogStreamer ¶
type OplogStreamer struct { SetName string Spool string URI string // contains filtered or unexported fields }
OplogStreamer tails oplogs
func (*OplogStreamer) ApplyCachedOplogs ¶
func (p *OplogStreamer) ApplyCachedOplogs() (string, error)
ApplyCachedOplogs applies cached oplogs to target
func (*OplogStreamer) CacheOplogs ¶
func (p *OplogStreamer) CacheOplogs() error
CacheOplogs store oplogs in files
func (*OplogStreamer) LiveStream ¶
func (p *OplogStreamer) LiveStream()
LiveStream begin applying oplogs to target
func (*OplogStreamer) LiveStreamOplogs ¶
func (p *OplogStreamer) LiveStreamOplogs(ts *primitive.Timestamp) error
LiveStreamOplogs stream and apply oplogs
type OplogWriteModel ¶
type OplogWriteModel struct { Namespace string Operation string WriteModel mongo.WriteModel }
OplogWriteModel stores namespace and writeModel
func GetWriteModels ¶
func GetWriteModels(oplog Oplog) []OplogWriteModel
GetWriteModels returns WriteModel from an oplog
type Simulator ¶
type Simulator struct { Namespaces []string `bson:"namespaces"` Threads struct { Find int `bson:"find"` Insert int `bson:"insert"` Write int `bson:"write"` } `bson:"threads"` Seconds int `bson:"seconds_to_run"` NumOplogs int `bson:"oplogs_per_second"` URI string `bson:"uri"` Verbose bool `bson:"verbose"` // contains filtered or unexported fields }
Simulator stores simulation info
type Task ¶
type Task struct { BeginTime time.Time `bson:"begin_time"` EndTime time.Time `bson:"end_time"` ID primitive.ObjectID `bson:"_id"` IDs []interface{} `bson:"ids"` Include Include `bson:"include"` Inserted int `bson:"inserted"` Namespace string `bson:"ns"` ParentID *primitive.ObjectID `bson:"parent_id"` SetName string `bson:"replica_set"` SourceCounts int `bson:"source_counts"` Status string `bson:"status"` UpdatedBy string `bson:"updated_by"` }
Task holds migration task information
func (*Task) CopyData ¶
func (p *Task) CopyData(source *mongo.Collection, target *mongo.Collection) error
CopyData copies data
type TaskStatusCounts ¶
type TaskStatusCounts struct { Added int32 Completed int32 Failed int32 Processing int32 Splitting int32 }
TaskStatusCounts stores counts of all status
type Workspace ¶
type Workspace struct {
// contains filtered or unexported fields
}
Workspace stores meta database
func (*Workspace) CleanUpWorkspace ¶
CleanUpWorkspace removes all cached file
func (*Workspace) CountAllStatus ¶
func (ws *Workspace) CountAllStatus() (TaskStatusCounts, error)
CountAllStatus returns task
func (*Workspace) CreateTaskIndexes ¶
CreateTaskIndexes create indexes on tasks collection
func (*Workspace) DropMetaDB ¶
DropMetaDB drops meta database
func (*Workspace) FindAllParentTasks ¶
FindAllParentTasks returns task by replica a set name
func (*Workspace) FindNextTaskAndUpdate ¶
func (ws *Workspace) FindNextTaskAndUpdate(replset string, updatedBy string, rev int) (*Task, error)
FindNextTaskAndUpdate returns task by replica a set name
func (*Workspace) GetOplogTimestamp ¶
GetOplogTimestamp returns timestamp of a shard/replica
func (*Workspace) InsertTasks ¶
InsertTasks inserts tasks to database
func (*Workspace) ResetLongRunningTasks ¶
ResetLongRunningTasks resets long running processing to added
func (*Workspace) ResetParentTask ¶
ResetParentTask resets and deletes all child tasks
func (*Workspace) ResetProcessingTasks ¶
ResetProcessingTasks resets processing status to added
func (*Workspace) SaveOplogTimestamp ¶
SaveOplogTimestamp updates timestamp of a shard/replica
func (*Workspace) UpdateTask ¶
UpdateTask updates task