Documentation
¶
Overview ¶
Package kafka is the implementation of the iterator interface and the publisher interface for kafka.
Index ¶
- Variables
- type BatchConsumerSettings
- type BatchIterator
- type ConsumerConfig
- type ConsumerSettings
- type Iterator
- type KerberosConfig
- type PlainSASLConfig
- type ProducerConfig
- type ProducerSettings
- type PrometheusConfig
- type Publisher
- type ScramSASLConfig
- type Topic
- type Transaction
- func (t *Transaction) Abort(ctx context.Context) (bool, error)
- func (t *Transaction) Begin() error
- func (t *Transaction) Client() *kgo.Client
- func (t *Transaction) Close()
- func (t *Transaction) Commit(ctx context.Context) (bool, error)
- func (t *Transaction) NextBatch(ctx context.Context) ([]broker.Record, error)
- func (t *Transaction) Topic(topicID string) (broker.Topic, error)
- type TransactionalTopic
Constants ¶
This section is empty.
Variables ¶
var DefaultBatchConsumerSettings = BatchConsumerSettings{ ConsumerSettings: DefaultConsumerSettings, MaxPollRecords: 100, }
DefaultBatchConsumerSettings stores the default values for BatchConsumerSettings.
var DefaultConsumerSettings = ConsumerSettings{ MinBytes: 100, MaxWait: 5 * time.Second, MaxBytes: 10 * 1024 * 1024, MaxConcurrentFetches: 3, }
DefaultConsumerSettings stores the default values for ConsumerSettings.
var DefaultProducerSettings = ProducerSettings{ BatchSize: 40, BatchBytes: 5 * 1024 * 1024, Linger: 10 * time.Millisecond, Acks: kgo.AllISRAcks(), }
DefaultProducerSettings stores the default values for ProducerSettings.
Functions ¶
This section is empty.
Types ¶
type BatchConsumerSettings ¶
type BatchConsumerSettings struct { ConsumerSettings // MaxPollRecords the maximum number of records that a single call to poll() // will return. Use this to control the amount of data (but not the size of data) your // application will need to process in one iteration. // // Keep in mind that this is only the maximum number of records; there's no guarantee // the BatchIterator will return MaxPollRecords even if the state of the topic the iterator // consumes from allows it. MaxPollRecords int }
BatchConsumerSettings the optional settings for a batched Kafka consumer.
These settings are fully optional and are preset to sane defaults.
type BatchIterator ¶
type BatchIterator struct {
// contains filtered or unexported fields
}
BatchIterator models a Kafka consumer that iterates over batches of records.
func NewBatchIterator ¶
func NewBatchIterator(ctx context.Context, config ConsumerConfig, settings BatchConsumerSettings) (*BatchIterator, error)
NewBatchIterator returns a new instance of BatchIterator, configured from the provided ConsumerConfig and ConsumerSettings.
func (*BatchIterator) Close ¶
func (it *BatchIterator) Close()
Close closes the connection to the Kafka cluster.
type ConsumerConfig ¶
type ConsumerConfig struct { // BrokerAddr a comma-separated list of at least one broker which is a member of the target cluster. // // After establishing the initial connection, the listed broker(s) will provide the // consumer with information about the remaining brokers of the cluster. // // It's recommended that this list contains as many members of the target Kafka cluster as possible, // to ensure the initial connection doesn't fail due to the listed broker(s) being temporarily unavailable. BrokerAddr string // GroupID the name of the consumer group of this consumer. GroupID string // Topic the target topic of this consumer. // // Multiple topics can be provided by separating the topic names with a comma. Topic string // TLS the tls configuration. // // If nil, the consumer will not use tls. TLS *tls.Config // InsecureSkipVerify controls whether a client verifies the server's // certificate chain and host name. If InsecureSkipVerify is true, crypto/tls // accepts any certificate presented by the server and any host name in that // certificate. In this mode, TLS is susceptible to machine-in-the-middle // attacks unless custom verification is used. This should be used only for testing. InsecureSkipVerify bool // Kerberos the SASL Kerberos configuration. // // If nil, the consumer will not use Kerberos. Kerberos *KerberosConfig // PlainSASL the SASL/PLAIN configuration. // // If nil, the consumer will not use SASL/PLAIN. PlainSASL *PlainSASLConfig // ScramSASL the SASL/SCRAM-SHA-512 configuration. // // If nil, the consumer will not use SASL/SCRAM-SHA-512. ScramSASL *ScramSASLConfig // Prometheus the Prometheus configuration. // // If nil, the consumer will not use Prometheus metrics. Prometheus *PrometheusConfig }
ConsumerConfig defines the configuration properties needed for initializing a Kafka consumer.
All fields of the struct need to be set in order to successfully initialize the consumer.
type ConsumerSettings ¶
type ConsumerSettings struct { // MinBytes the minimum amount of data that a consumer wants to receive from the broker when fetching records. // If a broker receives a request for records from a consumer but the new records amount to fewer bytes than MinBytes, // the broker will wait until more messages are available before sending the records back to the consumer, reducing // the load on both the consumer and the broker, as they have to handle fewer back-and-forth messages // in cases where the topics don’t have much new activity. // // Keep in mind that increasing this value can increase latency for low-throughput cases. // // This is equivalent to the fetch.min.bytes setting of the Java client. MinBytes int // MaxWait defines how long will Kafka wait for at least MinBytes-worth of new records to appear, before // sending them to the consumer. // // If you set MaxWait to 100 ms and MinBytes to 1 MB, Kafka will receive a fetch request from the consumer and // will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first. // // This is equivalent to the fetch.max.wait.ms setting of the Java client. MaxWait time.Duration // MaxBytes the maximum amount of bytes Kafka will return whenever the consumer polls a broker. // It is used to limit the size of memory that the consumer will use to store data that was returned from the server, // irrespective of how many partitions or messages were returned. // // This is equivalent to the fetch.max.bytes setting of the Java client. MaxBytes int // MaxConcurrentFetches sets the maximum number of fetch requests to allow in flight or buffered at once, // overriding the unbounded (i.e. number of brokers) default. // // This setting, paired with MaxBytes, can upper bound the maximum amount of memory that the client can use for consuming. // Requests are issued to brokers in a FIFO order: once the client is ready to issue a request to a broker, // it registers that request and issues it in order with other registrations. // // A value of 0 implies the allowed concurrency is unbounded and will be limited only by the number of brokers in the cluster. MaxConcurrentFetches int // Transactional determines if the consumer consumes messages that a transactional producer could be committing // which enables to have exactly-once delivery guarantee. Transactional bool }
ConsumerSettings the optional settings for a Kafka consumer.
These settings are fully optional and are preset to sane defaults.
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
Iterator models a simple Kafka consumer, which yields a single message at a time.
func NewIterator ¶
func NewIterator(ctx context.Context, config ConsumerConfig, settings ConsumerSettings) (*Iterator, error)
NewIterator returns a new instance of Iterator, configured from the provided ConsumerConfig and ConsumerSettings.
func (*Iterator) Batched ¶
func (it *Iterator) Batched(maxPollRecords int) *BatchIterator
type KerberosConfig ¶
type KerberosConfig struct { // KeyTabPath path to the keytab file. // // A keytab is a file containing pairs of Kerberos principals and encrypted keys that are derived from // the Kerberos password. You can use this file to log on to Kerberos without being prompted for a password. // One of the ways to get the keytab and config files can be found here: // https://syntio.atlassian.net/wiki/spaces/SJ/pages/2105475207/Kerberized+Janitor#Getting-the-Kerberos-files KeyTabPath string // ConfigPath krb5.conf path. // // The krb5. conf file contains Kerberos configuration information, including the locations of KDCs and // administration daemons for the Kerberos realms of interest, defaults for the current realm and for Kerberos // applications, and mappings of host names onto Kerberos realms. This file must reside on all Kerberos clients. ConfigPath string // A Kerberos Realm is the domain over which a Kerberos authentication server has the authority to // authenticate a user, host or service. A realm name is often, but not always the upper case version // of the name of the DNS domain over which it presides. Realm string // A Kerberos Service is the service name we will get a ticket for. Service string // Username of the service principal. Username string }
KerberosConfig defines the configuration properties needed for using SASL Kerberos authentication.
All fields of the struct need to be set in order to successfully initialize the Kerberos authentication. Currently, only supports authenticating with kerberos via Keytab and conf file.
type PlainSASLConfig ¶
type PlainSASLConfig struct { // Zid is an optional authorization ID to use in authenticating. Zid string // User is username to use for authentication. User string // Pass is the password to use for authentication. Pass string }
PlainSASLConfig defines the configuration properties needed for using SASL/PLAIN authentication.
User and Pass fields need to be set in order to successfully initialize the SASL/PLAIN authentication.
type ProducerConfig ¶
type ProducerConfig struct { // BrokerAddr a list of at least one broker which is a member of the target cluster. // // After establishing the initial connection, the listed broker(s) will provide the // producer with information about the remaining brokers of the cluster. // // It's recommended that this list contains as many members of the target Kafka cluster as possible, // to ensure the initial connection doesn't fail due to the listed broker(s) being temporarily unavailable. BrokerAddr string // TLS the tls configuration. // // If nil, the producer will not use tls. TLS *tls.Config // InsecureSkipVerify controls whether a client verifies the server's // certificate chain and host name. If InsecureSkipVerify is true, crypto/tls // accepts any certificate presented by the server and any host name in that // certificate. In this mode, TLS is susceptible to machine-in-the-middle // attacks unless custom verification is used. This should be used only for testing. InsecureSkipVerify bool // Kerberos the SASL Kerberos configuration. // // If nil, the producer will not use Kerberos. Kerberos *KerberosConfig // PlainSASL the SASL/PLAIN configuration. // // If nil, the producer will not use SASL/PLAIN. PlainSASL *PlainSASLConfig // Prometheus the Prometheus configuration. // // If nil, the producer will not use Prometheus metrics. Prometheus *PrometheusConfig // DisableCompression flag that specifies if message compression is disabled. // // If true, compression is disabled. DisableCompression bool // TransactionalID used as a transactional ID if the producer is set to be a part of // a transaction. TransactionalID string }
ProducerConfig defines the configuration properties needed for initializing a Kafka producer.
All fields of the struct need to be set in order to successfully initialize the producer.
type ProducerSettings ¶
type ProducerSettings struct { // BatchSize sets the max amount of records the client will buffer, // blocking new produces until records are finished if this limit is reached. BatchSize int // BatchBytes when multiple records are sent to the same partition, the producer will batch them // together. BatchBytes parameter controls the amount of memory in bytes that will be used for each batch. // // BatchBytes does not mean that the producer will wait for the batch to // become full. The producer will send half-full batches and even batches with just a single message in them. // Therefore, setting the batch size too large will not cause delays in sending messages; it will just use more memory for the batches. // // This is equivalent to the batch.size setting of the Java client. BatchBytes int64 // Linger controls the amount of time to wait for additional messages before sending the current batch. // The producer sends a batch of messages either when the current batch is full or when the Linger limit is reached, // whatever comes first. // // Linger is specific to a topic partition. // A high volume producer will likely be producing to many partitions; // it is both unnecessary to linger in this case and inefficient because the client will have many timers running // (and stopping and restarting) unnecessarily. // // This is equivalent to the linger.ms setting of the Java client. Linger time.Duration // Acks represents the number of acks a broker leader must have before // a produce request is considered complete. // // This controls the durability of written records and corresponds to "acks" in // Kafka's Producer Configuration documentation. Allowed values are LeaderAck, AllISRAcks, NoAck and RequiredAcks. // Setting Acks to a value other than AllISRAcks implicitly disables the producer's idempotency option. // // The default is LeaderAck. Acks kgo.Acks }
ProducerSettings the optional settings for a Kafka producer.
These settings are fully optional and are preset to sane defaults.
type PrometheusConfig ¶
type PrometheusConfig struct { // Namespace is a prefix relevant to the domain the metric belongs to. Namespace string // Registerer is the Prometheus interface for the part of a registry // in charge of registering and unregistering metrics. // Users of custom registries should use Registerer as type for registration purposes // rather than the Registry type directly. Registerer prometheus.Registerer // Gatherer is the Prometheus interface for the part of a registry in charge of // gathering the collected metrics. // Same as Registerer, users should use Gatherer as type for gathering purposes // rather than the Registry type directly. Gatherer prometheus.Gatherer }
PrometheusConfig defines the configuration properties needed for exposing Prometheus metrics.
All fields of the struct need to be set in order to successfully initialize metrics.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher models a Kafka producer.
func NewPublisher ¶
func NewPublisher(ctx context.Context, config ProducerConfig, settings ProducerSettings) (*Publisher, error)
NewPublisher returns a new instance of Publisher, configured from the provided ProducerConfig and ProducerSettings.
type ScramSASLConfig ¶ added in v1.2.0
type ScramSASLConfig struct { // Zid is an optional authorization ID to use in authenticating. Zid string // User is username to use for authentication. User string // Pass is the password to use for authentication. Pass string }
ScramSASLConfig defines the configuration properties needed for using SASL/SCRAM-SHA-512 authentication.
User and Pass fields need to be set in order to successfully initialize the SASL/SCRAM-SHA-512 authentication.
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic models a Kafka producer with a set, "hardcoded" destination topic.
func (*Topic) BatchPublish ¶
BatchPublish publishes a batch of Kafka records to this topic.
Blocks until the records are fully committed to the Kafka cluster. May fail if ProducerBatchMaxBytes is set to a value higher than the broker accepts.
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction models a Kafka transaction session.
This model provides exactly-once delivery of the produced messages. More specifically, the messages consumed over this Transaction struct, processed in any way by the client application and published using the TransactionalTopic that contains this Transaction as it's element will be delivered exactly once. Therefore, a potential consumer consuming those messages should be configured to be transactional (setting ConsumerSettings.Transactional to true).
To start producing in a transaction, the method Transaction.Begin should be called first. After producing, the transaction should be either committed or aborted (Transaction.Commit, Transaction.Abort). Multiple begin - commit/abort transaction sessions can be held on this Transaction, meaning that there is no need to create a new instance for every desired transactional session. Before shutting down, Transaction.Close should be called.
func NewTransaction ¶
func NewTransaction(ctx context.Context, producerConfig ProducerConfig, producerSettings ProducerSettings, consumerConfig ConsumerConfig, consumerSettings BatchConsumerSettings, ) (*Transaction, error)
NewTransaction returns a new instance of Transaction, configured from the provided ProducerConfig, ProducerSettings, ConsumerConfig and BatchConsumerSettings.
func (*Transaction) Abort ¶
func (t *Transaction) Abort(ctx context.Context) (bool, error)
Abort is a wrapper around github.com/twmb/franz-go's GroupTransactSession's End function, called with an argument demanding to abort the transaction.
func (*Transaction) Begin ¶
func (t *Transaction) Begin() error
Begin is a wrapper around github.com/twmb/franz-go's GroupTransactSession's Begin with the exact same semantics.
This function begins a transaction, and returns an error if the client has no transactional ID or is already in a transaction. It should be called before producing records in a transaction.
func (*Transaction) Client ¶
func (t *Transaction) Client() *kgo.Client
Client returns the underlying Kafka client this Transaction wraps.
func (*Transaction) Close ¶
func (t *Transaction) Close()
Close is a wrapper around github.com/twmb/franz-go's kgo.GroupTransactSession.Close, with the exact same semantics.
This function must be called to leave the group before shutting down.
func (*Transaction) Commit ¶
func (t *Transaction) Commit(ctx context.Context) (bool, error)
Commit is a wrapper around github.com/twmb/franz-go's GroupTransactSession's End function, called with an argument demanding to commit the transaction.
type TransactionalTopic ¶
type TransactionalTopic struct {
// contains filtered or unexported fields
}
TransactionalTopic models a transactional Kafka producer with a set, "hardcoded" destination topic.
func (*TransactionalTopic) BatchPublish ¶
func (t *TransactionalTopic) BatchPublish(ctx context.Context, messages ...broker.OutboundMessage) error
BatchPublish publishes a batch of Kafka records to this topic.
Blocks until the records are fully committed to the Kafka cluster. Before calling this function, the transaction must be initialized by calling Transaction.Begin.
func (*TransactionalTopic) IntoKafkaRecord ¶
func (t *TransactionalTopic) IntoKafkaRecord(message broker.OutboundMessage) *kgo.Record
IntoKafkaRecord maps the given broker.OutboundMessage to an instance of github.com/twmb/franz-go record.
func (*TransactionalTopic) Publish ¶
func (t *TransactionalTopic) Publish(ctx context.Context, message broker.OutboundMessage) error
Publish publishes a Kafka record to this topic.
Blocks until the record is fully committed to the Kafka cluster. Before calling this function, the transaction must be initialized by calling Transaction.Begin.