gokafka

package
v1.2.145 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

README

kafka

gokafka.Client().Topics()

// 发送消息,不指定分区
gokafka.Producer().SendMessage("test", []byte("hi"))

// 发送消息,指定分区
gokafka.Producer().WithPartition(0).SendMessage("test", []byte("hi goio"))

// 发送异步消息,不指定分区
gokafka.Producer().SendAsyncMessage("test", []byte("hi goio"), func(msg *gokafka.ProducerMessage, err error) {
})

// 发送异步消息,指定分区
gokafka.Producer().WithPartition(0).SendAsyncMessage("test", []byte("hi goio"), func(msg *gokafka.ProducerMessage, err error) {
})

// 消费消息,指定分区,指定起始位置
gokafka.Consumer().WithPartition(0).WithOffset(100).Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最新位置开始
gokafka.Consumer().WithPartition(0).WithOffsetNewest().Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最头开始
gokafka.Consumer().WithPartition(0).WithOffsetOldest().Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,分组消息,分组里面只要1个消费者消费
gokafka.Consumer().ConsumeGroup("test-id", []string{"test"}, func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consumer

func Consumer() iConsumer

func DelClient added in v1.2.108

func DelClient(names ...string)

func Init

func Init(configs ...Config) (err error)

func Producer

func Producer() iProducer

Types

type Config

type Config struct {
	User              string   `json:"user,optional" yaml:"User"`
	Name              string   `json:"name,optional"  yaml:"Name"`
	Password          string   `json:"password,optional"  yaml:"Password"`
	Addrs             []string `json:"addrs,optional"  yaml:"Addrs"`
	Timeout           int      `json:"timeout,optional"  yaml:"Timeout"`
	HeartbeatInterval int      `json:"heartbeatInterval" yaml:"HeartbeatInterval"`
	SessionTimeout    int      `json:"sessionTimeout" yaml:"SessionTimeout"`
	RebalanceTimeout  int      `json:"rebalanceTimeout" yaml:"RebalanceTimeout"`
	OffsetNewest      bool     `json:"offsetNewest" yaml:"OffsetNewest"`
}

type ConsumerError

type ConsumerError struct {
	*sarama.ConsumerError
}

type ConsumerHandler

type ConsumerHandler func(msg *ConsumerMessage, consumerErr *ConsumerError) error

type ConsumerMessage

type ConsumerMessage struct {
	*sarama.ConsumerMessage
}

type GoKafka added in v1.2.142

type GoKafka struct {
	sarama.Client
	// contains filtered or unexported fields
}

func Client

func Client() *GoKafka

default or 只有一个kafka实例直接返回

func GetClient added in v1.2.30

func GetClient(names ...string) *GoKafka

func New added in v1.2.30

func New(conf Config) (*GoKafka, error)

func (*GoKafka) Close added in v1.2.142

func (cli *GoKafka) Close()

func (*GoKafka) Consumer added in v1.2.142

func (cli *GoKafka) Consumer() iConsumer

func (*GoKafka) CreateTopicsRequest added in v1.2.142

func (cli *GoKafka) CreateTopicsRequest(topicName string, partitions int, replicationFactors int) error

func (*GoKafka) Producer added in v1.2.142

func (cli *GoKafka) Producer() iProducer

type MessageHandler

type MessageHandler func(msg *ProducerMessage, err error)

type ProducerMessage

type ProducerMessage struct {
	*sarama.ProducerMessage
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL