Documentation
¶
Index ¶
- Variables
- func GetVersion() sarama.KafkaVersion
- func SetVersion(v sarama.KafkaVersion)
- type Config
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) GetTopic() string
- func (c *Consumer) Handle()
- func (c *Consumer) HandleError(e HandleErrorFunc)
- func (c *Consumer) HandleEvent(eventType string, consumerFunc HandleConsumerFunc)
- func (c *Consumer) HandleMsg(handler HandleConsumerMsgFunc)
- func (c *Consumer) Run()
- func (c *Consumer) Worker()
- type ConsumerConfig
- type Event
- type HandleConsumerFunc
- type HandleConsumerMsgFunc
- type HandleErrorFunc
- type HandleSucceedFunc
- type Kafka
- func (k *Kafka) Close() error
- func (k *Kafka) Consume(eventType string, handler HandleConsumerFunc) error
- func (k *Kafka) ConsumeTopic(topic string, handler HandleConsumerMsgFunc) error
- func (k *Kafka) ConsumeTopicEvent(topic string, eventType string, handler HandleConsumerFunc) error
- func (k *Kafka) Consumer(topic string) *Consumer
- func (k *Kafka) Consumers() map[string]*Consumer
- func (k *Kafka) Producer() *Producer
- func (k *Kafka) Push(topic, eventType string, event interface{})
- func (k *Kafka) Start()
- type Producer
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrConsumerNotFound = errors.New("consumer not found")
)
Functions ¶
func GetVersion ¶
func GetVersion() sarama.KafkaVersion
func SetVersion ¶
func SetVersion(v sarama.KafkaVersion)
Types ¶
type Config ¶
type Config struct { // broker的集群地址 Brokers []string `toml:"brokers"` Consumers []*ConsumerConfig `toml:"consumers"` }
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(cfg *ConsumerConfig) (*Consumer, error)
func NewConsumerWithConfig ¶
func NewConsumerWithConfig(cfg1 *ConsumerConfig, cfg2 *sarama.Config) (*Consumer, error)
func NewConsumerWithInterceptor ¶
func NewConsumerWithInterceptor(cfg1 *ConsumerConfig, interceptor sarama.ConsumerInterceptor) (*Consumer, error)
func (*Consumer) HandleError ¶
func (c *Consumer) HandleError(e HandleErrorFunc)
func (*Consumer) HandleEvent ¶
func (c *Consumer) HandleEvent(eventType string, consumerFunc HandleConsumerFunc)
func (*Consumer) HandleMsg ¶
func (c *Consumer) HandleMsg(handler HandleConsumerMsgFunc)
type ConsumerConfig ¶
type ConsumerConfig struct { // broker的集群地址 Brokers []string `toml:"brokers"` // topic 的名称 Topic string `toml:"topic"` // 消费组名称 Group string `toml:"group"` SASL struct { Enable bool `toml:"enable"` User string `toml:"user"` Password string `toml:"password"` } `toml:"sasl"` // 多少个协程 Workers int `toml:"workers"` // 是否从最老的开始消费 Oldest bool `toml:"oldest"` }
func (*ConsumerConfig) Check ¶
func (c *ConsumerConfig) Check() bool
type HandleConsumerMsgFunc ¶
type HandleConsumerMsgFunc func(message *sarama.ConsumerMessage)
type HandleErrorFunc ¶
type HandleErrorFunc func(error)
type HandleSucceedFunc ¶
type HandleSucceedFunc func(*sarama.ProducerMessage)
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
func NewPlat ¶
func NewPlat(consumer *ConsumerConfig) (*Kafka, error)
func (*Kafka) Consume ¶
func (k *Kafka) Consume(eventType string, handler HandleConsumerFunc) error
如果在消费组中只有一个consumer 可以默认不传topic
func (*Kafka) ConsumeTopic ¶
func (k *Kafka) ConsumeTopic(topic string, handler HandleConsumerMsgFunc) error
func (*Kafka) ConsumeTopicEvent ¶
func (k *Kafka) ConsumeTopicEvent(topic string, eventType string, handler HandleConsumerFunc) error
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducerWithCfg ¶
func NewProducerWithInterceptor ¶
func NewProducerWithInterceptor(brokers []string, interceptor sarama.ProducerInterceptor) (*Producer, error)
func (*Producer) HandleError ¶
func (p *Producer) HandleError(e HandleErrorFunc)
func (*Producer) HandleSucceed ¶
func (p *Producer) HandleSucceed(s HandleSucceedFunc)
Click to show internal directories.
Click to hide internal directories.