Documentation ¶
Overview ¶
Package mongodbio contains transforms for reading from and writing to MongoDB.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Read ¶
func Read( s beam.Scope, uri string, database string, collection string, t reflect.Type, opts ...ReadOptionFn, ) beam.PCollection
Read reads a MongoDB collection and returns a PCollection<T> for a given type T. T must be a struct with exported fields that should have a "bson" tag. By default, the transform uses the MongoDB internal splitVector command to split the collection into bundles. The transform can be configured to use the $bucketAuto aggregation instead to support reading from MongoDB Atlas where the splitVector command is not allowed. This is enabled by passing the ReadOptionFn WithReadBucketAuto(true).
The Read transform has the required parameters:
- s: the scope of the pipeline
- uri: the MongoDB connection string
- database: the MongoDB database to read from
- collection: the MongoDB collection to read from
- t: the type of the elements in the collection
The Read transform takes a variadic number of ReadOptionFn which can set the ReadOption fields:
- BucketAuto: whether to use the bucketAuto aggregation to split the collection into bundles. Defaults to false
- Filter: a bson.M map that is used to filter the documents in the collection. Defaults to nil, which means no filter is applied
- BundleSize: the size in bytes to bundle the documents into when reading. Defaults to 64 * 1024 * 1024 (64 MB)
Example (Default) ¶
package main import ( "context" "log" "reflect" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/io/mongodbio" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/debug" "go.mongodb.org/mongo-driver/bson/primitive" ) func main() { type Event struct { ID primitive.ObjectID `bson:"_id"` Timestamp int64 `bson:"timestamp"` EventType int32 `bson:"event_type"` } beam.Init() p, s := beam.NewPipelineWithRoot() col := mongodbio.Read( s, "mongodb://localhost:27017", "demo", "events", reflect.TypeOf(Event{}), ) debug.Print(s, col) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
Example (Options) ¶
package main import ( "context" "log" "reflect" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/io/mongodbio" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/debug" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) func main() { type Event struct { ID primitive.ObjectID `bson:"_id"` Timestamp int64 `bson:"timestamp"` EventType int32 `bson:"event_type"` } beam.Init() p, s := beam.NewPipelineWithRoot() col := mongodbio.Read( s, "mongodb://localhost:27017", "demo", "events", reflect.TypeOf(Event{}), mongodbio.WithReadBucketAuto(true), mongodbio.WithReadBundleSize(32*1024*1024), mongodbio.WithReadFilter(bson.M{"timestamp": bson.M{"$gt": 1640995200000}}), ) debug.Print(s, col) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
func Write ¶
func Write( s beam.Scope, uri string, database string, collection string, col beam.PCollection, opts ...WriteOptionFn, ) beam.PCollection
Write writes a PCollection<T> of a type T to MongoDB. T must be a struct with exported fields that should have a "bson" tag. If the struct has a field with the bson tag "_id", the value of that field will be used as the id of the document. Otherwise, a new id field of type primitive.ObjectID will be generated for each document. Write returns a PCollection<K> of the inserted id values with type K.
The Write transform has the required parameters:
- s: the scope of the pipeline
- uri: the MongoDB connection string
- database: the MongoDB database to write to
- collection: the MongoDB collection to write to
- col: the PCollection to write to MongoDB
The Write transform takes a variadic number of WriteOptionFn which can set the WriteOption fields:
- BatchSize: the number of documents to write in a single batch. Defaults to 1000
- Ordered: whether to execute the writes in order. Defaults to true
Example (Default) ¶
package main import ( "context" "log" "time" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/io/mongodbio" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "go.mongodb.org/mongo-driver/bson/primitive" ) func main() { type Event struct { ID primitive.ObjectID `bson:"_id"` Timestamp int64 `bson:"timestamp"` EventType int32 `bson:"event_type"` } beam.Init() p, s := beam.NewPipelineWithRoot() input := []Event{ { ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)), Timestamp: 1640995200001, EventType: 1, }, { ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)), Timestamp: 1640995200002, EventType: 2, }, } col := beam.CreateList(s, input) mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
Example (GenerateID) ¶
package main import ( "context" "log" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/io/mongodbio" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/debug" ) func main() { type Event struct { Timestamp int64 `bson:"timestamp"` EventType int32 `bson:"event_type"` } beam.Init() p, s := beam.NewPipelineWithRoot() input := []Event{ { Timestamp: 1640995200001, EventType: 1, }, { Timestamp: 1640995200002, EventType: 1, }, } col := beam.CreateList(s, input) ids := mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col) debug.Print(s, ids) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
Example (Options) ¶
package main import ( "context" "log" "time" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/io/mongodbio" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "go.mongodb.org/mongo-driver/bson/primitive" ) func main() { type Event struct { ID primitive.ObjectID `bson:"_id"` Timestamp int64 `bson:"timestamp"` EventType int32 `bson:"event_type"` } beam.Init() p, s := beam.NewPipelineWithRoot() input := []Event{ { ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)), Timestamp: 1640995200001, EventType: 1, }, { ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)), Timestamp: 1640995200002, EventType: 2, }, } col := beam.CreateList(s, input) mongodbio.Write( s, "mongodb://localhost:27017", "demo", "events", col, mongodbio.WithWriteBatchSize(500), mongodbio.WithWriteOrdered(false), ) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
Types ¶
type ReadOption ¶
ReadOption represents options for reading from MongoDB.
type ReadOptionFn ¶
type ReadOptionFn func(option *ReadOption) error
ReadOptionFn is a function that configures a ReadOption.
func WithReadBucketAuto ¶
func WithReadBucketAuto(bucketAuto bool) ReadOptionFn
WithReadBucketAuto configures the ReadOption whether to use the bucketAuto aggregation stage.
func WithReadBundleSize ¶
func WithReadBundleSize(bundleSize int64) ReadOptionFn
WithReadBundleSize configures the ReadOption to use the provided bundle size in bytes.
func WithReadFilter ¶
func WithReadFilter(filter bson.M) ReadOptionFn
WithReadFilter configures the ReadOption to use the provided filter.
type WriteOption ¶
WriteOption represents options for writing to MongoDB.
type WriteOptionFn ¶
type WriteOptionFn func(option *WriteOption) error
WriteOptionFn is a function that configures a WriteOption.
func WithWriteBatchSize ¶
func WithWriteBatchSize(batchSize int64) WriteOptionFn
WithWriteBatchSize configures the WriteOption to use the provided batch size when writing documents.
func WithWriteOrdered ¶
func WithWriteOrdered(ordered bool) WriteOptionFn
WithWriteOrdered configures the WriteOption whether to apply an ordered bulk write.