Documentation ¶
Index ¶
- Constants
- Variables
- func BuildTopic(bizId int64, category int32, event string) string
- func DefaultConfig() *sarama.Config
- func NewSASLConfig(app, user, password string) *sarama.Config
- func SetLogger(l ILogger)
- func WithAddr(addrs ...string) func(o *Options)
- func WithApp(app string) func(o *Options)
- func WithPwd(pwd string) func(o *Options)
- func WithUser(user string) func(o *Options)
- type ConsumeHandler
- type Consumer
- func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error
- func (c *Consumer) Close() error
- func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *Consumer) Go() error
- func (c *Consumer) ID() string
- func (c *Consumer) Setup(sarama.ConsumerGroupSession) error
- type ConsumerGroup
- type IConsumer
- type IConsumerGroup
- type ILogger
- type KafkaManager
- func (m *KafkaManager) ConsumerClose()
- func (m *KafkaManager) ConsumersGo()
- func (m *KafkaManager) GetASyncProducer() sarama.AsyncProducer
- func (m *KafkaManager) GetConsumer(groupid, id string) (IConsumer, error)
- func (m *KafkaManager) GetSyncProducer() sarama.SyncProducer
- func (m *KafkaManager) NewConsumer(id, groupid string, handler ConsumeHandler, topics ...string) error
- func (m *KafkaManager) Publish(ctx context.Context, topic, key string, msg []byte) error
- type Message
- type Options
- type OptionsFunc
- type ReceiptStatus
Constants ¶
const ( ERR_CONSUMER_HASRUN = "consumer has run" ERR_CONSUMER_NOTRUN = "consumer no run" ERR_CONSUMER_EXIST = "consumer exist" ERR_CONSUMER_NOFOUND = "consumer no found" ERR_PRODUCER_NOFOUND = "producer no found" ERR_GROUP_NOFOUND = "group no found" ERR_CTX_NIL = "ctx is nil" )
Variables ¶
var ( ErrConsumerHasRun = errors.New(ERR_CONSUMER_HASRUN) ErrConsumerNotRun = errors.New(ERR_CONSUMER_NOTRUN) ErrConsumerExist = errors.New(ERR_CONSUMER_EXIST) ErrConsumerNoFound = errors.New(ERR_CONSUMER_NOFOUND) ErrProducerNoFound = errors.New(ERR_PRODUCER_NOFOUND) ErrGroupNoFound = errors.New(ERR_GROUP_NOFOUND) ErrCtxNil = errors.New(ERR_CTX_NIL) )
Functions ¶
func DefaultConfig ¶ added in v0.3.1
DefaultConfig 返回一个默认的Sarama配置对象
func NewSASLConfig ¶ added in v0.3.1
NewSASLConfig 返回一个使用给定用户名和密码创建的SASL配置。
func WithAddr ¶ added in v0.3.25
WithAddr 用于为 Options 添加一个或多个服务地址。 返回一个函数,该函数接受一个 Options 指针,将传入的地址添加到 Options 的 Addrs 列表中。
func WithApp ¶ added in v0.3.25
WithApp 用于设置 Options 的 App 字段。 返回一个函数,该函数接受一个 Options 指针,将传入的字符串赋值给 Options 的 App 字段。
Types ¶
type ConsumeHandler ¶
type ConsumeHandler func(context.Context, *Message) (ReceiptStatus, error)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer consumer impl
func NewConsumer ¶
func NewConsumer(id string, group *ConsumerGroup, handler ConsumeHandler, topics ...string) *Consumer
NewConsumer new consumer
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(groupid string, client sarama.Client) (*ConsumerGroup, error)
NewConsumerGroup 创建一个新的ConsumerGroup实例。 参数:
- groupid:消费者组的ID
- client:Sarama客户端实例
返回值:
- *ConsumerGroup:新创建的ConsumerGroup实例
- error:如果创建失败,返回错误信息
func (*ConsumerGroup) Close ¶
func (g *ConsumerGroup) Close()
func (*ConsumerGroup) ConsumerMap ¶
func (g *ConsumerGroup) ConsumerMap() map[string]IConsumer
func (*ConsumerGroup) CreateConsumer ¶
func (g *ConsumerGroup) CreateConsumer(id string, h ConsumeHandler, topics ...string) error
func (*ConsumerGroup) GetConsumer ¶
func (g *ConsumerGroup) GetConsumer(id string) IConsumer
func (*ConsumerGroup) ID ¶
func (g *ConsumerGroup) ID() string
type IConsumer ¶
type IConsumer interface { sarama.ConsumerGroupHandler ID() string Go() error Close() error }
IConsumer consumer interface
type IConsumerGroup ¶
type ILogger ¶
type ILogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) Debug(ctx context.Context, msg string, args ...interface{}) Info(ctx context.Context, msg string, args ...interface{}) Warn(ctx context.Context, msg string, args ...interface{}) Error(ctx context.Context, msg string, args ...interface{}) }
type KafkaManager ¶
type KafkaManager struct { Options // contains filtered or unexported fields }
KafkaManager kafka mq manager
func InitDefaultManager ¶ added in v0.3.1
func InitDefaultManager(opts ...OptionsFunc) (*KafkaManager, error)
InitDefaultManager 初始化默认的 KafkaManager 实例,并返回其指针。 参数:
- user: 用户名
- password: 密码
- brokers: Kafka broker 的地址列表
返回值:
- *KafkaManager: KafkaManager 实例的指针
- error: 错误信息,如果没有错误发生则为 nil
func NewKafkaManager ¶
func NewKafkaManager(opts ...OptionsFunc) (*KafkaManager, error)
NewKafkaManager 创建一个新的 KafkaManager 实例。 参数 config 是 Kafka 客户端配置。 参数 brokers 是 Kafka broker 的地址列表。 返回 KafkaManager 实例和可能的错误。
func (*KafkaManager) ConsumerClose ¶
func (m *KafkaManager) ConsumerClose()
ConsumerClose closes all consumers in the KafkaManager. It loops through all groups and consumers, and closes each consumer.
func (*KafkaManager) ConsumersGo ¶
func (m *KafkaManager) ConsumersGo()
ConsumersGo starts all consumers in the KafkaManager. It loops through all groups and consumers, and starts each consumer's goroutine.
func (*KafkaManager) GetASyncProducer ¶ added in v0.3.2
func (m *KafkaManager) GetASyncProducer() sarama.AsyncProducer
GetASyncProducer returns an asynchronous Kafka producer.
func (*KafkaManager) GetConsumer ¶
func (m *KafkaManager) GetConsumer(groupid, id string) (IConsumer, error)
GetConsumer returns the consumer with the specified ID from the specified group ID. It returns an error if the group ID or consumer ID is not found.
func (*KafkaManager) GetSyncProducer ¶ added in v0.3.2
func (m *KafkaManager) GetSyncProducer() sarama.SyncProducer
GetSyncProducer returns a synchronized Kafka producer.
func (*KafkaManager) NewConsumer ¶
func (m *KafkaManager) NewConsumer(id, groupid string, handler ConsumeHandler, topics ...string) error
NewConsumer creates a new consumer for the specified group ID and handler. It returns an error if the group ID is not found or if there is an error creating the consumer group.
type Message ¶
type Message struct { Id string `json:"id"` // identify id Topic string `json:"topic"` // topic Partition int32 `json:"partition"` // partition ConsumerId string `json:"consumerId"` // consumerId Offset int64 `json:"offset"` // offset Headers string `json:"headers"` // headers Key []byte `json:"key"` // key Value []byte `json:"value"` // value Ts int64 `json:"ts"` // ts BlockTs int64 `json:"blockts"` // blockts }
type Options ¶ added in v0.3.25
type Options struct { Addrs []string // 服务地址列表 App string // 应用标识 User string // 用户名 Pwd string // 密码 }
Options 结构体包含了连接信息所需的配置参数。
type OptionsFunc ¶ added in v0.3.25
type OptionsFunc func(o *Options)
OptionsFunc 是对 Options 结构体进行配置的函数类型。
type ReceiptStatus ¶
type ReceiptStatus int32
const ( ReceiptSuccess ReceiptStatus = iota // 处理成功 ReceiptAlreadyDo // 已经处理,重复消息 ReceiptErrUnKnow // 未知错误 ReceiptErrParse // 解析失败 ReceiptRetryMax // 重试达到上限 )