monresql

package module
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2024 License: MIT Imports: 22 Imported by: 0

README

Monresql: MongoDB to PostgreSQL Data Pipeline Tool Library

A Go Library for Building Data Pipelines from MongoDB (including MongoDB 6) to PostgreSQL

Overview

Monresql is a specialized library designed to facilitate efficient data replication, transfer, and synchronization from MongoDB to PostgreSQL databases. Inspired by similar tools like Moresql, Monresql focuses on unidirectional data movement, ensuring seamless integration and synchronization between MongoDB documents and PostgreSQL tables.

Description:

Monresql is a robust and efficient Go library designed to help developers build seamless data pipeline applications that transfer data from MongoDB (including the latest MongoDB 6) to PostgreSQL. Inspired by the popular Moresql library, Monresql offers a reliable solution for data migration and synchronization tasks, incorporating the latest MongoDB 6 features and capabilities.

Key Features

  • Data Replication: Efficiently replicate data from MongoDB collections to corresponding PostgreSQL tables.
  • Incremental Updates: Support for incremental updates to keep PostgreSQL data up-to-date without full data reloads.
  • Performance Optimization: Optimizes data transfer processes for minimal latency and optimal resource utilization.

API Reference

LoadFieldsMap()

Loads a mapping file to define how MongoDB documents should be mapped to PostgreSQL tables.

ValidateOrCreatePostgresTable()

Validates the existence of a PostgreSQL table to ensure it's ready for data replication.

Replicate()

Initiates the data replication process from MongoDB to PostgreSQL based on the loaded mapping.

Sync()

Starts the synchronization process, ensuring that changes in MongoDB are reflected in PostgreSQL in real-time and also save the marker to sync from the last stopped mark if the service stopped

NewSyncOptions()

NewSyncOptions will return the pointer of the syncoptions struct with default values of

&syncOptions{checkpoint: true, checkPointPeriod: time.Minute _ 1, lastEpoch: 0, reportPeriod: time.Minute _ 1} then you can edit and change the values by set methods

Getting Started

Installation

To install Monresql, follow these steps: if you like to add or edit the core logics of the codes

  1. Clone the repository:

    git clone https://github.com/yourusername/monresql.git
    
    
  2. get as go library :

    go get github.com/Havenganesh/monresql
    

Credits:

Monresql draws inspiration from the Moresql cli application, acknowledging its contribution to the field of data migration and integration. The development of Monresql has been guided by the core principles and best practices established by Moresql,ensuring a reliable and efficient data pipeline solution.

Example

please find the example code in the github repository

   package main

    import (
    	"context"
    	"fmt"
    	"log"
    	"os"
    	"time"

    	"github.com/jmoiron/sqlx"
    	"go.mongodb.org/mongo-driver/mongo"
    	"go.mongodb.org/mongo-driver/mongo/options"

    	"github.com/Havenganesh/monresql"
    	_ "github.com/lib/pq"
    )

    func main() {
    	dMap, err := monresql.LoadFieldsMap(jsonF)
    	if err != nil {
    		fmt.Println(err)
    		os.Exit(1)
    	}
    	pq := getPostgresConn("postgres://postgres:Success97@localhost:5432/monresql")
    	if pq == nil {
    		os.Exit(1)
    	}
    	defer pq.Close()
    	//validate the postges table
    	_, err = monresql.ValidateOrCreatePostgresTable(dMap, pq)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println("Validation Success, Table looks good")
    	}
    	clint, err := getMongoConn("mongodb://localhost:27017")
    	if err != nil {
    		log.Fatal(err)
    	}
    	monresql.Replicate(dMap, pq, clint, "students")
    	option := monresql.NewSyncOptions()
    	option.SetCheckPointPeriod(time.Second * 5)
    	stop := monresql.Sync(dMap, pq, clint, "students", option)
    	fmt.Println("sync called after")
    	time.Sleep(time.Minute * 1)
    	fmt.Println("waited end")
    	stop()
    	time.Sleep(time.Minute * 10)

    }

    func getPostgresConn(url string) *sqlx.DB {
    	db, err := sqlx.Connect("postgres", url)
    	if err != nil {
    		log.Println(err)
    		os.Exit(1)
    	}

    	// Ping the database to verify the connection.
    	err = db.Ping()
    	if err != nil {
    		log.Println(err)
    		os.Exit(1)
    	}
    	return db
    }

    func getMongoConn(url string) (*mongo.Client, error) {
    	client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(url))
    	if err != nil {
    		log.Println(err)
    		return nil, err
    	}
    	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    	defer cancel()
    	err = client.Ping(ctx, nil)
    	if err != nil {
    		log.Println(err)
    		return nil, err
    	} else {
    		log.Println("mongo connected")
    	}
    	return client, nil
    }

    var jsonF string = `{
    	"monresql": {
    	  "collections": {
    		"students": {
    		  "name": "students",
    		  "pg_table": "students",
    		  "fields": {
    			"_id": "TEXT",
    			"name": "TEXT",
    			"age": "INTEGER",
    			"rollNumber":{
    			"Postgres": {"Name": "rollnumber","Type": "INTEGER"},
    			"Mongo": {"Name": "rollNumber","Type": "INTEGER"}
    			},
    			"dob" : "TIMESTAMP",
    			"subjects" : "JSONB",
    			"class" : "TEXT",
    			"markList": {
    			  "Postgres": {"Name": "marklist","Type": "JSONB"},
    			  "Mongo": {"Name": "markList","Type": "object"}
    			}
    		  }
    		}
    	  }
    	}
      }`


Documentation

Overview

Monresql: MongoDB to PostgreSQL Data Pipeline Tool

Overview Monresql is a specialized library designed to facilitate efficient data replication, transfer, and synchronization from MongoDB to PostgreSQL databases. Inspired by similar tools like Moresql, Monresql focuses on unidirectional data movement, ensuring seamless integration and synchronization between MongoDB documents and PostgreSQL tables.

Key Features Data Replication: Efficiently replicate data from MongoDB collections to corresponding PostgreSQL tables.

Incremental Updates: Support for incremental updates to keep PostgreSQL data up-to-date without full data reloads.

Performance Optimization: Optimizes data transfer processes for minimal latency and optimal resource utilization.

API Reference LoadFieldsMap() Loads a mapping file to define how MongoDB documents should be mapped to PostgreSQL tables.

ValidateOrCreatePostgresTable() Validates the existence of a PostgreSQL table to ensure it's ready for data replication.

Replicate() Initiates the data replication process from MongoDB to PostgreSQL based on the loaded mapping.

Sync() Starts the synchronization process, ensuring that changes in MongoDB are reflected in PostgreSQL in real-time and also save the marker to sync from the last stopped mark if the service stopped

NewSyncOptions() NewSyncOptions will return the pointer of the syncoptions struct with default values of

&syncOptions{checkpoint: true, checkPointPeriod: time.Minute * 1, lastEpoch: 0, reportPeriod: time.Minute * 1} then you can edit and change the values by set methods

Index

Constants

View Source
const COMPLEX string = `` /* 127-byte string literal not displayed */

Variables

This section is empty.

Functions

func LoadFieldsMap

func LoadFieldsMap(jsonString string) (fieldsMap, error)

LoadFieldsMap receive the file as json string and return FieldsMap please refer the moresql config file structure

func NewSyncOptions

func NewSyncOptions() *syncOptions

NewSyncOptions method return the pointer of syncOptions with default values of {checkpoint: true, checkPointPeriod: time.Minute * 1, lastEpoch: 0, reportPeriod: time.Minute * 1} NewSyncOptions give you fucntion to set few options SetCheckPoint() if the checkpoint is true it will save the marker in monresql_metada as time epoch SetCheckPointPeriod() the marker saving period interval SetLastEpoch() if you want run the sync from the known epoch time you can use this method and restart the service SetReportPeriod() it log out the read and write counts in the console

func Replicate

func Replicate(config fieldsMap, pg *sqlx.DB, mongo *mongo.Client, replicaName string) string

if the table is validated you can start the replication using this method please find sample code in the example

func Sync

func Sync(fieldMap fieldsMap, pg *sqlx.DB, client *mongo.Client, syncName string, syncOption *syncOptions) syncStop

if the table is validated you can start the sync using this method and it return a stop function, by using that method you can stop sync anytime please find sample code in the example

func ValidateOrCreatePostgresTable

func ValidateOrCreatePostgresTable(fieldMap fieldsMap, pg *sqlx.DB) (string, error)

ValidateOrCreatePostgresTable validates the postgres table fileds against the jsonfile, if table not exists and it is a valid file it creates postgres table, but you must create the Database yourself, always use the small letters in the postgres fields name please refer the complex structure

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL