gokafka

package
v1.2.163 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

kafka

gokafka.Client().Topics()

// 发送消息,不指定分区
gokafka.Producer().SendMessage("test", []byte("hi"))

// 发送消息,指定分区
gokafka.Producer().WithPartition(0).SendMessage("test", []byte("hi goio"))

// 发送异步消息,不指定分区
gokafka.Producer().SendAsyncMessage("test", []byte("hi goio"), func(msg *gokafka.ProducerMessage, err error) {
})

// 发送异步消息,指定分区
gokafka.Producer().WithPartition(0).SendAsyncMessage("test", []byte("hi goio"), func(msg *gokafka.ProducerMessage, err error) {
})

// 消费消息,指定分区,指定起始位置
gokafka.Consumer().WithPartition(0).WithOffset(100).Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最新位置开始
gokafka.Consumer().WithPartition(0).WithOffsetNewest().Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,指定分区,从最头开始
gokafka.Consumer().WithPartition(0).WithOffsetOldest().Consume("test", func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

// 消费消息,分组消息,分组里面只要1个消费者消费
gokafka.Consumer().ConsumeGroup("test-id", []string{"test"}, func(msg *gokafka.ConsumerMessage, consumerErr *gokafka.ConsumerError) error {
    return nil
})

Documentation

Index

Constants

View Source
const (
	FocusName = "focus"
	RedisName = "redis"
)

Variables

This section is empty.

Functions

func DelClient added in v1.2.108

func DelClient(names ...string)

func Init

func Init(conf Config, opts ...Option) (err error)

Types

type Config

type Config struct {
	User              string   `json:"user,optional" yaml:"User"`
	Name              string   `json:"name,optional"  yaml:"Name"`
	Password          string   `json:"password,optional"  yaml:"Password"`
	Addrs             []string `json:"addrs,optional"  yaml:"Addrs"`
	Timeout           int      `json:"timeout,optional"  yaml:"Timeout"`
	HeartbeatInterval int      `json:"heartbeatInterval,optional" yaml:"HeartbeatInterval"`
	SessionTimeout    int      `json:"sessionTimeout,optional" yaml:"SessionTimeout"`
	RebalanceTimeout  int      `json:"rebalanceTimeout,optional" yaml:"RebalanceTimeout"`
	OffsetNewest      bool     `json:"offsetNewest,optional" yaml:"OffsetNewest"`
	GroupId           string   `json:"groupId,optional" yaml:"GroupId"`

	Version string `json:"version,optional" yaml:"Version"`

	RedisConfig goredis.Config `json:"redisConfig,optional" yaml:"RedisConfig"`
}

type ConsumerError

type ConsumerError struct {
	*sarama.ConsumerError
}

type ConsumerHandler

type ConsumerHandler func(ctx *gocontext.Context, msg *ConsumerMessage, consumerErr *ConsumerError) error

type ConsumerMessage

type ConsumerMessage struct {
	*sarama.ConsumerMessage
	GroupSession sarama.ConsumerGroupSession
}

func (ConsumerMessage) Commit added in v1.2.146

func (msg ConsumerMessage) Commit()

type GoKafka added in v1.2.142

type GoKafka struct {
	sarama.Client
	// contains filtered or unexported fields
}

func Client

func Client() *GoKafka

default or 只有一个kafka实例直接返回

func GetClient added in v1.2.30

func GetClient(names ...string) *GoKafka

func New added in v1.2.30

func New(conf Config, opts ...Option) (*GoKafka, error)

func (*GoKafka) Close added in v1.2.142

func (cli *GoKafka) Close()

func (*GoKafka) Consumer added in v1.2.142

func (cli *GoKafka) Consumer() IConsumer

消费者

func (*GoKafka) CreateTopicsRequest added in v1.2.142

func (cli *GoKafka) CreateTopicsRequest(topicName string, partitions int, replicationFactors int) error

func (*GoKafka) GetKey added in v1.2.146

func (c *GoKafka) GetKey(topic, msg string) string

func (*GoKafka) OffsetInfo added in v1.2.146

func (c *GoKafka) OffsetInfo(topic, groupId string) (data []map[string]int64)

分区数量

func (*GoKafka) Partitions added in v1.2.146

func (c *GoKafka) Partitions(topic string) []int32

分区数量

func (*GoKafka) Producer added in v1.2.142

func (cli *GoKafka) Producer(opts ...Option) IProducer

生产者

func (*GoKafka) Redis added in v1.2.146

func (c *GoKafka) Redis() *goredis.GoRedis

func (*GoKafka) Topics added in v1.2.146

func (c *GoKafka) Topics() []string

主题列表

type IConsumer added in v1.2.146

type IConsumer interface {
	Client() sarama.Client

	// 从指定分区消费
	WithPartition(partition int32) IConsumer

	// 从指定位置开始
	WithOffset(offset int64) IConsumer

	// 从最新位置开始
	WithOffsetNewest() IConsumer

	// 从头开始
	WithOffsetOldest() IConsumer

	// 消费
	Consume(topic string, handler ConsumerHandler)

	// 分组topic
	ConsumeGroup(groupId string, topics []string, handler ConsumerHandler)
}

消费者

func Consumer

func Consumer() IConsumer

type IMessage added in v1.2.146

type IMessage interface {
	Topic() string
	Key() string
	Headers() map[string]string
	Serialize() []byte
	Deserialize(b []byte)
}

消息

type IProducer added in v1.2.146

type IProducer interface {
	Client() sarama.Client

	// 发送消息 - 同步
	SendMessage(msg IMessage) (partition int32, offset int64, err error)

	// 发送消息 - 异步
	SendAsyncMessage(msg IMessage, cb MessageHandler) (err error)
}

生产者

func Producer

func Producer(opts ...Option) IProducer

type MessageHandler

type MessageHandler func(msg *ProducerMessage, err error)

type Option added in v1.2.146

type Option struct {
	Name  string
	Value interface{}
}

func FocusOption added in v1.2.146

func FocusOption() Option

是否强制

func RedisOption added in v1.2.146

func RedisOption(cli *goredis.GoRedis) Option

redis 对象

type ProducerMessage

type ProducerMessage struct {
	*sarama.ProducerMessage
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL