gokafka

package
v1.2.49 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

kafka

gokafka.Client().Topics()

// 发送消息,不指定分区
gokafka.Producer().SendMessage("test", []byte("hi hnatao"))

// 发送消息,指定分区
gokafka.Producer().WithPartition(0).SendMessage("test", []byte("hi hnatao"))

// 发送异步消息,不指定分区
gokafka.Producer().SendAsyncMessage("test", []byte("hi hnatao"), func(msg *gokafka.ProducerMessage, err error) {
})

// 发送异步消息,指定分区
gokafka.Producer().WithPartition(0).SendAsyncMessage("test", []byte("hi hnatao"), func(msg *gokafka.ProducerMessage, err error) {
})

// 消费消息,指定分区,指定起始位置
gokafka.Consumer().WithPartition(0).WithOffset(100).Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最新位置开始
gokafka.Consumer().WithPartition(0).WithOffsetNewest().Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最头开始
gokafka.Consumer().WithPartition(0).WithOffsetOldest().Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,分组消息,分组里面只要1个消费者消费
gokafka.Consumer().ConsumeGroup("test-id", []string{"test"}, func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Client

func Client() *client

func Consumer

func Consumer() iConsumer

func GetClient added in v1.2.30

func GetClient(names ...string) *client

func Init

func Init(configs ...Config) (err error)

func New added in v1.2.30

func New(conf Config) (*client, error)

func Producer

func Producer() iProducer

Types

type Config

type Config struct {
	User     string   `json:"user,optional" yaml:"User,optional"`
	Name     string   `json:"name,optional" yaml:"Name,optional"`
	Password string   `json:"password,optional" yaml:"Password,optional"`
	Addrs    []string `json:"addrs,optional" yaml:"Addrs,optional"`
	Timeout  int      `json:"timeout,optional" yaml:"Timeout,optional"`
}

type ConsumerError

type ConsumerError struct {
	*sarama.ConsumerError
}

type ConsumerHandler

type ConsumerHandler func(msg *ConsumerMessage, consumerErr *ConsumerError) error

type ConsumerMessage

type ConsumerMessage struct {
	*sarama.ConsumerMessage
}

type MessageHandler

type MessageHandler func(msg *ProducerMessage, err error)

type ProducerMessage

type ProducerMessage struct {
	*sarama.ProducerMessage
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL