mq

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2022 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_DATA_KEY = "MQ-DATA-KEY"
)

Variables

View Source
var AdapterMap = make(map[Adapter]Instance)
View Source
var EXCHANGE_NAME_DIRECT = "REACT_MQ_DIRECT_EXCHANGE" //Driect模式默认交换器名称定义

var MAX_POOL_SIZE = 512 //连接池最大连接数 var DEFAULT_POOL_SIZE = int(5) //连接池默认连接数

View Source
var EXCHANGE_NAME_FANOUT = "REACT_MQ_FANOUT_EXCHANGE" //Fanout模式默认交换器名称定义
View Source
var EXCHANGE_NAME_TOPIC = "REACT_MQ_TOPIC_EXCHANGE" //Topic模式默认交换器名称定义
View Source
var UNSET = "<UNSET>"

Functions

func Register

func Register(adapter Adapter, ins Instance)

注册对象创建方法

func SetLogLevel added in v1.0.9

func SetLogLevel(strLevel string)

strLevel -> "debug"/"info"/"warn"/"error" default "info"

func SetLogPath added in v1.0.9

func SetLogPath(strLogPath string)

strLogPath log file path

Types

type Adapter

type Adapter int8
const (
	Adapter_RabbitMQ Adapter = 1
	Adapter_RedisMQ  Adapter = 2
	Adapter_KafkaMQ  Adapter = 3
	Adapter_RocketMQ Adapter = 4
	Adapter_MQTT     Adapter = 5
	Adapter_ETCD     Adapter = 6
)

func (Adapter) String

func (a Adapter) String() string

type IReactMQ

type IReactMQ interface {
	/*
	* @brief 	MQ服务器连接接口定义
	* @param 	strUrl 连接服务器URL(格式规范 [amqp|redis|rocket|kafka|mqtt]://user:password@host:port)
	* @return 	err 连接失败返回具体错误信息
	 */
	Connect(mode Mode, strURL string) (err error)

	/*
	* @brief 	MQ服务器重新连接接口定义
	* @param
	* @return 	err 连接失败返回具体错误信息
	* @remark   当Publish返回错误且IsClosed()方法亦返回true时调用此方法重连MQ服务器
	 */
	Reconnect() (err error)

	/*
	* @brief 	主动关闭
	 */
	Close()

	/*
	* @brief 	判定是否MQ服务器断开连接(异常宕机或重启)
	* @param
	* @return 	远程服务器连接断开返回true,否则返回false
	 */
	IsClosed() bool

	/*
	* @brief 	消息发布接口定义(仅支持字符串类型消息)
	* @param 	strBindingKey 	路由Key
	* @param	strQueueName 	队列名称(redis/mqtt协议非必填)
	* @param	key 		消息标识(kafka必填,其他MQ填DEFAULT_DATA_KEY)
	* @param	value 		消息数据
	* @return   err 发布失败返回具体错误信息
	 */
	Publish(strBindingKey, strQueueName, key string, value string) (err error)

	/*
	* @brief 	消息消费接口定义
	* @param 	strBindingKey 	队列绑定Key
	* @param	strQueueName 	队列名称(redis/mqtt协议非必填)
	* @param    consumer        ReactHandler方法实现对象
	* @return   err 成功返回nil,失败返回返回具体错误信息
	* @remark   服务器异常或重启时内部会自动重连服务器
	 */
	Consume(strBindingKey, strQueueName string, consumer ReactHandler) (err error)

	/*
	* @brief 	开启或关闭调式模式
	* @param 	enable 	true开启/false关闭
	 */
	Debug(enable bool)

	/*
	* @brief 	获取当前MQ类型
	* @param 	adapter  MQ类型
	 */
	GetAdapter() (adapter Adapter)
}

func NewMQ added in v1.1.0

func NewMQ(adapter Adapter) (IReactMQ, error)

适配器: 创建MQ对象

type Instance

type Instance func() IReactMQ

type Mode

type Mode int8
const (
	Mode_Direct Mode = 1 //根据routing-key完全匹配消息队列
	Mode_Topic  Mode = 2 //根据topic规则匹配消息队列
	Mode_Fanout Mode = 3 //广播模式
)

func (Mode) String

func (m Mode) String() string

type ReactHandler added in v1.0.9

type ReactHandler interface {
	OnConsume(adapter Adapter, strBindingKey, strQueueName, strKey string, strValue string)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL