windlord
Windlord include two package,amzonriver and cetacean.
amazonriver
Amazonriver is rocketmq producer listening pg table changes,base on pg replication slot
Cetacean
Cetacean is apache rocket mq consumer lib for GO.
Link List
Usage
amazonriver
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/xiaojiaoyu100/windlord/amazonriver/conf"
"github.com/xiaojiaoyu100/windlord/amazonriver/river"
)
var (
sig = make(chan os.Signal)
)
func main() {
config := &conf.Conf{}
amazon := river.New(config)
if err := amazon.Start(); err != nil {
log.Panicf("start amazon river error: %v", err.Error())
return
}
signal.Notify(sig, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-sig:
log.Print("windlord producer stop!")
amazon.Stop()
return
}
}
}
alirmq
alirmq 是对rocketMQ 进行的一个封装, 分为producer和consumer,使用示例如下:
一般一个应用实例只有一个producer实例来
func main() {
// 设置链接参数
cred := &alirmq.RocketMQCredentials{
NameServer: "",
AccessKey: "",
SecretKey: "",
NameSpace: "",
}
producer, err := alirmq.NewProducer(cred)
if err != nil {
fmt.Printf("new producer error = %v\n", err)
return
}
if err := producer.Start(); err != nil {
fmt.Printf("start producer error = %v\n", err)
return
}
// 使用同步的方式发布消息
for i := 0; i < 10; i++ {
body := []byte(fmt.Sprintf("tag - %v", i))
r := producer.Send(
"TagTestTopic",
body,
alirmq.WithTag("tag"), // 设置消息的tag,可选
alirmq.WithKey(fmt.Sprintf("key-%v", i))) // 设置消息的key,全局唯一,推荐设置
fmt.Printf("MessageId = %v, error = %v\n", r.MessageId(), r.Err)
}
// 使用异步发送的形式,需要注册回调函数
callback := func(ctx context.Context, result *alirmq.SendResult) {
fmt.Printf("MessageId = %v, error = %v\n", result.MessageId(), result.Err)
}
for i := 10; i < 20; i++ {
body := []byte(fmt.Sprintf("tag - %v", i))
err := producer.SendAsync(
"TagTestTopic",
body,
callback,
alirmq.WithTag("tag"),
alirmq.WithKey(fmt.Sprintf("key-%v", i)))
fmt.Printf("sendAsync error=%v\n", err)
}
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("shutdown server ...")
if err := producer.Stop(); err != nil {
fmt.Printf("error = %v", err)
}
}
同样的,推荐一个启动的应用实例只有一个consumer, 一个consumer可以订阅多个topic,但需要注意的是,如果同一个消费者组(相同groupName),要保持订阅关系一致,即订阅的topic和tag要保持一致。
func main() {
cred := &alirmq.RocketMQCredentials{
NameServer: "",
AccessKey: "",
SecretKey: "",
NameSpace: "",
GroupName: "",
}
consumer, err := alirmq.NewConsumer(
cred,
alirmq.WithMaxTopicCount(12),)
if err != nil {
fmt.Printf("new consumer error = %v", err)
return
}
fn := func(msg *alirmq.M) error {
fmt.Printf("Topic=%v, tag=%v, key=%v, body=%v", msg.Topic, msg.Tag, msg.Key, string(msg.Body))
return nil
}
// 如果处理函数返回的error不是nil, 会进行重试
retryFn := func(msg *alirmq.M) error {
fmt.Printf("Topic=%v, tag=%v, key=%v, body=%v", msg.Topic, msg.Tag, msg.Key, string(msg.Body))
return fmt.Errorf("error")
}
if err = consumer.Subscribe("StudentTopic", "CommonUserStudent", fn); err != nil {
fmt.Printf("consumer error = %v\n", err)
}
// 订阅多个tag 可以使用 || 符号进行组合
if err = consumer.Subscribe("TagTestTopic", "A||B||C", multiTagFn); err != nil {
fmt.Printf("consumer error = %v\n", err)
}
if err = consumer.Subscribe("RetryTopic", "", retryFn); err != nil {
fmt.Printf("consumer error = %v\n", err)
}
err = consumer.Start()
if err != nil {
fmt.Printf("start error = %v\n", err)
}
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("shutdown server ...")
if err := consumer.Stop(); err != nil {
log.Fatalf("server shutdown error = %v\n", err)
}
}