teckafka

package
v0.0.0-...-f77605d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: MulanPSL-2.0 Imports: 5 Imported by: 0

README

  1. kafka类库
  2. 消费者使用方式
    1. 继承结构体
      type SelfStruct struct {
          BaseHandlerConsumer       
      }
      
    2. 重写结构体方法
      //处理业务前
      func (b *SelfStruct) PreCallBack(msg []byte) error {
          return nil
      }
      //处理业务
      func (b *SelfStruct) CallBack(msg []byte) error {
          return nil
      }
      //处理业务后
      func (b *SelfStruct) PostCallBack(msg []byte) error {
          return nil
      }
      //错误处理
      func (b *SelfStruct) DealErrCallBack(errMsg string) {
      
      }
      
    3. 启动消费服务
      StartConsumer(SelfStruct{}, gkafka.V1_0_0_0,  []string{"ip1","ip2"}, "groupName", "topicName")
      
  3. kafka生产者调用方式
    1. 自定义两个方法、分别是处理正确回调和错误回调函数、如果不自定义则调用两个默认处理函数

          /**
          *	自定义错误回调处理方式
           */
          func errFn(err *sarama.ProducerError) {
              logger.Error(fmt.Sprintf("err:++++%+v++++", err))
          }
         /**
         *	自定义成功回调处理方式
          */
         func successFn(msg *sarama.ProducerMessage) {
          logger.Info(fmt.Sprintf("success:----%+v------", msg))
         }
      
    2. 发送消息

      1. 使用自定义函数
        参数: 实例  topic  消息体 key 成功回调函数 失败回调函数
        msg, err := Publish(0, "test", "test", nil, successFn, errFn)
        if err != nil {
           fmt.Println(err)
           return
        }
        
      2. 使用默认函数
        参数: 实例  topic  消息体 key 成功回调函数 失败回调函数
        _, err := Publish(0, "test", "test", nil, DefaultSuccessFn, DefaultErrFn)
        

Documentation

Index

Constants

View Source
const (
	DEFAULTINSTANCE = iota //默认kafka实例

)

Variables

This section is empty.

Functions

func DefaultErrFn

func DefaultErrFn(err *sarama.ProducerError)

* * 默认回调错误处理方式

func DefaultSuccessFn

func DefaultSuccessFn(msg *sarama.ProducerMessage)

* * 默认回调成功处理方式

func Publish

func Publish(host []string, topic, msg string, key sarama.Encoder, s SuccessFn, e ErrFn) (sourceMsg string, err error)

* * 发送消息 * @param:topic string 所属topic * @param:msg string 消息 * @param:key sarama.Encoder

func StartConsumer

func StartConsumer(consumerGroupHandler ConsumerGroupHandler, version sarama.KafkaVersion, kafkaHosts []string, saslEnable bool, userName, password, group, topic string)

* * @param consumerGroupHandler 结构体ConsumerGroupHandler的子类 * @param client kafka集群 * @param group 消费者组 * @param topic 消费者topic

Types

type BaseHandlerConsumer

type BaseHandlerConsumer struct {
}

func (*BaseHandlerConsumer) CallBack

func (b *BaseHandlerConsumer) CallBack(msg *sarama.ConsumerMessage) error

func (*BaseHandlerConsumer) DealErrCallBack

func (b *BaseHandlerConsumer) DealErrCallBack(errMsg string)

func (*BaseHandlerConsumer) PostCallBack

func (b *BaseHandlerConsumer) PostCallBack(msg []byte) error

func (*BaseHandlerConsumer) PreCallBack

func (b *BaseHandlerConsumer) PreCallBack(msg []byte) error

* * 调用前接口

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	Fn IHandlerConsumer
}

func (ConsumerGroupHandler) Cleanup

func (ConsumerGroupHandler) ConsumeClaim

func (ConsumerGroupHandler) Setup

type ErrFn

type ErrFn func(err *sarama.ProducerError)

type IHandlerConsumer

type IHandlerConsumer interface {
	PreCallBack(msg []byte) error
	CallBack(msg *sarama.ConsumerMessage) error
	PostCallBack(msg []byte) error
	DealErrCallBack(errMsg string)
}

type SuccessFn

type SuccessFn func(msg *sarama.ProducerMessage)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL