jdbcio

package
v3.0.0-...-7ba4d6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 7 Imported by: 0

Documentation

Overview

Package jdbcio contains cross-language functionality for reading and writing data to JDBC. These transforms only work on runners that support cross-language transforms.

Setup

Transforms specified here are cross-language transforms implemented in a different SDK (listed below). During pipeline construction, the Go SDK will need to connect to an expansion service containing information on these transforms in their native SDK.

To use an expansion service, it must be run as a separate process accessible during pipeline construction. The address of that process must be passed to the transforms in this package.

The version of the expansion service should match the version of the Beam SDK being used. For numbered releases of Beam, these expansions services are released to the Maven repository as modules. For development versions of Beam, it is recommended to build and run it from source using Gradle.

Current supported SDKs, including expansion service modules and reference documentation:

Java:

  • Vendored Module: beam-sdks-java-extensions-schemaio-expansion-service
  • Run via Gradle: ./gradlew :sdks:java:extensions:schemaio-expansion-service:build java -jar <location_of_jar_file_generated_from_above> <port>
  • Reference Class: org.apache.beam.sdk.io.jdbc.JdbcIO

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConnectionInitSQLs

func ConnectionInitSQLs(initStatements []string) writeOption

ConnectionInitSQLs required only for MySql and MariaDB. passed as list of strings.

func ExpansionAddrRead

func ExpansionAddrRead(expansionAddr string) readOption

ExpansionAddrRead sets the expansion service for JDBC IO.

func ExpansionAddrWrite

func ExpansionAddrWrite(expansionAddr string) writeOption

ExpansionAddrWrite sets the expansion service for JDBC IO.

func FetchSize

func FetchSize(size int16) readOption

FetchSize specifies how many rows to fetch.

func OutputParallelization

func OutputParallelization(status bool) readOption

OutputParallelization specifies if output parallelization is on.

func Read

func Read(s beam.Scope, tableName, driverClassName, jdbcUrl, username, password string, outT reflect.Type, opts ...readOption) beam.PCollection

Read is a cross-language PTransform which read Rows from the specified database via JDBC. tableName is a required paramater, and by default, the readQuery is generated from it. The generated readQuery can be overridden by passing in a readQuery.If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.

If no additional classpaths are provided using jdbcio.ReadClasspaths() then the default classpath for that driver would be used. As of now, the default classpaths are present only for PostgreSQL and MySQL.

The default read query is "SELECT * FROM tableName;"

Read also accepts optional parameters as readOptions. All optional parameters are predefined in this package as functions that return readOption. To set an optional parameter, call the function within Read's function signature.

Example:

tableName := "roles"
driverClassName := "org.postgresql.Driver"
username := "root"
password := "root123"
jdbcUrl := "jdbc:postgresql://localhost:5432/dbname"
outT := reflect.TypeOf((*JdbcTestRow)(nil)).Elem()
jdbcio.Read(s, tableName, driverClassName, jdbcurl, username, password, outT, jdbcio.ExpansionAddrRead("localhost:9000"))

With Classpath parameter:

jdbcio.Read(s, tableName, driverClassName, jdbcurl, username, password, outT, jdbcio.ExpansionAddrRead("localhost:9000"), jdbcio.ReadClasspaths([]string{"org.postgresql:postgresql:42.3.3"})))

func ReadClasspaths

func ReadClasspaths(classpaths []string) readOption

func ReadConnectionInitSQLs

func ReadConnectionInitSQLs(initStatements []string) readOption

ReadConnectionInitSQLs required only for MySql and MariaDB. passed as list of strings.

func ReadConnectionProperties

func ReadConnectionProperties(properties string) readOption

ReadConnectionProperties specifies properties of the jdbc connection passed as string with format [propertyName=property;]*

func ReadFromPostgres

func ReadFromPostgres(s beam.Scope, tableName, jdbcUrl, username, password string, outT reflect.Type, opts ...readOption) beam.PCollection

ReadFromPostgres is a cross-language PTransform which read Rows from the postgres via JDBC. tableName is a required parameter, and by default, a read query is generated from it. The generated read query can be overridden by passing in a ReadQuery. If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.

The default read query is "SELECT * FROM tableName;"

Read also accepts optional parameters as readOptions. All optional parameters are predefined in this package as functions that return readOption. To set an optional parameter, call the function within Read's function signature. NOTE: This transform uses "org.postgresql.Driver" as the default driver. If you want to use read transform with custom postgres driver then use the conventional jdbcio.Read() transform.

Example:

tableName := "roles"
username := "root"
password := "root123"
jdbcUrl := "jdbc:postgresql://localhost:5432/dbname"
outT := reflect.TypeOf((*JdbcTestRow)(nil)).Elem()
jdbcio.Read(s, tableName, jdbcurl, username, password, outT, jdbcio.ExpansionAddrRead("localhost:9000"))

func ReadQuery

func ReadQuery(query string) readOption

ReadQuery overrides the default read query "SELECT * FROM tableName;"

func Write

func Write(s beam.Scope, tableName, driverClassName, jdbcUrl, username, password string, col beam.PCollection, opts ...writeOption)

Write is a cross-language PTransform which writes Rows to the specified database via JDBC. tableName is a required parameter, and by default, the write statement is generated from it. The generated write statement can be overridden by passing in a WriteStatement option. If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.

If no additional classpaths are provided using jdbcio.WriteClasspaths() then the default classpath for that driver would be used. As of now, the default classpaths are present only for PostgreSQL and MySQL.

The default write statement is: "INSERT INTO tableName(column1, ...) INTO VALUES(value1, ...)" Example:

  tableName := "roles"
	 driverClassName := "org.postgresql.Driver"
	 username := "root"
	 password := "root123"
	 jdbcUrl := "jdbc:postgresql://localhost:5432/dbname"
	 jdbcio.Write(s, tableName, driverClassName, jdbcurl, username, password, jdbcio.ExpansionAddrWrite("localhost:9000"))

With Classpath paramater:

jdbcio.Write(s, tableName, driverClassName, jdbcurl, username, password, jdbcio.ExpansionAddrWrite("localhost:9000"), jdbcio.WriteClasspaths([]string{"org.postgresql:postgresql:42.3.3"}))

func WriteClasspaths

func WriteClasspaths(classpaths []string) writeOption

func WriteConnectionProperties

func WriteConnectionProperties(properties string) writeOption

WriteConnectionProperties properties of the jdbc connection passed as string with format [propertyName=property;].

func WriteStatement

func WriteStatement(statement string) writeOption

WriteStatement option overrides the default write statement of "INSERT INTO tableName(column1, ...) INTO VALUES(value1, ...)".

func WriteToPostgres

func WriteToPostgres(s beam.Scope, tableName, jdbcUrl, username, password string, col beam.PCollection, opts ...writeOption)

WriteToPostgres is a cross-language PTransform which writes Rows to the postgres database via JDBC. tableName is a required parameter, and by default, a write statement is generated from it. The generated write statement can be overridden by passing in a WriteStatement option. If an expansion service address is not provided, an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running. NOTE: This transform uses "org.postgresql.Driver" as the default driver. If you want to use write transform with custom postgres driver then use the conventional jdbcio.Write() transform.

The default write statement is: "INSERT INTO tableName(column1, ...) INTO VALUES(value1, ...)" Example:

  tableName := "roles"
	 username := "root"
	 password := "root123"
	 jdbcUrl := "jdbc:postgresql://localhost:5432/dbname"
	 jdbcio.WriteToPostgres(s, tableName, jdbcurl, username, password, jdbcio.ExpansionAddrWrite("localhost:9000"))

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL