Documentation ¶
Index ¶
- func Ping(connUri string) (bool, error)
- func ReadFiles(filePrefix string, folder string) ([]any, error)
- type ConnectionHandler
- func (m *ConnectionHandler) CollectionExists(collection string) bool
- func (m *ConnectionHandler) ConcurrentBatchInsert(filePrefix, folder string, numWorkers int32, ...) error
- func (m *ConnectionHandler) Connect(connUri string, appName string) func()
- func (m *ConnectionHandler) ExtractResults(mapping string, filePrefix string, fileLocation string, ...) error
- func (m *ConnectionHandler) GetCollection(collectionName string) *mongo.Collection
- func (m *ConnectionHandler) InsertFromFiles(filePrefix, folder string, ...) error
- func (m *ConnectionHandler) StreamingResults(mapping, filePrefix, fileLocation string, batchSize, numWorkers int32, ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ConnectionHandler ¶
type ConnectionHandler struct {
// contains filtered or unexported fields
}
Connection Handler holds a client and database instances
func NewConnectionHandler ¶
func NewConnectionHandler(connUri string, dbName string, appName string) (*ConnectionHandler, func())
func (*ConnectionHandler) CollectionExists ¶
func (m *ConnectionHandler) CollectionExists(collection string) bool
func (*ConnectionHandler) ConcurrentBatchInsert ¶
func (m *ConnectionHandler) ConcurrentBatchInsert( filePrefix, folder string, numWorkers int32, insert func(ctx context.Context, files <-chan string, wg *sync.WaitGroup, coll *mongo.Collection), coll *mongo.Collection) error
This method reads files concurrently, send them to workers that will inserts the values concurrently
func (*ConnectionHandler) Connect ¶
func (m *ConnectionHandler) Connect(connUri string, appName string) func()
func (*ConnectionHandler) ExtractResults ¶
func (m *ConnectionHandler) ExtractResults(mapping string, filePrefix string, fileLocation string, process func([]*bson.M, string, string, string) error, coll *mongo.Collection, filter interface{}, opts ...*options.FindOptions) error
func (*ConnectionHandler) GetCollection ¶
func (m *ConnectionHandler) GetCollection(collectionName string) *mongo.Collection
Retrieve collection
func (*ConnectionHandler) InsertFromFiles ¶
func (m *ConnectionHandler) InsertFromFiles(filePrefix, folder string, walk func(filePrefix string, folder string) ([]any, error), coll *mongo.Collection, opts ...*options.InsertManyOptions) error
This method reads files, populates a slice and inserts into a collection (this may require a huge amount of memory)
func (*ConnectionHandler) StreamingResults ¶
func (m *ConnectionHandler) StreamingResults(mapping, filePrefix, fileLocation string, batchSize, numWorkers int32, process func(ctx context.Context, batchData <-chan []*bson.M, mapping string, wg *sync.WaitGroup, filePrefix string, folder string), coll *mongo.Collection, filter interface{}, opts ...*options.FindOptions) error
Streaming results into a pool of workers Process function should loop through channel. Once the channel is closed, worker should send a Done signal when finished iterating the channel.
Click to show internal directories.
Click to hide internal directories.