natsv2

package
v1.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

nats 消息队列处理支持分布式使用

## 该库下的消费者按照适配管理后台设计:即开即用,适时回收资源
    如果要常驻消费者需要单独再处理

## RequestReplayConsumer
    普通 请求-回应模式 订阅主题
    有消息处理时重置等待时间,无消息处理后到期关闭消费者
    如果多开消费者,会都收到消息

普通模式下的消息发送和处理案例 [同步发送并接收返回]

    nt := nats.NewNats()

    // 订阅主题
    err := nt.RequestReplayConsumer("mugame_serverlink_test.>", func(subject string, body []byte) []byte {
        return []byte("我收到你的消息了:" + string(body))
    })
    if err != nil {
        return nil, err
    }
    
    //发送消息
    subject := fmt.Sprintf("mugame_serverlink_%s.group.group_%d.ProcessSignal", appEnv, regionId)
    data := []byte(fmt.Sprintf("%d|%d|%d|0|0", lineId, signal, data_1))

    ret, err := nt.RequestReplayRequest(subject, data)

    //关闭连接 - 跟消费者一起启动时,不要调用关闭
    nc.Close()

流处理使用案例

func main() {
    nt := nats.NewNats()

    for i := 1; i < 3; i++ {
        go func(i int) {
            err := nt.StreamConsumer("greetStream", "greet.>", "greetStreamConsumer", func(subject string, body []byte) {
                fmt.Println("处理器", i, subject, string(body))
            })
            if err != nil {
                panic(err)
            }
        }(i)
    }

    go func() {
        for i := 0; i < 10; i++ {
            go func(i int) {
                time.Sleep(time.Second * time.Duration(cFunc.RandRangeInt(1, 10)))
                err := nt.StreamPublish("greet.testSend", []byte("消息:"+strconv.Itoa(i)+" "+cFunc.Date("Y-m-d H:i:s", 0)))
                if err != nil {
                    fmt.Println("消息发送失败", err)
                }
            }(i)
        }
    }()

    select {}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AnswerConsumer

func AnswerConsumer(subject string, fn func(subject string, body []byte) []byte) error

func RequestReplayRequest

func RequestReplayRequest(subject string, body []byte) ([]byte, error)

func StreamPublish

func StreamPublish(subject string, body []byte) error

Types

type Config

type Config struct {
	Host string `yaml:"host"`
	Port string `yaml:"port"`
}

Config nats配置文件结构

var Conf *Config

type Nats

type Nats struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(host, port string) (*Nats, error)

func (*Nats) AnswerConsumer

func (n *Nats) AnswerConsumer(subject string, fn func(subject string, body []byte) []byte) error

AnswerConsumer 普通 请求-回应模式 同步消费者 注意: 多开所有消费者都会收到消息,注意事务处理

func (*Nats) Close

func (n *Nats) Close()

func (*Nats) RequestReplayRequest

func (n *Nats) RequestReplayRequest(subject string, body []byte) ([]byte, error)

RequestReplayRequest 普通 请求-回应模式 发送请求

func (*Nats) StreamPublish

func (n *Nats) StreamPublish(subject string, body []byte) error

StreamPublish 流模式 发送消息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL