windlord

module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2020 License: Apache-2.0

README

windlord

Windlord include two package,amzonriver and cetacean.

amazonriver

Amazonriver is rocketmq producer listening pg table changes,base on pg replication slot

Cetacean

Cetacean is apache rocket mq consumer lib for GO.

Usage

amazonriver
  • amazonriver
package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/xiaojiaoyu100/windlord/amazonriver/conf"
	"github.com/xiaojiaoyu100/windlord/amazonriver/river"
)

var (
	sig = make(chan os.Signal)
)

func main() {
	config := &conf.Conf{}
	amazon := river.New(config)
	if err := amazon.Start(); err != nil {
		log.Panicf("start amazon river error: %v", err.Error())
		return
	}
	signal.Notify(sig, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
	for {
		select {
		case <-sig:
			log.Print("windlord producer stop!")
			amazon.Stop()
			return
		}
	}
}
alirmq

alirmq 是对rocketMQ 进行的一个封装, 分为producer和consumer,使用示例如下:

  • producer

一般一个应用实例只有一个producer实例来

func main() {
    // 设置链接参数
	cred := &alirmq.RocketMQCredentials{
		NameServer: "",
		AccessKey:  "",
		SecretKey:  "",
		NameSpace:  "",
	}

	producer, err := alirmq.NewProducer(cred)
	if err != nil {
		fmt.Printf("new producer error = %v\n", err)
		return
	}

	if err := producer.Start(); err != nil {
		fmt.Printf("start producer error = %v\n", err)
		return
	}
    
    // 使用同步的方式发布消息
	for i := 0; i < 10; i++ {
		body := []byte(fmt.Sprintf("tag - %v", i))
		r := producer.Send(
			"TagTestTopic",
			body,
			alirmq.WithTag("tag"), // 设置消息的tag,可选
			alirmq.WithKey(fmt.Sprintf("key-%v", i))) // 设置消息的key,全局唯一,推荐设置
		fmt.Printf("MessageId = %v, error = %v\n", r.MessageId(), r.Err)
	}

    // 使用异步发送的形式,需要注册回调函数
	callback := func(ctx context.Context, result *alirmq.SendResult) {
		fmt.Printf("MessageId = %v, error = %v\n", result.MessageId(), result.Err)
	}
	for i := 10; i < 20; i++ {
		body := []byte(fmt.Sprintf("tag - %v", i))
		err := producer.SendAsync(
			"TagTestTopic",
			body,
			callback,
			alirmq.WithTag("tag"),
			alirmq.WithKey(fmt.Sprintf("key-%v", i)))
		fmt.Printf("sendAsync error=%v\n", err)
	}

	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("shutdown server ...")


	if err := producer.Stop(); err != nil {
		fmt.Printf("error = %v", err)
	}
}
  • consumer

同样的,推荐一个启动的应用实例只有一个consumer, 一个consumer可以订阅多个topic,但需要注意的是,如果同一个消费者组(相同groupName),要保持订阅关系一致,即订阅的topic和tag要保持一致。

func main() {
	cred := &alirmq.RocketMQCredentials{
		NameServer: "",
		AccessKey:  "",
		SecretKey:  "",
		NameSpace:  "",
		GroupName: "",
	}

	consumer, err := alirmq.NewConsumer(
		cred,
		alirmq.WithMaxTopicCount(12),)
	if err != nil {
		fmt.Printf("new consumer error = %v", err)
		return
	}

	fn := func(msg *alirmq.M) error {
		fmt.Printf("Topic=%v, tag=%v, key=%v, body=%v", msg.Topic, msg.Tag, msg.Key, string(msg.Body))
		return nil
	}

	// 如果处理函数返回的error不是nil, 会进行重试
	retryFn := func(msg *alirmq.M) error {
		fmt.Printf("Topic=%v, tag=%v, key=%v, body=%v", msg.Topic, msg.Tag, msg.Key, string(msg.Body))
		return fmt.Errorf("error")
	}

	if err = consumer.Subscribe("StudentTopic", "CommonUserStudent", fn); err != nil {
		fmt.Printf("consumer error = %v\n", err)
	}
	
	// 订阅多个tag 可以使用 || 符号进行组合
	if err = consumer.Subscribe("TagTestTopic", "A||B||C", multiTagFn); err != nil {
		fmt.Printf("consumer error = %v\n", err)
	}

	if err = consumer.Subscribe("RetryTopic", "", retryFn); err != nil {
		fmt.Printf("consumer error = %v\n", err)
	}

	err = consumer.Start()
	if err != nil {
		fmt.Printf("start error = %v\n", err)
	}

	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("shutdown server ...")

	if err := consumer.Stop(); err != nil {
		log.Fatalf("server shutdown error = %v\n", err)
	}
}

Directories

Path Synopsis
amazonriver
conf
Source from https://github.com/hellobike/amazonriver * Copyright 2018 Shanghai Junzheng Network Technology Co.,Ltd.
Source from https://github.com/hellobike/amazonriver * Copyright 2018 Shanghai Junzheng Network Technology Co.,Ltd.
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL