Documentation ¶
Index ¶
- func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)
- type BrokerInfo
- type Config
- type IService
- type Info
- type Message
- type MessageHeader
- type ProducerMessage
- type Service
- func (this_ *Service) CreatePartitions(topic string, count int32) (err error)
- func (this_ *Service) CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)
- func (this_ *Service) DeleteConsumerGroup(groupId string) (err error)
- func (this_ *Service) DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)
- func (this_ *Service) DeleteTopic(topic string) (err error)
- func (this_ *Service) GetServers() []string
- func (this_ *Service) GetTopics() (res []*TopicInfo, err error)
- func (this_ *Service) Info() (res *Info, err error)
- func (this_ *Service) MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)
- func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
- func (this_ *Service) Pull(groupId string, topics []string, PullSize int, PullTimeout int, ...) (msgList []*Message, err error)
- func (this_ *Service) Push(msg *Message) (err error)
- func (this_ *Service) ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)
- func (this_ *Service) Stop()
- type TopicInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MessageToProducerMessage ¶
func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)
Types ¶
type BrokerInfo ¶ added in v0.1.0
type Config ¶
type Config struct { Address string `json:"address"` Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` CertPath string `json:"certPath,omitempty"` }
Config kafka配置
type IService ¶ added in v0.0.5
type IService interface { Stop() Info() (res *Info, err error) GetTopics() (res []*TopicInfo, err error) Pull(groupId string, topics []string, PullSize int, PullTimeout int, keyType, valueType string) (msgList []*Message, err error) MarkOffset(groupId string, topic string, partition int32, offset int64) (err error) ResetOffset(groupId string, topic string, partition int32, offset int64) (err error) CreatePartitions(topic string, count int32) (err error) CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error) DeleteTopic(topic string) (err error) DeleteConsumerGroup(groupId string) (err error) DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error) NewSyncProducer() (syncProducer sarama.SyncProducer, err error) Push(msg *Message) (err error) }
type Info ¶ added in v0.1.0
type Info struct {
Brokers []*BrokerInfo `json:"brokers"`
}
type Message ¶
type Message struct { KeyType string `json:"keyType,omitempty"` Key string `json:"key,omitempty"` ValueType string `json:"valueType,omitempty"` Value string `json:"value,omitempty"` Topic string `json:"topic,omitempty"` Partition *int32 `json:"partition,omitempty"` Offset *int64 `json:"offset,omitempty"` Headers []MessageHeader `json:"headers,omitempty"` Timestamp *time.Time `json:"timestamp,omitempty"` }
type MessageHeader ¶
type ProducerMessage ¶
type ProducerMessage struct {
*sarama.ProducerMessage
}
type Service ¶ added in v0.0.5
type Service struct {
Config
}
Service 注册处理器在线信息等
func (*Service) CreatePartitions ¶ added in v0.0.5
func (*Service) CreateTopic ¶ added in v0.0.5
func (*Service) DeleteConsumerGroup ¶ added in v0.0.5
func (*Service) DeleteRecords ¶ added in v0.0.5
func (*Service) DeleteTopic ¶ added in v0.0.5
func (*Service) GetServers ¶ added in v0.0.5
func (*Service) MarkOffset ¶ added in v0.0.5
func (*Service) NewSyncProducer ¶ added in v0.0.5
func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
NewSyncProducer 创建生产者
func (*Service) ResetOffset ¶ added in v0.0.5
Click to show internal directories.
Click to hide internal directories.