gongs

package module
v0.0.0-...-06976a7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2023 License: MIT Imports: 2 Imported by: 2

README

GONGS - Go Nats Generic Streams

A thin wrapper around Nats Jetstream client that uses Go Generics to provide strongly typed NATS streams.

Example Usage

Define the Type that will be published and retrieved from the NATS stream:

type ExampleMsgEventData struct {
	Id          string
	Type        string
	Description string
}

type ExampleMsg struct {
	eventData *ExampleMsgEventData
}

// Mandatory - Implement the `gongs.MsgEvent` interface
func (e *ExampleMsg) GetId(ctx context.Context) string {
	return e.eventData.Id
}

func (e *ExampleMsg) DecodeEventData(b []byte) error {
	d := &ExampleMsgEventData{}
	json.Unmarshal(b, d)
	e.eventData = d
	return nil
}

func (e *ExampleMsg) EncodeEventData(ctx context.Context) []byte {
	b, _ := json.Marshal(e.eventData)
	return b
}

Create a Generic Stream for the above type:

	// create Jetstream for Stream
	cfg := &nats.StreamConfig{
		Name:      "EXAMPLE",
		Subjects:  []string{"example.>"},
		Storage:   nats.MemoryStorage,
		Retention: nats.WorkQueuePolicy,
	}
	js, _ := nc.JetStream()
	js.AddStream(cfg)

	// create Generic Stream
	q := gongs.NewGenericStream[ExampleMsg](js, "example.events", cfg.Name)

Publish event

	ctx := context.Background()
	// Publish an event
	q.Publish(ctx, &ExampleMsg{
		eventData: &ExampleMsgEventData{
			Id:          "abc123",
			Type:        "start",
			Description: "An important task has started",
		},
	})

Read the last event off queue.

	// Read event from NATS
	event, _ := q.GetLastMsg("example")

	fmt.Printf("Id: %s [%s] - %s",
		event.eventData.Id,
		event.eventData.Type,
		event.eventData.Description,
	)

Documentation

Overview

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"

	"github.com/nats-io/nats.go"
	"github.com/sl1pm4t/gongs"
	"github.com/sl1pm4t/gongs/test"
)

func init() {
	s := test.RunBasicJetStreamServer()
	os.Setenv("NATS_URL", s.ClientURL())
}

type ExampleMsgEventData struct {
	Id          string
	Type        string
	Description string
}
type ExampleMsg struct {
	eventData *ExampleMsgEventData
}

func (e *ExampleMsg) GetId(ctx context.Context) string {
	return e.eventData.Id
}

func (e *ExampleMsg) DecodeEventData(b []byte) error {
	d := &ExampleMsgEventData{}
	json.Unmarshal(b, d)
	e.eventData = d
	return nil
}

func (e *ExampleMsg) EncodeEventData(ctx context.Context) []byte {
	b, _ := json.Marshal(e.eventData)
	return b
}

func main() {
	// Get NATS connection
	nc, _ := nats.Connect(os.Getenv("NATS_URL"))

	// create Jetstream for Stream
	cfg := &nats.StreamConfig{
		Name:      "EXAMPLE",
		Subjects:  []string{"example.>"},
		Storage:   nats.MemoryStorage,
		Retention: nats.WorkQueuePolicy,
	}
	js, _ := nc.JetStream()
	js.AddStream(cfg)
	ctx := context.Background()

	// create Generic Stream
	q := gongs.NewGenericStream[ExampleMsg](js, "example.events", cfg.Name)

	// Publish an event
	q.Publish(ctx, &ExampleMsg{
		eventData: &ExampleMsgEventData{
			Id:          "abc123",
			Type:        "start",
			Description: "An important task has started",
		},
	})

	// Read event from NATS
	event, _ := q.GetLastMsg("example")

	fmt.Printf("Id: %s [%s] - %s",
		event.eventData.Id,
		event.eventData.Type,
		event.eventData.Description,
	)

}
Output:

Id: abc123 [start] - An important task has started

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GenericStream

type GenericStream[T any, I MsgEvent[T]] struct {
	// contains filtered or unexported fields
}

func NewGenericStream

func NewGenericStream[T any, I MsgEvent[T]](
	js nats.JetStreamContext,
	sub string,
	stream string,
) *GenericStream[T, I]

func (*GenericStream[T, I]) GetLastMsg

func (s *GenericStream[T, I]) GetLastMsg(name string) (*T, error)

func (*GenericStream[T, I]) Publish

func (s *GenericStream[T, I]) Publish(ctx context.Context, evt I) (*nats.PubAck, error)

Publish will publish a message to nats using a message id returned by MsgEvent.GetId The message id is used for deduplication https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication

func (*GenericStream[T, I]) QueueSubscribe

func (s *GenericStream[T, I]) QueueSubscribe(queue string, fn MsgHandlerFunc[T]) (*nats.Subscription, error)

type MsgEvent

type MsgEvent[T any] interface {
	GetId(ctx context.Context) string
	DecodeEventData(b []byte) error
	EncodeEventData(ctx context.Context) []byte
	*T
}

type MsgHandlerFunc

type MsgHandlerFunc[T any] func(*T) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL