Documentation
¶
Index ¶
- Variables
- func WithAutoOffsetReset(strategy string) optparams.Option[Options]
- func WithComsumerSetting(key string, value any) optparams.Option[Options]
- func WithGroupID(groupID string) optparams.Option[Options]
- func WithIsolationLevel(strategy string) optparams.Option[Options]
- func WithParallelCallback() optparams.Option[Options]
- func WithUUID4GroupID(namespace ...string) optparams.Option[Options]
- type Callback
- type ConsumerProxy
- func (proxy *ConsumerProxy) Init(endpoints string, opts ...optparams.Option[Options]) error
- func (proxy *ConsumerProxy) IsOk() bool
- func (proxy *ConsumerProxy) OnError(cb OnErrorCallback) error
- func (proxy *ConsumerProxy) OnMessage(cb OnMsgCallback) error
- func (proxy *ConsumerProxy) Regist(cb ...Callback) error
- func (proxy *ConsumerProxy) SetConnect(cli *kafka.Consumer) error
- func (proxy *ConsumerProxy) Watch() func()
- type OnErrorCallback
- type OnMsgCallback
- type Options
Constants ¶
This section is empty.
Variables ¶
var Default = New()
Default 默认的kafka Consumer代理对象
var DefaultOptions = Options{ ConfigMap: kafka.ConfigMap{}, }
var ErrProxyAllreadySettedCallback = errors.New("cannot reset callback")
ErrProxyAllreadySettedCallback 代理已经设置过回调函数
var ErrProxyAllreadySettedClient = errors.New("cannot reset consumer")
ErrProxyAllreadySettedClient 代理已经设置过kafka消费者客户端
var Logger *log.Log
Functions ¶
func WithAutoOffsetReset ¶
WithAutoOffsetReset 设置监听时自动重置offset的策略 @params strategy string 自动重置offset的策略,可选的有earliest,latest,err3种
func WithComsumerSetting ¶
WithComsumerSetting 设置监听时的其他设置,具体设置可以看<https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md> @params key string 设置项 @params value any 设置项的值
func WithGroupID ¶
WithGroupID 设置监听时使用的groupid @params groupID string 指定的groupid
func WithIsolationLevel ¶
WithIsolationLevel 设置监听时读取消息的策略 @params strategy string 读取消息的策略,可选的有read_uncommitted, read_committed2种
func WithParallelCallback ¶
WithParallelCallback 设置callback并行执行
Types ¶
type ConsumerProxy ¶
ConsumerProxy redis客户端的代理
func (*ConsumerProxy) Init ¶
Init 从配置条件初始化代理对象 @params endpoints string 设置etcd连接的地址端点,以`,`分隔 @params opts ...optparams.Option[Options]
func (*ConsumerProxy) OnError ¶
func (proxy *ConsumerProxy) OnError(cb OnErrorCallback) error
OnMessage 注册消息处理函数 @params cb OnMsgCallback 消息处理的回调
func (*ConsumerProxy) OnMessage ¶
func (proxy *ConsumerProxy) OnMessage(cb OnMsgCallback) error
OnMessage 注册消息处理函数 @params cb OnMsgCallback 消息处理的回调
func (*ConsumerProxy) Regist ¶
func (proxy *ConsumerProxy) Regist(cb ...Callback) error
Regist 注册回调函数,在init执行后执行回调函数 如果对象已经设置了被代理客户端则无法再注册回调函数 @params cb ...Callback 回调函数
func (*ConsumerProxy) SetConnect ¶
func (proxy *ConsumerProxy) SetConnect(cli *kafka.Consumer) error
SetConnect 设置连接的客户端 @params cli UniversalClient 满足redis.UniversalClient接口的对象的指针