kafkaex

package
v0.3.57 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ERR_CONSUMER_HASRUN  = "consumer has run"
	ERR_CONSUMER_NOTRUN  = "consumer no run"
	ERR_CONSUMER_EXIST   = "consumer exist"
	ERR_CONSUMER_NOFOUND = "consumer no found"
	ERR_PRODUCER_NOFOUND = "producer no found"
	ERR_GROUP_NOFOUND    = "group no found"
	ERR_CTX_NIL          = "ctx is nil"
)

Variables

View Source
var (
	ErrConsumerHasRun  = errors.New(ERR_CONSUMER_HASRUN)
	ErrConsumerNotRun  = errors.New(ERR_CONSUMER_NOTRUN)
	ErrConsumerExist   = errors.New(ERR_CONSUMER_EXIST)
	ErrConsumerNoFound = errors.New(ERR_CONSUMER_NOFOUND)
	ErrProducerNoFound = errors.New(ERR_PRODUCER_NOFOUND)
	ErrGroupNoFound    = errors.New(ERR_GROUP_NOFOUND)
	ErrCtxNil          = errors.New(ERR_CTX_NIL)
)

Functions

func BuildTopic added in v0.3.4

func BuildTopic(bizId int64, category int32, event string) string

func DefaultConfig added in v0.3.1

func DefaultConfig() *sarama.Config

DefaultConfig 返回一个默认的Sarama配置对象

func NewSASLConfig added in v0.3.1

func NewSASLConfig(app, user, password string) *sarama.Config

NewSASLConfig 返回一个使用给定用户名和密码创建的SASL配置。

func SetLogger

func SetLogger(l ILogger)

func WithAddr added in v0.3.25

func WithAddr(addrs ...string) func(o *Options)

WithAddr 用于为 Options 添加一个或多个服务地址。 返回一个函数,该函数接受一个 Options 指针,将传入的地址添加到 Options 的 Addrs 列表中。

func WithApp added in v0.3.25

func WithApp(app string) func(o *Options)

WithApp 用于设置 Options 的 App 字段。 返回一个函数,该函数接受一个 Options 指针,将传入的字符串赋值给 Options 的 App 字段。

func WithPwd added in v0.3.25

func WithPwd(pwd string) func(o *Options)

WithPwd 用于设置 Options 的 Pwd 字段。 返回一个函数,该函数接受一个 Options 指针,将传入的字符串赋值给 Options 的 Pwd 字段。

func WithUser added in v0.3.25

func WithUser(user string) func(o *Options)

WithUser 用于设置 Options 的 User 字段。 返回一个函数,该函数接受一个 Options 指针,将传入的字符串赋值给 Options 的 User 字段。

Types

type ConsumeHandler

type ConsumeHandler func(context.Context, *Message) (ReceiptStatus, error)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer consumer impl

func NewConsumer

func NewConsumer(id string, group *ConsumerGroup, handler ConsumeHandler, topics ...string) *Consumer

NewConsumer new consumer

func (*Consumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) Close

func (c *Consumer) Close() error

Close close consume goroutine

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Consumer) Go

func (c *Consumer) Go() error

Go async consume message

func (*Consumer) ID

func (c *Consumer) ID() string

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(groupid string, client sarama.Client) (*ConsumerGroup, error)

NewConsumerGroup 创建一个新的ConsumerGroup实例。 参数:

  • groupid:消费者组的ID
  • client:Sarama客户端实例

返回值:

  • *ConsumerGroup:新创建的ConsumerGroup实例
  • error:如果创建失败,返回错误信息

func (*ConsumerGroup) Close

func (g *ConsumerGroup) Close()

func (*ConsumerGroup) ConsumerMap

func (g *ConsumerGroup) ConsumerMap() map[string]IConsumer

func (*ConsumerGroup) CreateConsumer

func (g *ConsumerGroup) CreateConsumer(id string, h ConsumeHandler, topics ...string) error

func (*ConsumerGroup) GetConsumer

func (g *ConsumerGroup) GetConsumer(id string) IConsumer

func (*ConsumerGroup) ID

func (g *ConsumerGroup) ID() string

type IConsumer

type IConsumer interface {
	sarama.ConsumerGroupHandler
	ID() string
	Go() error
	Close() error
}

IConsumer consumer interface

type IConsumerGroup

type IConsumerGroup interface {
	ID() string
	CreateConsumer(id string, h ConsumeHandler, topics ...string) error
	GetConsumer(id string) IConsumer
	ConsumerMap() map[string]IConsumer
}

type ILogger

type ILogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
	Debug(ctx context.Context, msg string, args ...interface{})
	Info(ctx context.Context, msg string, args ...interface{})
	Warn(ctx context.Context, msg string, args ...interface{})
	Error(ctx context.Context, msg string, args ...interface{})
}

type KafkaManager

type KafkaManager struct {
	Options
	// contains filtered or unexported fields
}

KafkaManager kafka mq manager

func GetKafkaManager

func GetKafkaManager() *KafkaManager

GetKafkaManager 返回一个指向 KafkaManager 的指针

func InitDefaultManager added in v0.3.1

func InitDefaultManager(opts ...OptionsFunc) (*KafkaManager, error)

InitDefaultManager 初始化默认的 KafkaManager 实例,并返回其指针。 参数:

  • user: 用户名
  • password: 密码
  • brokers: Kafka broker 的地址列表

返回值:

  • *KafkaManager: KafkaManager 实例的指针
  • error: 错误信息,如果没有错误发生则为 nil

func NewKafkaManager

func NewKafkaManager(opts ...OptionsFunc) (*KafkaManager, error)

NewKafkaManager 创建一个新的 KafkaManager 实例。 参数 config 是 Kafka 客户端配置。 参数 brokers 是 Kafka broker 的地址列表。 返回 KafkaManager 实例和可能的错误。

func (*KafkaManager) ConsumerClose

func (m *KafkaManager) ConsumerClose()

ConsumerClose closes all consumers in the KafkaManager. It loops through all groups and consumers, and closes each consumer.

func (*KafkaManager) ConsumersGo

func (m *KafkaManager) ConsumersGo()

ConsumersGo starts all consumers in the KafkaManager. It loops through all groups and consumers, and starts each consumer's goroutine.

func (*KafkaManager) GetASyncProducer added in v0.3.2

func (m *KafkaManager) GetASyncProducer() sarama.AsyncProducer

GetASyncProducer returns an asynchronous Kafka producer.

func (*KafkaManager) GetConsumer

func (m *KafkaManager) GetConsumer(groupid, id string) (IConsumer, error)

GetConsumer returns the consumer with the specified ID from the specified group ID. It returns an error if the group ID or consumer ID is not found.

func (*KafkaManager) GetSyncProducer added in v0.3.2

func (m *KafkaManager) GetSyncProducer() sarama.SyncProducer

GetSyncProducer returns a synchronized Kafka producer.

func (*KafkaManager) NewConsumer

func (m *KafkaManager) NewConsumer(id, groupid string, handler ConsumeHandler, topics ...string) error

NewConsumer creates a new consumer for the specified group ID and handler. It returns an error if the group ID is not found or if there is an error creating the consumer group.

func (*KafkaManager) Publish

func (m *KafkaManager) Publish(ctx context.Context, topic, key string, msg []byte) error

Publish is a function that publishes a message to a Kafka topic. It takes a context, topic name, key, and message as input and returns an error.

type Message

type Message struct {
	Id         string `json:"id"`         // identify id
	Topic      string `json:"topic"`      // topic
	Partition  int32  `json:"partition"`  // partition
	ConsumerId string `json:"consumerId"` // consumerId
	Offset     int64  `json:"offset"`     // offset
	Headers    string `json:"headers"`    // headers
	Key        []byte `json:"key"`        // key
	Value      []byte `json:"value"`      // value
	Ts         int64  `json:"ts"`         // ts
	BlockTs    int64  `json:"blockts"`    // blockts
}

type Options added in v0.3.25

type Options struct {
	Addrs []string // 服务地址列表
	App   string   // 应用标识
	User  string   // 用户名
	Pwd   string   // 密码
}

Options 结构体包含了连接信息所需的配置参数。

type OptionsFunc added in v0.3.25

type OptionsFunc func(o *Options)

OptionsFunc 是对 Options 结构体进行配置的函数类型。

type ReceiptStatus

type ReceiptStatus int32
const (
	ReceiptSuccess   ReceiptStatus = iota // 处理成功
	ReceiptAlreadyDo                      // 已经处理,重复消息
	ReceiptErrUnKnow                      // 未知错误
	ReceiptErrParse                       // 解析失败
	ReceiptRetryMax                       // 重试达到上限
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL