Documentation ¶
Index ¶
- Constants
- Variables
- func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error)
- func GetSaramaConfig(kfkCfg *config.KafkaConfig) (sarCfg *sarama.Config, err error)
- type Inputer
- type KafkaFranz
- type KafkaGo
- type KafkaGoLogger
- type KafkaSarama
- type MyConsumerGroupHandler
- type XDGSCRAMClient
Constants ¶
View Source
const ( TypeKafkaGo = "kafka-go" TypeKafkaSarama = "sarama" TypeKafkaFranz = "franz" )
View Source
const ( Krb5KeytabAuth = 2 CommitRetries = 6 RetryBackoff = 5 * time.Second )
Variables ¶
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
func GetFranzConfig ¶
func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error)
func GetSaramaConfig ¶
func GetSaramaConfig(kfkCfg *config.KafkaConfig) (sarCfg *sarama.Config, err error)
Types ¶
type Inputer ¶
type Inputer interface { Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) error Run() Stop() error CommitMessages(message *model.InputMessage) error }
func NewInputer ¶
type KafkaFranz ¶
type KafkaFranz struct {
// contains filtered or unexported fields
}
KafkaFranz implements input.Inputer refers to examples/group_consuming/main.go
func (*KafkaFranz) CommitMessages ¶
func (k *KafkaFranz) CommitMessages(msg *model.InputMessage) error
func (*KafkaFranz) Description ¶
func (k *KafkaFranz) Description() string
Description of this kafka consumer, which topic it reads from
func (*KafkaFranz) Init ¶
func (k *KafkaFranz) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) (err error)
Init Initialise the kafka instance with configuration
func (*KafkaFranz) Stop ¶
func (k *KafkaFranz) Stop() error
Stop kafka consumer and close all connections
type KafkaGo ¶
type KafkaGo struct {
// contains filtered or unexported fields
}
KafkaGo implements input.Inputer
func (*KafkaGo) CommitMessages ¶
func (k *KafkaGo) CommitMessages(msg *model.InputMessage) (err error)
func (*KafkaGo) Description ¶
Description of this kafka consumer, which topic it reads from
func (*KafkaGo) Init ¶
func (k *KafkaGo) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) (err error)
Init Initialise the kafka instance with configuration
type KafkaGoLogger ¶
type KafkaGoLogger struct {
// contains filtered or unexported fields
}
func (*KafkaGoLogger) Printf ¶
func (kgl *KafkaGoLogger) Printf(template string, args ...interface{})
type KafkaSarama ¶
type KafkaSarama struct {
// contains filtered or unexported fields
}
KafkaSarama implements input.Inputer
func NewKafkaSarama ¶
func NewKafkaSarama() *KafkaSarama
NewKafkaSarama get instance of kafka reader
func (*KafkaSarama) CommitMessages ¶
func (k *KafkaSarama) CommitMessages(msg *model.InputMessage) error
func (*KafkaSarama) Description ¶
func (k *KafkaSarama) Description() string
Description of this kafka consumer, which topic it reads from
func (*KafkaSarama) Init ¶
func (k *KafkaSarama) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) (err error)
Init Initialise the kafka instance with configuration
func (*KafkaSarama) Stop ¶
func (k *KafkaSarama) Stop() error
Stop kafka consumer and close all connections
type MyConsumerGroupHandler ¶
type MyConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (MyConsumerGroupHandler) Cleanup ¶
func (h MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error
func (MyConsumerGroupHandler) ConsumeClaim ¶
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (MyConsumerGroupHandler) Setup ¶
func (h MyConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.