gkafka

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

README

go操作kafka

引入包
    import "github.com/layasugar/gkafka"
producer 生产者
protocol(plaintext, sasl_ssl, sasl_plaintext)
    var cfg = &KafkaConfig{
        Brokers  string `json:"brokers"`
        Topic    string `json:"topic"`
        Group    string `json:"group"`
        User     string `json:"user"`
        Pwd      string `json:"pwd"`
        Ca       string `json:"ca"`
        Version  string `json:"version"`
        Protocol string `json:"protocol"`
    }
    Producer = gkafka.InitProducer(cfg)
	
    err := Producer.SendMsg(topic, string(dataByte), partition)
consumer 消费者
	consumerData := make(chan *gkafka.ConsumerData)
	go gkafka.InitConsumer(cfg, consumerData, gkafka.SetClientId("gkafka"))

	for data := range consumerData {
		log.Printf("pool submit topic:%q partition:%d offset:%d", data.Topic, data.Partition, data.Offset)
	}

完整示例

Documentation

Index

Constants

This section is empty.

Variables

Functions

func InitConsumer

func InitConsumer(cfg *Config, dataChan chan *ConsumerData, f ...FuncCfg)

Types

type Config

type Config struct {
	Brokers  string `json:"brokers"`
	Topic    string `json:"topic"`
	Group    string `json:"group"`
	User     string `json:"user"`
	Pwd      string `json:"pwd"`
	Ca       string `json:"ca"`
	Version  string `json:"version"`
	Protocol string `json:"protocol"`
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a Sarama consumer group consumer

func (*Consumer) Cleanup

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerData

type ConsumerData struct {
	Msg       []byte
	Topic     string
	Partition int32
	Offset    int64
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func InitProducer

func InitProducer(cfg *Config, f ...FuncCfg) *Engine

func (*Engine) SendMsg

func (e *Engine) SendMsg(topic, message string, partition int32) (err error)

func (*Engine) SendMsgs

func (e *Engine) SendMsgs(msgs []*sarama.ProducerMessage) (err error)

type FuncCfg

type FuncCfg func(e *Engine) error

func SetClientId

func SetClientId(v string) FuncCfg

func SetOffsetsInitial

func SetOffsetsInitial(v int64) FuncCfg

func SetPartitioner

func SetPartitioner(v sarama.PartitionerConstructor) FuncCfg

func SetReBalanceStrategy

func SetReBalanceStrategy(v sarama.BalanceStrategy) FuncCfg

func SetRequiredAcks

func SetRequiredAcks(v sarama.RequiredAcks) FuncCfg

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL