Documentation ¶
Index ¶
- Constants
- Variables
- func BindRecvChan(subject string, recvCh chan interface{}) error
- func BindRecvQueueChan(subject, queue string, recvCh chan interface{}) error
- func BindSendChan(subject string, sendCh chan interface{}) error
- func Drain(conn nats.Conn) error
- func Flush(conn nats.Conn) error
- func FlushTimeout(conn nats.Conn, timeout time.Duration) error
- func NatsMqInit()
- func Publish(subject string, msg interface{}) error
- func QueueSubscribe(subject, queue string, handler nats.Handler) error
- func Request(subject string, v interface{}, replyPtr interface{}, timeout time.Duration) error
- func RequestWithContext(ctx context.Context, subject string, msg interface{}, respPtr interface{}, ...) error
- func Subscribe(subject string, handler nats.MsgHandler) error
- func SubscribeForEncodedMsg(subject string, handler EncodedMsgHandler) error
- func SubscribeForRequest(subject string, f RequestMsgHandler) error
- func Unsubscribe(subjects ...string) error
- type DialFunc
- type EncodedMsgHandler
- type NatsPool
- type RequestMsgHandler
Constants ¶
const (
DefaultConnSize = 20 // 默认初始的连接数
)
const NatsTestBindChanSubject = "bind.chan"
演示BindChan主题
const NatsTestQueueSubject = "queue.subject"
演示Queue的主题
const NatsTestRequestSubject = "request.subject"
演示Request的主题
const NatsTestSubject1 = "subject1"
nats示例的订阅主题
const NatsTestSubject2 = "subject2"
nats示例的订阅主题
const QueueGroup1 = "qg1"
const QueueGroup2 = "qg2"
Variables ¶
var DefaultDialFunc = func(natsServersUrl string, options ...nats.Option) (*nats.Conn, error) { ops := []nats.Option{ nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { logger.WarmLog("Nats server disconnected Reason:" + err.Error()) }), nats.ReconnectHandler(func(nc *nats.Conn) { logger.WarmLog("Nats server reconnected to " + nc.ConnectedUrl()) }), nats.ClosedHandler(func(nc *nats.Conn) { logger.WarmLog("Nats server connection closed. Reason: " + nc.LastError().Error()) }), } ops = append(ops, options...) return nats.Connect(natsServersUrl, ops...) }
默认的连接处理函数
Functions ¶
func BindRecvQueueChan ¶
基于队列的接收操作,绑定通道。
func Drain ¶
func Drain(conn nats.Conn) error
Drain将使连接处于排空状态。所有订阅都将立即进入耗尽状态。完成后,发布服务器将被耗尽,并且不能发布任何其他消息。 一旦排空发布服务器,连接将关闭。使用ClosedCB()选项可以知道连接何时已从排出状态移到关闭状态。
func NatsMqInit ¶
func NatsMqInit()
func QueueSubscribe ¶
基于队列组的主题订阅: 具有相同队列名称的所有订阅都将形成一个队列组。使用队列语义,每个消息将仅传递给每个队列组的一个订阅服务器。 您可以拥有任意数量的队列组。普通订阅服务器将继续按预期工作。
func Request ¶
Request()是一个简单方便的API,它提供了一个伪同步的方式,使用了超时timeout配置。它创建了一个收件箱reply(收件箱是一种主题(subject) 类型, 对请求者唯一),订阅主题(subject),然后发布你的请求消息(消息带reply地址),设置为收件箱的主题(subject),然后等待响应,或者超时取消。 @param subject string 请求的主题 @param v interface{} 发送的消息 @param replyPtr interface{} 伪同步收件箱,在超时时间内接收同步响应数据 @param timeout time.Duration 等待响应的超时时间
For example:
1.Requests 发布一个主题等待响应消息 var response string err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil { fmt.Printf("Request failed: %v\n", err) }
2.Replying 订阅一个request主题,并向收件箱reply发布响应信息
c.Subscribe("help", func(subj, reply string, msg string) { c.Publish(reply, "I can help!") })
func RequestWithContext ¶
func RequestWithContext(ctx context.Context, subject string, msg interface{}, respPtr interface{}, timeout time.Duration) error
RequestWithContext将创建一个收件箱,并使用提供的取消上下文和数据v的收件箱回复执行请求。响应将被解码为vPtrResponse。
func SubscribeForEncodedMsg ¶
func SubscribeForEncodedMsg(subject string, handler EncodedMsgHandler) error
func SubscribeForRequest ¶
func SubscribeForRequest(subject string, f RequestMsgHandler) error
func Unsubscribe ¶
取消订阅一个或多个主题 param subject/subjects string 已订阅的主题
Types ¶
type EncodedMsgHandler ¶
type EncodedMsgHandler func(subject string, goDataMsg interface{})
接收已编码消息的订阅,用于订阅发布go类型数据消息的主题 除接收处理函数不同,其他都一样,请自定义接收消息的数据类型,消息用json编码发送
type RequestMsgHandler ¶
type RequestMsgHandler func(subject, reply string, msg interface{})
针对Request 主题的响应处理函数,RequestMsgHandler函数需向请求收件箱的reply主题发布一个响应消息,可使用PublishRequest处理