mp_nats

package module
v0.0.0-...-8b26049 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

README

nats

Description

The useful library that makes working with Nats as easy as possible.

Installation

Run the following command to install the package:

go get github.com/minipkg/nats

Basic Usage

Publisher
import (
    "log"
    "sync"
    "time"
    
    "github.com/labstack/gommon/random"
    "github.com/minipkg/nats"
    "github.com/nats-io/nats.go"
)

const (
    streamName  = "tst"
    subjectName = "test.first"
)

func main() {
    n, err := mp_nats.New(&mp_nats.Config{})
    if err != nil {
        log.Fatal(err)
    }
    
    _, err = n.AddStreamIfNotExists(&nats.StreamConfig{
    Name:     streamName,
    Subjects: []string{"test.>"},
    })
    if err != nil {
        log.Fatalf("natsWriter error: %q", err.Error())
        return
    }
    
    wg := &sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        tickWriter(wg, n.Js())
    }
    wg.Wait()
}

func tickWriter(wg *sync.WaitGroup, js nats.JetStreamContext) {
    go func() {
        defer wg.Done()
        s := random.String(8)
        t := time.NewTicker(time.Second)
        defer t.Stop()
        
        for {
            select {
            case i := <-t.C:
                js.Publish(subjectName, []byte(s+": hello - "+i.String()))
            }
        }
    }()
}
Push Consumer
import (
	"context"
	"fmt"
	"log"

	"github.com/minipkg/nats"
	"github.com/nats-io/nats.go"
)

const (
	queueGroupName = "groupExample"
	consumerName   = "consumerExample"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	n, err := mp_nats.New(&mp_nats.Config{})
	if err != nil {
		log.Fatal(err)
	}

	_, err = n.AddStreamIfNotExists(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{"test.>"},
	})
	if err != nil {
		log.Fatalf("natsWriter error: %q", err.Error())
	}

	_, _, delFunc, err := n.AddPushConsumerIfNotExists(streamName, &nats.ConsumerConfig{
		Name:    consumerName,
		Durable: consumerName,
		//DeliverGroup:  queueGroupName, // if you want queue group
		FilterSubject: subjectName,
	}, natsMsgHandler)
	if err != nil {
		log.Fatalf("natsWriter error: %q", err.Error())
	}
	defer func() {
		if err = delFunc(); err != nil {
			log.Fatalf("delConsumerAndSubscription error: %q", err.Error())
		}
	}()

	<-ctx.Done()
}

func natsMsgHandler(msg *nats.Msg) {
    msg.Ack()
	fmt.Println(string(msg.Data))
}
Pull Consumer
import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/pkg/errors"

	"github.com/minipkg/nats"
	"github.com/nats-io/nats.go"
)

const (
	consumerName        = "consumerExample"
	defaultRequestBatch = 1000
	defaultMaxWait      = 3 * time.Second
	duration            = 2 * time.Second
)


func main() {
	ctx, cancel := context.WithCancel(context.Background())

	n, err := mp_nats.New(&mp_nats.Config{})
	if err != nil {
		log.Fatal(err)
	}

	_, err = n.AddStreamIfNotExists(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{"test.>"},
	})
	if err != nil {
		log.Fatalf("natsWriter error: %q", err.Error())
	}

	_, subs, delFunc, err := n.AddPullConsumerIfNotExists(streamName, &nats.ConsumerConfig{
		Name:    consumerName,
		Durable: consumerName,
		FilterSubject: subjectName,
	})
	if err != nil {
		log.Fatalf("natsWriter error: %q", err.Error())
	}
	defer func() {
		if err = delFunc(); err != nil {
			log.Fatalf("delConsumerAndSubscription error: %q", err.Error())
		}
	}()

    err = listenNatsSubscription(ctx, subs, 0)
	if err != nil {
		log.Fatalf("listenNatsSubscription error: %q", err.Error()))
		return
	}
}

func listenNatsSubscription(ctx context.Context, subs *nats.Subscription, requestBatch uint) error {
	if requestBatch == 0 {
		requestBatch = defaultRequestBatch
	}
OuterLoop:
	for {
		select {
		case <-ctx.Done():
			break OuterLoop
		default:
		}

		bmsgs, err := subs.Fetch(int(requestBatch), nats.MaxWait(defaultMaxWait))
		if err != nil {
			if !errors.Is(err, nats.ErrTimeout) {
				return err
			}

			t := time.NewTimer(duration)
			select {
			case <-ctx.Done():
				break OuterLoop
			case <-t.C:
			}

		}
		for _, msg := range bmsgs {
			if err = msg.Ack(); err != nil {
				return err
			}
			natsMsgHandler(msg)
		}
	}
	return nil
}

func natsMsgHandler(msg *nats.Msg) {
	fmt.Println(string(msg.Data))
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PullSubscriptionProcessing

func PullSubscriptionProcessing(ctx context.Context, subs *gonats.Subscription, msgHandler gonats.MsgHandler, duration time.Duration, requestBatch uint) error

Types

type Config

type Config struct {
	Servers             string
	MaxReconnects       int
	ReconnectWaitInSecs time.Duration
	Login               string
	Password            string
}

type Nats

type Nats interface {
	Conn() *gonats.Conn
	Js() gonats.JetStreamContext
	Close()

	IsStreamExists(name string) (bool, *gonats.StreamInfo, error)
	AddStreamIfNotExists(config *gonats.StreamConfig) (*gonats.StreamInfo, error)
	AddPushConsumerIfNotExists(streamName string, config *gonats.ConsumerConfig, msgHandler gonats.MsgHandler) (consumerInfo *gonats.ConsumerInfo, sub *gonats.Subscription, delConsumerAndSubscription func() error, err error)
	AddPullConsumerIfNotExists(streamName string, config *gonats.ConsumerConfig) (consumerInfo *gonats.ConsumerInfo, sub *gonats.Subscription, delConsumerAndSubscription func() error, err error)
}

func New

func New(config *Config) (Nats, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL