Documentation
¶
Index ¶
- Constants
- Variables
- func SetLogger(l ILogger)
- type ConsumeHandler
- type Consumer
- func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error
- func (c *Consumer) Close() error
- func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *Consumer) Go() error
- func (c *Consumer) ID() string
- func (c *Consumer) Setup(sarama.ConsumerGroupSession) error
- type ConsumerGroup
- type IConsumer
- type IConsumerGroup
- type ILogger
- type KafkaManager
- func (m *KafkaManager) ConsumerClose()
- func (m *KafkaManager) ConsumersGo()
- func (m *KafkaManager) GetConsumer(groupid, id string) (IConsumer, error)
- func (m *KafkaManager) Init() error
- func (m *KafkaManager) NewConsumer(groupid string, handler ConsumeHandler, topics ...string) error
- func (m *KafkaManager) NewProducer() error
- func (m *KafkaManager) Publish(ctx context.Context, topic, key string, msg []byte) error
- type Message
- type ReceiptStatus
Constants ¶
View Source
const ( ERR_CONSUMER_HASRUN = "consumer has run" ERR_CONSUMER_NOTRUN = "consumer no run" ERR_CONSUMER_EXIST = "consumer exist" ERR_CONSUMER_NOFOUND = "consumer no found" ERR_GROUP_NOFOUND = "group no found" )
Variables ¶
View Source
var ( ErrConsumerHasRun = errors.New(ERR_CONSUMER_HASRUN) ErrConsumerNotRun = errors.New(ERR_CONSUMER_NOTRUN) ErrConsumerExist = errors.New(ERR_CONSUMER_EXIST) ErrConsumerNoFound = errors.New(ERR_CONSUMER_NOFOUND) ErrGroupNoFound = errors.New(ERR_GROUP_NOFOUND) )
Functions ¶
Types ¶
type ConsumeHandler ¶
type ConsumeHandler func(context.Context, *Message) (ReceiptStatus, error)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer consumer impl
func NewConsumer ¶
func NewConsumer(group *ConsumerGroup, handler ConsumeHandler, topics ...string) *Consumer
NewConsumer new consumer
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(groupid string, client sarama.Client) (*ConsumerGroup, error)
func (*ConsumerGroup) Close ¶
func (g *ConsumerGroup) Close()
func (*ConsumerGroup) ConsumerMap ¶
func (g *ConsumerGroup) ConsumerMap() map[string]IConsumer
func (*ConsumerGroup) CreateConsumer ¶
func (g *ConsumerGroup) CreateConsumer(h ConsumeHandler, topics ...string) error
func (*ConsumerGroup) GetConsumer ¶
func (g *ConsumerGroup) GetConsumer(id string) IConsumer
func (*ConsumerGroup) ID ¶
func (g *ConsumerGroup) ID() string
type IConsumer ¶
type IConsumer interface { sarama.ConsumerGroupHandler ID() string Go() error Close() error }
IConsumer consumer interface
type IConsumerGroup ¶
type ILogger ¶
type ILogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) Debug(ctx context.Context, msg string, args ...interface{}) Info(ctx context.Context, msg string, args ...interface{}) Warn(ctx context.Context, msg string, args ...interface{}) Error(ctx context.Context, msg string, args ...interface{}) }
type KafkaManager ¶
type KafkaManager struct {
// contains filtered or unexported fields
}
KafkaManager kafka mq manager
func GetKafkaManager ¶
func GetKafkaManager() *KafkaManager
func InitManager ¶
func InitManager(brokers ...string) *KafkaManager
func NewKafkaManager ¶
func NewKafkaManager(brokers ...string) *KafkaManager
NewKafkaManager new a kafka mq manager
func (*KafkaManager) ConsumerClose ¶
func (m *KafkaManager) ConsumerClose()
func (*KafkaManager) ConsumersGo ¶
func (m *KafkaManager) ConsumersGo()
func (*KafkaManager) GetConsumer ¶
func (m *KafkaManager) GetConsumer(groupid, id string) (IConsumer, error)
func (*KafkaManager) NewConsumer ¶
func (m *KafkaManager) NewConsumer(groupid string, handler ConsumeHandler, topics ...string) error
func (*KafkaManager) NewProducer ¶
func (m *KafkaManager) NewProducer() error
type Message ¶
type Message struct { Id string `json:"id"` // identify id Topic string `json:"topic"` // topic Partition int32 `json:"partition"` // partition ConsumerId string `json:"consumerId"` // consumerId Offset int64 `json:"offset"` // offset Headers string `json:"headers"` // headers Key []byte `json:"key"` // key Value []byte `json:"value"` // value Ts int64 `json:"ts"` // ts BlockTs int64 `json:"blockts"` // blockts }
type ReceiptStatus ¶
type ReceiptStatus int32
const ( ReceiptSuccess ReceiptStatus = iota // 处理成功 ReceiptAlreadyDo // 已经处理,重复消息 ReceiptErrUnKnow // 未知错误 ReceiptErrParse // 解析失败 ReceiptRetryMax // 重试达到上限 )
Click to show internal directories.
Click to hide internal directories.