Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Cmd = &cobra.Command{ Use: "mongoload <graph>", Short: "Directly load data into mongodb", Long: ``, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { if vertexFile == "" && edgeFile == "" && dirPath == "" { return fmt.Errorf("no edge or vertex files were provided") } graph = args[0] log.Infof("Loading data into graph: %s", graph) client, err := mgo.NewClient(options.Client().ApplyURI(mongoHost)) if err != nil { return err } err = client.Connect(context.TODO()) if err != nil { return err } if createGraph { err = mongo.AddMongoGraph(client, database, graph) if err != nil { return err } } vertexCol := client.Database(database).Collection(fmt.Sprintf("%s_vertices", graph)) edgeCol := client.Database(database).Collection(fmt.Sprintf("%s_edges", graph)) if vertexFile != "" { log.Infof("Loading vertex file: %s", vertexFile) vertInserter := db.NewUnorderedBufferedBulkInserter(vertexCol, bulkBufferSize). SetBypassDocumentValidation(true). SetOrdered(false). SetUpsert(true) vertChan, err := util.StreamVerticesFromFile(vertexFile, workerCount) if err != nil { return err } dataChan := vertexSerialize(vertChan, workerCount) count := 0 for d := range dataChan { vertInserter.InsertRaw(d) if count%logRate == 0 { log.Infof("Loaded %d vertices", count) } count++ } vertInserter.Flush() } if edgeFile != "" { log.Infof("Loading edge file: %s", edgeFile) edgeInserter := db.NewUnorderedBufferedBulkInserter(edgeCol, bulkBufferSize). SetBypassDocumentValidation(true). SetOrdered(false). SetUpsert(true) edgeChan, err := util.StreamEdgesFromFile(edgeFile, workerCount) if err != nil { return err } dataChan := edgeSerialize(edgeChan, workerCount) count := 0 for d := range dataChan { edgeInserter.InsertRaw(d) if count%logRate == 0 { log.Infof("Loaded %d edges", count) } count++ } edgeInserter.Flush() } if dirPath != "" { if glob, err := util.DirScan(dirPath, "*.vertex.json.gz"); err == nil { vertexCount := 0 vertInserter := db.NewUnorderedBufferedBulkInserter(vertexCol, bulkBufferSize). SetBypassDocumentValidation(true). SetOrdered(false). SetUpsert(true) for _, vertexFile := range glob { log.Infof("Loading vertex file: %s", vertexFile) vertChan, err := util.StreamVerticesFromFile(vertexFile, workerCount) if err != nil { return err } dataChan := vertexSerialize(vertChan, workerCount) for d := range dataChan { vertInserter.InsertRaw(d) if vertexCount%logRate == 0 { log.Infof("Loaded %d vertices", vertexCount) } vertexCount++ } } vertInserter.Flush() } if glob, err := util.DirScan(dirPath, "*.edge.json.gz"); err == nil { edgeCount := 0 edgeInserter := db.NewUnorderedBufferedBulkInserter(edgeCol, bulkBufferSize). SetBypassDocumentValidation(true). SetOrdered(false). SetUpsert(true) for _, edgeFile := range glob { log.Infof("Loading edge file: %s", edgeFile) edgeChan, err := util.StreamEdgesFromFile(edgeFile, workerCount) if err != nil { return err } dataChan := edgeSerialize(edgeChan, workerCount) for d := range dataChan { edgeInserter.InsertRaw(d) if edgeCount%logRate == 0 { log.Infof("Loaded %d edges", edgeCount) } edgeCount++ } } edgeInserter.Flush() } } return nil }, }
Cmd is the declaration of the command line
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.