kafkaio

package
v3.0.0-...-16f56ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 5 Imported by: 0

Documentation

Overview

Package kafkaio contains cross-language functionality for using Apache Kafka (http://kafka.apache.org/). These transforms only work on runners that support cross-language transforms.

Setup

Transforms specified here are cross-language transforms implemented in a different SDK (listed below). During pipeline construction, the Go SDK will need to connect to an expansion service containing information on these transforms in their native SDK.

To use an expansion service, it must be run as a separate process accessible during pipeline construction. The address of that process must be passed to the transforms in this package.

The version of the expansion service should match the version of the Beam SDK being used. For numbered releases of Beam, these expansions services are released to the Maven repository as modules. For development versions of Beam, it is recommended to build and run it from source using Gradle.

Current supported SDKs, including expansion service modules and reference documentation:

Java:

  • Vendored Module: beam-sdks-java-io-expansion-service
  • Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
  • Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO

Index

Constants

View Source
const (
	ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
	ByteArraySerializer   = "org.apache.kafka.common.serialization.ByteArraySerializer"

	// ProcessingTime is a timestamp policy that assigns processing time to
	// each record. Specifically, this is the timestamp when the record becomes
	// "current" in the reader. Further documentation can be found in Java's
	// KafkaIO documentation.
	ProcessingTime policy = "ProcessingTime"

	// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
	// kafka records. Requires the records to have a type set to
	// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
	// documentation can be found in Java's KafkaIO documentation.
	CreateTime policy = "CreateTime"

	// LogAppendTime is a timestamp policy that assigns Kafka's log append time
	// (server side ingestion time) to each record. Further documentation can
	// be found in Java's KafkaIO documentation.
	LogAppendTime policy = "LogAppendTime"
)

Variables

This section is empty.

Functions

func CommitOffsetInFinalize

func CommitOffsetInFinalize(enabled bool) readOption

CommitOffsetInFinalize is a Read option that specifies whether to commit offsets when finalizing.

Default: false

func ConsumerConfigs

func ConsumerConfigs(cfgs map[string]string) readOption

ConsumerConfigs is a Read option that adds consumer properties to the Consumer configuration of the transform. Each usage of this adds the given elements to the existing map without removing existing elements.

Note that the "bootstrap.servers" property is automatically set by kafkaio.Read and does not need to be specified via this option.

func MaxNumRecords

func MaxNumRecords(num int64) readOption

MaxNumRecords is a Read option that specifies the maximum amount of records to be read. Setting this will cause the Read to execute as a bounded transform. Useful for tests tests and demo applications.

func MaxReadSecs

func MaxReadSecs(secs int64) readOption

MaxReadSecs is a Read option that specifies the maximum amount of time in seconds the transform executes. Setting this will cause the Read to execute as a bounded transform. Useful for tests and demo applications.

func ProducerConfigs

func ProducerConfigs(cfgs map[string]string) writeOption

ProducerConfigs is a Write option that adds producer properties to the Producer configuration of the transform. Each usage of this adds the given elements to the existing map without removing existing elements.

Note that the "bootstrap.servers" property is automatically set by kafkaio.Write and does not need to be specified via this option.

func Read

func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection

Read is a cross-language PTransform which reads from Kafka and returns a KV pair for each item in the specified Kafka topics. By default, this runs as an unbounded transform and outputs keys and values as byte slices. These properties can be changed through optional parameters.

Read requires the address for an expansion service for Kafka Read transforms, a comma-seperated list of bootstrap server addresses (see the Kafka property "bootstrap.servers" for details), and at least one topic to read from. If an expansion service address is provided as "", an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.

Read also accepts optional parameters as readOptions. All optional parameters are predefined in this package as functions that return readOption. To set an optional parameter, call the function within Read's function signature.

Example of Read with required and optional parameters:

expansionAddr := "localhost:1234"
bootstrapServer := "bootstrap-server:1234"
topic := "topic_name"
pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic},
    kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true))

func StartReadTimestamp

func StartReadTimestamp(ts int64) readOption

StartReadTimestamp is a Read option that specifies a start timestamp in milliseconds epoch, so only records after that timestamp will be read.

This results in failures if one or more partitions don't contain messages with a timestamp larger than or equal to the one specified, or if the message format version in a partition is before 0.10.0, meaning messages do not have timestamps.

func TimestampPolicy

func TimestampPolicy(name policy) readOption

TimestampPolicy is a Read option that specifies the timestamp policy to use for extracting timestamps from the KafkaRecord. Must be one of the predefined constant timestamp policies in this package.

Default: kafkaio.ProcessingTime

func Write

func Write(s beam.Scope, addr, servers, topic string, col beam.PCollection, opts ...writeOption)

Write is a cross-language PTransform which writes KV data to a specified Kafka topic. By default, this assumes keys and values to be received as byte slices. This can be changed through optional parameters.

Write requires the address for an expansion service for Kafka Write transforms, a comma-seperated list of bootstrap server addresses (see the Kafka property "bootstrap.servers" for details), and a topic to write to. If an expansion service address is provided as "", an appropriate expansion service will be automatically started; however this is slower than having a persistent expansion service running.

Write also accepts optional parameters as writeOptions. All optional parameters are predefined in this package as functions that return writeOption. To set an optional parameter, call the function within Write's function signature.

Example of Write with required and optional parameters:

expansionAddr := "localhost:1234"
bootstrapServer := "bootstrap-server:1234"
topic := "topic_name"
pcol := kafkaio.Read(s, expansionAddr, bootstrapServer, topic,
    kafkaio.ValueSerializer("foo.BarSerializer"))

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL