kafka

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2024 License: CC0-1.0 Imports: 21 Imported by: 0

README

Kafka

Event Adapter for Kafka

Sender

Available config for kafka event sender adapter.

Config

Config Type Required Description
brokers Array of string Yes List of kafka brokers
batch_size int No Limit on how many messages will be buffered before being sent to a partition.

Default: 100 messages
batch_timeout string No Time limit on how often incomplete message batches will be flushed to kafka.

Default: 1s
key_file string No The path to the key file location (required to connect to kafka brokers with tls)
cert_file string No The path to the certification file location (required to connect to kafka brokers with tls)
ca_cert string No The path to the CA (certificate authority) file location (required to connect to kafka brokers with tls)
auth_type string No The authentication type to connect with kafka brokers using username / password mechanism.

Valid value:
- plain
- scram

Default: plain
username string No The username to be used to connect to the brokers (required for plain and scram mechanism)
password string No The password to be used to connect to the brokers (required for plain and scram mechanism)
max_attempts int No Limit on how many attempts will be made to deliver a message.

Default: 10
balancer string No The balancer used to distribute messages across partitions.

Please refer to Balancer Section for available balancer.

Default: least_bytes
balancer_config map[string]interface No Additional config for balancer
print_log_level string No Log level for all log message other than error message.

Due to the implementation using golib log library, this will also follow the golib log minimum log level,
So if you set the value higher than the minimum level it will not be printed in the log.

Please refer to Log Level for the available level.

Default: debug
error_log_level string No Log level for error message.

Due to the implementation using golib log library, this will also follow the golib log minimum log level,
So if you set the value higher than the minimum level it will not be printed in the log.

Please refer to Log Level for the available level.

Default: error

Listener

Available config for kafka event listener adapter.

Config Type Required Description
brokers Array of string Yes List of kafka brokers
key_file string No The path to the key file location (required to connect to kafka brokers with tls)
cert_file string No The path to the certification file location (required to connect to kafka brokers with tls)
ca_cert string No The path to the CA (certificate authority) file location (required to connect to kafka brokers with tls)
auth_type string No The authentication type to connect with kafka brokers using username / password mechanism.

Valid value:
- plain
- scram


Default: plain
username string No The username to be used to connect to the brokers (required for plain and scram mechanism)
password string No The password to be used to connect to the brokers (required for plain and scram mechanism)
print_log_level string No Log level for all log message other than error message.

Due to the implementation using golib log library, this will also follow the golib log minimum log level,
So if you set the value higher than the minimum level it will not be printed in the log.

Default: debug
error_log_level string No Log level for error message.

Due to the implementation using golib log library, this will also follow the golib log minimum log level,
So if you set the value higher than the minimum level it will not be printed in the log.

Default: error
queue_capacity int No The capacity of the internal message queue.

Default: 100
min_bytes int No Indicates to the broker the minimum batch size that the consumer
will accept. Setting a high minimum when consuming from a low-volume topic
may result in delayed delivery when the broker does not have enough data to
satisfy the defined minimum.

Default: 1
max_bytes int No Indicates to the broker the maximum batch size that the consumer
will accept. The broker will truncate a message to satisfy this maximum, so
choose a value that is high enough for your largest message size.

Default: 1MB
max_wait string No Maximum amount of time to wait for new data to come when fetching batches
of messages from kafka.

Default: 10s
max_attempts int No Limit of how many attempts will be made before delivering the error.

Default: 3
read_lag_interval string No Sets the frequency at which the reader lag is updated.
Setting this field to a negative value disables lag reporting.

Default: 1m
heartbeat_interval string No sets the optional frequency at which the reader sends the consumer
group heartbeat update.

Default: 3s
commit_interval string No Indicates the interval at which offsets are committed to
the broker. If 0, commits will be handled synchronously.

Default: 0
partition_watch_interval string No Indicates how often a reader checks for partition changes.
If a reader sees a partition change (such as a partition add) it will rebalance the group
picking up new partitions.

Default: 5s
watch_partition_changes bool No Is used to inform kafka-go that a consumer group should be
polling the brokers and rebalancing if any partition changes happen to the topic.
session_timeout string No Optionally sets the length of time that may pass without a heartbeat
before the coordinator considers the consumer dead and initiates a rebalance.

Default: 30s
rebalance_timeout string No Optionally sets the length of time the coordinator will wait
for members to join as part of a rebalance. For kafka servers under higher
load, it may be useful to set this value higher.

Default: 30s
join_group_backoff string No Optionally sets the length of time to wait between re-joining
the consumer group after an error.

Default: 5s
retention_time string No RetentionTime optionally sets the length of time the consumer group will be saved
by the broker

Default: 24h
start_offset int No Determines from whence the consumer group should begin
consuming when it finds a partition without a committed offset. If
non-zero, it must be set to one of FirstOffset (-2) or LastOffset (-1).

Default: FirstOffset (-2)
read_backoff_min string No Optionally sets the smallest amount of time the reader will wait before
polling for new messages

Default: 100ms
read_backoff_max string No Optionally sets the maximum amount of time the reader will wait before
polling for new messages

Default: 1s

Additional Config

Balancer
Type Description Additional Config
hash Hash is a Balancer that uses the provided hash function to determine which
partition to route messages to. This ensures that messages with the same key
are routed to the same partition.

The logic to calculate the partition is:

hasher.Sum32() % len(partitions) => partition

By default, Hash uses the FNV-1a algorithm. This is the same algorithm used
by the Sarama Producer and ensures that messages produced by kafka-go will
be delivered to the same topics that the Sarama producer would be delivered to
-
round_robin RoundRobin is an Balancer implementation that equally distributes messages
across all available partitions.
-
crc32 CRC32Balancer is a Balancer that uses the CRC32 hash function to determine
which partition to route messages to. This ensures that messages with the
same key are routed to the same partition. This balancer is compatible with
the built-in hash partitioners in librdkafka and the language bindings that
are built on top of it, including the
github.com/confluentinc/confluent-kafka-go Go package.

With the Consistent field false (default), this partitioner is equivalent to
the "consistent_random" setting in librdkafka. When Consistent is true, this
partitioner is equivalent to the "consistent" setting. The latter will hash
empty or nil keys into the same partition.

Unless you are absolutely certain that all your messages will have keys, it's
best to leave the Consistent flag off. Otherwise, you run the risk of
creating a very hot partition.
crc32 config
murmur2 Murmur2Balancer is a Balancer that uses the Murmur2 hash function to
determine which partition to route messages to. This ensures that messages
with the same key are routed to the same partition. This balancer is
compatible with the partitioner used by the Java library and by librdkafka's
"murmur2" and "murmur2_random" partitioners. /

With the Consistent field false (default), this partitioner is equivalent to
the "murmur2_random" setting in librdkafka. When Consistent is true, this
partitioner is equivalent to the "murmur2" setting. The latter will hash
nil keys into the same partition. Empty, non-nil keys are always hashed to
the same partition regardless of configuration.

Unless you are absolutely certain that all your messages will have keys, it's
best to leave the Consistent flag off. Otherwise, you run the risk of
creating a very hot partition.

Note that the librdkafka documentation states that the "murmur2_random" is
functionally equivalent to the default Java partitioner. That's because the
Java partitioner will use a round robin balancer instead of random on nil
keys. We choose librdkafka's implementation because it arguably has a larger
install base.
murmur2 config
least_bytes LeastBytes is a Balancer implementation that routes messages to the partition
that has received the least amount of data.

Note that no coordination is done between multiple producers, having good
balancing relies on the fact that each producer using a LeastBytes balancer
should produce well balanced messages.
-
crc32 Additional Config
Config Type Required Description
consistent bool No When consistent is false, message with empty key will be send into random partition,
but when is set to true, message with empty key will be hash into the same partition.
murmur2 Additional Config
Config Type Required Description
consistent bool No When consistent is false, message with empty key will be send into random partition,
but when is set to true, message with empty key will be hash into the same partition.
Log Level
Level Description
discard No log message will be printed
debug Only log with level debug and above will be printed
info Only log with level info and above will be printed
warning Only log with level warning and above will be printed
error Only log with level error and above will be printed
fatal Only log with level fatal and above will be printed
panic This will work exactly as fatal level

Documentation

Index

Constants

View Source
const (
	KafkaPartitionKey     = attribute.Key("messaging.kafka.partition")
	KafkaConsumerGroupKey = attribute.Key("messaging.kafka.consumer_group")
)

Variables

This section is empty.

Functions

func NewKafkaListener

func NewKafkaListener(_ context.Context, config interface{}) (event.Listener, error)

func NewKafkaSender

func NewKafkaSender(_ context.Context, config interface{}) (event.Sender, error)

Types

type KafkaConsumeMessage

type KafkaConsumeMessage struct {
	*KafkaMessageCarrier
	// contains filtered or unexported fields
}

func (*KafkaConsumeMessage) Commit

func (k *KafkaConsumeMessage) Commit(ctx context.Context) error

func (*KafkaConsumeMessage) GetEventConsumeMessage

func (k *KafkaConsumeMessage) GetEventConsumeMessage(ctx context.Context) (*event.EventConsumeMessage, error)

type KafkaIterator

type KafkaIterator struct {
	// contains filtered or unexported fields
}

func (*KafkaIterator) Close

func (k *KafkaIterator) Close() error

func (*KafkaIterator) Next

type KafkaListener

type KafkaListener struct {
	Brokers       []string `json:"brokers" mapstructure:"brokers"`
	KeyFile       string   `json:"key_file" mapstructure:"key_file"`
	CertFile      string   `json:"cert_file" mapstructure:"cert_file"`
	CACertificate string   `json:"ca_cert" mapstructure:"ca_cert"`
	AuthType      string   `json:"auth_type" mapstructure:"auth_type"`
	Username      string   `json:"username" mapstructure:"username"`
	Password      string   `json:"password" mapstructure:"password"`

	// PrintLogLevel will set default log level for all log message other than error message
	// available level are: panic, fatal, error, warning, info, debug, discard
	// discard level will discard any of the log message received
	// default: debug
	PrintLogLevel string `json:"print_log_level" mapstructure:"print_log_level"`
	// ErrorLogLevel will set default log level for error message log
	// available level are: panic, fatal, error, warning, info, debug, discard
	// discard level will discard any of the log message received
	// default: error
	ErrorLogLevel string `json:"error_log_level" mapstructure:"error_log_level"`

	QueueCapacity          int    `json:"queue_capacity" mapstructure:"queue_capacity"`
	MinBytes               int    `json:"min_bytes" mapstructure:"min_bytes"`
	MaxBytes               int    `json:"max_bytes" mapstructure:"max_bytes"`
	MaxWait                string `json:"max_wait" mapstructure:"max_wait"`
	MaxAttempts            int    `json:"max_attempts" mapstructure:"max_attempts"`
	ReadlagInterval        string `json:"read_lag_interval" mapstructure:"read_lag_interval"`
	HeartbeatInterval      string `json:"heartbeat_interval" mapstructure:"heartbeat_interval"`
	CommitInterval         string `json:"commit_interval" mapstructure:"commit_interval"`
	PartitionWatchInterval string `json:"partition_watch_interval" mapstructure:"partition_watch_interval"`
	WatchPartitionChanges  bool   `json:"watch_partition_changes" mapstructure:"watch_partition_changes"`
	SessionTimeout         string `json:"session_timeout" mapstructure:"session_timeout"`
	RebalanceTimeout       string `json:"rebalance_timeout" mapstructure:"rebalance_timeout"`
	JoinGroupBackoff       string `json:"join_group_backoff" mapstructure:"join_group_backoff"`
	RetentionTime          string `json:"retention_time" mapstructure:"retention_time"`
	StartOffset            int64  `json:"start_offset" mapstructure:"start_offset"`
	ReadBackoffMin         string `json:"read_backoff_min" mapstructure:"read_backoff_min"`
	ReadBackoffMax         string `json:"read_backoff_max" mapstructure:"read_backoff_max"`
}

func (*KafkaListener) Listen

func (k *KafkaListener) Listen(ctx context.Context, topic, group string) (event.Iterator, error)

type KafkaMessageCarrier

type KafkaMessageCarrier struct {
	*kafka.Message
}

func (KafkaMessageCarrier) Get

func (k KafkaMessageCarrier) Get(key string) string

Get retrieves a single value for a given key.

func (KafkaMessageCarrier) Keys

func (k KafkaMessageCarrier) Keys() []string

Keys returns a slice of all key identifiers in the carrier.

func (KafkaMessageCarrier) Set

func (k KafkaMessageCarrier) Set(key, val string)

Set sets a header.

type KafkaSender

type KafkaSender struct {
	Brokers        []string               `json:"brokers" mapstructure:"brokers"`
	BatchSize      int                    `json:"batch_size" mapstructure:"batch_size"`
	BatchTimeout   string                 `json:"batch_timeout" mapstructure:"batch_timeout"`
	KeyFile        string                 `json:"key_file" mapstructure:"key_file"`
	CertFile       string                 `json:"cert_file" mapstructure:"cert_file"`
	CACertificate  string                 `json:"ca_cert" mapstructure:"ca_cert"`
	AuthType       string                 `json:"auth_type" mapstructure:"auth_type"`
	Username       string                 `json:"username" mapstructure:"username"`
	Password       string                 `json:"password" mapstructure:"password"`
	MaxAttempts    int                    `json:"max_attempts" mapstructure:"max_attempts"`
	Balancer       string                 `json:"balancer" mapstructure:"balancer"`
	BalancerConfig map[string]interface{} `json:"balancer_config" mapstructure:"balancer_config"`

	// PrintLogLevel will set default log level for all log message other than error message
	// available level are: panic, fatal, error, warning, info, debug, discard
	// discard level will discard any of the log message received
	// default: debug
	PrintLogLevel string `json:"print_log_level" mapstructure:"print_log_level"`
	// ErrorLogLevel will set default log level for error message log
	// available level are: panic, fatal, error, warning, info, debug, discard
	// discard level will discard any of the log message received
	// default: error
	ErrorLogLevel string `json:"error_log_level" mapstructure:"error_log_level"`
	// contains filtered or unexported fields
}

func (*KafkaSender) Send

func (k *KafkaSender) Send(ctx context.Context, message *event.EventMessage) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL