Documentation ¶
Overview ¶
Wordcount is an example using cross-language BigQuery transforms to read and write to BigQuery. This example runs a batch pipeline that reads from the public table "shakespeare" described here: https://cloud.google.com/bigquery/public-data#sample_tables. It reads the data of word counts per different work, aggregates them to find total word counts in all works, as well as the average number of times a word appears if it appears in a work, and then writes all that data to a given output table.
This example is only expected to work on Dataflow, and requires a cross-language expansion service that can expand BigQuery read and write transforms. An address to a persistent expansion service can be provided as a flag, or if none is specified then the SDK will attempt to automatically start an appropriate expansion service.
Running an Expansion Server ¶
If the automatic expansion service functionality is not available for your environment, or if you want improved performance, you will need to start a persistent expansion service. These instructions will cover running the Java SchemaIO Expansion Service, and therefore requires a JDK installation in a version supported by Beam. Depending on whether you are running this from a numbered Beam release, or a development environment, there are two sources you may use for the Expansion service.
Numbered release: The expansion service jar is vendored as module org.apache.beam:beam-sdks-java-io-google-cloud-platform-expansion-service in Maven Repository. This jar can be executed directly with the following command:
`java -jar <jar_name> <port_number>`
Development env: This requires that the JAVA_HOME environment variable points to your JDK installation. From the root `beam/` directory of the Apache Beam repository, the jar can be built (or built and run) with the following commands:
./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:runExpansionService -PconstructionService.port=<port_num>
Running the Example on GCP ¶
An example command for executing this pipeline on GCP is as follows:
export PROJECT="$(gcloud config get-value project)" export TEMP_LOCATION="gs://MY-BUCKET/temp" export REGION="us-central1" export JOB_NAME="bigquery-wordcount-`date +%Y%m%d-%H%M%S`" export OUTPUT_TABLE="123.45.67.89:1234" export EXPANSION_ADDR="localhost:1234" export OUTPUT_TABLE="project_id:dataset_id.table_id" go run ./sdks/go/examples/kafka/types/types.go \ --runner=DataflowRunner \ --temp_location=$TEMP_LOCATION \ --staging_location=$STAGING_LOCATION \ --project=$PROJECT \ --region=$REGION \ --job_name="${JOB_NAME}" \ --bootstrap_servers=$BOOTSTRAP_SERVER \ --expansion_addr=$EXPANSION_ADDR \ --out_table=$OUTPUT_TABLE
Running the Example From a Git Clone ¶
When running on a development environment, a custom container will likely need to be provided for the cross-language SDK. First this will require building and pushing the SDK container to container repository, such as Docker Hub.
export DOCKER_ROOT="Your Docker Repository Root" ./gradlew :sdks:java:container:java8:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest docker push $DOCKER_ROOT/beam_java8_sdk:latest
For runners in local mode, simply building the container using the default values for docker-repository-root and docker-tag will work to have it accessible locally.
Additionally, you must provide the location of your custom container to the pipeline with the --sdk_harness_container_image_override flag for Java, or --environment_config flag for Go. For example:
--sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \ --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest