vnats

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

vnats library

This library acts as a facade in front of NATS and will be used for the internal Middleware.

Usage

For using vnats you will need to

  1. Establish connection to NATS
  2. Create Publisher/ Subscriber
  3. Profit!
Publisher

The publisher sends a slice of bytes []byte to a subject. If a struct or different type should be sent, the user has to (un-)marshal the payload.

Example
package main

import (
	"encoding/json"
	"github.com/fond-of-vertigo/vnats"
	"log"
	"fmt"
	"time"
)

type Product struct {
	Name        string
	Price       string
	LastUpdated time.Time
}
// Define NATS server/ cluster
var server = []string{"nats://ruser:T0pS3cr3t@localhost:4222"}

func main() {
	// Establish connection to NATS server
	conn, err := vnats.Connect(server, vnats.WithLogger(log.Printf))
	if err != nil {
		log.Fatal(err.Error())
	}
	// Close NATS connection deferred
	defer func(conn *vnats.Connection) {
		if err := conn.Close(); err != nil {
			log.Fatalf("NATS connection could not be closed: %v", err)
		}
	}(conn)

	// Create publisher bound to stream `PRODUCTS`
	pub, err := conn.NewPublisher(vnats.PublisherArgs{StreamName: "PRODUCTS"})
	if err != nil {
		log.Fatalf("Could not create publisher: %v", err)
	}

	p := Product{
		Name:        "Example Product",
		Price:       "12,34",
		LastUpdated: time.Now(),
	}

	// Since vnats needs a slice of bytes, the products is converted via the json marshaller
	productToBytes, err := json.Marshal(p)
	if err != nil {
		panic(err)
	}

	// Publish message to stream `PRODUCTS.PRICES` with a context bound, unique message ID 
	// msgID is used for deduplication
	msgID := fmt.Sprintf("%s-%s", p.Name, p.LastUpdated)
	msg := vnats.NewMsg("PRODUCTS.PRICES", msgID, productToBytes)
	if err := pub.Publish(msg); err != nil {
		log.Fatalf("Could not publish %v: %v", p, err)
	}
}

Subscriber

We use a pull-based subscriber by default, which scales horizontally. The subscriber is asynchronous and pulls continuously for new messages. A message handler is needed to process each message. The message will be passed as a slice of bytes []byte.

Important: The MsgHandler MUST finish its task under 30 seconds. Longer tasks must be only triggered and executed asynchronously.

Example
package main

import (
	"encoding/json"
	"github.com/fond-of-vertigo/vnats"
	"log"
	"time"
	"os"
	"os/signal"
)

type Product struct {
	Name        string
	Price       string
	LastUpdated time.Time
}

// Define NATS server/ cluster
var server = []string{"nats://ruser:T0pS3cr3t@localhost:4222"}

func main() {
	// Establish connection to NATS server
	conn, err := vnats.Connect(server, vnats.WithLogger(log.Printf))
	if err != nil {
		log.Fatal(err.Error())
	}
	
	// Unsubscribe to all open subscriptions and close NATS connection deferred
	defer func(conn *vnats.Connection) {
		if err := conn.Close(); err != nil {
			log.Fatalf("NATS connection could not be closed: %v", err)
		}
	}(conn)

	// Create Pull-Subscriber bound to consumer `EXAMPLE_CONSUMER` 
	// and the subject `PRODUCTS.PRICES`
	sub, err := conn.NewSubscriber(vnats.SubscriberArgs{
		ConsumerName: "EXAMPLE_CONSUMER",
		Subject:      "PRODUCTS.PRICES",
	})
	if err != nil {
		log.Fatalf("Could not create subscriber: %v", err)
	}

	// Start subscribing with specify messageHandler
	if err := sub.Start(msgHandler); err != nil {
		log.Fatalf(err.Error())
	}

	// Wait for stop signal (e.g. ctrl-C)
	waitForStopSignal()
}

// msgHandler returns the data in a slice of bytes inside the Msg struct.
func msgHandler(msg vnats.Msg) error {
	var p Product
	if err := json.Unmarshal(msg.Data, &p); err != nil {
		return err
	}
	log.Printf("Received product: %v", p)
	return nil
}

func waitForStopSignal() {
	// Setting up signal capturing
	stop := make(chan os.Signal, 1)
	signal.Notify(stop, os.Interrupt)

	// Waiting for SIGINT (pkill -2)
	<-stop
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NoLogging = func(_ string, _ ...interface{}) {}

NoLogging is the default LogFunc. It logs nothing.

Functions

This section is empty.

Types

type Config added in v0.12.0

type Config struct {
	Password string
	Username string
	Hosts    string
	Port     int
}

Config is a struct to hold the configuration of a NATS connection.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is the main entry point for the library. It is used to create Publishers and Subscribers. It is also used to close the connection to the NATS server/ cluster.

func Connect

func Connect(servers []string, options ...Option) (*Connection, error)

Connect returns Connection to a NATS server/ cluster and enables Publisher and Subscriber creation.

func MustConnectToNATS added in v0.12.0

func MustConnectToNATS(config *Config, logger *slog.Logger) *Connection

MustConnectToNATS to NATS Server. This function panics if the connection could not be established. servers: List of NATS servers in the form of "nats://<user:password>@<host>:<port>" logger: an optional slog.Logger instance

func (*Connection) Close added in v0.10.0

func (c *Connection) Close() error

Close closes the NATS Connection and drains all subscriptions.

func (*Connection) NewPublisher

func (c *Connection) NewPublisher(args PublisherArgs) (*Publisher, error)

NewPublisher creates a new Publisher that publishes to a NATS stream.

func (*Connection) NewSubscriber

func (c *Connection) NewSubscriber(args SubscriberArgs) (*Subscriber, error)

NewSubscriber creates a new Subscriber that subscribes to a NATS stream.

type Header map[string][]string

A Header represents the key-value pairs.

type LogFunc added in v0.10.0

type LogFunc func(format string, a ...interface{})

LogFunc is a generic logging function to incorporate the logging of the library into the application. It can be set via the Option of a Connection using WithLogger(l LogFunc).

type Msg added in v0.10.0

type Msg struct {
	// Subject represents the destination subject name, like "PRODUCTS.new"
	Subject string

	// Reply represents an optional subject name where a reply message should be sent to.
	// This value is just distributed, whether the response is sent to the specified subject depends on the Subscriber.
	Reply string

	// MsgID represents a unique value for the message, like a hash value of Data.
	// Semantically equal messages must lead to the same MsgID at any time.
	// E.g. two messages with the same Data must have the same MsgID.
	//
	// The MsgID is used for deduplication.
	MsgID string

	// Data represents the raw byte data to send. The data is sent as-is.
	Data []byte

	// Header represents the optional Header for the message.
	Header Header
}

Msg contains the arguments publishing a new message. By using a struct we are open for adding new arguments in the future and the caller can omit arguments where the default value is OK.

func NewMsg added in v0.10.0

func NewMsg(subject, id string, data []byte) *Msg

NewMsg constructs a new Msg with the given data.

type MsgHandler

type MsgHandler func(msg Msg) error

MsgHandler is the type of function the Subscriber has to implement to process an incoming message.

type Option added in v0.10.0

type Option func(*Connection)

Option is an optional configuration argument for the Connect() function.

func WithLogger added in v0.10.0

func WithLogger(log LogFunc) Option

WithLogger sets the logger using the generic LogFunc function. This option can be passed in the Connect function. Without this option, the default LogFunc is a nop function.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is a NATS publisher that publishes to a NATS stream.

func (*Publisher) Publish

func (p *Publisher) Publish(msg *Msg) error

Publish publishes the message (data) to the given subject.

type PublisherArgs added in v0.10.0

type PublisherArgs struct {
	// StreamName is the name of the stream like "PRODUCTS" or "ORDERS".
	// If it does not exist, the stream will be created.
	StreamName string
}

PublisherArgs contains the arguments for creating a new Publisher. By using a struct we are open for adding new arguments in the future and the caller can omit arguments where the default value is OK.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber subscribes to a NATS consumer and pulls messages to handle by MsgHandler.

func (*Subscriber) Start added in v0.10.0

func (s *Subscriber) Start(handler MsgHandler) (err error)

Start subscribes to the NATS consumer and starts a go-routine that handles pulled messages.

func (*Subscriber) Stop added in v0.10.0

func (s *Subscriber) Stop() error

Stop unsubscribes the consumer from the NATS stream.

type SubscriberArgs added in v0.10.0

type SubscriberArgs struct {
	// ConsumerName contains the name of the consumer. By default, this should be the
	// name of the service.
	ConsumerName string

	// Subject defines which subjects of the stream should be subscribed.
	// Examples:
	//  "ORDERS.new" -> subscribe subject "new" of stream "ORDERS"
	//  "ORDERS.>"   -> subscribe all subjects in any level of stream "ORDERS".
	//  "ORDERS.*"   -> subscribe all direct subjects of stream "ORDERS", like "ORDERS.new", "ORDERS.processed",
	//                  but not "ORDERS.new.error".
	Subject string

	// Mode defines the constraints of the subscription. Default is MultipleSubscribersAllowed.
	// See SubscriptionMode for details.
	Mode SubscriptionMode
}

SubscriberArgs contains the arguments for creating a new Subscriber. By using a struct we are open for adding new arguments in the future and the caller can omit arguments where the default value is OK.

type SubscriptionMode

type SubscriptionMode int

SubscriptionMode defines how the consumer and its Subscriber are configured. This mode must be set accordingly to the use-case. If the order of messages should be strictly ordered, SingleSubscriberStrictMessageOrder should be used. If the message order is not important, but horizontal scaling is, use MultipleSubscribersAllowed.

const (
	// MultipleSubscribersAllowed mode (default) enables multiple Subscriber of one consumer for horizontal scaling.
	// The message order cannot be guaranteed when messages get NAKed/ MsgHandler for message returns error.
	MultipleSubscribersAllowed SubscriptionMode = iota

	// SingleSubscriberStrictMessageOrder mode enables strict order of messages. If messages get NAKed/ MsgHandler for
	// message returns error, the Subscriber of consumer will retry the failed message until resolved. This blocks the
	// entire consumer, so that horizontal scaling is not effectively possible.
	SingleSubscriberStrictMessageOrder
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL