kafka

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2021 License: MIT Imports: 8 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DestroyQueueName

func DestroyQueueName(name string) (string, string)

DestroyQueueName 使用当前客户端分解一个ESB消息的队列名称,{name}名称满足格式:sys_esb_{systemId}_{node} 其中{systemId}为目标系统的四位数数字ID,{node}为目标系统监听的ESB节点标示(参考{@link ESBNode}。

@param name ESB消息的队列名称 @return "sys_esb","{systemId}_{node}"

func New

func New(opts ...Option) kafka

func NewProvider

func NewProvider(node esb.Node, logger log.Logger, client *kafka) *kafkaProvider

Types

type Option

type Option func(*options)

Option 是ESB选项.

func WithBackends

func WithBackends(backends []string) Option

func WithLogger

func WithLogger(logger log.Logger) Option

func WithSystemId

func WithSystemId(systemId string) Option

type Reader

type Reader interface {
	// OnRead 读取到数据后的回调处理方法,由业务系统实现处理。
	//
	// @param topic     数据所属的队列
	// @param partition 数据所在的分区编号,从0开始
	// @param offset    数据所在的offset
	// @param key       数据的key,发送时指定的,可能为Null
	// @param value     数据的内容
	OnRead(topic string, partition int32, offset int64, key, value []byte) error

	// GetOffset 获取从指定主题(topic)中指定分区(partition)读取数据的起始偏移位置,如果不指定(返回Null)
	// 则表明依据系统记录的最后offset来获取数据,如果返回小于0的值(如-1)则表明从初始位置开始读取。
	// 特别注意,如果返回的值超过了该分区的最大偏移值则会被认为是从初始位置开始读取数据。
	//
	// @param topic 数据所属的队列
	GetOffset(topic string) (offset int64, partition int32)
}

Reader kafka的数据读取接口定义,该接口的实现总会被认为是从最后记录的偏移位置开始读取数据(如果偏移值有保存的话),如果想要从指定偏移位置开始读取数据请实现方法`GetOffset`.

type ReaderRef

type ReaderRef struct {
	// contains filtered or unexported fields
}

ReaderRef 启动消费者获得的消费者引用,可以通过`close`来关闭指定的消费者。

type Sender

type Sender interface {

	// Send 将给定的数据发送到kafka特定的topic上
	// @param topic   队列名称
	// @param content 写入的内容
	// @see #send(String, String, String)
	Send(topic, content string)

	// SendPartition 将给定的数据发送到kafka特定的topic上,如果指定了key则根据key的hash结果来选择topic的分区,如果没有指定key则随机选择一个分区。
	//
	// @param topic   队列名称
	// @param key     内容的key
	// @param content 内容数据
	SendPartition(topic, key, content string)

	// Close 关闭处理,程序终止时的资源释放。
	Close()
}

Sender kafka的数据写入接口,所有数据都采用string形式写入。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL