Documentation ¶
Index ¶
- Constants
- Variables
- func ErrExec(topic, executer string, box *BoxMessage, err error, ...) error
- func Field2ZapField(fs watermill.LogFields) []zapcore.Field
- func NewPublisher(topic string, ow func(*sarama.Config) *sarama.Config) (*kafka.Publisher, error)
- func NewSubscriber(group string, ow func(*sarama.Config) *sarama.Config) (*kafka.Subscriber, error)
- func SetGetKafkaBrokersFunc(f func() []string)
- func SetGetKafkaPwdFunc(f func() string)
- func SetGetKafkaUserFunc(f func() string)
- func SetName(n string)
- func SetRetryDelay(delay time.Duration)
- func SetTimeout(timeout time.Duration)
- func WithTrace(ctx context.Context) []zapcore.Field
- type BoxMessage
- func (m *BoxMessage) Blocked() bool
- func (m *BoxMessage) Dead() bool
- func (m *BoxMessage) ExecResult(execer string, err error)
- func (m *BoxMessage) NewRawMessage() *message.Message
- func (m *BoxMessage) WithHeadersOption(headers map[string]string) *BoxMessage
- func (m *BoxMessage) WithOption(opts ...Option) *BoxMessage
- func (m *BoxMessage) WithRawMessage(msg *message.Message) *BoxMessage
- type Handler
- type ILogger
- type IManager
- type Option
- func WithExecType(execType int32) Option
- func WithGroup(group string) Option
- func WithHandle(handle Handler) Option
- func WithHandleTimeout(timeout time.Duration) Option
- func WithKey(key string) Option
- func WithOverwrite(overwrite func(*sarama.Config) *sarama.Config) Option
- func WithRetryIndex(retryIndex int64) Option
- func WithRetryMax(retryMax int64) Option
- func WithSharedGroup() Option
- func WithTopic(topic string) Option
- func WithTraceID(traceId string) Option
- type Options
- type WaterMillLogger
- func (l *WaterMillLogger) Debug(msg string, fields watermill.LogFields)
- func (l *WaterMillLogger) Error(msg string, err error, fields watermill.LogFields)
- func (l *WaterMillLogger) ErrorCtx(ctx context.Context, msg string, args ...any)
- func (l *WaterMillLogger) Info(msg string, fields watermill.LogFields)
- func (l *WaterMillLogger) InfoCtx(ctx context.Context, msg string, args ...any)
- func (l *WaterMillLogger) Trace(msg string, fields watermill.LogFields)
- func (l *WaterMillLogger) With(fields watermill.LogFields) watermill.LoggerAdapter
- type WaterMillManager
- func (m *WaterMillManager) Publish(topic string, boxM *BoxMessage) error
- func (m *WaterMillManager) RawPublish(topic string, boxM *BoxMessage, ow func(*sarama.Config) *sarama.Config) error
- func (m *WaterMillManager) RegisterDead(ctx context.Context, h Handler) error
- func (m *WaterMillManager) RegisterRetry(ctx context.Context, h Handler) error
- func (m *WaterMillManager) RegisterSubscriber(ctx context.Context, topic string, opts ...Option) error
Constants ¶
const ( APHMQIGP_DEF = "aphmqigp_def" // APHMQIGP_DEF 是一个默认标识符 APHMQIGP_INNER = "aphmqigp_inner" // APHMQIGP_INNER 是一个内置,用于处理重试与死信的主消费配置 )
内置Group
const ( APHMQITP_DEAD = "aphmqitp_dead" // APHMQITP_DEAD 表示消息处理失败,处于死信状态 APHMQITP_RETRY = "aphmqitp_retry" // APHMQITP_RETRY 表示消息需要重试 )
内置队列
const ( APHMQH_PARTITION_KEY = "_aphmqh_partition" // APHMQH_PARTITION_KEY 用于标识消息所在的分区 APHMQH_TRACE_ID = "_aphmqh_traceid" // APHMQH_TRACE_ID 用于标识消息的跟踪ID APHMQH_MSG_ID = "_aphmqh_msgid" // APHMQH_MSG_ID 用于唯一标识消息的ID APHMQH_MSG_GROUP = "_aphmqh_msggp" // APHMQH_MSG_GROUP 用于标识消息所属的组 APHMQH_MSG_TOPIC = "_aphmqh_msgtopic" // APHMQH_MSG_TOPIC 用于标识消息的主题 APHMQH_RETRIES = "_aphmqh_retries" // APHMQH_RETRIES 表示消息的重试次数 APHMQH_RETRIES_MAX = "_aphmqh_retriesmax" // APHMQH_RETRIES_MAX 表示消息的最大重试次数 APHMQH_EXECTYPE = "_aphmqh_exectype" // APHMQH_EXECTYPE 用于标识执行消息的方式 APHMQH_EXECER = "_aphmqh_execer" // APHMQH_EXECER 用于标识执行消息的实体 APHMQH_EXECAT = "_aphmqh_execat" // APHMQH_EXECAT 用于标识消息执行的时间 APHMQH_EXECERR = "_aphmqh_execerr" // APHMQH_EXECERR 用于记录消息执行失败的原因 APHMQH_EXEC_TIMEOUT = "_aphmqh_timeout" // APHMQH_EXEC_TIMEOUT 用于标识消息执行的超时时间 )
消息头metadata
Variables ¶
var ( ErrNoFoundManager = errors.New("没有配置管理器") // 表示没有找到配置的理器 ErrNoFoundPublisher = errors.New("没有配置发布者") // 表示没有找到配置的发布者 ErrNoFoundSubscriber = errors.New("没有配置订阅者") // 表示没有找到配置的订阅者 ErrNoFoundTopic = errors.New("没有配置主题") // 表示没有找到配置的主题 ErrNoFoundGroup = errors.New("没有配置组名") // 表示没有找到配置的组名 ErrNoFoundHandle = errors.New("没有配置执行函数") // 表示没有找到配置的执行函数 )
定义了一系列的错误类型,用于表示消息发布与订阅过程中可能出现的配置错误。 这些错误类型主要用于标识配置信息缺失的情况,包括: - 没有配置发布者 - 没有配置订阅者 - 没有配置主题 - 没有配置组名 - 没有配置执行函数
Functions ¶
func ErrExec ¶ added in v0.0.5
func ErrExec(topic, executer string, box *BoxMessage, err error, publishFunc func(string, *BoxMessage) error) error
ErrExec 处理错误执行逻辑,并根据错误情况发布到不同的主题。
参数: topic - 消息的主题。 executer - 执行者的标识。 box - 包含执行结果的消息盒。 err - 执行过程中遇到的错误。 publishFunc - 用于发布消息的函数,接受主题和消息盒作为参数,返回错误。
返回值: 返回调用publishFunc函数时的错误,如果publishFunc执行失败。
func Field2ZapField ¶
Field2ZapField 将 watermill.LogFields 转换为 zapcore.Field 切片,以供日志记录使用。
func NewPublisher ¶
NewPublisher 创建并返回一个新的Kafka发布者实例。 参数: - topic: 发布者将要发布消息的主题名称。 返回值: - *kafka.Publisher: 创建的Kafka发布者实例。 - error: 如果在创建发布者过程中遇到错误,则返回错误信息;否则返回nil。
func NewSubscriber ¶
NewSubscriber 创建并返回一个新的Kafka订阅者实例。
func SetGetKafkaBrokersFunc ¶
func SetGetKafkaBrokersFunc(f func() []string)
SetGetKafkaBrokersFunc 设置获取Kafka Broker列表的函数。
func SetGetKafkaPwdFunc ¶
func SetGetKafkaPwdFunc(f func() string)
SetGetKafkaPwdFunc 设置获取Kafka密码的函数。
func SetGetKafkaUserFunc ¶
func SetGetKafkaUserFunc(f func() string)
SetGetKafkaUserFunc 设置获取Kafka用户名的函数。
Types ¶
type BoxMessage ¶
type BoxMessage struct { Options MsgId string `json:"msgid" form:"msgid"` // 消息ID Execer string `json:"execer" form:"execer"` // 执行者 ExecAt int64 `json:"execat" form:"execat"` // 执行时间戳 ExecErr string `json:"execerr" form:"execerr"` // 执行错误信息 Value []byte `json:"val" form:"val"` // 消息值 }
BoxMessage 定义了一个消息结构体,包括基础的消息信息和执行相关的信息。
func NewBoxMessage ¶
func NewBoxMessage() *BoxMessage
NewBoxMessage 创建并返回一个新的BoxMessage实例,其中Options的HandleTimeout字段被初始化为getExecTimeout()的返回值。
func (*BoxMessage) Dead ¶
func (m *BoxMessage) Dead() bool
Dead 判断消息是否进入死信状态。当RetryMax为0或RetryIndex大于等于RetryMax时,返回true。
func (*BoxMessage) ExecResult ¶
func (m *BoxMessage) ExecResult(execer string, err error)
ExecResult 更新消息的执行结果,并根据是否重试或消息是否进入死信状态,发布消息到相应的主题。
func (*BoxMessage) NewRawMessage ¶
func (m *BoxMessage) NewRawMessage() *message.Message
NewRawMessage 根据BoxMessage的内容创建一个新的message.Message实例,并返回该实例。它会使用BoxMessage中的字段设置消息的元数据。
func (*BoxMessage) WithHeadersOption ¶
func (m *BoxMessage) WithHeadersOption(headers map[string]string) *BoxMessage
WithHeadersOption 使用headers中的信息更新BoxMessage实例,并返回修改后的实例。它从headers中读取配置项并应用到BoxMessage上。
func (*BoxMessage) WithOption ¶
func (m *BoxMessage) WithOption(opts ...Option) *BoxMessage
WithOption 为BoxMessage应用一个或多个选项,并返回修改后的BoxMessage实例。
func (*BoxMessage) WithRawMessage ¶
func (m *BoxMessage) WithRawMessage(msg *message.Message) *BoxMessage
WithRawMessage 根据message.Message更新BoxMessage实例,并返回修改后的实例。它会设置MsgId和Value,并应用消息的元数据作为选项。
type Handler ¶
type Handler func(ctx context.Context, box *BoxMessage) error
Handler 是一个处理BoxMessage的函数类型,它接收一个context.Context和一个*BoxMessage作为参数,返回一个error。
type ILogger ¶
type ILogger interface { InfoCtx(ctx context.Context, msg string, args ...any) ErrorCtx(ctx context.Context, msg string, args ...any) }
定义 ILogger 接口,提供日志的 Info 和 Error 方法。
type IManager ¶
type IManager interface { Publish(topic string, boxmsg *BoxMessage) error RawPublish(topic string, boxM *BoxMessage, ow func(*sarama.Config) *sarama.Config) error RegisterSubscriber(ctx context.Context, topic string, opts ...Option) error RegisterRetry(ctx context.Context, h Handler) error RegisterDead(ctx context.Context, h Handler) error }
IManager 接口定义了消息管理器的基本功能, 包括发布消息、注册订阅者、注册重试处理和注册死信处理。
func GetManager ¶
func GetManager() IManager
GetManager 是用于获取默认消息管理器的函数。 该函数确保仅初始化一次默认的消息管理器和日志记录器。 返回值是初始化后的默认消息管理器实例,实现了IManager接口。
func NewWaterMillManager ¶
func NewWaterMillManager() IManager
NewWaterMillManager 是用于创建一个新的WaterMillManager实例的函数。 返回值是一个初始化的WaterMillManager实例,包含了空的订阅者和发布者映射。
type Option ¶
type Option func(*Options)
Option 类型为函数,用于修改Options实例
func WithExecType ¶ added in v0.0.6
WithExecType 0:普通消费,1-阻塞消费
func WithHandleTimeout ¶
WithHandleTimeout 设置消息处理超时时间
func WithOverwrite ¶ added in v0.0.5
WithOverwrite 设置重写
type Options ¶
type Options struct { Group string `json:"group" form:"group"` // 消息组别 Topic string `json:"topic" form:"topic"` // 消息主题 Key string `json:"key" form:"key"` // 消息键 TraceId string `json:"traceid" form:"traceid"` // 跟踪ID RetryMax int64 `json:"retrymax" form:"retrymax"` // 最大重试次数 RetryIndex int64 `json:"retryindex" form:"retryindex"` // 当前重试索引 Overwrite func(*sarama.Config) *sarama.Config `json:"-" form:"-"` // 重写config ExecType int32 `json:"exectype" form:"exectype"` // 0:普通消费,1-阻塞消费 Handle Handler `json:"-" form:"-"` // 消息处理函数 HandleTimeout time.Duration `json:"timeout" form:"timeout"` // 处理超时时间 }
Options 定义了消息队列的选项配置
type WaterMillLogger ¶
type WaterMillLogger struct {
// contains filtered or unexported fields
}
WaterMillLogger 实现了 ILogger 接口和 watermill.LoggerAdapter 接口,用于记录日志。
func NewWaterMillLogger ¶
func NewWaterMillLogger() *WaterMillLogger
NewWaterMillLogger 创建并返回一个新的 WaterMillLogger 实例,该实例的日志级别为开发级别,并跳过调用者。
func (*WaterMillLogger) Debug ¶
func (l *WaterMillLogger) Debug(msg string, fields watermill.LogFields)
Debug 记录一个调试日志消息,包括额外的日志字段。
func (*WaterMillLogger) Error ¶
func (l *WaterMillLogger) Error(msg string, err error, fields watermill.LogFields)
Error 记录一个错误日志消息,包括错误信息和额外的日志字段。
func (*WaterMillLogger) ErrorCtx ¶
func (l *WaterMillLogger) ErrorCtx(ctx context.Context, msg string, args ...any)
ErrorCtx 使用上下文信息记录一个错误日志消息,包括格式化的消息和额外的参数。
func (*WaterMillLogger) Info ¶
func (l *WaterMillLogger) Info(msg string, fields watermill.LogFields)
Info 记录一个信息日志消息,包括额外的日志字段。
func (*WaterMillLogger) InfoCtx ¶
func (l *WaterMillLogger) InfoCtx(ctx context.Context, msg string, args ...any)
InfoCtx 使用上下文信息记录一个信息日志消息,包括格式化的消息和额外的参数。
func (*WaterMillLogger) Trace ¶
func (l *WaterMillLogger) Trace(msg string, fields watermill.LogFields)
Trace 记录一个跟踪日志消息,包括额外的日志字段。实则为信息级别日志。
func (*WaterMillLogger) With ¶
func (l *WaterMillLogger) With(fields watermill.LogFields) watermill.LoggerAdapter
With 返回一个包含额外日志字段的日志适配器实例。
type WaterMillManager ¶
type WaterMillManager struct { Subs structure.ItemMap[kafka.Subscriber] // 存储订阅者 Pubs structure.ItemMap[kafka.Publisher] // 存储发布者 }
WaterMillManager 是具体的消息管理器实现,负责管理订阅者和发布者。
func (*WaterMillManager) Publish ¶
func (m *WaterMillManager) Publish(topic string, boxM *BoxMessage) error
Publish 将消息发布到指定的主题。
func (*WaterMillManager) RawPublish ¶ added in v0.0.5
func (m *WaterMillManager) RawPublish(topic string, boxM *BoxMessage, ow func(*sarama.Config) *sarama.Config) error
RawPublish 将消息发布到指定的主题。
func (*WaterMillManager) RegisterDead ¶
func (m *WaterMillManager) RegisterDead(ctx context.Context, h Handler) error
RegisterDead 注册一个死信消息的订阅者。如果提供的处理程序为nil,则使用默认的死信处理程序。 ctx: 上下文,用于控制函数的生命周期。 h: 自定义的消息处理程序,如果为nil,则使用默认处理程序。 返回值: 执行过程中遇到的任何错误。
func (*WaterMillManager) RegisterRetry ¶
func (m *WaterMillManager) RegisterRetry(ctx context.Context, h Handler) error
RegisterRetry 注册一个重试消息的订阅者。如果提供的处理程序为nil,则使用默认的重试发布处理程序。 ctx: 上下文,用于控制函数的生命周期。 h: 自定义的消息处理程序,如果为nil,则使用默认处理程序。 返回值: 执行过程中遇到的任何错误。
func (*WaterMillManager) RegisterSubscriber ¶
func (m *WaterMillManager) RegisterSubscriber(ctx context.Context, topic string, opts ...Option) error
RegisterSubscriber 注册一个自定义主题的订阅者。 ctx: 上下文,用于控制函数的生命周期。 topic: 要订阅的主题。 opts: 一系列选项,用于配置订阅者。 返回值: 执行过程中遇到的任何错误。