Documentation ¶
Overview ¶
taxi is an example using a cross-language Kafka pipeline to write and read to Kafka. This example reads from the PubSub NYC Taxi stream described in https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a given Kafka topic and then reads back from the same Kafka topic, logging every element. This is done as a streaming pipeline and will not end unless the pipeline is stopped externally.
Running this example requires a Kafka cluster accessible to the runner, and a cross-language expansion service that can expand Kafka read and write transforms. An address to a persistent expansion service can be provided as a flag, or if none is specified then the SDK will attempt to automatically start an appropriate expansion service.
Setting Up a Kafka Cluster ¶
Setting up a Kafka cluster is more involved than can be covered in this example. In order for this example to work, all that is necessary is a Kafka cluster accessible through a bootstrap server address that is passed in as a flag. Some instructions for setting up a single node Kafka cluster in GCE can be found here: https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery
Running an Expansion Server ¶
If the automatic expansion service functionality is not available for your environment, or if you want improved performance, you will need to start a persistent expansion service. These instructions will cover running the Java IO Expansion Service, and therefore requires a JDK installation in a version supported by Beam. Depending on whether you are running this from a numbered Beam release, or a development environment, there are two sources you may use for the Expansion service.
Numbered release: The expansion service jar is vendored as module org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository. This jar can be executed directly with the following command:
`java -jar <jar_name> <port_number>`
Development env: This requires that the JAVA_HOME environment variable points to your JDK installation. From the root `beam/` directory of the Apache Beam repository, the jar can be built (or built and run) with the following commands:
Build: ./gradlew :sdks:java:io:expansion-service:build Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num>
Running the Example on GCP ¶
Running this pipeline requires providing an address for the Expansion Service and for the Kafka cluster's bootstrap servers as flags, in addition to the usual flags for pipelines.
An example command for executing this pipeline on GCP is as follows:
export PROJECT="$(gcloud config get-value project)" export TEMP_LOCATION="gs://MY-BUCKET/temp" export REGION="us-central1" export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" export BOOTSTRAP_SERVERS="123.45.67.89:1234" export EXPANSION_ADDR="localhost:1234" cd ./sdks/go go run ./examples/kafka/taxi.go \ --runner=DataflowRunner \ --temp_location=$TEMP_LOCATION \ --staging_location=$STAGING_LOCATION \ --project=$PROJECT \ --region=$REGION \ --job_name="${JOB_NAME}" \ --bootstrap_servers=$BOOTSTRAP_SERVER \ --expansion_addr=$EXPANSION_ADDR
Running the Example From a Git Clone ¶
When running on a development environment, a custom container will likely need to be provided for the cross-language SDK. First this will require building and pushing the SDK container to container repository, such as Docker Hub.
export DOCKER_ROOT="Your Docker Repository Root" ./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest docker push $DOCKER_ROOT/beam_java11_sdk:latest
For runners in local mode, simply building the container using the default values for docker-repository-root and docker-tag will work to have it accessible locally.
Additionally, you must provide the location of your custom container to the pipeline with the --sdk_harness_container_image_override flag for Java, or --environment_config flag for Go. For example:
--sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java11_sdk:latest" \ --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest