Documentation ¶
Overview ¶
Package jdbcio contains cross-language functionality for reading and writing data to JDBC. These transforms only work on runners that support cross-language transforms.
Setup ¶
Transforms specified here are cross-language transforms implemented in a different SDK (listed below). During pipeline construction, the Go SDK will need to connect to an expansion service containing information on these transforms in their native SDK.
To use an expansion service, it must be run as a separate process accessible during pipeline construction. The address of that process must be passed to the transforms in this package.
The version of the expansion service should match the version of the Beam SDK being used. For numbered releases of Beam, these expansions services are released to the Maven repository as modules. For development versions of Beam, it is recommended to build and run it from source using Gradle.
Current supported SDKs, including expansion service modules and reference documentation:
Java:
- Vendored Module: beam-sdks-java-extensions-schemaio-expansion-service
- Run via Gradle: ./gradlew :sdks:java:extensions:schemaio-expansion-service:build java -jar <location_of_jar_file_generated_from_above> <port>
- Reference Class: org.apache.beam.sdk.io.jdbc.JdbcIO
Index ¶
- func ConnectionInitSQLs(initStatements []string) writeOption
- func ExpansionAddrRead(expansionAddr string) readOption
- func ExpansionAddrWrite(expansionAddr string) writeOption
- func FetchSize(size int16) readOption
- func OutputParallelization(status bool) readOption
- func Read(s beam.Scope, tableName, driverClassName, jdbcUrl, username, password string, ...) beam.PCollection
- func ReadClasspaths(classpaths []string) readOption
- func ReadConnectionInitSQLs(initStatements []string) readOption
- func ReadConnectionProperties(properties string) readOption
- func ReadFromPostgres(s beam.Scope, tableName, jdbcUrl, username, password string, outT reflect.Type, ...) beam.PCollection
- func ReadQuery(query string) readOption
- func Write(s beam.Scope, tableName, driverClassName, jdbcUrl, username, password string, ...)
- func WriteClasspaths(classpaths []string) writeOption
- func WriteConnectionProperties(properties string) writeOption
- func WriteStatement(statement string) writeOption
- func WriteToPostgres(s beam.Scope, tableName, jdbcUrl, username, password string, ...)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConnectionInitSQLs ¶
func ConnectionInitSQLs(initStatements []string) writeOption
ConnectionInitSQLs required only for MySql and MariaDB. passed as list of strings.
func ExpansionAddrRead ¶
func ExpansionAddrRead(expansionAddr string) readOption
ExpansionAddrRead sets the expansion service for JDBC IO.
func ExpansionAddrWrite ¶
func ExpansionAddrWrite(expansionAddr string) writeOption
ExpansionAddrWrite sets the expansion service for JDBC IO.
func OutputParallelization ¶
func OutputParallelization(status bool) readOption
OutputParallelization specifies if output parallelization is on.
func Read ¶
func Read(s beam.Scope, tableName, driverClassName, jdbcUrl, username, password string, outT reflect.Type, opts ...readOption) beam.PCollection
Read is a cross-language PTransform which read Rows from the specified database via JDBC. tableName is a required paramater, and by default, the readQuery is generated from it. The generated readQuery can be overridden by passing in a readQuery.If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.
If no additional classpaths are provided using jdbcio.ReadClasspaths() then the default classpath for that driver would be used. As of now, the default classpaths are present only for PostgreSQL and MySQL.
The default read query is "SELECT * FROM tableName;"
Read also accepts optional parameters as readOptions. All optional parameters are predefined in this package as functions that return readOption. To set an optional parameter, call the function within Read's function signature.
Example:
tableName := "roles" driverClassName := "org.postgresql.Driver" username := "root" password := "root123" jdbcUrl := "jdbc:postgresql://localhost:5432/dbname" outT := reflect.TypeOf((*JdbcTestRow)(nil)).Elem() jdbcio.Read(s, tableName, driverClassName, jdbcurl, username, password, outT, jdbcio.ExpansionAddrRead("localhost:9000"))
With Classpath parameter:
jdbcio.Read(s, tableName, driverClassName, jdbcurl, username, password, outT, jdbcio.ExpansionAddrRead("localhost:9000"), jdbcio.ReadClasspaths([]string{"org.postgresql:postgresql:42.3.3"})))
func ReadClasspaths ¶
func ReadClasspaths(classpaths []string) readOption
func ReadConnectionInitSQLs ¶
func ReadConnectionInitSQLs(initStatements []string) readOption
ReadConnectionInitSQLs required only for MySql and MariaDB. passed as list of strings.
func ReadConnectionProperties ¶
func ReadConnectionProperties(properties string) readOption
ReadConnectionProperties specifies properties of the jdbc connection passed as string with format [propertyName=property;]*
func ReadFromPostgres ¶
func ReadFromPostgres(s beam.Scope, tableName, jdbcUrl, username, password string, outT reflect.Type, opts ...readOption) beam.PCollection
ReadFromPostgres is a cross-language PTransform which read Rows from the postgres via JDBC. tableName is a required parameter, and by default, a read query is generated from it. The generated read query can be overridden by passing in a ReadQuery. If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.
The default read query is "SELECT * FROM tableName;"
Read also accepts optional parameters as readOptions. All optional parameters are predefined in this package as functions that return readOption. To set an optional parameter, call the function within Read's function signature. NOTE: This transform uses "org.postgresql.Driver" as the default driver. If you want to use read transform with custom postgres driver then use the conventional jdbcio.Read() transform.
Example:
tableName := "roles" username := "root" password := "root123" jdbcUrl := "jdbc:postgresql://localhost:5432/dbname" outT := reflect.TypeOf((*JdbcTestRow)(nil)).Elem() jdbcio.Read(s, tableName, jdbcurl, username, password, outT, jdbcio.ExpansionAddrRead("localhost:9000"))
func ReadQuery ¶
func ReadQuery(query string) readOption
ReadQuery overrides the default read query "SELECT * FROM tableName;"
func Write ¶
func Write(s beam.Scope, tableName, driverClassName, jdbcUrl, username, password string, col beam.PCollection, opts ...writeOption)
Write is a cross-language PTransform which writes Rows to the specified database via JDBC. tableName is a required parameter, and by default, the write statement is generated from it. The generated write statement can be overridden by passing in a WriteStatement option. If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.
If no additional classpaths are provided using jdbcio.WriteClasspaths() then the default classpath for that driver would be used. As of now, the default classpaths are present only for PostgreSQL and MySQL.
The default write statement is: "INSERT INTO tableName(column1, ...) INTO VALUES(value1, ...)" Example:
tableName := "roles" driverClassName := "org.postgresql.Driver" username := "root" password := "root123" jdbcUrl := "jdbc:postgresql://localhost:5432/dbname" jdbcio.Write(s, tableName, driverClassName, jdbcurl, username, password, jdbcio.ExpansionAddrWrite("localhost:9000"))
With Classpath paramater:
jdbcio.Write(s, tableName, driverClassName, jdbcurl, username, password, jdbcio.ExpansionAddrWrite("localhost:9000"), jdbcio.WriteClasspaths([]string{"org.postgresql:postgresql:42.3.3"}))
func WriteClasspaths ¶
func WriteClasspaths(classpaths []string) writeOption
func WriteConnectionProperties ¶
func WriteConnectionProperties(properties string) writeOption
WriteConnectionProperties properties of the jdbc connection passed as string with format [propertyName=property;].
func WriteStatement ¶
func WriteStatement(statement string) writeOption
WriteStatement option overrides the default write statement of "INSERT INTO tableName(column1, ...) INTO VALUES(value1, ...)".
func WriteToPostgres ¶
func WriteToPostgres(s beam.Scope, tableName, jdbcUrl, username, password string, col beam.PCollection, opts ...writeOption)
WriteToPostgres is a cross-language PTransform which writes Rows to the postgres database via JDBC. tableName is a required parameter, and by default, a write statement is generated from it. The generated write statement can be overridden by passing in a WriteStatement option. If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running. NOTE: This transform uses "org.postgresql.Driver" as the default driver. If you want to use write transform with custom postgres driver then use the conventional jdbcio.Write() transform.
The default write statement is: "INSERT INTO tableName(column1, ...) INTO VALUES(value1, ...)" Example:
tableName := "roles" username := "root" password := "root123" jdbcUrl := "jdbc:postgresql://localhost:5432/dbname" jdbcio.WriteToPostgres(s, tableName, jdbcurl, username, password, jdbcio.ExpansionAddrWrite("localhost:9000"))
Types ¶
This section is empty.