goo_kafka

package
v1.1.222 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

kafka

goo_kafka.Client().Topics()

// 发送消息,不指定分区
goo_kafka.Producer().SendMessage("test", []byte("hi hnatao"))

// 发送消息,指定分区
goo_kafka.Producer().WithPartition(0).SendMessage("test", []byte("hi hnatao"))

// 发送异步消息,不指定分区
goo_kafka.Producer().SendAsyncMessage("test", []byte("hi hnatao"), func(msg *goo_kafka.ProducerMessage, err error) {
})

// 发送异步消息,指定分区
goo_kafka.Producer().WithPartition(0).SendAsyncMessage("test", []byte("hi hnatao"), func(msg *goo_kafka.ProducerMessage, err error) {
})

// 消费消息,指定分区,指定起始位置
goo_kafka.Consumer().WithPartition(0).WithOffset(100).Consume("test", func(msg *goo_kafka.ConsumerMessage, consumerErr *goo_kafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最新位置开始
goo_kafka.Consumer().WithPartition(0).WithOffsetNewest().Consume("test", func(msg *goo_kafka.ConsumerMessage, consumerErr *goo_kafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最头开始
goo_kafka.Consumer().WithPartition(0).WithOffsetOldest().Consume("test", func(msg *goo_kafka.ConsumerMessage, consumerErr *goo_kafka.ConsumerError) error {
    return nil
})

// 消费消息,分组消息,分组里面只要1个消费者消费
goo_kafka.Consumer().ConsumeGroup("test-id", []string{"test"}, func(msg *goo_kafka.ConsumerMessage, consumerErr *goo_kafka.ConsumerError) error {
    return nil
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Client

func Client() *client

客户端

func Consumer

func Consumer() iConsumer

消费者

func Init

func Init(conf Config) error

初始化

func OffsetInfo added in v1.1.153

func OffsetInfo(topic, groupId string) (data []map[string]int64)

分区数量

func Partitions added in v1.1.153

func Partitions(topic string) []int32

分区数量

func Producer

func Producer() iProducer

生产者

func Topics added in v1.1.155

func Topics() []string

主题列表

Types

type Config added in v1.1.2

type Config struct {
	User              string   `json:"user" yaml:"user"`
	Password          string   `json:"password" yaml:"password"`
	Addrs             []string `json:"addrs" yaml:"addrs"`
	Timeout           int      `json:"timeout" yaml:"timeout"`
	HeartbeatInterval int      `json:"heartbeat_interval" yaml:"heartbeat_interval"`
	SessionTimeout    int      `json:"session_timeout" yaml:"session_timeout"`
	RebalanceTimeout  int      `json:"rebalance_timeout" yaml:"rebalance_timeout"`
}

type ConsumerError

type ConsumerError struct {
	*sarama.ConsumerError
}

type ConsumerHandler

type ConsumerHandler func(msg *ConsumerMessage, consumerErr *ConsumerError) error

type ConsumerMessage

type ConsumerMessage struct {
	*sarama.ConsumerMessage
	GroupSession sarama.ConsumerGroupSession
}

func (ConsumerMessage) Commit added in v1.1.218

func (msg ConsumerMessage) Commit()

type MessageHandler added in v1.1.66

type MessageHandler func(msg *ProducerMessage, err error)

type ProducerMessage added in v1.1.66

type ProducerMessage struct {
	*sarama.ProducerMessage
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL