Documentation ¶
Index ¶
Constants ¶
View Source
const ( TypeKafkaGo = "kafka-go" TypeKafkaSarama = "sarama" TypePulsar = "pulsar" )
Variables ¶
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
func GetSaramaConfig ¶ added in v1.5.2
func GetSaramaConfig(kfkCfg *config.KafkaConfig) (sarCfg *sarama.Config, err error)
Types ¶
type Inputer ¶ added in v1.5.2
type Inputer interface { Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg model.InputMessage), cleanupFn func()) error Run() Stop() error CommitMessages(message *model.InputMessage) error }
func NewInputer ¶ added in v1.5.2
type KafkaGo ¶ added in v1.5.2
type KafkaGo struct {
// contains filtered or unexported fields
}
KafkaGo implements input.Inputer
func NewKafkaGo ¶ added in v1.5.2
func NewKafkaGo() *KafkaGo
NewKafkaGo get instance of kafka reader
func (*KafkaGo) CommitMessages ¶ added in v1.5.2
func (k *KafkaGo) CommitMessages(msg *model.InputMessage) (err error)
func (*KafkaGo) Description ¶ added in v1.5.2
Description of this kafka consumer, which topic it reads from
func (*KafkaGo) Init ¶ added in v1.5.2
func (k *KafkaGo) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg model.InputMessage), cleanupFn func()) (err error)
Init Initialise the kafka instance with configuration
type KafkaSarama ¶ added in v1.5.2
type KafkaSarama struct {
// contains filtered or unexported fields
}
KafkaSarama implements input.Inputer
func NewKafkaSarama ¶ added in v1.5.2
func NewKafkaSarama() *KafkaSarama
NewKafkaSarama get instance of kafka reader
func (*KafkaSarama) CommitMessages ¶ added in v1.5.2
func (k *KafkaSarama) CommitMessages(msg *model.InputMessage) error
func (*KafkaSarama) Description ¶ added in v1.5.2
func (k *KafkaSarama) Description() string
Description of this kafka consumer, which topic it reads from
func (*KafkaSarama) Init ¶ added in v1.5.2
func (k *KafkaSarama) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg model.InputMessage), cleanupFn func()) (err error)
Init Initialise the kafka instance with configuration
func (*KafkaSarama) Stop ¶ added in v1.5.2
func (k *KafkaSarama) Stop() error
Stop kafka consumer and close all connections
type MyConsumerGroupHandler ¶ added in v1.5.2
type MyConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (MyConsumerGroupHandler) Cleanup ¶ added in v1.5.2
func (h MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error
func (MyConsumerGroupHandler) ConsumeClaim ¶ added in v1.5.2
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (MyConsumerGroupHandler) Setup ¶ added in v1.5.2
func (h MyConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error
type XDGSCRAMClient ¶ added in v1.5.2
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶ added in v1.5.2
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶ added in v1.5.2
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.