Documentation ¶
Overview ¶
Package kafka provides methods to interact with Apache Kafka offering functionality for both producing and consuming messages from kafka-topics.
Index ¶
- Constants
- Variables
- func NewKafkaProducer(config *Config) (sarama.SyncProducer, error)
- func NewKafkaWithAvro(config *AvroWithKafkaConfig, logger log.Logger) (pubsub.PublisherSubscriber, error)
- type AvroWithKafkaConfig
- type Config
- type Consumer
- type ConsumerHandler
- type Kafka
- func (k *Kafka) Bind(message []byte, target interface{}) error
- func (k *Kafka) CommitOffset(offsets pubsub.TopicPartition)
- func (k *Kafka) HealthCheck() types.Health
- func (k *Kafka) IsSet() bool
- func (k *Kafka) Pause() error
- func (k *Kafka) Ping() error
- func (k *Kafka) PublishEvent(key string, value interface{}, headers map[string]string) (err error)
- func (k *Kafka) PublishEventWithOptions(key string, value interface{}, headers map[string]string, ...) (err error)
- func (k *Kafka) Resume() error
- func (k *Kafka) Subscribe() (*pubsub.Message, error)
- func (k *Kafka) SubscribeWithCommit(f pubsub.CommitFunc) (*pubsub.Message, error)
- type SASLConfig
- type XDGSCRAMClient
Constants ¶
const ( SASLTypeSCRAMSHA512 = "SCRAM-SHA-512" PLAIN = "PLAIN" )
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. OffsetNewest int64 = sarama.OffsetNewest // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest int64 = sarama.OffsetOldest )
Variables ¶
var (
SHA512 scram.HashGeneratorFcn = sha512.New
)
Functions ¶
func NewKafkaProducer ¶
func NewKafkaProducer(config *Config) (sarama.SyncProducer, error)
NewKafkaProducer returns a kafka producer object created using the configs provided. returns error if configs are invalid
func NewKafkaWithAvro ¶
func NewKafkaWithAvro(config *AvroWithKafkaConfig, logger log.Logger) (pubsub.PublisherSubscriber, error)
NewKafkaWithAvro initialize Kafka with Avro when EventHubConfig and AvroConfig are right
Types ¶
type AvroWithKafkaConfig ¶
AvroWithKafkaConfig represents a configuration for using Avro with Kafka
type Config ¶
type Config struct { // Brokers comma separated kafka brokers Brokers string // SASL provide configs for authentication SASL SASLConfig // MaxRetry number of times to retry sending a failing message MaxRetry int // RetryFrequency backoff time in milliseconds before retrying RetryFrequency int // Topics multiple topics to subscribe messages from // first topic will be used for publishing the message Topics []string // GroupID consumer group id GroupID string Config *sarama.Config // ConnRetryDuration for specifying connection retry duration ConnRetryDuration int // Offsets is slice of TopicPartition in which "Topic","Partition" and "Offset" // are the field needed to be set to start consuming from specific offset Offsets []pubsub.TopicPartition InitialOffsets int64 // This config will allow application to disable kafka consumer auto commit DisableAutoCommit bool }
Config provide values for kafka producer and consumer
type Consumer ¶
type Consumer struct { ConsumerGroup sarama.ConsumerGroup ConsumerGroupHandler *ConsumerHandler // contains filtered or unexported fields }
Consumer is a wrapper on sarama ConsumerGroup.
func NewKafkaConsumer ¶
NewKafkaConsumer returns a kafka consumer object created using the configs provided. returns error if configs are invalid
type ConsumerHandler ¶
type ConsumerHandler struct {
// contains filtered or unexported fields
}
ConsumerHandler represents a Sarama consumer group message handler. It is responsible for handling the messages in a specific topic/partition. The handler implements methods called during the lifecycle of a consumer group session.
func (*ConsumerHandler) Cleanup ¶
func (consumer *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerHandler) ConsumeClaim ¶
func (consumer *ConsumerHandler) ConsumeClaim( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerHandler) Setup ¶
func (consumer *ConsumerHandler) Setup(s sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type Kafka ¶
type Kafka struct { Producer sarama.SyncProducer Consumer *Consumer // contains filtered or unexported fields }
Kafka is a client for interacting with Apache Kafka.
func NewKafkaFromEnv ¶
NewKafkaFromEnv fetches the config from environment variables and tries to connect to Kafka Deprecated: Instead use pubsub.New
func (*Kafka) Bind ¶
Bind parses the encoded data and stores the result in the value pointed to by target
func (*Kafka) CommitOffset ¶
func (k *Kafka) CommitOffset(offsets pubsub.TopicPartition)
CommitOffset marks a particular offset on a specific partition as Read. The commits are performed asynchronously at intervals specified in Sarama Consumer.Offsets.AutoCommit
func (*Kafka) HealthCheck ¶
HealthCheck checks if consumer and producer are initialized and the connection is stable
func (*Kafka) PublishEvent ¶
PublishEvent publishes the event to kafka
func (*Kafka) PublishEventWithOptions ¶
func (k *Kafka) PublishEventWithOptions(key string, value interface{}, headers map[string]string, options *pubsub.PublishOptions) (err error)
PublishEventWithOptions publishes message to kafka. Ability to provide additional options described in PublishOptions struct
func (*Kafka) Subscribe ¶
Subscribe method is responsible for consuming a single message from a sarama Consumer Group. When Subscribe is called first time we initiate consumer group session rebalance, which handles the partition assignment to multiple consumers in the group.
func (*Kafka) SubscribeWithCommit ¶
SubscribeWithCommit calls the CommitFunc after subscribing message from kafka and based on the return values decides whether to commit message and consume another message
type SASLConfig ¶
type SASLConfig struct { // User username to connect to protected kafka instance User string // Password password to connect to protected kafka instance Password string // Mechanism SASL mechanism used for authentication Mechanism string // SecurityProtocol SSL or PLAINTEXT SecurityProtocol string // SSLVerify set it to true if certificate verification is required SSLVerify bool }
SASLConfig holds SASL authentication configurations for Kafka.
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
Begin prepares the client for the SCRAM exchange with the server with a user name and a password
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Done should return true when the SCRAM conversation is over.