kafka

package
v1.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

Kafka

The Kafka input component enable consuming messages from Kafka topics with configurable settings such as partition fetch size, offset reset policies, and balancing strategies.


Config Definition

class Kafka extends Input {
  fixed sourceName = "kafka"
  common: Common.Kafka
  autoCommitEnabled: Boolean = true
  consumerGroupID: String
  autoOffsetReset: AutoOffsetReset = "earliest"
  balancerStrategy: Listing<Strategy> = new Listing<Strategy> {
    "cooperative-sticky"
  }
  maxPartitionFetchBytes: DataSize(validateBuffersSizes) = 1.mib
  fetchMaxBytes: DataSize(validateBuffersSizes) = 50.mib
}

Common.Kafka Definition

The common attribute of the Kafka input component references the Common.Kafka class, which defines essential configurations for connecting to Kafka brokers and interacting with topics.

Common.Kafka Config
class KafkaAuth {
   saslMechanism: SASLMechanism
   saslUsername: String
   saslPassword: String
}

class Kafka {
   saslAuth: KafkaAuth?
   brokers: Listing<String>
   version: String?
   topics: Listing<String>
}
Common.Kafka Attributes
Attribute Type Description Default Value
saslMechanism SASLMechanism SASL mechanism to use (SCRAM-SHA-512 or SCRAM-SHA-256). null
saslUsername String SASL authentication username. null
saslPassword String SASL authentication password. null
brokers Listing<String> List of Kafka broker addresses. Required
version String Kafka protocol version (optional). null
topics Listing<String> List of Kafka topics to subscribe to. Required
Validations
  1. SASL Mechanism Validation

    • If saslAuth is not null and saslMechanism is not set, the following error is thrown:
      'saslMechanism' can not be null
      
  2. SASL Credentials Validation

    • If saslAuth is not null, both saslUsername and saslPassword must be provided. Otherwise, the following error is thrown:
    'saslUsername' and 'saslPassword' can not be empty string or null
    

Kafka Input Attributes

Attribute Type Description Default Value
common Common.Kafka Reusable Kafka connection settings (e.g., brokers, SASL). Required
autoCommitEnabled Boolean Enables or disables auto-commit for consumer offsets. true
consumerGroupID String Consumer group ID for managing Kafka consumers and partition ownership. Required
autoOffsetReset AutoOffsetReset Behavior when there is no initial offset or when the offset is invalid (earliest or latest). "earliest"
balancerStrategy Listing<Strategy> Strategy used for partition assignment during rebalancing. ["cooperative-sticky"]
maxPartitionFetchBytes DataSize Maximum data fetched per partition per request. 1.mib
fetchMaxBytes DataSize Maximum data fetched across all partitions per request. 50.mib

Validations

Buffer Size Validation
  • Ensures fetchMaxBytes is greater than or equal to maxPartitionFetchBytes.
  • If the validation fails, an exception is thrown:
    'fetchMaxBytes' should be more than 'maxPartitionFetchBytes'
    

Pkl Configuration Example

Basic Kafka Input
new Inputs.Kafka {
  common = new Common.Kafka {
    brokers = {
     "broker1:9092"
      "broker2:9092"
     }
    topics = {
     "example-topic"
    }
  }
  consumerGroupID = "example-consumer-group"
}
Kafka Input with SASL Authentication
new Inputs.Kafka {
  common = new Common.Kafka {
    saslEnabled = true
    saslMechanism = "SCRAM-SHA-512"
    saslUsername = "example-user"
    saslPassword = "example-password"
    brokers = {
     "broker1:9092"
      "broker2:9092"
     }
    topics = {
       "secure-topic"
    }
  }
  consumerGroupID = "example-secure-consumer"
}

Attributes in Detail

Common Kafka Settings (common)
  • Defines reusable Kafka connection attributes (e.g., brokers, topics, SASL authentication).
Auto Commit Enabled (autoCommitEnabled)
  • If true, offsets are automatically committed to Kafka.
  • If false, manual offset commit is required.
Consumer Group ID (consumerGroupID)
  • Identifies the group of Kafka consumers that share load and maintain offset tracking.
Offset Reset Behavior (autoOffsetReset)
  • earliest: Start consuming from the earliest available message.
  • latest: Start consuming from the latest message.
Partition Balancing Strategy (balancerStrategy)
  • Default: cooperative-sticky ensures minimal partition movement during rebalancing.
  • Custom strategies can be added for advanced partitioning needs.

Use Cases

  1. Basic Kafka Consumption

    • Use the common attribute to specify brokers and topics, along with a consumer group ID for basic use cases.
  2. Secure Kafka Consumption

    • Use saslEnabled, saslMechanism, saslUsername, and saslPassword to secure the Kafka connection.
  3. Optimized Data Transfer

    • Adjust maxPartitionFetchBytes and fetchMaxBytes to fine-tune data fetching and improve performance.

Notes

  • Ensure Common.Kafka is configured with valid brokers and topics.
  • Validate buffer sizes (fetchMaxBytes and maxPartitionFetchBytes) to avoid runtime errors.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(
	cfg input.Kafka,
	logger *zerolog.Logger,
) (*Consumer, error)

func (*Consumer) Consume

func (c *Consumer) Consume(
	ctx context.Context,
	iterator func(record *kgo.Record) error,
) error

type Kafka

type Kafka struct {
	components.Logger
	// contains filtered or unexported fields
}

func (*Kafka) Generate

func (c *Kafka) Generate(ctx context.Context, input chan<- any)

func (*Kafka) Init

func (c *Kafka) Init(cfg input.Input) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL