Documentation ¶
Index ¶
Constants ¶
const ( // AckNone means the broker does not send any response/ack to client. High // throughput, low latency. No durability guarantee. The producer does not // wait for acknowledgment from the server. AckNone Ack = 0 // AckLeader means only the leader broker will need to ack the message. Medium // throughput, medium latency. Leader writes the record to its local log, and // responds without awaiting full acknowledgment from all followers. AckLeader = 1 // AckAll means the broker will block until message is committed by all in // sync replicas (ISRs). Low throughput, high latency. Leader waits for the // full set of in-sync replicas (ISRs) to acknowledge the record. This // guarantees that the record is not lost as long as at least one IRS is // active. AckAll = -1 )
Variables ¶
var ConsumerDefaults = Consumer{ CommitInterval: 5 * time.Second, Debug: Debug{}, HeartbeatInterval: 1 * time.Second, IgnoreErrors: []kafka.ErrorCode{ kafka.ErrTransport, kafka.ErrAllBrokersDown, kafka.ErrDestroy, kafka.ErrFail, kafka.ErrResolve, kafka.ErrLeaderNotAvailable, kafka.ErrNotLeaderForPartition, kafka.ErrRequestTimedOut, kafka.ErrBrokerNotAvailable, kafka.ErrReplicaNotAvailable, kafka.ErrNetworkException, kafka.ErrNotEnoughReplicas, kafka.ErrNotEnoughReplicasAfterAppend, kafka.ErrUnknownMemberID, kafka.ErrMaxPollExceeded, }, MaxInFlightRequests: 1000000, OffsetInitial: OffsetBeginning, SecurityProtocol: ProtocolPlaintext, SessionTimeout: 30 * time.Second, SSL: SSL{}, StatisticsInterval: 15 * time.Minute, }
ConsumerDefaults holds the default values for Consumer.
var ProducerDefaults = Producer{ BatchMessageSize: 10000, CompressionCodec: CompressionLZ4, Debug: Debug{}, HeartbeatInterval: 1 * time.Second, IgnoreErrors: []kafka.ErrorCode{ kafka.ErrTransport, kafka.ErrAllBrokersDown, kafka.ErrDestroy, kafka.ErrFail, kafka.ErrResolve, kafka.ErrLeaderNotAvailable, kafka.ErrNotLeaderForPartition, kafka.ErrRequestTimedOut, kafka.ErrBrokerNotAvailable, kafka.ErrReplicaNotAvailable, kafka.ErrNetworkException, kafka.ErrNotEnoughReplicas, kafka.ErrNotEnoughReplicasAfterAppend, kafka.ErrUnknownMemberID, }, MaxDeliveryRetries: 5, MaxInFlightRequests: 1000000, MaxQueueBufferDuration: 500 * time.Millisecond, MaxQueueSizeKBytes: 2097151, MaxQueueSizeMessages: 1000000, RequiredAcks: AckAll, RetryBackoff: 15 * time.Second, SecurityProtocol: ProtocolPlaintext, SessionTimeout: 30 * time.Second, SSL: SSL{}, StatisticsInterval: 15 * time.Minute, }
ProducerDefaults holds the default values for Producer.
var TestBrokerAddress = "127.0.0.1:9092"
TestBrokerAddress is the address used to connect to the testing broker. This defaults to 127.0.0.1:9092, but can be overwritten if desired.
Functions ¶
This section is empty.
Types ¶
type Ack ¶
type Ack int
Ack is the configuration that dictates the acknowledgment behavior of the Kafka broker to this producer.
func (Ack) ConfigValue ¶
func (a Ack) ConfigValue() kafka.ConfigValue
ConfigValue returns the kafka.ConfigValue value for the method receiver.
type Compression ¶
type Compression string
Compression is the compression codec used to compress message sets before delivering them to the Kafka brokers.
const ( // CompressionNone sets no message compression. Less CPU usage, more data // volume to transmit, slower throughput. CompressionNone Compression = "none" // CompressionGZIP sets message compression to GZIP. CompressionGZIP Compression = "gzip" // CompressionSnappy sets message compression to Snappy. CompressionSnappy Compression = "snappy" // CompressionLZ4 sets message compression to LZ4. CompressionLZ4 Compression = "lz4" )
func (Compression) ConfigValue ¶
func (c Compression) ConfigValue() kafka.ConfigValue
ConfigValue returns the kafka.ConfigValue value for the method receiver.
func (*Compression) Set ¶
func (c *Compression) Set(value string) error
Set is used by the `envconfig` package to set the right value based off of the provided environment variables.
type Consumer ¶
type Consumer struct { // Brokers is a list of host/port pairs to use for establishing the initial // connection to the Kafka cluster. The client will make use of all servers // irrespective of which servers are specified here for bootstrapping — this // list only impacts the initial hosts used to discover the full set of // servers. Since these servers are just used for the initial connection to // discover the full cluster membership (which may change dynamically), this // list need not contain the full set of servers (you may want more than one, // though, in case a server is down). Brokers []string `kafka:"metadata.broker.list,omitempty"` // CommitInterval represents the frequency in milliseconds that the // consumer offsets are auto-committed to Kafka. CommitInterval time.Duration `kafka:"auto.commit.interval.ms,omitempty" split_words:"true"` // Debug allows tweaking of the default debug values. Debug Debug `kafka:"debug,omitempty"` // GroupID is a unique string that identifies the consumer group this consumer // belongs to. This property is required if the consumer uses either the group // management functionality by using subscribe(topic) or the Kafka-based // offset management strategy. GroupID string `kafka:"group.id,omitempty" split_words:"true"` // HeartbeatInterval represents The expected time between heartbeats to the // consumer coordinator when using Kafka's group management facilities. // Heartbeats are used to ensure that the consumer's session stays active and // to facilitate rebalancing when new consumers join or leave the group. The // value must be set lower than `SessionTimeout`, but typically should be set // no higher than 1/3 of that value. It can be adjusted even lower to control // the expected time for normal rebalances. HeartbeatInterval time.Duration `kafka:"heartbeat.interval.ms,omitempty" split_words:"true"` // ID is an id string to pass to the server when making requests. The purpose // of this is to be able to track the source of requests beyond just IP/port // by allowing a logical application name to be included in server-side // request logging. ID string `kafka:"client.id,omitempty" envconfig:"client_id"` // IgnoreErrors determines what Kafka related errors are considered "safe to // ignore". These errors will not be transmitted over the Consumer's errors // channel, as the underlying library will handle those errors gracefully. IgnoreErrors []kafka.ErrorCode // MaxInFlightRequests dictates the maximum number of in-flight requests per // broker connection. This is a generic property applied to all broker // communication, however it is primarily relevant to produce requests. In // particular, note that other mechanisms limit the number of outstanding // consumer fetch request per broker to one. MaxInFlightRequests int `kafka:"max.in.flight.requests.per.connection,omitempty" split_words:"true"` // nolint: lll // MaxPollInterval determines the maximum allowed time between calls to // consume messages. If this interval is exceeded the consumer is // considered failed and the group will rebalance in order to reassign the // partitions to another consumer group member. // // Warning: Offset commits may be not possible at this point. // // Note: It is recommended to set `EnableAutoOffsetStore` to `false` for // long-time processing applications and then explicitly store offsets // (using offsets_store()) after message processing, to make sure offsets // are not auto-committed prior to processing has finished. // // The interval is checked two times per second. MaxPollInterval time.Duration `kafka:"max.poll.interval.ms,omitempty" split_words:"true"` // OffsetDefault sets an offset starting point from which to consume messages. // If this value is set to zero (0), the value is ignored, and the // `OffsetInitial` is used instead (see its description for more details). If // the value is a positive integer, the offset will be set as expected, // meaning that a value of `5` will set the starting offset to 5. If we // provide a negative integer, we parse the value as a "tail" offset, meaning // that an integer of `-2` will be considered as "get the second message from // the last known offset of the topic". // // NOTE that this value ONLY works when used in a consumer group that has no // offsets committed to Kafka yet. As soon as the first offset is committed // (per partition), this value is ignored for that consumer group, and the // consumer will instead continue reading from the last known offset. // // If you want to make sure that the provided offset is _always_ used as a // starting point, you can use this value in conjunction with // `streamconfig.GroupIDRandom()`. OffsetDefault *int64 `kafka:"-" split_words:"true"` // OffsetInitial dictates what to do when there is no initial offset in Kafka // or if the current offset does not exist any more on the server (e.g. // because that data has been deleted): // // * OffsetBeginning: automatically reset the offset to the earliest offset // * OffsetEnd: automatically reset the offset to the latest offset // * none: throw exception to the consumer if no previous offset is found for // the consumer's group OffsetInitial Offset `kafka:"{topic}.auto.offset.reset,omitempty" split_words:"true"` // SecurityProtocol is the protocol used to communicate with brokers. SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"` // SessionTimeout represents the timeout used to detect consumer failures when // using Kafka's group management facility. The consumer sends periodic // heartbeats to indicate its liveness to the broker. If no heartbeats are // received by the broker before the expiration of this session timeout, then // the broker will remove this consumer from the group and initiate a // rebalance. Note that the value must be in the allowable range as configured // in the broker configuration by `group.min.session.timeout.ms` and // `group.max.session.timeout.ms`. SessionTimeout time.Duration `kafka:"session.timeout.ms,omitempty" split_words:"true"` // SSL contains all configuration values for Kafka SSL connections. Defaults // to an empty struct, meaning no SSL configuration is required to connect to // the brokers. SSL SSL `kafka:"ssl,omitempty"` // StatisticsInterval is the time interval between statistics shared by Kafka // on how the client and cluster is performing. // // See: https://github.com/edenhill/librdkafka/wiki/Statistics // // If set to 0, no statistics will be produced. Defaults to 15 minutes. StatisticsInterval time.Duration `kafka:"statistics.interval.ms" split_words:"true"` // Topics is a list of topics to which to subscribe for this consumer. Topics []string `kafka:"-"` }
Consumer is a value-object, containing all user-configurable configuration values that dictate how the Kafka client's consumer will behave.
func TestConsumer ¶
TestConsumer returns a kafkaconfig.Consumer struct with its options tweaked for testing purposes.
type Debug ¶
type Debug struct { All, Broker, CGRP, Consumer, Feature, Fetch, Generic, Interceptor, Metadata, Msg, Plugin, Protocol, Queue, Security, Topic bool }
Debug contains all available debug configuration values. Each value defaults to `false`, but can be set to `true` accordingly.
Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch.
func (Debug) ConfigValue ¶
func (d Debug) ConfigValue() kafka.ConfigValue
ConfigValue returns the kafka.ConfigValue value for the method receiver.
type Offset ¶
type Offset string
Offset is the configuration that dictates whether the consumer should start reading from the beginning, or the end of a group.
const ( // OffsetBeginning instructs the consumer to start reading from the first // message. OffsetBeginning Offset = "beginning" // OffsetEnd instructs the consumer to start reading from the last message. OffsetEnd = "end" )
func (Offset) ConfigValue ¶
func (o Offset) ConfigValue() kafka.ConfigValue
ConfigValue returns the kafka.ConfigValue value for the method receiver.
type Producer ¶
type Producer struct { // BatchMessageSize sets the maximum number of messages batched in one // MessageSet. The total MessageSet size is also limited by message.max.bytes. BatchMessageSize int `kafka:"batch.num.messages,omitempty" split_words:"true"` // Brokers is a list of host/port pairs to use for establishing the initial // connection to the Kafka cluster. The client will make use of all servers // irrespective of which servers are specified here for bootstrapping — this // list only impacts the initial hosts used to discover the full set of // servers. Since these servers are just used for the initial connection to // discover the full cluster membership (which may change dynamically), this // list need not contain the full set of servers (you may want more than one, // though, in case a server is down). Brokers []string `kafka:"metadata.broker.list,omitempty" split_words:"true"` // CompressionCodec sets the compression codec to use for compressing message // sets. This is the default value for all topics, may be overridden by the // topic configuration property compression.codec. Set to `LZ4` by default. CompressionCodec Compression `kafka:"compression.codec,omitempty" split_words:"true"` // Debug allows tweaking of the default debug values. Debug Debug `kafka:"debug,omitempty"` // HeartbeatInterval represents The expected time between heartbeats to the // consumer coordinator when using Kafka's group management facilities. // Heartbeats are used to ensure that the consumer's session stays active and // to facilitate rebalancing when new consumers join or leave the group. The // value must be set lower than `SessionTimeout`, but typically should be set // no higher than 1/3 of that value. It can be adjusted even lower to control // the expected time for normal rebalances. HeartbeatInterval time.Duration `kafka:"heartbeat.interval.ms,omitempty" split_words:"true"` // ID is an id string to pass to the server when making requests. The purpose // of this is to be able to track the source of requests beyond just IP/port // by allowing a logical application name to be included in server-side // request logging. ID string `kafka:"client.id,omitempty" envconfig:"client_id"` // IgnoreErrors determines what Kafka related errors are considered "safe to // ignore". These errors will not be transmitted over the Producer's errors // channel, as the underlying library will handle those errors gracefully. IgnoreErrors []kafka.ErrorCode // MaxDeliveryRetries dictates how many times to retry sending a failing // MessageSet. Defaults to 5 retries. MaxDeliveryRetries int `kafka:"message.send.max.retries" split_words:"true"` // MaxInFlightRequests dictates the maximum number of in-flight requests per // broker connection. This is a generic property applied to all broker // communication, however it is primarily relevant to produce requests. In // particular, note that other mechanisms limit the number of outstanding // consumer fetch request per broker to one. // // Note: having more than one in flight request may cause reordering. Use // `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery. MaxInFlightRequests int `kafka:"max.in.flight.requests.per.connection,omitempty" split_words:"true"` // nolint: lll // MaxQueueBufferDuration is the delay to wait for messages in the producer // queue to accumulate before constructing message batches (MessageSets) to // transmit to brokers. A higher value allows larger and more effective (less // overhead, improved compression) batches of messages to accumulate at the // expense of increased message delivery latency. Defaults to 500 ms to // strike a balance between latency and throughput. MaxQueueBufferDuration time.Duration `kafka:"queue.buffering.max.ms,omitempty" split_words:"true"` // MaxQueueSizeKBytes is the maximum total message size sum allowed on the // producer queue. This property has higher priority than // MaxQueueSizeMessages. MaxQueueSizeKBytes int `kafka:"queue.buffering.max.kbytes,omitempty" envconfig:"max_queue_size_kbytes"` // nolint: lll // MaxQueueSizeMessages dictates the maximum number of messages allowed on the // producer queue. MaxQueueSizeMessages int `kafka:"queue.buffering.max.messages,omitempty" split_words:"true"` // RequiredAcks indicates how many acknowledgments the leader broker must // receive from ISR brokers before responding to the request: // // AckNone: Broker does not send any response/ack to client // AckLeader: Only the leader broker will need to ack the message, // AckAll: broker will block until message is committed by all in sync // replicas (ISRs). // // Defaults to `AckAll`. RequiredAcks Ack `kafka:"{topic}.request.required.acks" split_words:"true"` // RetryBackoff sets the backoff time before retrying a protocol request. RetryBackoff time.Duration `kafka:"retry.backoff.ms" split_words:"true"` // SecurityProtocol is the protocol used to communicate with brokers. SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"` // SessionTimeout represents the timeout used to detect consumer failures when // using Kafka's group management facility. The consumer sends periodic // heartbeats to indicate its liveness to the broker. If no heartbeats are // received by the broker before the expiration of this session timeout, then // the broker will remove this consumer from the group and initiate a // rebalance. Note that the value must be in the allowable range as configured // in the broker configuration by `group.min.session.timeout.ms` and // `group.max.session.timeout.ms`. SessionTimeout time.Duration `kafka:"session.timeout.ms,omitempty" split_words:"true"` // SSL contains all configuration values for Kafka SSL connections. Defaults // to an empty struct, meaning no SSL configuration is required to connect to // the brokers. SSL SSL `kafka:"ssl,omitempty"` // StatisticsInterval is the time interval between statistics shared by Kafka // on how the client and cluster is performing. // // See: https://github.com/edenhill/librdkafka/wiki/Statistics // // If set to 0, no statistics will be produced. Defaults to 15 minutes. StatisticsInterval time.Duration `kafka:"statistics.interval.ms" split_words:"true"` // Topic is the topic used to deliver messages to. This value is used as a // default value, if the provided message does not define a topic of its own. Topic string `kafka:"-"` }
Producer is a value-object, containing all user-configurable configuration values that dictate how the Kafka client's producer will behave.
type Protocol ¶
type Protocol string
Protocol is the protocol used to communicate with brokers.
const ( // ProtocolPlaintext is the default unencrypted protocol. ProtocolPlaintext Protocol = "plaintext" // ProtocolSSL is the SSL-based protocol. ProtocolSSL Protocol = "ssl" // ProtocolSASLPlaintext is the Simple Authentication and Security Layer // protocol with plain text. ProtocolSASLPlaintext Protocol = "sasl_plaintext" // ProtocolSASLSSL is the Simple Authentication and Security Layer protocol // with SSL. ProtocolSASLSSL Protocol = "sasl_ssl" )
func (Protocol) ConfigValue ¶
func (p Protocol) ConfigValue() kafka.ConfigValue
ConfigValue returns the kafka.ConfigValue value for the method receiver.
type SSL ¶
type SSL struct { // CAPath is the file or directory path to CA certificate(s) for verifying the // broker's key. CAPath string `kafka:"ca.location,omitempty" envconfig:"ca_path"` // CertPath is the path to client's public key (PEM) used for authentication. CertPath string `kafka:"certificate.location,omitempty" envconfig:"cert_path"` // CRLPath is the path to CRL for verifying broker's certificate validity. CRLPath string `kafka:"crl.location,omitempty" envconfig:"crl_path"` // KeyPassword is the password of the private key in the key store file. This // is optional for client. KeyPassword string `kafka:"key.password,omitempty" split_words:"true"` // KeyPath is the path to client's private key (PEM) used for authentication. KeyPath string `kafka:"key.location,omitempty" split_words:"true"` // KeystorePassword is the store password for the key store file. This is // optional for client and only needed if `KeystorePath` is configured. KeystorePassword string `kafka:"keystore.password,omitempty" split_words:"true"` // KeystorePath is the location of the key store file. This is optional for // client and can be used for two-way authentication for client. KeystorePath string `kafka:"keystore.location,omitempty" split_words:"true"` }
SSL contains all configuration values for Kafka SSL connections.