kafka

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MessageToProducerMessage

func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)

Types

type BrokerInfo added in v0.1.0

type BrokerInfo struct {
	Id        int32  `json:"id"`
	Addr      string `json:"addr"`
	Rack      string `json:"rack"`
	Connected bool   `json:"connected"`
}

type Config

type Config struct {
	Address  string `json:"address"`
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
	CertPath string `json:"certPath,omitempty"`
}

Config kafka配置

type IService added in v0.0.5

type IService interface {
	Stop()
	Info() (res *Info, err error)
	GetTopics() (res []*TopicInfo, err error)
	Pull(groupId string, topics []string, PullSize int, PullTimeout int, keyType, valueType string) (msgList []*Message, err error)
	MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)
	ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)
	CreatePartitions(topic string, count int32) (err error)
	CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)
	DeleteTopic(topic string) (err error)
	DeleteConsumerGroup(groupId string) (err error)
	DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)
	NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
	Push(msg *Message) (err error)
}

func New

func New(config Config) (IService, error)

New 创建kafka服务

type Info added in v0.1.0

type Info struct {
	Brokers []*BrokerInfo `json:"brokers"`
}

type Message

type Message struct {
	KeyType   string          `json:"keyType,omitempty"`
	Key       string          `json:"key,omitempty"`
	ValueType string          `json:"valueType,omitempty"`
	Value     string          `json:"value,omitempty"`
	Topic     string          `json:"topic,omitempty"`
	Partition *int32          `json:"partition,omitempty"`
	Offset    *int64          `json:"offset,omitempty"`
	Headers   []MessageHeader `json:"headers,omitempty"`
	Timestamp *time.Time      `json:"timestamp,omitempty"`
}

func ConsumerMessageToMessage

func ConsumerMessageToMessage(keyType string, valueType string, consumerMessage *sarama.ConsumerMessage) (msg *Message, err error)

type MessageHeader

type MessageHeader struct {
	Key   string `json:"key,omitempty"`
	Value string `json:"value,omitempty"`
}

type ProducerMessage

type ProducerMessage struct {
	*sarama.ProducerMessage
}

type Service added in v0.0.5

type Service struct {
	Config
}

Service 注册处理器在线信息等

func (*Service) CreatePartitions added in v0.0.5

func (this_ *Service) CreatePartitions(topic string, count int32) (err error)

func (*Service) CreateTopic added in v0.0.5

func (this_ *Service) CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)

func (*Service) DeleteConsumerGroup added in v0.0.5

func (this_ *Service) DeleteConsumerGroup(groupId string) (err error)

func (*Service) DeleteRecords added in v0.0.5

func (this_ *Service) DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)

func (*Service) DeleteTopic added in v0.0.5

func (this_ *Service) DeleteTopic(topic string) (err error)

func (*Service) GetServers added in v0.0.5

func (this_ *Service) GetServers() []string

func (*Service) GetTopics added in v0.0.5

func (this_ *Service) GetTopics() (res []*TopicInfo, err error)

func (*Service) Info added in v0.0.5

func (this_ *Service) Info() (res *Info, err error)

func (*Service) MarkOffset added in v0.0.5

func (this_ *Service) MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)

func (*Service) NewSyncProducer added in v0.0.5

func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)

NewSyncProducer 创建生产者

func (*Service) Pull added in v0.0.5

func (this_ *Service) Pull(groupId string, topics []string, PullSize int, PullTimeout int, keyType, valueType string) (msgList []*Message, err error)

func (*Service) Push added in v0.0.5

func (this_ *Service) Push(msg *Message) (err error)

Push 推送消息到kafka

func (*Service) ResetOffset added in v0.0.5

func (this_ *Service) ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)

func (*Service) Stop added in v0.0.5

func (this_ *Service) Stop()

type TopicInfo

type TopicInfo struct {
	Topic string `json:"topic"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL