Documentation ¶
Overview ¶
Example ¶
package main import ( "context" "encoding/json" "fmt" "os" "github.com/nats-io/nats.go" "github.com/sl1pm4t/gongs" "github.com/sl1pm4t/gongs/test" ) func init() { s := test.RunBasicJetStreamServer() os.Setenv("NATS_URL", s.ClientURL()) } type ExampleMsgEventData struct { Id string Type string Description string } type ExampleMsg struct { eventData *ExampleMsgEventData } func (e *ExampleMsg) GetId(ctx context.Context) string { return e.eventData.Id } func (e *ExampleMsg) DecodeEventData(b []byte) error { d := &ExampleMsgEventData{} json.Unmarshal(b, d) e.eventData = d return nil } func (e *ExampleMsg) EncodeEventData(ctx context.Context) []byte { b, _ := json.Marshal(e.eventData) return b } func main() { // Get NATS connection nc, _ := nats.Connect(os.Getenv("NATS_URL")) // create Jetstream for Stream cfg := &nats.StreamConfig{ Name: "EXAMPLE", Subjects: []string{"example.>"}, Storage: nats.MemoryStorage, Retention: nats.WorkQueuePolicy, } js, _ := nc.JetStream() js.AddStream(cfg) ctx := context.Background() // create Generic Stream q := gongs.NewGenericStream[ExampleMsg](js, "example.events", cfg.Name) // Publish an event q.Publish(ctx, &ExampleMsg{ eventData: &ExampleMsgEventData{ Id: "abc123", Type: "start", Description: "An important task has started", }, }) // Read event from NATS event, _ := q.GetLastMsg("example") fmt.Printf("Id: %s [%s] - %s", event.eventData.Id, event.eventData.Type, event.eventData.Description, ) }
Output: Id: abc123 [start] - An important task has started
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GenericStream ¶
func NewGenericStream ¶
func NewGenericStream[T any, I MsgEvent[T]]( js nats.JetStreamContext, sub string, stream string, ) *GenericStream[T, I]
func (*GenericStream[T, I]) GetLastMsg ¶
func (s *GenericStream[T, I]) GetLastMsg(name string) (*T, error)
func (*GenericStream[T, I]) Publish ¶
func (s *GenericStream[T, I]) Publish(ctx context.Context, evt I) (*nats.PubAck, error)
Publish will publish a message to nats using a message id returned by MsgEvent.GetId The message id is used for deduplication https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication
func (*GenericStream[T, I]) QueueSubscribe ¶
func (s *GenericStream[T, I]) QueueSubscribe(queue string, fn MsgHandlerFunc[T]) (*nats.Subscription, error)
type MsgHandlerFunc ¶
Click to show internal directories.
Click to hide internal directories.