windlord
Windlord include two package,amzonriver and cetacean.
amazonriver
Amazonriver is rocketmq producer listening pg table changes,base on pg replication slot
Cetacean
Cetacean is apache rocket mq consumer lib for GO.
Link List
Usage
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/xiaojiaoyu100/windlord/amazonriver/conf"
"github.com/xiaojiaoyu100/windlord/amazonriver/river"
)
var (
sig = make(chan os.Signal)
)
func main() {
config := &conf.Conf{}
amazon := river.New(config)
if err := amazon.Start(); err != nil {
log.Panicf("start amazon river error: %v", err.Error())
return
}
signal.Notify(sig, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-sig:
log.Print("windlord producer stop!")
amazon.Stop()
return
}
}
}
package main
import (
"fmt"
"testing"
"time"
"github.com/xiaojiaoyu100/windlord/cetacean"
)
func init() {
rocketmqConf := &cetacean.RocketMQCredentials{
Endpoint: "",
AccessKey: "",
SecretKey: "",
InstanceID: "",
}
cetacean.InitProducerGroup(rocketmqConf)
}
func main() {
msg := &cetacean.MQProducerMessage{
Topic: "BenchmarkTest",
GroupID: "GID_P_Benchmark",
Body: "for test",
}
for i := 0; i < 5; i++ {
err := cetacean.SendSyncMessage(msg)
if err != nil {
fmt.Printf("publish err: %v", err.Error())
return
}
}
}
package main
import (
"log"
"github.com/xiaojiaoyu100/windlord/cetacean"
)
func main() {
consumerGroup := &cetacean.ConsumerGroup{
GroupID: "GID_C-CommonUserStudent",
Topic: "CommonUserStudent",
}
credentials := &cetacean.RocketMQCredentials{
Endpoint: "endpoint",
AccessKey: "accesskey",
SecretKey: "secretkey",
InstanceID: "instanceid",
}
err := cetacean.NewConsumer(
cetacean.WithConsumeGroup(consumerGroup),
cetacean.WithCredentials(credentials),
cetacean.WithHandler(func(msg string) error {
log.Printf("receive msg: %v",msg)
return nil
}),
)
if err != nil {
log.Printf("new consumer error: %v", err.Error())
}
select {}
}