Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminAuthOpts ¶
type AdminClient ¶
type AdminClient interface { CreateUser(username, password string) error DeleteUser(username string) error UserExists(username string) (bool, error) CreateTopic(topicName string, partitionCount int) error DeleteTopic(topicName string) error TopicExists(topicName string) (bool, error) AllowUserOnTopics(username string, allowedOperations string, topicNames ...string) error }
func NewAdminClient ¶
func NewAdminClient(adminEndpoint string, kafkaBrokers string, opts *AdminAuthOpts) AdminClient
type Consumer ¶
type Consumer interface { Ping(ctx context.Context) error Close() StartConsuming(reader ReaderFunc) }
func NewConsumer ¶
type ConsumerError ¶
type ConsumerOpts ¶
type ConsumerOpts struct { SASLAuth *KafkaSASLAuth Logger logging.Logger MaxRetries *int MaxPollRecords *int }
type KafkaMessage ¶
type KafkaSASLAuth ¶
type KafkaSASLAuth struct { SASLMechanism SASLMechanism User string Password string }
type Producer ¶
type Producer interface { Ping(ctx context.Context) error Close() Produce(ctx context.Context, topic, key string, value []byte) (*ProducerOutput, error) }
func NewProducer ¶
func NewProducer(brokerHosts string, producerOpts ProducerOpts) (Producer, error)
type ProducerOpts ¶
type ProducerOpts struct { SASLAuth *KafkaSASLAuth Logger logging.Logger }
type ProducerOutput ¶
type ReaderFunc ¶
type ReaderFunc func(msg KafkaMessage) error
type SASLMechanism ¶
type SASLMechanism string
const ( ScramSHA256 SASLMechanism = "SCRAM-SHA-256" ScramSHA512 SASLMechanism = "SCRAM-SHA-512" )
Click to show internal directories.
Click to hide internal directories.