kafka

package
v1.0.37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2023 License: MPL-2.0 Imports: 8 Imported by: 0

README

Kafka配置

  • default: 缺省的Kafka客户端
  • items: 额外的Kafka客户端
[sdk.log]
        level = "debug"
         filename = "demo.log"
         [sdk.log.rotate]
             max_age = 168
             rotation_time=24
            
[sdk.kafka]
    [sdk.kafka.default]
        brokers = ["192.168.0.123:9092"]            <--- kafka连接的brokers
        [[sdk.kafka.default.consumers]]             <--- kafka消费端配置
            name = "consumer1"                      <--- 消费端必须用name来区分
            topic ="testtopic1"
            group_id=""                             <--- 如果要在同一个消费组,可以设置group_id
            user=""
            password=""
        [[sdk.kafka.default.producers]]             <--- kafka生产端配置
            name = "producer1"                      <--- 生产端必须用name来区分
            topics  = ["testtopic1", "testtopic2"]  <--- 支持同时往多个topic发送
            balance = ""                            <--- 支持balance的策略:roundrobin,leastbytes,hash,crc32,murmur2     

    [[sdk.kafka.items]]
            name = "other_kafka"
            brokers = ["192.168.0.123:9092"]        <--- kafka连接的brokers
            [[sdk.kafka.items.consumers]]
                name = "consumer2"
                topic ="testtopic2"
                group_id=""
  1. 除了default的kafka,还需要使用其他Kafka服务,可以配置在[[sdk.kafka.items]]中,必须使用name来区分不同kafka服务
  2. consumer/producer配置中必须指定name来区分不同配置, 在创建consumer或者producer的时候使用name来获取交换配置信息
  3. 注意在退出程序的时候需要调用consumer/producer的Close()方法来保证消息不丢失

Kafka使用指南

获取Kafka客户端

  • 获取缺省Kafka客户端: sdk.Kafka.My()
  • 获取指定名字的Kafka客户端: sdk.Kafka.By(name)

Kafka创建producer并发送消息

  • 首先指定client的name通过Producer()获取producer实例
  • 调用producer实例的Publish()方法发送消息
p, err := sdk.Kafka.My().CreateProducer("producer1")
if err != nil {
	sdk.Logger.Error("kafka create producer", "err", err)
    return err
}

err = p.Publish([]byte("test"))
if err != nil {
	sdk.Logger.Fatal("publish", "err", err)
}

Kafka创建consumer并接收处理消息

  1. 首先指定consumer的name通过CreateConsumer()创建consumer实例
  2. 定义消息内容的处理函数, 消息函数为func(data []byte) types.MqMsgAction格式
  3. 调用consumer实例的Consume()方法接收并处理消息
func msgProcess(data []byte) error {
	fmt.Println(utils.BytesToString(data))
	return nil
}

func handle() {
    c, err := sdk.Kafka.My().CreateConsumer("consumer1", msgProcess)
	if err != nil {
		sdk.Logger.Fatal("kafka create consumer", "err", err)
	}

    c.Consume()
}

自定义发送或者接收的选项

  1. 首先调用GetDefaultOptions()函数获取到系统所有默认选项, 默认选项有QueueOption, ExchangeOption, PublishOption, ConsumeOption四种
  2. 修改指定选项中的默认值
  3. 在创建producer或者consumer的时候将修改后的Options作为参数传入
...
mq := Kafka.My()
options := mq.GetDefaultOptions()
consumeOption := options[types.MqOptionQueue].(*Kafka.ConsumeOption)
consumeOption.MinBytes = 2
options[types.MqOptionConsume] = consumeOption
p, err := mq.CreateProducer("producer1", msgProcess, options)
... 

Documentation

Overview

Package kafka

Package kafka @Title log capability of zerolog @Description zerolog implementation of log capability @Author Ryan Fan 2021-06-09 @Update Ryan Fan 2021-06-09

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMq

func NewMq(providerType string, config *MqConfig, logger types.LogProvider) (types.Mq, error)

Types

type BaseClient

type BaseClient struct {
	// contains filtered or unexported fields
}

BaseClient 消息队列客户端维护connection和channel

type ConsumeOption

type ConsumeOption struct {
	InitialOffset int64 // 如果之前没有offset提交,选择offset的策略
	// broker等待Consumer.Fetch.Min的最长时间,
	// 如果没取到足够Consumer.Fetch.Min, 等待MaxWaitTime后也会返回
	// MaxWaitTime     time.Duration
	// 是否在消费时有任何错误都会返回到Errors通道
	ReturnErrors bool
	// 是否自动提交
	AutoCommit bool
}

func (ConsumeOption) GetType

func (q ConsumeOption) GetType() types.MqOptionType

type ConsumerClient

type ConsumerClient struct {
	*BaseClient
	Option    *ConsumeOption
	Parameter *ConsumerParameter
	// contains filtered or unexported fields
}

type ConsumerConfig

type ConsumerConfig struct {
	Name      string `mapstructure:"name"`
	Topic     string `mapstructure:"topic"`
	Partition int    `mapstructure:"partition"`
	GroupId   string `mapstructure:"group_id"`
	User      string `mapstructure:"user"`
	Password  string `mapstructure:"password"`
}

ConsumerConfig 客户端配置

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	Logger types.LogProvider

	Process types.MqMsgProcessFunc
	// contains filtered or unexported fields
}

func (ConsumerGroupHandler) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (ConsumerGroupHandler) ConsumeClaim

ConsumeClaim 在ConsumeClaim()中必须执行不断取消息的循环:ConsumerGroupClaim.Messages()

func (ConsumerGroupHandler) Setup

Setup 消费组在执行Consume时会初始化consumer,并依次调用Setup()->ConsumeClaim(), 关闭之前会调用Cleanup() 消息的具体消费发生在ConsumeClaim()中,你可以在Setup()中初始化一些东西

type ConsumerParameter

type ConsumerParameter struct {
	Name     string `mapstructure:"name"`
	GroupId  string `mapstructure:"groupId"`
	Topic    string `mapstructure:"topic"`
	User     string `mapstructure:"user"`
	Password string `mapstructure:"password"`
}

type Kafka

type Kafka struct {
	Logger types.LogProvider
	Config *MqConfig
}

func (*Kafka) CreateConsumer

func (k *Kafka) CreateConsumer(processFunc types.MqMsgProcessFunc, parameters map[string]interface{}, args ...types.MqOptioner) (types.MqConsumer, error)

func (*Kafka) CreateProducer

func (k *Kafka) CreateProducer(parameters map[string]interface{}, args ...types.MqOptioner) (types.MqProducer, error)

CreateProducer 创造一个生产者

func (*Kafka) GetDefaultOptions

func (k *Kafka) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner

type KafkaConsumer

type KafkaConsumer struct {
	Logger types.LogProvider
	Client *ConsumerClient
	// contains filtered or unexported fields
}

func (*KafkaConsumer) Close

func (kc *KafkaConsumer) Close()

func (*KafkaConsumer) Consume

func (kc *KafkaConsumer) Consume()

Consume 消费消息

type KafkaProvider

type KafkaProvider struct {
	mq.BaseMqProvider
}

func (*KafkaProvider) Init

func (kp *KafkaProvider) Init(rootConfiger types.Configer, logger types.LogProvider, args ...interface{}) error

Init implements types.Provider interface, used to initialize the capability @author Ryan Fan (2021-06-09) @param baseconf.Configer root config interface to extract config info @return error

type MqConfig

type MqConfig struct {
	Name      string            `mapstructure:"name"`
	Brokers   []string          `mapstructure:"brokers"`
	Consumers []*ConsumerConfig `mapstructure:"consumers"`
	Producers []*ProducerConfig `mapstructure:"producers"`
}

MqConfig amqp://user:pass@host:10000/vhost

type MqProviderConfig

type MqProviderConfig struct {
	Default *MqConfig   `mapstructure:"default"`
	Items   []*MqConfig `mapstructure:"items"`
}

type Producer

type Producer struct {
	Logger types.LogProvider
	Option *PublishOption
	Client *ProducerClient
}

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) GetLastConfirmedId

func (p *Producer) GetLastConfirmedId() uint64

func (Producer) Publish

func (p Producer) Publish(data []byte, args ...interface{}) error

func (Producer) PublishDelay

func (p Producer) PublishDelay(data []byte, ttl int64, args ...interface{}) error

type ProducerClient

type ProducerClient struct {
	*BaseClient
	Parameter *ProducerParameter
	Option    *PublishOption
	// contains filtered or unexported fields
}

type ProducerConfig

type ProducerConfig struct {
	Name        string   `mapstructure:"name"`
	Topics      []string `mapstructure:"topics"`
	Balance     string   `mapstructure:"balance"`
	Compression string   `mapstructure:"compression"`
}

ProducerConfig 发送端配置

type ProducerParameter

type ProducerParameter struct {
	Topics []string `mapstructure:"topics"`
}

type PublishOption

type PublishOption struct {
	// NoResponse:0,  doesn't send any response, the TCP ACK is all you get
	// WaitForLocal:1,  waits for only the local commit to succeed before responding
	// WaitForAll:-1,  waits for all in-sync replicas to commit before responding
	RequiredAcks sarama.RequiredAcks
	// The total number of times to retry sending a message (default 3)
	RetryMax int
	// If enabled, successfully delivered messages will be returned on the successes channel
	ReturnSuccess bool
	// If enabled, successfully delivered messages will be returned on the successes channel
	ReturnError bool
}

func GetPublishOption

func GetPublishOption(options map[types.MqOptionType]types.MqOptioner) *PublishOption

func (PublishOption) GetType

func (q PublishOption) GetType() types.MqOptionType

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL