Documentation
¶
Overview ¶
Creates/manages an in-process NATS server
Currently this is used as part of the hops test suite, but in future it will be leveraged to enable user-side developers to run tests on Hiphops pipelines.
Index ¶
- Constants
- func DoubleAck(ctx context.Context, msg jetstream.Msg) error
- func ReplayFilterSubject(accountId string, sequenceId string) string
- func SequenceHopsKeyTokens(sequenceId string) []string
- func SourceEventSubject(accountId string, sequenceId string) string
- func WorkerRequestSubject(accountId string, appName string, handler string) string
- type Client
- func (c *Client) CheckConnection() bool
- func (c *Client) Close()
- func (c *Client) Consume(ctx context.Context, callback jetstream.MessageHandler) error
- func (c *Client) ConsumeSequences(ctx context.Context, handler SequenceHandler) error
- func (c *Client) FetchMessageBundle(ctx context.Context, newMsg *MsgMeta) (MessageBundle, error)
- func (c *Client) GetMsg(ctx context.Context, subjTokens ...string) (*jetstream.RawStreamMsg, error)
- func (c *Client) GetSysObject(key string) ([]byte, error)
- func (c *Client) Publish(ctx context.Context, data []byte, subjTokens ...string) (*jetstream.PubAck, bool, error)
- func (c *Client) PublishResult(ctx context.Context, startedAt time.Time, result interface{}, err error, ...) (error, bool)
- func (c *Client) PutSysObject(name string, data []byte) (*nats.ObjectInfo, error)
- type ClientOpt
- type HopsResultMeta
- type KeyFile
- type LocalServer
- type Logger
- type MessageBundle
- type MsgMeta
- type ResultMsg
- type SequenceHandler
Constants ¶
const ( ChannelNotify = "notify" ChannelRequest = "request" )
const DoneMessageId = "done"
const HopsMessageId = "hops"
Variables ¶
This section is empty.
Functions ¶
func ReplayFilterSubject ¶
func SequenceHopsKeyTokens ¶
func SourceEventSubject ¶
Types ¶
type Client ¶
type Client struct { NatsConn *nats.Conn JetStream jetstream.JetStream Consumer jetstream.Consumer SysObjStore nats.ObjectStore // contains filtered or unexported fields }
func NewClient ¶
func NewClient(natsUrl string, accountId string, logger Logger, clientOpts ...ClientOpt) (*Client, error)
NewClient returns a new hiphops specific NATS client
By default it is configured as a runner consumer (listening for incoming source events) Passing *any* ClientOpts will override this default.
func (*Client) CheckConnection ¶
func (*Client) Consume ¶
Consume consumes messages from the HopsNats.Consumer
This will block the calling goroutine until the context is cancelled and can be ran as a long-lived service
func (*Client) ConsumeSequences ¶
func (c *Client) ConsumeSequences(ctx context.Context, handler SequenceHandler) error
ConsumeSequences is a wrapper around consume that presents the aggregate state of a sequence to the callback instead of individual messages.
func (*Client) FetchMessageBundle ¶
FetchMessageBundle pulls all historic messages for a sequenceId from the stream, converting them to a message bundle
The returned message bundle will contain all previous messages in addition to the newly received message
type ClientOpt ¶
ClientOpt functions configure a nats.Client via NewClient()
func DefaultClientOpts ¶
func DefaultClientOpts() []ClientOpt
DefaultClientOpts configures the hiphops nats.Client as a RunnerClient
func ReplayClient ¶
ReplayClient initialises the client with a consumer for replaying a sequence
func RunnerClient ¶
func RunnerClient() ClientOpt
RunnerClient initialises the client with a consumer for running pipelines
func WorkerClient ¶
WorkerClient initialises the client with a consumer to receive call requests for a worker
type HopsResultMeta ¶
type HopsResultMeta struct { Error error `json:"error,omitempty"` FinishedAt time.Time `json:"finished_at"` StartedAt time.Time `json:"started_at"` }
HopsResultMeta is metadata included in the top level of a result message
type KeyFile ¶
type KeyFile struct { AccountId string `json:"account_id"` Password string `json:"password"` NatsDomain string `json:"nats_domain"` }
func NewKeyFile ¶
type LocalServer ¶
LocalServer is an in-process hiphops.io style NATS server instance created from a NATS config file.
func NewLocalServer ¶
func NewLocalServer(natsConfigPath string, dataDir string, debug bool, logger server.Logger) (*LocalServer, error)
NewLocalServer starts an in-process nats server from a config file
LocalServer.Close() should be called when finished with the server
func (*LocalServer) Close ¶
func (l *LocalServer) Close()
Close shuts down the local nats server, waiting until shutdown is complete
func (*LocalServer) Connect ¶
func (l *LocalServer) Connect(accountName string) (*nats.Conn, error)
Connect establishes a client connection with the local nats server
type Logger ¶
type Logger interface { // Log a debug statement Debugf(format string, v ...interface{}) // Log an error with exact error Errf(err error, format string, v ...interface{}) // Log an error Errorf(format string, v ...interface{}) // Log a fatal error Fatalf(format string, v ...interface{}) // Log an info statement Infof(format string, v ...interface{}) // Log a notice statement Noticef(format string, v ...interface{}) // Log a trace statement Tracef(format string, v ...interface{}) // Log a warning statement Warnf(format string, v ...interface{}) }
type MessageBundle ¶
MessageBundle is a map of messageIDs and the data that message contained
MessageBundle is designed to be passed to a runner to ensure it has the aggregate state of a hiphops sequence of messages.
type MsgMeta ¶
type MsgMeta struct { AccountId string AppName string Channel string ConsumerSequence uint64 Done bool HandlerName string MessageId string SequenceId string StreamSequence uint64 // contains filtered or unexported fields }
func (*MsgMeta) ResponseSubject ¶
func (*MsgMeta) SequenceFilter ¶
type ResultMsg ¶
type ResultMsg struct { Body string `json:"body"` Completed bool `json:"completed"` Done bool `json:"done"` Errored bool `json:"errored"` Hops HopsResultMeta `json:"hops"` JSON interface{} `json:"json,omitempty"` }
ResultMsg is the schema for handler call result messages
type SequenceHandler ¶
type SequenceHandler interface {
SequenceCallback(context.Context, string, MessageBundle) error
}
SequenceHandler is a function that receives the sequenceId and message bundle for a sequence of messages