rabbitmq

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2021 License: MIT Imports: 9 Imported by: 0

README

异步消息

安装

go get github.com/hanguangbaihuo/sparrow_cloud_go/

发送异步消息

注意
必须配置环境变量:
SC_TASK_PROXY
SC_TASK_PROXY_API
发送消息示例
import (
    "github.com/hanguangbaihuo/sparrow_cloud_go/rabbitmq"
)

func main() {
    // 1.发送异步消息
    // message code是test
    // 位置参数为"hola",关键字参数为hi="hello world"
    res, err := rabbitmq.SendTask("test", []interface{}{"hola"}, map[string]interface{}{"hi": "hello world"})
    if err != nil {
        // handle error
    }
    id, ok := res.(string)
    if !ok {
        // 
    }

    // 2. 发送异步延时消息
    // message code是test
    // 位置参数为空,关键字参数为info="this is a delay 测试 message"
    // 延时时间是3600s
    res, err = rabbitmq.SendTask("test", []interface{}{}, map[string]interface{}{"info": "this is a delay 测试 message"}, 3600)
    if err != nil {
       // handle error
    }
    num, ok := res.(float64)
    if !ok {
        // 
    }

}
发送消息函数参数说明
SendTask(msgCode string, args []interface{}, kwargs map[string]interface{}, delayTime ...int) (interface{}, error) 

参数:
msgCode: message_code,消息码
args: 位置参数,发送的异步消息数据
kwargs: 关键字参数,发送的异步消息数据
delayTime: 可选参数,延迟时间,当发送延时消息时,需要传递该参数

返回:
第一个返回数据类型是接口类型,如果需要返回的task_id,需要先进行断言。
非延时消息返回类型是string
延时消息返回类型是float64

消费异步消息

注意
必须配置的环境变量
SC_CONSUMER_RETRY_TIMES 
SC_CONSUMER_INTERVAL_TIME
SC_BROKER_SERVICE_HOST
SC_BROKER_SERVICE_PORT
SC_BROKER_USERNAME
SC_BROKER_PASSWORD
SC_BROKER_VIRTUAL_HOST
SC_BACKEND_SERVICE_SVC
SC_BACKEND_SERVICE_API
消费异步消息示例
import (
    rq "github.com/hanguangbaihuo/sparrow_cloud_go/rabbitmq"
)

// 1. 消费者函数
func testConsumerFunc(args []interface{}, kwargs map[string]interface{}) error {
    // args是接收到的位置参数,kwargs是接收到的关键字参数
    log.Printf("args %v\n", args)
    log.Printf("kwargs %v\n", kwargs)
    // return nil
    return errors.New("test failure situation")
}

// 2. messageCode对应的消费者函数
var funcMap = map[string]rq.Func{
    "test7": testConsumerFunc,
}

func main() {
    // 3. 初始化消费者, 第一个参数是消费者队列名queue, 第二个参数是第2步中的对应关系变量
    w := rq.New("test9", funcMap)
    // 4. 开始消费
    w.Run()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SendTask

func SendTask(msgCode string, args []interface{}, kwargs map[string]interface{}, delayTime ...int) (interface{}, error)

Types

type Func added in v0.9.0

type Func func(args []interface{}, kwargs map[string]interface{}) error

type InputData

type InputData struct {
	Code         string                 `json:"code"`
	Args         []interface{}          `json:"args"`
	Kwargs       map[string]interface{} `json:"kwargs"`
	DeliveryMode string                 `json:"delivery_mode"`
	DelayTime    int                    `json:"delay_time"`
	ParentOpt    ParentOptions          `json:"parent_options"`
}

type MessageBoby added in v0.9.0

type MessageBoby struct {
	Name   string                 `json:"name"`
	Args   []interface{}          `json:"args"`
	Kwargs map[string]interface{} `json:"kwargs"`
}

type ParentOptions

type ParentOptions struct {
	ID   string `json:"id"`
	Code string `json:"code"`
}

type Result added in v0.9.0

type Result struct {
	Status    string
	TraceBack string
}

type Worker added in v0.9.0

type Worker struct {
	BrokerHost        string
	BrokerPort        string
	BrokerUserName    string
	BrokerPassword    string
	BrokerVirtualHost string
	BackendSvc        string
	BackendApi        string
	RetryTimes        int64
	IntervalTime      int64
	QueueName         string
	FuncMap           map[string]Func
}

func New added in v0.9.0

func New(queueName string, funcMap map[string]Func) *Worker

func (*Worker) Run added in v0.9.0

func (w *Worker) Run()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL