Documentation ¶
Overview ¶
Package integration provides functionality that needs to be shared between all integration tests.
Integration tests are implemented through Go's test framework, as test functions that create and execute pipelines using the ptest package. Tests should be placed in smaller sub-packages for organizational purposes and parallelism (tests are only run in parallel across different packages). Integration tests should always begin with a call to CheckFilters to ensure test filters can be applied, and each package containing integration tests should call ptest.Main in a TestMain function if it uses ptest.
Running integration tests can be done with a go test call with any flags that are required by the test pipelines, such as --runner or --endpoint. Example:
go test -v ./sdks/go/test/integration/... --runner=portable --endpoint=localhost:8099
Alternatively, tests can be executed by running the run_validatesrunner_tests.sh script, which also performs much of the environment setup, or by calling gradle commands in :sdks:go:test.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // BootstrapServers is the address of the bootstrap servers for a Kafka // cluster, used for Kafka IO tests. BootstrapServers = flag.String("bootstrap_servers", "", "URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.") // KafkaJar is a filepath to a jar for starting a Kafka cluster, used for // Kafka IO tests. KafkaJar = flag.String("kafka_jar", "", "The filepath to a jar for starting up a Kafka cluster. Only used if boostrap_servers is unspecified.") // KafkaJarTimeout attempts to apply an auto-shutdown timeout to the Kafka // cluster jar. Only used for Kafka IO tests. KafkaJarTimeout = flag.String("kafka_jar_timeout", "10m", "Sets an auto-shutdown timeout to the Kafka cluster. "+ "Requires the timeout command to be present in Path, unless the value is set to \"\".") // BigQueryDataset is the name of the dataset to create tables in for // BigQuery integration tests. BigQueryDataset = flag.String("bq_dataset", "", "Name of the dataset to create tables in for BigQuery tests.") // BigtableInstance is the name of the Bigtable instance to create tables in // for Bigtable integration tests. BigtableInstance = flag.String("bt_instance", "", "Name of the Bigtable instance to create tables in for Bigtable tests.") // ExpansionJars contains elements in the form "label:jar" describing jar // filepaths for expansion services to use in integration tests, and the // corresponding labels. Once provided through this flag, those jars can // be used in tests via the ExpansionServices struct. ExpansionJars stringSlice // ExpansionAddrs contains elements in the form "label:address" describing // endpoints for expansion services to use in integration tests, and the // corresponding labels. Once provided through this flag, those addresses // can be used in tests via the ExpansionServices struct. ExpansionAddrs stringSlice // ExpansionTimeout attempts to apply an auto-shutdown timeout to any // expansion services started by integration tests. ExpansionTimeout = flag.Duration("expansion_timeout", 0, "Sets an auto-shutdown timeout to any started expansion services. "+ "Requires the timeout command to be present in Path, unless the value is set to 0.") )
The following flags are flags used in one or more integration tests, and that may be used by scripts that execute "go test ./sdks/go/test/integration/...". Because any flags used with those commands are used for each package, every integration test package must import these flags, even if they are not used.
Functions ¶
func CheckFilters ¶
CheckFilters checks if an integration test is filtered to be skipped, either because the intended runner does not support it, or the test is sickbayed. This method should be called at the beginning of any integration test. If t.Run is used, CheckFilters should be called within the t.Run callback, so that sub-tests can be skipped individually.
func GetExpansionAddrs ¶
GetExpansionAddrs gets all the addresses given to --expansion_addr as a map of label to address.
func GetExpansionJars ¶
GetExpansionJars gets all the jars given to --expansion_jar as a map of label to jar location.
Types ¶
type ExpansionServices ¶
type ExpansionServices struct {
// contains filtered or unexported fields
}
ExpansionServices is a struct used for getting addresses and starting expansion services, based on the --expansion_jar and --expansion_addr flags in this package. The main reason to use this instead of accessing the flags directly is to let it handle jar startup and shutdown.
Usage ¶
Create an ExpansionServices object in TestMain with NewExpansionServices. Then use GetAddr for every expansion service needed for the test. Call Shutdown on it before finishing TestMain (or simply defer a call to it).
ExpansionServices is not concurrency safe, and so a single instance should not be used within multiple individual tests, due to the possibility of those tests being run concurrently. It is recommended to only use ExpansionServices in TestMain to avoid this.
Example:
flag.Parse() beam.Init() services := integration.NewExpansionServices() defer func() { services.Shutdown() }() addr, err := services.GetAddr("example") if err != nil { panic(err) } expansionAddr = addr // Save address to a package-level variable used by tests. ptest.MainRet(m)
func NewExpansionServices ¶
func NewExpansionServices() *ExpansionServices
NewExpansionServices creates and initializes an ExpansionServices instance.
func (*ExpansionServices) GetAddr ¶
func (es *ExpansionServices) GetAddr(label string) (string, error)
GetAddr gets the address for the expansion service with the given label. The label corresponds to the labels used in the --expansion_jar and --expansion_addr flags. If an expansion service is provided as a jar, then that jar will be run to retrieve the address, and the jars are not guaranteed to be shut down unless Shutdown is called.
Note: If this function starts a jar, it waits a few seconds for it to initialize. Do not use this function if the possibility of a few seconds of latency is not acceptable.
func (*ExpansionServices) Shutdown ¶
func (es *ExpansionServices) Shutdown()
Shutdown shuts down any jars started by the ExpansionServices struct and should get called if it was used at all.
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
containers
Package containers contains utilities for running test containers in integration tests.
|
Package containers contains utilities for running test containers in integration tests. |
jars
Package jars contains functionality for running jars for integration tests.
|
Package jars contains functionality for running jars for integration tests. |
ports
Package ports contains utilities for handling ports needed for integration tests.
|
Package ports contains utilities for handling ports needed for integration tests. |
io
|
|
xlang/bigquery
Package bigquery contains integration tests for cross-language BigQuery IO transforms.
|
Package bigquery contains integration tests for cross-language BigQuery IO transforms. |
xlang/bigtable
Package bigtable contains integration tests for cross-language Bigtable IO transforms.
|
Package bigtable contains integration tests for cross-language Bigtable IO transforms. |
xlang/debezium
Package debezium contains integration tests for cross-language Debezium IO transforms.
|
Package debezium contains integration tests for cross-language Debezium IO transforms. |
xlang/jdbc
Package jdbc contains integration tests for cross-language JDBC IO transforms.
|
Package jdbc contains integration tests for cross-language JDBC IO transforms. |
xlang/kafka
Package kafka contains integration tests for cross-language Kafka IO transforms.
|
Package kafka contains integration tests for cross-language Kafka IO transforms. |
Package primitives contains integration tests for primitives in beam.
|
Package primitives contains integration tests for primitives in beam. |
Package synthetic contains pipelines for testing synthetic steps and sources.
|
Package synthetic contains pipelines for testing synthetic steps and sources. |
transforms
|
|
Package wordcount contains transforms for wordcount.
|
Package wordcount contains transforms for wordcount. |
Package xlang contains integration tests for cross-language transforms.
|
Package xlang contains integration tests for cross-language transforms. |