kafkaconfig

package
v3.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2021 License: ISC Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AckNone means the broker does not send any response/ack to client. High
	// throughput, low latency. No durability guarantee. The producer does not
	// wait for acknowledgment from the server.
	AckNone Ack = 0

	// AckLeader means only the leader broker will need to ack the message. Medium
	// throughput, medium latency. Leader writes the record to its local log, and
	// responds without awaiting full acknowledgment from all followers.
	AckLeader = 1

	// AckAll means the broker will block until message is committed by all in
	// sync replicas (ISRs). Low throughput, high latency. Leader waits for the
	// full set of in-sync replicas (ISRs) to acknowledge the record. This
	// guarantees that the record is not lost as long as at least one IRS is
	// active.
	AckAll = -1
)

Variables

ConsumerDefaults holds the default values for Consumer.

View Source
var ProducerDefaults = Producer{
	BatchMessageSize:  10000,
	CompressionCodec:  CompressionLZ4,
	Debug:             Debug{},
	HeartbeatInterval: 1 * time.Second,
	IgnoreErrors: []kafka.ErrorCode{

		kafka.ErrTransport,
		kafka.ErrAllBrokersDown,

		kafka.ErrDestroy,
		kafka.ErrFail,
		kafka.ErrResolve,
		kafka.ErrLeaderNotAvailable,
		kafka.ErrNotLeaderForPartition,
		kafka.ErrRequestTimedOut,
		kafka.ErrBrokerNotAvailable,
		kafka.ErrReplicaNotAvailable,
		kafka.ErrNetworkException,
		kafka.ErrNotEnoughReplicas,
		kafka.ErrNotEnoughReplicasAfterAppend,
		kafka.ErrUnknownMemberID,
	},
	MaxDeliveryRetries:     5,
	MaxInFlightRequests:    1000000,
	MaxQueueBufferDuration: 500 * time.Millisecond,
	MaxQueueSizeKBytes:     2097151,
	MaxQueueSizeMessages:   1000000,
	RequiredAcks:           AckAll,
	RetryBackoff:           15 * time.Second,
	SecurityProtocol:       ProtocolPlaintext,
	SessionTimeout:         30 * time.Second,
	SSL:                    SSL{},
	StatisticsInterval:     15 * time.Minute,
}

ProducerDefaults holds the default values for Producer.

View Source
var TestBrokerAddress = "127.0.0.1:9092"

TestBrokerAddress is the address used to connect to the testing broker. This defaults to 127.0.0.1:9092, but can be overwritten if desired.

Functions

This section is empty.

Types

type Ack

type Ack int

Ack is the configuration that dictates the acknowledgment behavior of the Kafka broker to this producer.

func (Ack) ConfigValue

func (a Ack) ConfigValue() kafka.ConfigValue

ConfigValue returns the kafka.ConfigValue value for the method receiver.

type Compression

type Compression string

Compression is the compression codec used to compress message sets before delivering them to the Kafka brokers.

const (
	// CompressionNone sets no message compression. Less CPU usage, more data
	// volume to transmit, slower throughput.
	CompressionNone Compression = "none"

	// CompressionGZIP sets message compression to GZIP.
	CompressionGZIP Compression = "gzip"

	// CompressionSnappy sets message compression to Snappy.
	CompressionSnappy Compression = "snappy"

	// CompressionLZ4 sets message compression to LZ4.
	CompressionLZ4 Compression = "lz4"
)

func (Compression) ConfigValue

func (c Compression) ConfigValue() kafka.ConfigValue

ConfigValue returns the kafka.ConfigValue value for the method receiver.

func (*Compression) Set

func (c *Compression) Set(value string) error

Set is used by the `envconfig` package to set the right value based off of the provided environment variables.

type Consumer

type Consumer struct {
	// Brokers is a list of host/port pairs to use for establishing the initial
	// connection to the Kafka cluster. The client will make use of all servers
	// irrespective of which servers are specified here for bootstrapping — this
	// list only impacts the initial hosts used to discover the full set of
	// servers. Since these servers are just used for the initial connection to
	// discover the full cluster membership (which may change dynamically), this
	// list need not contain the full set of servers (you may want more than one,
	// though, in case a server is down).
	Brokers []string `kafka:"metadata.broker.list,omitempty"`

	// CommitInterval represents the frequency in milliseconds that the
	// consumer offsets are auto-committed to Kafka.
	CommitInterval time.Duration `kafka:"auto.commit.interval.ms,omitempty" split_words:"true"`

	// Debug allows tweaking of the default debug values.
	Debug Debug `kafka:"debug,omitempty"`

	// GroupID is a unique string that identifies the consumer group this consumer
	// belongs to. This property is required if the consumer uses either the group
	// management functionality by using subscribe(topic) or the Kafka-based
	// offset management strategy.
	GroupID string `kafka:"group.id,omitempty" split_words:"true"`

	// HeartbeatInterval represents The expected time between heartbeats to the
	// consumer coordinator when using Kafka's group management facilities.
	// Heartbeats are used to ensure that the consumer's session stays active and
	// to facilitate rebalancing when new consumers join or leave the group. The
	// value must be set lower than `SessionTimeout`, but typically should be set
	// no higher than 1/3 of that value. It can be adjusted even lower to control
	// the expected time for normal rebalances.
	HeartbeatInterval time.Duration `kafka:"heartbeat.interval.ms,omitempty" split_words:"true"`

	// ID is an id string to pass to the server when making requests. The purpose
	// of this is to be able to track the source of requests beyond just IP/port
	// by allowing a logical application name to be included in server-side
	// request logging.
	ID string `kafka:"client.id,omitempty" envconfig:"client_id"`

	// IgnoreErrors determines what Kafka related errors are considered "safe to
	// ignore". These errors will not be transmitted over the Consumer's errors
	// channel, as the underlying library will handle those errors gracefully.
	IgnoreErrors []kafka.ErrorCode

	// MaxInFlightRequests dictates the maximum number of in-flight requests per
	// broker connection. This is a generic property applied to all broker
	// communication, however it is primarily relevant to produce requests. In
	// particular, note that other mechanisms limit the number of outstanding
	// consumer fetch request per broker to one.
	MaxInFlightRequests int `kafka:"max.in.flight.requests.per.connection,omitempty" split_words:"true"` // nolint: lll

	// MaxPollInterval determines the maximum allowed time between calls to
	// consume messages. If this interval is exceeded the consumer is
	// considered failed and the group will rebalance in order to reassign the
	// partitions to another consumer group member.
	//
	// Warning: Offset commits may be not possible at this point.
	//
	// Note: It is recommended to set `EnableAutoOffsetStore` to `false` for
	// long-time processing applications and then explicitly store offsets
	// (using offsets_store()) after message processing, to make sure offsets
	// are not auto-committed prior to processing has finished.
	//
	// The interval is checked two times per second.
	MaxPollInterval time.Duration `kafka:"max.poll.interval.ms,omitempty" split_words:"true"`

	// OffsetDefault sets an offset starting point from which to consume messages.
	// If this value is set to zero (0), the value is ignored, and the
	// `OffsetInitial` is used instead (see its description for more details). If
	// the value is a positive integer, the offset will be set as expected,
	// meaning that a value of `5` will set the starting offset to 5. If we
	// provide a negative integer, we parse the value as a "tail" offset, meaning
	// that an integer of `-2` will be considered as "get the second message from
	// the last known offset of the topic".
	//
	// NOTE that this value ONLY works when used in a consumer group that has no
	// offsets committed to Kafka yet. As soon as the first offset is committed
	// (per partition), this value is ignored for that consumer group, and the
	// consumer will instead continue reading from the last known offset.
	//
	// If you want to make sure that the provided offset is _always_ used as a
	// starting point, you can use this value in conjunction with
	// `streamconfig.GroupIDRandom()`.
	OffsetDefault *int64 `kafka:"-" split_words:"true"`

	// OffsetInitial dictates what to do when there is no initial offset in Kafka
	// or if the current offset does not exist any more on the server (e.g.
	// because that data has been deleted):
	//
	// * OffsetBeginning: automatically reset the offset to the earliest offset
	// * OffsetEnd: automatically reset the offset to the latest offset
	// * none: throw exception to the consumer if no previous offset is found for
	//   the consumer's group
	OffsetInitial Offset `kafka:"{topic}.auto.offset.reset,omitempty" split_words:"true"`

	// SecurityProtocol is the protocol used to communicate with brokers.
	SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"`

	// SessionTimeout represents the timeout used to detect consumer failures when
	// using Kafka's group management facility. The consumer sends periodic
	// heartbeats to indicate its liveness to the broker. If no heartbeats are
	// received by the broker before the expiration of this session timeout, then
	// the broker will remove this consumer from the group and initiate a
	// rebalance. Note that the value must be in the allowable range as configured
	// in the broker configuration by `group.min.session.timeout.ms` and
	// `group.max.session.timeout.ms`.
	SessionTimeout time.Duration `kafka:"session.timeout.ms,omitempty" split_words:"true"`

	// SSL contains all configuration values for Kafka SSL connections. Defaults
	// to an empty struct, meaning no SSL configuration is required to connect to
	// the brokers.
	SSL SSL `kafka:"ssl,omitempty"`

	// StatisticsInterval is the time interval between statistics shared by Kafka
	// on how the client and cluster is performing.
	//
	// See: https://github.com/edenhill/librdkafka/wiki/Statistics
	//
	// If set to 0, no statistics will be produced. Defaults to 15 minutes.
	StatisticsInterval time.Duration `kafka:"statistics.interval.ms" split_words:"true"`

	// Topics is a list of topics to which to subscribe for this consumer.
	Topics []string `kafka:"-"`
}

Consumer is a value-object, containing all user-configurable configuration values that dictate how the Kafka client's consumer will behave.

func TestConsumer

func TestConsumer(tb testing.TB) Consumer

TestConsumer returns a kafkaconfig.Consumer struct with its options tweaked for testing purposes.

func (*Consumer) ConfigMap

func (c *Consumer) ConfigMap() (*kafka.ConfigMap, error)

ConfigMap converts the current configuration into a format known to the rdkafka library.

type Debug

type Debug struct {
	All,

	Broker,
	CGRP,
	Consumer,
	Feature,
	Fetch,
	Generic,
	Interceptor,
	Metadata,
	Msg,
	Plugin,
	Protocol,
	Queue,
	Security,
	Topic bool
}

Debug contains all available debug configuration values. Each value defaults to `false`, but can be set to `true` accordingly.

Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch.

func (Debug) ConfigValue

func (d Debug) ConfigValue() kafka.ConfigValue

ConfigValue returns the kafka.ConfigValue value for the method receiver.

func (*Debug) Set

func (d *Debug) Set(value string) error

Set is used by the `envconfig` package to allow setting the debug configuration through environment variables.

type Offset

type Offset string

Offset is the configuration that dictates whether the consumer should start reading from the beginning, or the end of a group.

const (
	// OffsetBeginning instructs the consumer to start reading from the first
	// message.
	OffsetBeginning Offset = "beginning"

	// OffsetEnd instructs the consumer to start reading from the last message.
	OffsetEnd = "end"
)

func (Offset) ConfigValue

func (o Offset) ConfigValue() kafka.ConfigValue

ConfigValue returns the kafka.ConfigValue value for the method receiver.

func (*Offset) Set

func (o *Offset) Set(value string) error

Set is used by the `envconfig` package to set the right value based off of the provided environment variables.

type Producer

type Producer struct {
	// BatchMessageSize sets the maximum number of messages batched in one
	// MessageSet. The total MessageSet size is also limited by message.max.bytes.
	BatchMessageSize int `kafka:"batch.num.messages,omitempty" split_words:"true"`

	// Brokers is a list of host/port pairs to use for establishing the initial
	// connection to the Kafka cluster. The client will make use of all servers
	// irrespective of which servers are specified here for bootstrapping — this
	// list only impacts the initial hosts used to discover the full set of
	// servers. Since these servers are just used for the initial connection to
	// discover the full cluster membership (which may change dynamically), this
	// list need not contain the full set of servers (you may want more than one,
	// though, in case a server is down).
	Brokers []string `kafka:"metadata.broker.list,omitempty" split_words:"true"`

	// CompressionCodec sets the compression codec to use for compressing message
	// sets. This is the default value for all topics, may be overridden by the
	// topic configuration property compression.codec. Set to `LZ4` by default.
	CompressionCodec Compression `kafka:"compression.codec,omitempty" split_words:"true"`

	// Debug allows tweaking of the default debug values.
	Debug Debug `kafka:"debug,omitempty"`

	// HeartbeatInterval represents The expected time between heartbeats to the
	// consumer coordinator when using Kafka's group management facilities.
	// Heartbeats are used to ensure that the consumer's session stays active and
	// to facilitate rebalancing when new consumers join or leave the group. The
	// value must be set lower than `SessionTimeout`, but typically should be set
	// no higher than 1/3 of that value. It can be adjusted even lower to control
	// the expected time for normal rebalances.
	HeartbeatInterval time.Duration `kafka:"heartbeat.interval.ms,omitempty" split_words:"true"`

	// ID is an id string to pass to the server when making requests. The purpose
	// of this is to be able to track the source of requests beyond just IP/port
	// by allowing a logical application name to be included in server-side
	// request logging.
	ID string `kafka:"client.id,omitempty" envconfig:"client_id"`

	// IgnoreErrors determines what Kafka related errors are considered "safe to
	// ignore". These errors will not be transmitted over the Producer's errors
	// channel, as the underlying library will handle those errors gracefully.
	IgnoreErrors []kafka.ErrorCode

	// MaxDeliveryRetries dictates how many times to retry sending a failing
	// MessageSet. Defaults to 5 retries.
	MaxDeliveryRetries int `kafka:"message.send.max.retries" split_words:"true"`

	// MaxInFlightRequests dictates the maximum number of in-flight requests per
	// broker connection. This is a generic property applied to all broker
	// communication, however it is primarily relevant to produce requests. In
	// particular, note that other mechanisms limit the number of outstanding
	// consumer fetch request per broker to one.
	//
	// Note: having more than one in flight request may cause reordering. Use
	// `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery.
	MaxInFlightRequests int `kafka:"max.in.flight.requests.per.connection,omitempty" split_words:"true"` // nolint: lll

	// MaxQueueBufferDuration is the delay to wait for messages in the producer
	// queue to accumulate before constructing message batches (MessageSets) to
	// transmit to brokers. A higher value allows larger and more effective (less
	// overhead, improved compression) batches of messages to accumulate at the
	// expense of increased message delivery latency. Defaults to 500 ms to
	// strike a balance between latency and throughput.
	MaxQueueBufferDuration time.Duration `kafka:"queue.buffering.max.ms,omitempty" split_words:"true"`

	// MaxQueueSizeKBytes is the maximum total message size sum allowed on the
	// producer queue. This property has higher priority than
	// MaxQueueSizeMessages.
	MaxQueueSizeKBytes int `kafka:"queue.buffering.max.kbytes,omitempty" envconfig:"max_queue_size_kbytes"` // nolint: lll

	// MaxQueueSizeMessages dictates the maximum number of messages allowed on the
	// producer queue.
	MaxQueueSizeMessages int `kafka:"queue.buffering.max.messages,omitempty" split_words:"true"`

	// RequiredAcks indicates how many acknowledgments the leader broker must
	// receive from ISR brokers before responding to the request:
	//
	// AckNone: Broker does not send any response/ack to client
	// AckLeader: Only the leader broker will need to ack the message,
	// AckAll: broker will block until message is committed by all in sync
	// replicas (ISRs).
	//
	// Defaults to `AckAll`.
	RequiredAcks Ack `kafka:"{topic}.request.required.acks" split_words:"true"`

	// RetryBackoff sets the backoff time before retrying a protocol request.
	RetryBackoff time.Duration `kafka:"retry.backoff.ms" split_words:"true"`

	// SecurityProtocol is the protocol used to communicate with brokers.
	SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"`

	// SessionTimeout represents the timeout used to detect consumer failures when
	// using Kafka's group management facility. The consumer sends periodic
	// heartbeats to indicate its liveness to the broker. If no heartbeats are
	// received by the broker before the expiration of this session timeout, then
	// the broker will remove this consumer from the group and initiate a
	// rebalance. Note that the value must be in the allowable range as configured
	// in the broker configuration by `group.min.session.timeout.ms` and
	// `group.max.session.timeout.ms`.
	SessionTimeout time.Duration `kafka:"session.timeout.ms,omitempty" split_words:"true"`

	// SSL contains all configuration values for Kafka SSL connections. Defaults
	// to an empty struct, meaning no SSL configuration is required to connect to
	// the brokers.
	SSL SSL `kafka:"ssl,omitempty"`

	// StatisticsInterval is the time interval between statistics shared by Kafka
	// on how the client and cluster is performing.
	//
	// See: https://github.com/edenhill/librdkafka/wiki/Statistics
	//
	// If set to 0, no statistics will be produced. Defaults to 15 minutes.
	StatisticsInterval time.Duration `kafka:"statistics.interval.ms" split_words:"true"`

	// Topic is the topic used to deliver messages to. This value is used as a
	// default value, if the provided message does not define a topic of its own.
	Topic string `kafka:"-"`
}

Producer is a value-object, containing all user-configurable configuration values that dictate how the Kafka client's producer will behave.

func (*Producer) ConfigMap

func (p *Producer) ConfigMap() (*kafka.ConfigMap, error)

ConfigMap converts the current configuration into a format known to the rdkafka library.

type Protocol

type Protocol string

Protocol is the protocol used to communicate with brokers.

const (
	// ProtocolPlaintext is the default unencrypted protocol.
	ProtocolPlaintext Protocol = "plaintext"

	// ProtocolSSL is the SSL-based protocol.
	ProtocolSSL Protocol = "ssl"

	// ProtocolSASLPlaintext is the Simple Authentication and Security Layer
	// protocol with plain text.
	ProtocolSASLPlaintext Protocol = "sasl_plaintext"

	// ProtocolSASLSSL is the Simple Authentication and Security Layer protocol
	// with SSL.
	ProtocolSASLSSL Protocol = "sasl_ssl"
)

func (Protocol) ConfigValue

func (p Protocol) ConfigValue() kafka.ConfigValue

ConfigValue returns the kafka.ConfigValue value for the method receiver.

func (*Protocol) Set

func (p *Protocol) Set(value string) error

Set is used by the `envconfig` package to set the right value based off of the provided environment variables.

type SSL

type SSL struct {
	// CAPath is the file or directory path to CA certificate(s) for verifying the
	// broker's key.
	CAPath string `kafka:"ca.location,omitempty" envconfig:"ca_path"`

	// CertPath is the path to client's public key (PEM) used for authentication.
	CertPath string `kafka:"certificate.location,omitempty" envconfig:"cert_path"`

	// CRLPath is the path to CRL for verifying broker's certificate validity.
	CRLPath string `kafka:"crl.location,omitempty" envconfig:"crl_path"`

	// KeyPassword is the password of the private key in the key store file. This
	// is optional for client.
	KeyPassword string `kafka:"key.password,omitempty" split_words:"true"`

	// KeyPath is the path to client's private key (PEM) used for authentication.
	KeyPath string `kafka:"key.location,omitempty" split_words:"true"`

	// KeystorePassword is the store password for the key store file. This is
	// optional for client and only needed if `KeystorePath` is configured.
	KeystorePassword string `kafka:"keystore.password,omitempty" split_words:"true"`

	// KeystorePath is the location of the key store file. This is optional for
	// client and can be used for two-way authentication for client.
	KeystorePath string `kafka:"keystore.location,omitempty" split_words:"true"`
}

SSL contains all configuration values for Kafka SSL connections.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL