nats

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

README

Hops/NATS

This package contains NATS utils for interacting with NATS in the context of a Hiphops server/worker/etc.

Documentation

Overview

Creates/manages an in-process NATS server

Currently this is used as part of the hops test suite, but in future it will be leveraged to enable user-side developers to run tests on Hiphops pipelines.

Index

Constants

View Source
const (
	ChannelNotify  = "notify"
	ChannelRequest = "request"
)
View Source
const HopsMessageId = "hops"

Variables

This section is empty.

Functions

func DoubleAck

func DoubleAck(ctx context.Context, msg jetstream.Msg) error

DoubleAck is a convenience wrapper around NATS acking with a timeout

func ReplayFilterSubject

func ReplayFilterSubject(accountId string, sequenceId string) string

func SequenceHopsKeyTokens

func SequenceHopsKeyTokens(sequenceId string) []string

func SourceEventSubject

func SourceEventSubject(accountId string, sequenceId string) string

func WorkerRequestSubject

func WorkerRequestSubject(accountId string, appName string, handler string) string

Types

type Client

type Client struct {
	NatsConn    *nats.Conn
	JetStream   jetstream.JetStream
	Consumer    jetstream.Consumer
	SysObjStore nats.ObjectStore
	// contains filtered or unexported fields
}

func NewClient

func NewClient(natsUrl string, accountId string, logger Logger, clientOpts ...ClientOpt) (*Client, error)

NewClient returns a new hiphops specific NATS client

By default it is configured as a runner consumer (listening for incoming source events) Passing *any* ClientOpts will override this default.

func (*Client) CheckConnection

func (c *Client) CheckConnection() bool

func (*Client) Close

func (c *Client) Close()

func (*Client) Consume

func (c *Client) Consume(ctx context.Context, callback jetstream.MessageHandler) error

Consume consumes messages from the HopsNats.Consumer

This will block the calling goroutine until the context is cancelled and can be ran as a long-lived service

func (*Client) ConsumeSequences

func (c *Client) ConsumeSequences(ctx context.Context, handler SequenceHandler) error

ConsumeSequences is a wrapper around consume that presents the aggregate state of a sequence to the callback instead of individual messages.

func (*Client) FetchMessageBundle

func (c *Client) FetchMessageBundle(ctx context.Context, newMsg *MsgMeta) (MessageBundle, error)

FetchMessageBundle pulls all historic messages for a sequenceId from the stream, converting them to a message bundle

The returned message bundle will contain all previous messages in addition to the newly received message

func (*Client) GetMsg

func (c *Client) GetMsg(ctx context.Context, subjTokens ...string) (*jetstream.RawStreamMsg, error)

func (*Client) GetSysObject

func (c *Client) GetSysObject(key string) ([]byte, error)

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, data []byte, subjTokens ...string) (*jetstream.PubAck, bool, error)

func (*Client) PublishResult

func (c *Client) PublishResult(ctx context.Context, startedAt time.Time, result interface{}, err error, subjTokens ...string) (error, bool)

PublishResult is a convenience wrapper that json encodes a ResultMsg and publishes it

func (*Client) PutSysObject

func (c *Client) PutSysObject(name string, data []byte) (*nats.ObjectInfo, error)

type ClientOpt

type ClientOpt func(*Client) error

ClientOpt functions configure a nats.Client via NewClient()

func DefaultClientOpts

func DefaultClientOpts() []ClientOpt

DefaultClientOpts configures the hiphops nats.Client as a RunnerClient

func ReplayClient

func ReplayClient(sequenceId string) ClientOpt

ReplayClient initialises the client with a consumer for replaying a sequence

func RunnerClient

func RunnerClient() ClientOpt

RunnerClient initialises the client with a consumer for running pipelines

func WorkerClient

func WorkerClient(appName string) ClientOpt

WorkerClient initialises the client with a consumer to receive call requests for a worker

type HopsResultMeta

type HopsResultMeta struct {
	Error      error     `json:"error,omitempty"`
	FinishedAt time.Time `json:"finished_at"`
	StartedAt  time.Time `json:"started_at"`
}

HopsResultMeta is metadata included in the top level of a result message

type KeyFile

type KeyFile struct {
	AccountId  string `json:"account_id"`
	Password   string `json:"password"`
	NatsDomain string `json:"nats_domain"`
}

func NewKeyFile

func NewKeyFile(keyfilePath string) (KeyFile, error)

func (*KeyFile) NatsUrl

func (k *KeyFile) NatsUrl() string

type LocalServer

type LocalServer struct {
	NatsServer *server.Server
	ServerOpts *server.Options
}

LocalServer is an in-process hiphops.io style NATS server instance created from a NATS config file.

func NewLocalServer

func NewLocalServer(natsConfigPath string, dataDir string, debug bool, logger server.Logger) (*LocalServer, error)

NewLocalServer starts an in-process nats server from a config file

LocalServer.Close() should be called when finished with the server

func (*LocalServer) AuthUrl

func (l *LocalServer) AuthUrl(accountName string) (string, error)

func (*LocalServer) Close

func (l *LocalServer) Close()

Close shuts down the local nats server, waiting until shutdown is complete

func (*LocalServer) Connect

func (l *LocalServer) Connect(accountName string) (*nats.Conn, error)

Connect establishes a client connection with the local nats server

func (*LocalServer) User

func (l *LocalServer) User(accountName string) (*server.User, error)

type Logger

type Logger interface {
	// Log a debug statement
	Debugf(format string, v ...interface{})

	// Log an error with exact error
	Errf(err error, format string, v ...interface{})

	// Log an error
	Errorf(format string, v ...interface{})

	// Log a fatal error
	Fatalf(format string, v ...interface{})

	// Log an info statement
	Infof(format string, v ...interface{})

	// Log a notice statement
	Noticef(format string, v ...interface{})

	// Log a trace statement
	Tracef(format string, v ...interface{})

	// Log a warning statement
	Warnf(format string, v ...interface{})
}

type MessageBundle

type MessageBundle map[string][]byte

MessageBundle is a map of messageIDs and the data that message contained

MessageBundle is designed to be passed to a runner to ensure it has the aggregate state of a hiphops sequence of messages.

type MsgMeta

type MsgMeta struct {
	AccountId        string
	AppName          string
	Channel          string
	ConsumerSequence uint64
	HandlerName      string
	MessageId        string
	SequenceId       string
	StreamSequence   uint64
	// contains filtered or unexported fields
}

func Parse

func Parse(msg jetstream.Msg) (*MsgMeta, error)

func (*MsgMeta) ResponseSubject

func (m *MsgMeta) ResponseSubject() string

func (*MsgMeta) SequenceFilter

func (m *MsgMeta) SequenceFilter() string

type ResultMsg

type ResultMsg struct {
	Body      string         `json:"body"`
	Completed bool           `json:"completed"`
	Done      bool           `json:"done"`
	Errored   bool           `json:"errored"`
	Hops      HopsResultMeta `json:"hops"`
	JSON      interface{}    `json:"json,omitempty"`
}

ResultMsg is the schema for handler call result messages

func NewResultMsg

func NewResultMsg(startedAt time.Time, result interface{}, err error) ResultMsg

type SequenceHandler

type SequenceHandler interface {
	SequenceCallback(context.Context, string, MessageBundle) error
}

SequenceHandler is a function that receives the sequenceId and message bundle for a sequence of messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL