kafkaex

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	APHMQIGP_DEF   = "aphmqigp_def"   // APHMQIGP_DEF 是一个默认标识符
	APHMQIGP_INNER = "aphmqigp_inner" // APHMQIGP_INNER 是一个内置,用于处理重试与死信的主消费配置
)

内置Group

View Source
const (
	APHMQITP_DEAD  = "aphmqitp_dead"  // APHMQITP_DEAD 表示消息处理失败,处于死信状态
	APHMQITP_RETRY = "aphmqitp_retry" // APHMQITP_RETRY 表示消息需要重试
)

内置队列

View Source
const (
	APHMQH_PARTITION_KEY = "_aphmqh_partition"  // APHMQH_PARTITION_KEY 用于标识消息所在的分区
	APHMQH_TRACE_ID      = "_aphmqh_traceid"    // APHMQH_TRACE_ID 用于标识消息的跟踪ID
	APHMQH_MSG_ID        = "_aphmqh_msgid"      // APHMQH_MSG_ID 用于唯一标识消息的ID
	APHMQH_MSG_GROUP     = "_aphmqh_msggp"      // APHMQH_MSG_GROUP 用于标识消息所属的组
	APHMQH_MSG_TOPIC     = "_aphmqh_msgtopic"   // APHMQH_MSG_TOPIC 用于标识消息的主题
	APHMQH_RETRIES       = "_aphmqh_retries"    // APHMQH_RETRIES 表示消息的重试次数
	APHMQH_RETRIES_MAX   = "_aphmqh_retriesmax" // APHMQH_RETRIES_MAX 表示消息的最大重试次数
	APHMQH_EXECTYPE      = "_aphmqh_exectype"   // APHMQH_EXECTYPE 用于标识执行消息的方式
	APHMQH_EXECER        = "_aphmqh_execer"     // APHMQH_EXECER 用于标识执行消息的实体
	APHMQH_EXECAT        = "_aphmqh_execat"     // APHMQH_EXECAT 用于标识消息执行的时间
	APHMQH_EXECERR       = "_aphmqh_execerr"    // APHMQH_EXECERR 用于记录消息执行失败的原因
	APHMQH_EXEC_TIMEOUT  = "_aphmqh_timeout"    // APHMQH_EXEC_TIMEOUT 用于标识消息执行的超时时间
)

消息头metadata

Variables

View Source
var (
	ErrNoFoundManager    = errors.New("没有配置管理器")  // 表示没有找到配置的理器
	ErrNoFoundPublisher  = errors.New("没有配置发布者")  // 表示没有找到配置的发布者
	ErrNoFoundSubscriber = errors.New("没有配置订阅者")  // 表示没有找到配置的订阅者
	ErrNoFoundTopic      = errors.New("没有配置主题")   // 表示没有找到配置的主题
	ErrNoFoundGroup      = errors.New("没有配置组名")   // 表示没有找到配置的组名
	ErrNoFoundHandle     = errors.New("没有配置执行函数") // 表示没有找到配置的执行函数
)

定义了一系列的错误类型,用于表示消息发布与订阅过程中可能出现的配置错误。 这些错误类型主要用于标识配置信息缺失的情况,包括: - 没有配置发布者 - 没有配置订阅者 - 没有配置主题 - 没有配置组名 - 没有配置执行函数

Functions

func ErrExec added in v0.0.5

func ErrExec(topic, executer string, box *BoxMessage, err error, publishFunc func(string, *BoxMessage) error) error

ErrExec 处理错误执行逻辑,并根据错误情况发布到不同的主题。

参数: topic - 消息的主题。 executer - 执行者的标识。 box - 包含执行结果的消息盒。 err - 执行过程中遇到的错误。 publishFunc - 用于发布消息的函数,接受主题和消息盒作为参数,返回错误。

返回值: 返回调用publishFunc函数时的错误,如果publishFunc执行失败。

func Field2ZapField

func Field2ZapField(fs watermill.LogFields) []zapcore.Field

Field2ZapField 将 watermill.LogFields 转换为 zapcore.Field 切片,以供日志记录使用。

func NewPublisher

func NewPublisher(topic string, ow func(*sarama.Config) *sarama.Config) (*kafka.Publisher, error)

NewPublisher 创建并返回一个新的Kafka发布者实例。 参数: - topic: 发布者将要发布消息的主题名称。 返回值: - *kafka.Publisher: 创建的Kafka发布者实例。 - error: 如果在创建发布者过程中遇到错误,则返回错误信息;否则返回nil。

func NewSubscriber

func NewSubscriber(group string, ow func(*sarama.Config) *sarama.Config) (*kafka.Subscriber, error)

NewSubscriber 创建并返回一个新的Kafka订阅者实例。

func SetGetKafkaBrokersFunc

func SetGetKafkaBrokersFunc(f func() []string)

SetGetKafkaBrokersFunc 设置获取Kafka Broker列表的函数。

func SetGetKafkaPwdFunc

func SetGetKafkaPwdFunc(f func() string)

SetGetKafkaPwdFunc 设置获取Kafka密码的函数。

func SetGetKafkaUserFunc

func SetGetKafkaUserFunc(f func() string)

SetGetKafkaUserFunc 设置获取Kafka用户名的函数。

func SetName added in v0.0.5

func SetName(n string)

SetName 设置节点名。

func SetRetryDelay

func SetRetryDelay(delay time.Duration)

SetRetryDelay 设置重试延迟时间。

func SetTimeout

func SetTimeout(timeout time.Duration)

SetTimeout 设置执行超时时间。

func WithTrace

func WithTrace(ctx context.Context) []zapcore.Field

WithTrace 从上下文中提取跟踪ID、会话ID和业务信息,并作为日志字段返回。

Types

type BoxMessage

type BoxMessage struct {
	Options
	MsgId   string `json:"msgid" form:"msgid"`     // 消息ID
	Execer  string `json:"execer" form:"execer"`   // 执行者
	ExecAt  int64  `json:"execat" form:"execat"`   // 执行时间戳
	ExecErr string `json:"execerr" form:"execerr"` // 执行错误信息
	Value   []byte `json:"val" form:"val"`         // 消息值
}

BoxMessage 定义了一个消息结构体,包括基础的消息信息和执行相关的信息。

func NewBoxMessage

func NewBoxMessage() *BoxMessage

NewBoxMessage 创建并返回一个新的BoxMessage实例,其中Options的HandleTimeout字段被初始化为getExecTimeout()的返回值。

func (*BoxMessage) Blocked added in v0.0.6

func (m *BoxMessage) Blocked() bool

Blocked 消费进入阻塞状态。

func (*BoxMessage) Dead

func (m *BoxMessage) Dead() bool

Dead 判断消息是否进入死信状态。当RetryMax为0或RetryIndex大于等于RetryMax时,返回true。

func (*BoxMessage) ExecResult

func (m *BoxMessage) ExecResult(execer string, err error)

ExecResult 更新消息的执行结果,并根据是否重试或消息是否进入死信状态,发布消息到相应的主题。

func (*BoxMessage) NewRawMessage

func (m *BoxMessage) NewRawMessage() *message.Message

NewRawMessage 根据BoxMessage的内容创建一个新的message.Message实例,并返回该实例。它会使用BoxMessage中的字段设置消息的元数据。

func (*BoxMessage) WithHeadersOption

func (m *BoxMessage) WithHeadersOption(headers map[string]string) *BoxMessage

WithHeadersOption 使用headers中的信息更新BoxMessage实例,并返回修改后的实例。它从headers中读取配置项并应用到BoxMessage上。

func (*BoxMessage) WithOption

func (m *BoxMessage) WithOption(opts ...Option) *BoxMessage

WithOption 为BoxMessage应用一个或多个选项,并返回修改后的BoxMessage实例。

func (*BoxMessage) WithRawMessage

func (m *BoxMessage) WithRawMessage(msg *message.Message) *BoxMessage

WithRawMessage 根据message.Message更新BoxMessage实例,并返回修改后的实例。它会设置MsgId和Value,并应用消息的元数据作为选项。

type Handler

type Handler func(ctx context.Context, box *BoxMessage) error

Handler 是一个处理BoxMessage的函数类型,它接收一个context.Context和一个*BoxMessage作为参数,返回一个error。

type ILogger

type ILogger interface {
	InfoCtx(ctx context.Context, msg string, args ...any)
	ErrorCtx(ctx context.Context, msg string, args ...any)
}

定义 ILogger 接口,提供日志的 Info 和 Error 方法。

type IManager

type IManager interface {
	Publish(topic string, boxmsg *BoxMessage) error
	RawPublish(topic string, boxM *BoxMessage, ow func(*sarama.Config) *sarama.Config) error
	RegisterSubscriber(ctx context.Context, topic string, opts ...Option) error
	RegisterRetry(ctx context.Context, h Handler) error
	RegisterDead(ctx context.Context, h Handler) error
}

IManager 接口定义了消息管理器的基本功能, 包括发布消息、注册订阅者、注册重试处理和注册死信处理。

func GetManager

func GetManager() IManager

GetManager 是用于获取默认消息管理器的函数。 该函数确保仅初始化一次默认的消息管理器和日志记录器。 返回值是初始化后的默认消息管理器实例,实现了IManager接口。

func NewWaterMillManager

func NewWaterMillManager() IManager

NewWaterMillManager 是用于创建一个新的WaterMillManager实例的函数。 返回值是一个初始化的WaterMillManager实例,包含了空的订阅者和发布者映射。

type Option

type Option func(*Options)

Option 类型为函数,用于修改Options实例

func WithExecType added in v0.0.6

func WithExecType(execType int32) Option

WithExecType 0:普通消费,1-阻塞消费

func WithGroup

func WithGroup(group string) Option

WithGroup 设置自定义的消息组

func WithHandle

func WithHandle(handle Handler) Option

WithHandle 设置消息处理函数

func WithHandleTimeout

func WithHandleTimeout(timeout time.Duration) Option

WithHandleTimeout 设置消息处理超时时间

func WithKey

func WithKey(key string) Option

WithKey 设置消息键

func WithOverwrite added in v0.0.5

func WithOverwrite(overwrite func(*sarama.Config) *sarama.Config) Option

WithOverwrite 设置重写

func WithRetryIndex

func WithRetryIndex(retryIndex int64) Option

WithRetryIndex 设置当前重试索引

func WithRetryMax

func WithRetryMax(retryMax int64) Option

WithRetryMax 设置最大重试次数

func WithSharedGroup

func WithSharedGroup() Option

WithSharedGroup 设置Group为共享组

func WithTopic

func WithTopic(topic string) Option

WithTopic 设置消息主题

func WithTraceID

func WithTraceID(traceId string) Option

WithTraceID 设置跟踪ID

type Options

type Options struct {
	Group         string                              `json:"group" form:"group"`           // 消息组别
	Topic         string                              `json:"topic" form:"topic"`           // 消息主题
	Key           string                              `json:"key" form:"key"`               // 消息键
	TraceId       string                              `json:"traceid" form:"traceid"`       // 跟踪ID
	RetryMax      int64                               `json:"retrymax" form:"retrymax"`     // 最大重试次数
	RetryIndex    int64                               `json:"retryindex" form:"retryindex"` // 当前重试索引
	Overwrite     func(*sarama.Config) *sarama.Config `json:"-" form:"-"`                   // 重写config
	ExecType      int32                               `json:"exectype" form:"exectype"`     // 0:普通消费,1-阻塞消费
	Handle        Handler                             `json:"-" form:"-"`                   // 消息处理函数
	HandleTimeout time.Duration                       `json:"timeout" form:"timeout"`       // 处理超时时间
}

Options 定义了消息队列的选项配置

func NewOptions

func NewOptions(opts ...Option) *Options

NewOptions 创建并初始化一个新的Options实例

func (*Options) Fmt

func (o *Options) Fmt() *Options

Fmt 检查并设置Options的默认值

func (*Options) Verify

func (o *Options) Verify() error

Verify 验证Options配置的有效性

type WaterMillLogger

type WaterMillLogger struct {
	// contains filtered or unexported fields
}

WaterMillLogger 实现了 ILogger 接口和 watermill.LoggerAdapter 接口,用于记录日志。

func NewWaterMillLogger

func NewWaterMillLogger() *WaterMillLogger

NewWaterMillLogger 创建并返回一个新的 WaterMillLogger 实例,该实例的日志级别为开发级别,并跳过调用者。

func (*WaterMillLogger) Debug

func (l *WaterMillLogger) Debug(msg string, fields watermill.LogFields)

Debug 记录一个调试日志消息,包括额外的日志字段。

func (*WaterMillLogger) Error

func (l *WaterMillLogger) Error(msg string, err error, fields watermill.LogFields)

Error 记录一个错误日志消息,包括错误信息和额外的日志字段。

func (*WaterMillLogger) ErrorCtx

func (l *WaterMillLogger) ErrorCtx(ctx context.Context, msg string, args ...any)

ErrorCtx 使用上下文信息记录一个错误日志消息,包括格式化的消息和额外的参数。

func (*WaterMillLogger) Info

func (l *WaterMillLogger) Info(msg string, fields watermill.LogFields)

Info 记录一个信息日志消息,包括额外的日志字段。

func (*WaterMillLogger) InfoCtx

func (l *WaterMillLogger) InfoCtx(ctx context.Context, msg string, args ...any)

InfoCtx 使用上下文信息记录一个信息日志消息,包括格式化的消息和额外的参数。

func (*WaterMillLogger) Trace

func (l *WaterMillLogger) Trace(msg string, fields watermill.LogFields)

Trace 记录一个跟踪日志消息,包括额外的日志字段。实则为信息级别日志。

func (*WaterMillLogger) With

With 返回一个包含额外日志字段的日志适配器实例。

type WaterMillManager

type WaterMillManager struct {
	Subs structure.ItemMap[kafka.Subscriber] // 存储订阅者
	Pubs structure.ItemMap[kafka.Publisher]  // 存储发布者
}

WaterMillManager 是具体的消息管理器实现,负责管理订阅者和发布者。

func (*WaterMillManager) Publish

func (m *WaterMillManager) Publish(topic string, boxM *BoxMessage) error

Publish 将消息发布到指定的主题。

func (*WaterMillManager) RawPublish added in v0.0.5

func (m *WaterMillManager) RawPublish(topic string, boxM *BoxMessage, ow func(*sarama.Config) *sarama.Config) error

RawPublish 将消息发布到指定的主题。

func (*WaterMillManager) RegisterDead

func (m *WaterMillManager) RegisterDead(ctx context.Context, h Handler) error

RegisterDead 注册一个死信消息的订阅者。如果提供的处理程序为nil,则使用默认的死信处理程序。 ctx: 上下文,用于控制函数的生命周期。 h: 自定义的消息处理程序,如果为nil,则使用默认处理程序。 返回值: 执行过程中遇到的任何错误。

func (*WaterMillManager) RegisterRetry

func (m *WaterMillManager) RegisterRetry(ctx context.Context, h Handler) error

RegisterRetry 注册一个重试消息的订阅者。如果提供的处理程序为nil,则使用默认的重试发布处理程序。 ctx: 上下文,用于控制函数的生命周期。 h: 自定义的消息处理程序,如果为nil,则使用默认处理程序。 返回值: 执行过程中遇到的任何错误。

func (*WaterMillManager) RegisterSubscriber

func (m *WaterMillManager) RegisterSubscriber(ctx context.Context, topic string, opts ...Option) error

RegisterSubscriber 注册一个自定义主题的订阅者。 ctx: 上下文,用于控制函数的生命周期。 topic: 要订阅的主题。 opts: 一系列选项,用于配置订阅者。 返回值: 执行过程中遇到的任何错误。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL