Kafka
The Kafka input component enable consuming messages from Kafka topics with configurable settings such as partition fetch size, offset reset policies, and balancing strategies.
Config Definition
class Kafka extends Input {
fixed sourceName = "kafka"
common: Common.Kafka
autoCommitEnabled: Boolean = true
consumerGroupID: String
autoOffsetReset: AutoOffsetReset = "earliest"
balancerStrategy: Listing<Strategy> = new Listing<Strategy> {
"cooperative-sticky"
}
maxPartitionFetchBytes: DataSize(validateBuffersSizes) = 1.mib
fetchMaxBytes: DataSize(validateBuffersSizes) = 50.mib
}
Common.Kafka Definition
The common
attribute of the Kafka input component references the Common.Kafka class, which defines essential configurations for connecting to Kafka brokers and interacting with topics.
Common.Kafka Config
class KafkaAuth {
saslMechanism: SASLMechanism
saslUsername: String
saslPassword: String
}
class Kafka {
saslAuth: KafkaAuth?
brokers: Listing<String>
version: String?
topics: Listing<String>
}
Common.Kafka Attributes
Attribute |
Type |
Description |
Default Value |
saslMechanism |
SASLMechanism |
SASL mechanism to use (SCRAM-SHA-512 or SCRAM-SHA-256 ). |
null |
saslUsername |
String |
SASL authentication username. |
null |
saslPassword |
String |
SASL authentication password. |
null |
brokers |
Listing<String> |
List of Kafka broker addresses. |
Required |
version |
String |
Kafka protocol version (optional). |
null |
topics |
Listing<String> |
List of Kafka topics to subscribe to. |
Required |
Validations
-
SASL Mechanism Validation
-
SASL Credentials Validation
- If
saslAuth
is not null, both saslUsername
and saslPassword
must be provided. Otherwise, the following error is thrown:
'saslUsername' and 'saslPassword' can not be empty string or null
Attribute |
Type |
Description |
Default Value |
common |
Common.Kafka |
Reusable Kafka connection settings (e.g., brokers, SASL). |
Required |
autoCommitEnabled |
Boolean |
Enables or disables auto-commit for consumer offsets. |
true |
consumerGroupID |
String |
Consumer group ID for managing Kafka consumers and partition ownership. |
Required |
autoOffsetReset |
AutoOffsetReset |
Behavior when there is no initial offset or when the offset is invalid (earliest or latest ). |
"earliest" |
balancerStrategy |
Listing<Strategy> |
Strategy used for partition assignment during rebalancing. |
["cooperative-sticky"] |
maxPartitionFetchBytes |
DataSize |
Maximum data fetched per partition per request. |
1.mib |
fetchMaxBytes |
DataSize |
Maximum data fetched across all partitions per request. |
50.mib |
Validations
Buffer Size Validation
Pkl Configuration Example
new Inputs.Kafka {
common = new Common.Kafka {
brokers = {
"broker1:9092"
"broker2:9092"
}
topics = {
"example-topic"
}
}
consumerGroupID = "example-consumer-group"
}
new Inputs.Kafka {
common = new Common.Kafka {
saslEnabled = true
saslMechanism = "SCRAM-SHA-512"
saslUsername = "example-user"
saslPassword = "example-password"
brokers = {
"broker1:9092"
"broker2:9092"
}
topics = {
"secure-topic"
}
}
consumerGroupID = "example-secure-consumer"
}
Attributes in Detail
Common Kafka Settings (common
)
- Defines reusable Kafka connection attributes (e.g., brokers, topics, SASL authentication).
Auto Commit Enabled (autoCommitEnabled
)
- If
true
, offsets are automatically committed to Kafka.
- If
false
, manual offset commit is required.
Consumer Group ID (consumerGroupID
)
- Identifies the group of Kafka consumers that share load and maintain offset tracking.
Offset Reset Behavior (autoOffsetReset
)
earliest
: Start consuming from the earliest available message.
latest
: Start consuming from the latest message.
Partition Balancing Strategy (balancerStrategy
)
- Default:
cooperative-sticky
ensures minimal partition movement during rebalancing.
- Custom strategies can be added for advanced partitioning needs.
Use Cases
-
Basic Kafka Consumption
- Use the
common
attribute to specify brokers and topics, along with a consumer group ID for basic use cases.
-
Secure Kafka Consumption
- Use
saslEnabled
, saslMechanism
, saslUsername
, and saslPassword
to secure the Kafka connection.
-
Optimized Data Transfer
- Adjust
maxPartitionFetchBytes
and fetchMaxBytes
to fine-tune data fetching and improve performance.
Notes
- Ensure
Common.Kafka
is configured with valid brokers and topics.
- Validate buffer sizes (
fetchMaxBytes
and maxPartitionFetchBytes
) to avoid runtime errors.