ntc-gnats

command module
v0.0.0-...-8c316eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

README

ntc-gnats

ntc-gnats is a module NATS golang client.

Install

go get -u github.com/congnghia0609/ntc-gnats

1. Publish-Subscribe

Publisher
////// Publish
//// Case 1: PubSub.
//// Cach 1.1.
name := "notify"
subj := "msg.test"
for i:=0; i<10; i++ {
    msg := "hello " + strconv.Itoa(i)
    npub.Publish(name, subj, msg)
    log.Printf("Published PubSub[%s] : '%s'\n", subj, msg)
}

//// Cach 1.2.
name := "notify"
subj := "msg.test"
np := npub.GetInstance(name)
for i := 0; i < 10; i++ {
    msg := "hello "+strconv.Itoa(i)
    np.Publish(subj, msg)
    log.Printf("Published PubSub[%s] : '%s'\n", subj, msg)
}
Subscriber
func StartSimpleSubscriber() {
	name := "chat"
	ns := nsub.NewNSubscriber(name)
	processChan := make(chan *nats.Msg)
	ns.Start(processChan)
	// go-routine process message.
	go func() {
		for {
			select {
			case msg := <-processChan:
				// Process message in here.
				log.Printf("SimpleSubscriber[#%s] Received on PubSub [%s]: '%s'", ns.Name, ns.Subject, string(msg.Data))
			}
		}
	}()
	fmt.Printf("SimpleSubscriber[#%s] start...\n", ns.Name)
}

2. Queue Groups

Queue Worker
func StartSimpleWorker() {
	name := "email"
	nw := nworker.NewNWorker(name)
	processChan := make(chan *nats.Msg)
	nw.Start(processChan)
	// go-routine process message.
	go func() {
		for {
			select {
			case msg := <-processChan:
				// Process message in here.
				log.Printf("SimpleNWorker[%s][#%s] Received on QueueWorker[%s]: '%s'", nw.Group, nw.Name, nw.Subject, string(msg.Data))
			}
		}
	}()
	fmt.Printf("SimpleNWorker[#%s] start...\n", nw.Name)
}

3. Request-Reply

Request
////// Request
//// Cach 1.
name := "dbreq"
subj := "reqres"
for i:=0; i<10; i++ {
    payload := "this is request " + strconv.Itoa(i)
    msg, err := nreq.Request(name, subj, payload)
    if err != nil {
        log.Fatalf("%v for request", err)
    }
    log.Printf("NReq Published [%s] : '%s'", subj, payload)
    log.Printf("NReq Received  [%v] : '%s'", msg.Subject, string(msg.Data))
}

//// Cach 2.
name := "dbreq"
subj := "reqres"
nr := nreq.GetInstance(name)
for i := 0; i < 10; i++ {
    payload := "this is request "+strconv.Itoa(i)
    msg, err := nr.Request(subj, payload)
    if err != nil {
        log.Fatalf("%v for request", err)
    }
    log.Printf("NReq[%s] Published [%s] : '%s'", nr.Name, subj, payload)
    log.Printf("NReq[%s] Received  [%v] : '%s'", nr.Name, msg.Subject, string(msg.Data))
}
Reply
func StartSimpleResponse() {
	name := "dbres"
	nrs := nres.NewNResponse(name)
	processChan := make(chan *nats.Msg)
	nrs.Start(processChan)
	// go-routine process message.
	go func() {
		reply := "this is response ==> "
		for {
			select {
			case msg := <-processChan:
				// Process message in here.
				log.Printf("NRes[%s][#%s] Received on QueueNRes[%s]: '%s'", nrs.Group, nrs.Name, nrs.Subject, string(msg.Data))
				datares := reply + string(msg.Data)
				msg.Respond([]byte(datares))
				log.Printf("NRes[%s][#%s] Reply on QueueNRes[%s]: '%s'", nrs.Group, nrs.Name, nrs.Subject, datares)
			}
		}
	}()
	fmt.Printf("SimpleResponse[#%s] start...\n", nrs.Name)
}
File main.go
func InitNConf() {
	_, b, _, _ := runtime.Caller(0)
	wdir := filepath.Dir(b)
	fmt.Println("wdir:", wdir)
	nconf.Init(wdir)
}

/** https://github.com/nats-io/nats.go */
/**
 * cd ~/go-projects/src/ntc-gnats
 * go run main.go
 */
func main() {
	// Init NConf
	InitNConf()

	//// Start Simple Subscriber
	for i := 0; i < 2; i++ {
		StartSimpleSubscriber()
	}

	// Start Simple Worker
	for i := 0; i < 2; i++ {
		StartSimpleWorker()
	}

	// Start Simple Response
	for i := 0; i < 2; i++ {
		StartSimpleResponse()
	}

	////// Publish
	//// Case 1: PubSub.
	////// Cach 1.1.
	//name := "notify"
	//subj := "msg.test"
	//for i:=0; i<10; i++ {
	//	msg := "hello " + strconv.Itoa(i)
	//	npub.Publish(name, subj, msg)
	//	log.Printf("Published PubSub[%s] : '%s'\n", subj, msg)
	//}
	//// Cach 1.2.
	name := "notify"
	subj := "msg.test"
	np := npub.GetInstance(name)
	for i := 0; i < 10; i++ {
		msg := "hello " + strconv.Itoa(i)
		np.Publish(subj, msg)
		log.Printf("Published PubSub[%s] : '%s'\n", subj, msg)
	}

	//// Case 2: Queue Group.
	namew := "notify"
	subjw := "worker.email"
	npw := npub.GetInstance(namew)
	for i := 0; i < 10; i++ {
		msg := "hello " + strconv.Itoa(i)
		npw.Publish(subjw, msg)
		log.Printf("Published QueueWorker[%s] : '%s'\n", subjw, msg)
	}

	////// Request
	////// Cach 1.
	//name := "dbreq"
	//subj := "reqres"
	//for i:=0; i<10; i++ {
	//	payload := "this is request " + strconv.Itoa(i)
	//	msg, err := nreq.Request(name, subj, payload)
	//	if err != nil {
	//		log.Fatalf("%v for request", err)
	//	}
	//	log.Printf("NReq Published [%s] : '%s'", subj, payload)
	//	log.Printf("NReq Received  [%v] : '%s'", msg.Subject, string(msg.Data))
	//}
	//// Cach 2.
	namer := "dbreq"
	subjr := "reqres"
	nr := nreq.GetInstance(namer)
	for i := 0; i < 10; i++ {
		payload := "this is request " + strconv.Itoa(i)
		msg, err := nr.Request(subjr, payload)
		if err != nil {
			log.Fatalf("%v for request", err)
		}
		log.Printf("NReq[%s] Published [%s] : '%s'", nr.Name, subjr, payload)
		log.Printf("NReq[%s] Received  [%v] : '%s'", nr.Name, msg.Subject, string(msg.Data))
	}

	// Hang thread Main.
	s := make(chan os.Signal, 1)
	// We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C) SIGKILL, SIGQUIT or SIGTERM (Ctrl+/) will not be caught.
	signal.Notify(s, os.Interrupt)
	// Block until we receive our signal.
	<-s
	log.Println("################# End Main #################")
}

License

This code is under the Apache License v2.

Documentation

Overview

*

*
* @author nghiatc
* @since Dec 6, 2019

*

*
* @author nghiatc
* @since Dec 6, 2019

*

*
* @author nghiatc
* @since Dec 6, 2019

*

*
* @author nghiatc
* @since Dec 6, 2019

*

*
* @author nghiatc
* @since Dec 6, 2019

Directories

Path Synopsis
*
*
*
*
*
*
*
*
*

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL