consumerproxy

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2022 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Default = New()

Default 默认的kafka Consumer代理对象

View Source
var DefaultOptions = Options{
	ConfigMap: kafka.ConfigMap{},
}
View Source
var ErrProxyAllreadySettedCallback = errors.New("cannot reset callback")

ErrProxyAllreadySettedCallback 代理已经设置过回调函数

View Source
var ErrProxyAllreadySettedClient = errors.New("cannot reset consumer")

ErrProxyAllreadySettedClient 代理已经设置过kafka消费者客户端

View Source
var Logger *log.Log

Functions

func WithAutoOffsetReset

func WithAutoOffsetReset(strategy string) optparams.Option[Options]

WithAutoOffsetReset 设置监听时自动重置offset的策略 @params strategy string 自动重置offset的策略,可选的有earliest,latest,err3种

func WithComsumerSetting

func WithComsumerSetting(key string, value any) optparams.Option[Options]

WithComsumerSetting 设置监听时的其他设置,具体设置可以看<https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md> @params key string 设置项 @params value any 设置项的值

func WithGroupID

func WithGroupID(groupID string) optparams.Option[Options]

WithGroupID 设置监听时使用的groupid @params groupID string 指定的groupid

func WithIsolationLevel

func WithIsolationLevel(strategy string) optparams.Option[Options]

WithIsolationLevel 设置监听时读取消息的策略 @params strategy string 读取消息的策略,可选的有read_uncommitted, read_committed2种

func WithParallelCallback

func WithParallelCallback() optparams.Option[Options]

WithParallelCallback 设置callback并行执行

func WithUUID4GroupID

func WithUUID4GroupID(namespace ...string) optparams.Option[Options]

WithGroupID 设置监听时使用uuid4作为groupid @params namespace ...string 指定的groupid的命名空间,命名空间会按顺序使用`-`连接,然后作为前缀与uuid使用`__`相连

Types

type Callback

type Callback func(cli *kafka.Consumer) error

Callback redis操作的回调函数

type ConsumerProxy

type ConsumerProxy struct {
	*kafka.Consumer
	Opt Options
	// contains filtered or unexported fields
}

ConsumerProxy redis客户端的代理

func New

func New() *ConsumerProxy

New 创建一个新的数据库客户端代理

func (*ConsumerProxy) Init

func (proxy *ConsumerProxy) Init(endpoints string, opts ...optparams.Option[Options]) error

Init 从配置条件初始化代理对象 @params endpoints string 设置etcd连接的地址端点,以`,`分隔 @params opts ...optparams.Option[Options]

func (*ConsumerProxy) IsOk

func (proxy *ConsumerProxy) IsOk() bool

IsOk 检查代理是否已经可用

func (*ConsumerProxy) OnError

func (proxy *ConsumerProxy) OnError(cb OnErrorCallback) error

OnMessage 注册消息处理函数 @params cb OnMsgCallback 消息处理的回调

func (*ConsumerProxy) OnMessage

func (proxy *ConsumerProxy) OnMessage(cb OnMsgCallback) error

OnMessage 注册消息处理函数 @params cb OnMsgCallback 消息处理的回调

func (*ConsumerProxy) Regist

func (proxy *ConsumerProxy) Regist(cb ...Callback) error

Regist 注册回调函数,在init执行后执行回调函数 如果对象已经设置了被代理客户端则无法再注册回调函数 @params cb ...Callback 回调函数

func (*ConsumerProxy) SetConnect

func (proxy *ConsumerProxy) SetConnect(cli *kafka.Consumer) error

SetConnect 设置连接的客户端 @params cli UniversalClient 满足redis.UniversalClient接口的对象的指针

func (*ConsumerProxy) Watch

func (proxy *ConsumerProxy) Watch() func()

Watch 开始监听kafka

type OnErrorCallback

type OnErrorCallback func(err kafka.Error)

type OnMsgCallback

type OnMsgCallback func(evt *kafka.Message)

type Options

type Options struct {
	kafka.ConfigMap
	ParallelCallback bool
}

Options 设置代理对象初始化方法的可选参数

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL