Documentation
¶
Overview ¶
Creates/manages an in-process NATS server
Currently this is used as part of the hops test suite, but in future it will be leveraged to enable user-side developers to run tests on Hiphops pipelines.
Index ¶
- Constants
- Variables
- func CreateSourceEvent(rawEvent map[string]any, source string, event string, action string, ...) ([]byte, string, error)
- func DoubleAck(ctx context.Context, msg jetstream.Msg) error
- func EventLogFilterSubject(accountId string, interestTopic string, eventFilter string) string
- func HealthChecker(natsClient *Client) func(w http.ResponseWriter, r *http.Request)
- func NotifyFilterSubject(accountId string, interestTopic string) string
- func ReplayFilterSubject(accountId string, interestTopic string, sequenceId string) string
- func RequestFilterSubject(accountId string, interestTopic string) string
- func SequenceHopsKeyTokens(sequenceId string) []string
- func SourceEventSubject(accountId string, interestTopic string, sequenceId string) string
- func WorkerRequestFilterSubject(accountId string, interestTopic string, appName string, handler string) string
- type Client
- func (c *Client) CheckConnection() bool
- func (c *Client) Close()
- func (c *Client) Consume(ctx context.Context, fromConsumer string, callback jetstream.MessageHandler) error
- func (c *Client) ConsumeSequences(ctx context.Context, fromConsumer string, handler SequenceHandler) error
- func (c *Client) DeleteMsgSequence(ctx context.Context, msgMeta *MsgMeta) error
- func (c *Client) FetchMessageBundle(ctx context.Context, incomingMsg *MsgMeta) (MessageBundle, error)
- func (c *Client) GetEventHistory(ctx context.Context, start time.Time, sourceOnly bool) ([]*MsgMeta, error)
- func (c *Client) GetMsg(ctx context.Context, subjTokens ...string) (*jetstream.RawStreamMsg, error)
- func (c *Client) GetSysObject(key string) ([]byte, error)
- func (c *Client) Publish(ctx context.Context, data []byte, subjTokens ...string) (*jetstream.PubAck, bool, error)
- func (c *Client) PublishResult(ctx context.Context, startedAt time.Time, result interface{}, err error, ...) (error, bool)deprecated
- func (c *Client) PublishResultWithAck(ctx context.Context, msg jetstream.Msg, startedAt time.Time, ...) (bool, error)
- func (c *Client) PutSysObject(name string, data []byte) (*nats.ObjectInfo, error)
- type ClientOpt
- type HopsResultMeta
- type KeyFile
- type LocalServer
- type Logger
- type MessageBundle
- type MsgMeta
- type ResultMsg
- type SequenceHandler
- type SourceMeta
Constants ¶
const ( ChannelNotify = "notify" ChannelRequest = "request" DefaultConsumerName = "runner" // How far back to look for events by default DefaultEventLookback = -time.Hour // Interest topic which is used by default DefaultInterestTopic = "default" // Number of events returned max GetEventHistoryEventLimit = 100 )
const AllEventId = ">"
const DoneMessageId = "done"
const HopsMessageId = "hops"
const SourceEventId = "event"
Variables ¶
var (
ErrIncompleteMsgBundle = errors.New("Unable to fetch complete sequence history")
)
Functions ¶
func CreateSourceEvent ¶ added in v0.16.1
func EventLogFilterSubject ¶ added in v0.14.0
EventLogFilterSubject returns the subject used to get events for display to the user in the UI.
accountId: The account id to filter on eventFilter: either AllEventId or SourceEventId
func HealthChecker ¶ added in v0.15.1
func HealthChecker(natsClient *Client) func(w http.ResponseWriter, r *http.Request)
func NotifyFilterSubject ¶ added in v0.14.0
NotifyFilterSubject returns the filter subject to get notify messages for the account
func ReplayFilterSubject ¶
func RequestFilterSubject ¶ added in v0.14.0
RequestFilterSubject returns the filter subject to get request messages for the account
func SequenceHopsKeyTokens ¶
func SourceEventSubject ¶
Types ¶
type Client ¶
type Client struct { Consumers map[string]jetstream.Consumer JetStream jetstream.JetStream NatsConn *nats.Conn SysObjStore nats.ObjectStore // contains filtered or unexported fields }
func NewClient ¶
func NewClient(natsUrl string, accountId string, interestTopic string, logger Logger, clientOpts ...ClientOpt) (*Client, error)
NewClient returns a new hiphops specific NATS client
By default it is configured as a runner consumer (listening for incoming source events) Passing *any* ClientOpts will override this default.
func (*Client) CheckConnection ¶
func (*Client) Consume ¶
func (c *Client) Consume(ctx context.Context, fromConsumer string, callback jetstream.MessageHandler) error
Consume consumes messages from the HopsNats.Consumers[fromConsumer]
This will block the calling goroutine until the context is cancelled and can be ran as a long-lived service
func (*Client) ConsumeSequences ¶
func (c *Client) ConsumeSequences(ctx context.Context, fromConsumer string, handler SequenceHandler) error
ConsumeSequences is a wrapper around consume that presents the aggregate state of a sequence to the callback instead of individual messages.
func (*Client) DeleteMsgSequence ¶ added in v0.16.3
DeleteMsgSequence deletes a given message and the entire sequence it is part of
Main use case is preventing build up of source events that do not relate to any configured automation
func (*Client) FetchMessageBundle ¶
func (c *Client) FetchMessageBundle(ctx context.Context, incomingMsg *MsgMeta) (MessageBundle, error)
FetchMessageBundle pulls all historic messages for a sequenceId from the stream, converting them to a message bundle
The returned message bundle will contain all previous messages in addition to the newly received message
func (*Client) GetEventHistory ¶ added in v0.7.0
func (c *Client) GetEventHistory(ctx context.Context, start time.Time, sourceOnly bool) ([]*MsgMeta, error)
GetEventHistory pulls historic events, most recent first, from now back to start time.
Times out if events take longer than a second to be received. Only returns the first 100 events. (const GetEventHistoryEventLimit) If sourceOnly is true, only returns source events (i.e. not pipeline events)
func (*Client) PublishResult
deprecated
func (c *Client) PublishResult(ctx context.Context, startedAt time.Time, result interface{}, err error, subjTokens ...string) (error, bool)
Deprecated: PublishResult is a convenience wrapper that json encodes a ResultMsg and publishes it
In most cases you should use PublishResultWithAck instead, deferring acking of the original messaging until after we've sent a result. This method will be removed in future.
func (*Client) PublishResultWithAck ¶ added in v0.15.1
type ClientOpt ¶
ClientOpt functions configure a nats.Client via NewClient()
func DefaultClientOpts ¶
func DefaultClientOpts() []ClientOpt
DefaultClientOpts configures the hiphops nats.Client as a RunnerClient
func WithLocalRunner ¶ added in v0.16.0
WithLocalRunner initialises a runner with a randomised interest topic and ephemeral consumer
func WithReplay ¶ added in v0.9.1
WithReplay initialises the client with a consumer for replaying a sequence
func WithRunner ¶ added in v0.9.0
WithRunner initialises the client with a consumer for running pipelines
func WithStreamName ¶ added in v0.9.1
WithStreamName overrides the stream name to be used (which defaults to accountId otherwise)
Should be given before any ClientOpts that use the stream, as otherwise they will be initialised with the default stream name
func WithWorker ¶ added in v0.9.0
WithWorker initialises the client with a consumer to receive call requests for a worker
type HopsResultMeta ¶
type HopsResultMeta struct { Error string `json:"error,omitempty"` FinishedAt time.Time `json:"finished_at"` StartedAt time.Time `json:"started_at"` }
HopsResultMeta is metadata included in the top level of a result message
type KeyFile ¶
type KeyFile struct { AccountId string `json:"account_id"` Password string `json:"password"` NatsDomain string `json:"nats_domain"` }
func NewKeyFile ¶
type LocalServer ¶
LocalServer is an in-process hiphops.io style NATS server instance created from a NATS config file.
func NewLocalServer ¶
func NewLocalServer(natsConfigPath string, dataDir string, debug bool, logger server.Logger) (*LocalServer, error)
NewLocalServer starts an in-process nats server from a config file
LocalServer.Close() should be called when finished with the server
func (*LocalServer) Close ¶
func (l *LocalServer) Close()
Close shuts down the local nats server, waiting until shutdown is complete
func (*LocalServer) Connect ¶
func (l *LocalServer) Connect(accountName string) (*nats.Conn, error)
Connect establishes a client connection with the local nats server
type Logger ¶
type Logger interface { // Log a debug statement Debugf(format string, v ...interface{}) // Log an error with exact error Errf(err error, format string, v ...interface{}) // Log an error Errorf(format string, v ...interface{}) // Log a fatal error Fatalf(format string, v ...interface{}) // Log an info statement Infof(format string, v ...interface{}) // Log a notice statement Noticef(format string, v ...interface{}) // Log a trace statement Tracef(format string, v ...interface{}) // Log a warning statement Warnf(format string, v ...interface{}) }
type MessageBundle ¶
MessageBundle is a map of messageIDs and the data that message contained
MessageBundle is designed to be passed to a runner to ensure it has the aggregate state of a hiphops sequence of messages.
type MsgMeta ¶
type MsgMeta struct { AccountId string AppName string Channel string ConsumerSequence uint64 Done bool // Message is a pipeline 'done' message HandlerName string InterestTopic string MessageId string NumPending uint64 SequenceId string StreamSequence uint64 Timestamp time.Time // contains filtered or unexported fields }
func (*MsgMeta) ResponseSubject ¶
func (*MsgMeta) SequenceFilter ¶
type ResultMsg ¶
type ResultMsg struct { Body string `json:"body"` Completed bool `json:"completed"` Done bool `json:"done"` Errored bool `json:"errored"` Headers map[string]string `json:"headers,omitempty"` Hops HopsResultMeta `json:"hops"` JSON interface{} `json:"json,omitempty"` StatusCode int `json:"status_code,omitempty"` URL string `json:"url,omitempty"` }
ResultMsg is the schema for handler call result messages
type SequenceHandler ¶
type SequenceHandler interface {
SequenceCallback(context.Context, string, MessageBundle) (bool, error)
}
SequenceHandler is a function that receives the sequenceId and message bundle for a sequence of messages