natsMq

package
v0.0.0-...-3208bda Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2020 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultConnSize = 20 // 默认初始的连接数
)
View Source
const NatsTestBindChanSubject = "bind.chan"

演示BindChan主题

View Source
const NatsTestQueueSubject = "queue.subject"

演示Queue的主题

View Source
const NatsTestRequestSubject = "request.subject"

演示Request的主题

View Source
const NatsTestSubject1 = "subject1"

nats示例的订阅主题

View Source
const NatsTestSubject2 = "subject2"

nats示例的订阅主题

View Source
const QueueGroup1 = "qg1"
View Source
const QueueGroup2 = "qg2"

Variables

View Source
var DefaultDialFunc = func(natsServersUrl string, options ...nats.Option) (*nats.Conn, error) {
	ops := []nats.Option{
		nats.MaxReconnects(5),
		nats.ReconnectWait(2 * time.Second),
		nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {

			logger.WarmLog("Nats server disconnected Reason:" + err.Error())
		}),
		nats.ReconnectHandler(func(nc *nats.Conn) {

			logger.WarmLog("Nats server reconnected to " + nc.ConnectedUrl())
		}),
		nats.ClosedHandler(func(nc *nats.Conn) {

			logger.WarmLog("Nats server connection closed. Reason: " + nc.LastError().Error())
		}),
	}

	ops = append(ops, options...)

	return nats.Connect(natsServersUrl, ops...)
}

默认的连接处理函数

Functions

func BindRecvChan

func BindRecvChan(subject string, recvCh chan interface{}) error

接收主题消息,绑定管道

func BindRecvQueueChan

func BindRecvQueueChan(subject, queue string, recvCh chan interface{}) error

基于队列的接收操作,绑定通道。

func BindSendChan

func BindSendChan(subject string, sendCh chan interface{}) error

发送消息到一个主题,绑定管道

func Drain

func Drain(conn nats.Conn) error

Drain将使连接处于排空状态。所有订阅都将立即进入耗尽状态。完成后,发布服务器将被耗尽,并且不能发布任何其他消息。 一旦排空发布服务器,连接将关闭。使用ClosedCB()选项可以知道连接何时已从排出状态移到关闭状态。

func Flush

func Flush(conn nats.Conn) error

Flush 当执行完整个服务并接收到所有内部reply时返回

func FlushTimeout

func FlushTimeout(conn nats.Conn, timeout time.Duration) error

Flush的超时限制的实现

func NatsMqInit

func NatsMqInit()

func Publish

func Publish(subject string, msg interface{}) error

发布消息到一个主题 @param subject string 发布主题 @param msg interface{} 发布的消息

func QueueSubscribe

func QueueSubscribe(subject, queue string, handler nats.Handler) error

基于队列组的主题订阅: 具有相同队列名称的所有订阅都将形成一个队列组。使用队列语义,每个消息将仅传递给每个队列组的一个订阅服务器。 您可以拥有任意数量的队列组。普通订阅服务器将继续按预期工作。

func Request

func Request(subject string, v interface{}, replyPtr interface{}, timeout time.Duration) error

Request()是一个简单方便的API,它提供了一个伪同步的方式,使用了超时timeout配置。它创建了一个收件箱reply(收件箱是一种主题(subject) 类型, 对请求者唯一),订阅主题(subject),然后发布你的请求消息(消息带reply地址),设置为收件箱的主题(subject),然后等待响应,或者超时取消。 @param subject string 请求的主题 @param v interface{} 发送的消息 @param replyPtr interface{} 伪同步收件箱,在超时时间内接收同步响应数据 @param timeout time.Duration 等待响应的超时时间

For example:

1.Requests 发布一个主题等待响应消息 var response string err = c.Request("help", "help me", &response, 10*time.Millisecond)

if err != nil {
    fmt.Printf("Request failed: %v\n", err)
}

2.Replying 订阅一个request主题,并向收件箱reply发布响应信息

c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

func RequestWithContext

func RequestWithContext(ctx context.Context, subject string, msg interface{}, respPtr interface{}, timeout time.Duration) error

RequestWithContext将创建一个收件箱,并使用提供的取消上下文和数据v的收件箱回复执行请求。响应将被解码为vPtrResponse。

func Subscribe

func Subscribe(subject string, handler nats.MsgHandler) error

func SubscribeForEncodedMsg

func SubscribeForEncodedMsg(subject string, handler EncodedMsgHandler) error

func SubscribeForRequest

func SubscribeForRequest(subject string, f RequestMsgHandler) error

func Unsubscribe

func Unsubscribe(subjects ...string) error

取消订阅一个或多个主题 param subject/subjects string 已订阅的主题

Types

type DialFunc

type DialFunc func(natsServersUrl string, options ...nats.Option) (*nats.Conn, error)

连接处理函数

type EncodedMsgHandler

type EncodedMsgHandler func(subject string, goDataMsg interface{})

接收已编码消息的订阅,用于订阅发布go类型数据消息的主题 除接收处理函数不同,其他都一样,请自定义接收消息的数据类型,消息用json编码发送

type NatsPool

type NatsPool struct {
	Network string
	Addr    string
	// contains filtered or unexported fields
}

nats连接池值类型

var NatsMqConnPool *NatsPool

func NewDefaultPool

func NewDefaultPool(addr string) (*NatsPool, error)

默认连接池的工厂方法

func NewNatsConnectPool

func NewNatsConnectPool(addr string, connSize int, dialFunc DialFunc) (*NatsPool, error)

创建连接池的工厂方法

func (*NatsPool) Avail

func (p *NatsPool) Avail() int

有效的连接数

func (*NatsPool) Empty

func (p *NatsPool) Empty()

情况连接池

func (*NatsPool) Get

func (p *NatsPool) Get() (*nats.Conn, error)

从连接池获取连接

func (*NatsPool) Put

func (p *NatsPool) Put(conn *nats.Conn)

将连接放回连接池

type RequestMsgHandler

type RequestMsgHandler func(subject, reply string, msg interface{})

针对Request 主题的响应处理函数,RequestMsgHandler函数需向请求收件箱的reply主题发布一个响应消息,可使用PublishRequest处理

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL