nats

package module
v0.0.0-...-347df5d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2023 License: MIT Imports: 6 Imported by: 0

README

Openware Nats

openware common nats package.

first we need to initialize nats connection

connectionString = "localhost:4222"
nc, err := nats.InitNats(connectionString)

we need nc for initializing handlers and subscribers

Publisher

we have publisher for publishing event using either nats or Jetstream

to intialize nats and publish an event:

pub, err := nats.NewNatsEventPublisher(nc)
// handle error
pub.Publish("foo.bar", []byte("baz"))

to initialize jetstream and publish an event:

js, err := nats.NewJsEventPublisher(nc)
// handle error
js.Publish("foo.bar", []byte("baz"))

keep in mind that before publishing an event you'll also need to have a stream of the topic. to create a stream:

err := js.CreateNewEventStream("foo", []string{"foo.baz", "foo.bar"})
// handle error
Subscriber

With subscribers we can subscribe to different topics. If we subscribe using queue, subscribers with the same group name will receive an event once.

to intiialize nats and subscribe to a topic. we can pass <-chan os.Signal for subscribing to shutdown event. so at the end it can cleanup

handler := nats.NewNatsHandler(nc, terminationChannel, nats.NewHandlerDefaultConfig())
msgChan := make(chan *nats.Msg)
handler.SubscribeToQueueUsingChannel("foo.baz", "bar", msgChannel)

Documentation

Overview

Package nats is used for handling nats (or nats jetstream) pub/sub

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitEmbededNats

func InitEmbededNats() (*nats.Conn, error)

InitEmbededNats initialize nats in memory

func InitNats

func InitNats(connectionString string) (*nats.Conn, error)

InitNats initialize nats using connectionSting

func NewBucketManager

func NewBucketManager(nc *nats.Conn) (*bucketManager, error)

func NewHandlerDefaultConfig

func NewHandlerDefaultConfig() *handlerConfig

NewHandlerDefaultConfig initialize default config for event handlers

func NewJsEventPublisher

func NewJsEventPublisher(nc *nats.Conn) (*jsEventPublisher, error)

NewJsEventPublisher initialize new jetstream event publisher

func NewNatsEventPublisher

func NewNatsEventPublisher(nc *nats.Conn) *natsEventPublisher

NewNatsEventPublisher initialize new nats event publisher

Types

type Bucket

type Bucket interface {
	GetValue(key string) ([]byte, error)
	GetKeyCreateDate(key string) (time.Time, error)
	GetAllTheKeys() ([]string, error)
	GetHistoryOfTheKey(key string) ([]KeyVal, error)
	AddPair(key string, val []byte) error
	Delete(key string) error
}

type BucketManager

type BucketManager interface {
	CreateBucketOrFindExisting(name string) (*bucket, error)
	BucketExists(name string) (bool, error)
	DeleteBucket(name string) error
}

type EventHandler

type EventHandler interface {
	SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error
	SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error
	Subscribe(subject string, cb nats.MsgHandler) error
}

EventHandler handles subscribing and queue subscribing on nats and jetstream

type EventPublisher

type EventPublisher interface {
	// contains filtered or unexported methods
}

EventPublisher nats event publisher interface

type JsEventHandler

type JsEventHandler struct {
	// contains filtered or unexported fields
}

JsEventHandler jetstream event handler structure which implements EventHandler interface

func NewJsHandler

func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) (*JsEventHandler, error)

NewJsHandler initializes new jetstream handler.

func (*JsEventHandler) GetSubscriptions

func (h *JsEventHandler) GetSubscriptions() []*nats.Subscription

GetSubscriptions get list of subscriptions

func (*JsEventHandler) Subscribe

func (j *JsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error

Subscribe subscribe using a callback.

func (*JsEventHandler) SubscribeToQueue

func (j *JsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error

SubscribeToQueue subscribe to queue using a callback.

func (*JsEventHandler) SubscribeToQueueUsingChannel

func (j *JsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error

SubscribeToQueueUsingChannel subscribe to queue with channel. you'll receive all the new events into the channel

type JsEventPublisher

type JsEventPublisher interface {
	CreateNewEventStream(string, []string) error
	DeleteEventStream(streamName string) error
	// contains filtered or unexported methods
}

JsEventPublisher jetstream event publisher and event stream manager

type KeyVal

type KeyVal struct {
	Key        string
	Value      []byte
	CreateDate time.Time
}

type NatsEventHandler

type NatsEventHandler struct {
	// contains filtered or unexported fields
}

NatsEventHandler nats event handler structure which implements EventHandler interface

func NewNatsHandler

func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) *NatsEventHandler

NewNatsHandler initializes new nats handler.

func (*NatsEventHandler) GetSubscriptions

func (h *NatsEventHandler) GetSubscriptions() []*nats.Subscription

GetSubscriptions get list of subscriptions

func (*NatsEventHandler) Subscribe

func (n *NatsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error

Subscribe subscribe using a callback.

func (*NatsEventHandler) SubscribeToQueue

func (n *NatsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error

SubscribeToQueue subscribe to queue using a callback.

func (*NatsEventHandler) SubscribeToQueueUsingChannel

func (n *NatsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error

SubscribeToQueueUsingChannel subscribe to queue with channel. you'll receive all the new events into the channel

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL