Documentation ¶
Index ¶
- Variables
- func CheckVersion(ctx context.Context, nc *nats.Conn) error
- func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
- func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
- func Delete(ctx context.Context, kv jetstream.KeyValue, key string) error
- func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func DropStateParams(state *model.WorkflowState)
- func ElementTable(process *model.Workflow) map[string]*model.Element
- func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, ...) error
- func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, ...) error
- func ExtendLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error
- func GetCallerInfo(skip int) (info string)
- func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)
- func KSuidTo128bit(k string) [16]byte
- func KSuidTo64bit(k string) [8]byte
- func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, ...) ([]string, error)
- func KeyPrefixSearchMap(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, ...) (map[string]struct{}, error)
- func Load(ctx context.Context, wf jetstream.KeyValue, k string) ([]byte, error)
- func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) ([]byte, error)
- func LoadLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func LoadObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error
- func Lock(ctx context.Context, kv jetstream.KeyValue, lockID string) (bool, error)
- func Log(ctx context.Context, js nats.JetStream, trackingID string, ...) error
- func NewOtelHandler() (slog.Handler, func() error)
- func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler
- func NewTextHandler(level slog.Level, addSource bool) slog.Handler
- func Process(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, ...) error
- func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, ...) error
- func SafeDelete(ctx context.Context, kv jetstream.KeyValue, key string) error
- func Save(ctx context.Context, wf jetstream.KeyValue, k string, v []byte) error
- func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func SaveObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error
- func SeekElement(process *model.Workflow, id string) *model.Element
- func StreamingReplyClient(ctx context.Context, nc *nats.Conn, msg *nats.Msg, ...) error
- func StreamingReplyServer(nc streamNatsReplyconnection, subject string, ...) (*nats.Subscription, error)
- func UnLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error
- func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) (T, error)
- func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, ...) error
- func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, ...) (bool, error)
- type BackoffFn
- type BackoffProcessOption
- type HandlerOptions
- type KeyPrefixResultOpts
- type LogPublisher
- type MultiHandler
- type NatsConn
- type NatsLogPublisher
- type NatsMsgWrapper
- func (w *NatsMsgWrapper) Ack() error
- func (w *NatsMsgWrapper) Data() []byte
- func (w *NatsMsgWrapper) DoubleAck(context.Context) error
- func (w *NatsMsgWrapper) Headers() nats.Header
- func (w *NatsMsgWrapper) InProgress() error
- func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)
- func (w *NatsMsgWrapper) Nak() error
- func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error
- func (w *NatsMsgWrapper) Reply() string
- func (w *NatsMsgWrapper) SetData(b []byte)
- func (w *NatsMsgWrapper) Subject() string
- func (w *NatsMsgWrapper) Term() error
- type ProcessOption
- type ProcessOpts
- type SharHandler
- type TrackingID
Constants ¶
This section is empty.
Variables ¶
var ErrStreamCancel = errors.New("stream cancelled")
ErrStreamCancel is an error variable that represents a stream cancellation.
Functions ¶
func CheckVersion ¶ added in v1.1.927
CheckVersion checks the NATS server version against a minimum supported version
func ContextLoggerWithWfState ¶ added in v1.1.754
func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
ContextLoggerWithWfState will populate a context with relevant fields from a WorkflowState model
func CopyWorkflowState ¶ added in v1.0.446
func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
CopyWorkflowState - clones a proto model.WorkflowState for modification.
func DeleteLarge ¶ added in v1.0.498
func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string) error
DeleteLarge deletes a large binary from the object store
func DropStateParams ¶ added in v1.0.451
func DropStateParams(state *model.WorkflowState)
DropStateParams removes any parameters unsafe to send across a state transition.
func ElementTable ¶ added in v0.1.134
ElementTable indexes an entire process for quick ID lookups
func EnsureBucket ¶ added in v1.0.477
func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, name string, ttl time.Duration) error
EnsureBucket creates a bucket if it does not exist
func EnsureBuckets ¶
func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, names []string) error
EnsureBuckets ensures that a list of key value stores exist
func ExtendLock ¶ added in v1.0.477
ExtendLock extends the lock past its stale time.
func GetCallerInfo ¶ added in v1.1.1056
GetCallerInfo returns caller function info. 1 for current function, 2 for caller.
func IndexProcessElements ¶ added in v0.1.134
IndexProcessElements is the recursive part of the index
func KSuidTo128bit ¶ added in v0.1.78
KSuidTo128bit returns a KSuid as bytes.
func KSuidTo64bit ¶ added in v0.1.78
KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.
func KeyPrefixSearch ¶ added in v1.1.927
func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) ([]string, error)
KeyPrefixSearch searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a slice of strings containing the keys, and an error if any.
func KeyPrefixSearchMap ¶ added in v1.1.1349
func KeyPrefixSearchMap(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) (map[string]struct{}, error)
KeyPrefixSearchMap searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a map of strings containing the keys, and an error if any.
func LoadLarge ¶ added in v1.0.498
func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, opt ...jetstream.GetObjectOpt) ([]byte, error)
LoadLarge load a large binary from the object store
func LoadLargeObj ¶ added in v1.0.498
func LoadLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...jetstream.GetObjectOpt) error
LoadLargeObj loads a protobuf message from a key value store
func Lock ¶ added in v1.0.477
Lock ensures a lock on a given ID, it returns true if a lock was granted.
func Log ¶ added in v1.0.271
func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error
Log is the generic metod to output to SHAR telemetry.
func NewOtelHandler ¶ added in v1.1.754
NewOtelHandler constructs and initialises an otel handler for log exports
func NewSharHandler ¶ added in v1.1.754
func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler
NewSharHandler will return a new instance of a SharHandler
func NewTextHandler ¶ added in v1.1.754
NewTextHandler initialises a text handler writing to stdout for slog
func Process ¶ added in v0.1.78
func Process(ctx context.Context, js jetstream.JetStream, streamName string, traceName string, closer chan struct{}, subject string, durable string, concurrency int, middleware []middleware.Receive, fn func(ctx context.Context, log *slog.Logger, msg jetstream.Msg) (bool, error), signalFatalErrFn func(ctx context.Context, state *model.WorkflowState, log *slog.Logger), opts ...ProcessOption) error
Process processes messages from a nats consumer and executes a function against each one.
func PublishObj ¶ added in v1.1.754
func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, middlewareFn func(*nats.Msg) error) error
PublishObj publishes a proto message to a subject.
func PublishOnce ¶ added in v1.1.732
func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, streamName string, consumerName string, msg *nats.Msg) error
PublishOnce sets up a single message to be used as a timer.
func SafeDelete ¶ added in v1.1.1441
SafeDelete deletes a key from a JetStream KV ensuring the value is present, and the operation is idempotent.
func SaveLarge ¶ added in v1.0.498
func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, data []byte) error
SaveLarge saves a large binary from the object store
func SaveLargeObj ¶ added in v1.0.498
func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error
SaveLargeObj save an protobuf message to a document store
func SeekElement ¶ added in v1.1.1307
SeekElement locates an element from a workflow by ID
func StreamingReplyClient ¶ added in v1.1.975
func StreamingReplyClient(ctx context.Context, nc *nats.Conn, msg *nats.Msg, fn func(msg *nats.Msg) error) error
StreamingReplyClient establishes a streaming reply client. It creates a subscription for replies and invokes a callback function for each received message.
func StreamingReplyServer ¶ added in v1.1.975
func StreamingReplyServer(nc streamNatsReplyconnection, subject string, fn func(req *nats.Msg, ret chan *nats.Msg, errs chan error)) (*nats.Subscription, error)
StreamingReplyServer is a function that sets up a NATS subscription to handle streaming reply messages. When a message is received, it begins streaming by creating channels for return messages and error messages. It then executes the provided function to process the request and send the response messages. The function runs in a separate goroutine. It continuously listens for return messages and error messages, and publishes them to the reply inbox. It exits when an error or cancellation occurs. The function returns the NATS subscription and any error that occurred during setup.
func UpdateLargeObj ¶ added in v1.0.498
func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)
UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.
func UpdateObj ¶
func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error
UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.
func UpdateObjIsNew ¶ added in v1.0.446
func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)
UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.
Types ¶
type BackoffFn ¶ added in v1.0.623
BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK
type BackoffProcessOption ¶ added in v1.0.623
type BackoffProcessOption struct {
// contains filtered or unexported fields
}
BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn
func WithBackoffFn ¶ added in v1.0.623
func WithBackoffFn(fn BackoffFn) BackoffProcessOption
WithBackoffFn adds a back-off function to message processing
func (BackoffProcessOption) Set ¶ added in v1.0.623
func (b BackoffProcessOption) Set(opts *ProcessOpts)
Set the backoff function in the process settings
type HandlerOptions ¶ added in v1.1.754
HandlerOptions provides an ability to configure a shar slog handler
type KeyPrefixResultOpts ¶ added in v1.1.927
type KeyPrefixResultOpts struct { Sort bool // Sort the returned values ExcludeDeleted bool // ExcludeDeleted filters deleted key-values from the result (cost penalty)¬. }
KeyPrefixResultOpts represents the options for KeyPrefixSearch function. Sort field indicates whether the returned values should be sorted. ExcludeDeleted field filters out deleted key-values from the result.
type LogPublisher ¶ added in v1.1.754
type LogPublisher interface {
Publish(ctx context.Context, lr *model.LogRequest) error
}
LogPublisher is an interface defining the ability to send a LogRequest to a destination
type MultiHandler ¶ added in v1.1.754
MultiHandler implements slog.Handler and wraps a collection of slog.Handlers that are delegated to
func NewMultiHandler ¶ added in v1.1.754
func NewMultiHandler(handlers []slog.Handler) *MultiHandler
NewMultiHandler creates a new instance of a multi handler
func (*MultiHandler) Enabled ¶ added in v1.1.754
Enabled always returns true as the actual decision as to whether a log record is emitted is delegated to the wrapped handlers in the Handle method
func (*MultiHandler) Handle ¶ added in v1.1.754
Handle will iterate over the slice of wrapped handlers and determine whether that handler is Enabled for the given record log level
type NatsConn ¶
type NatsConn interface { QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error) Publish(subj string, bytes []byte) error PublishMsg(msg *nats.Msg) error Subscribe(subj string, cb nats.MsgHandler) (*nats.Subscription, error) }
NatsConn is the trimmed down NATS Connection interface that only encompasses the methods used by SHAR
type NatsLogPublisher ¶ added in v1.1.754
type NatsLogPublisher struct {
Conn *nats.Conn
}
NatsLogPublisher is an impl of LogPublisher sending a LogRequest to a destination nats subject
func (*NatsLogPublisher) Publish ¶ added in v1.1.754
func (nlp *NatsLogPublisher) Publish(ctx context.Context, lr *model.LogRequest) error
Publish writes a LogRequest to a Nats subject
type NatsMsgWrapper ¶ added in v1.1.927
NatsMsgWrapper is a wrapper type that combines the jetstream.Msg and nats.Msg types.
func NewNatsMsgWrapper ¶ added in v1.1.927
func NewNatsMsgWrapper(msg *nats.Msg) *NatsMsgWrapper
NewNatsMsgWrapper is a function that creates a new instance of NatsMsgWrapper, which is a wrapper type that combines the jetstream.Msg and nats.Msg types.
func (*NatsMsgWrapper) Ack ¶ added in v1.1.927
func (w *NatsMsgWrapper) Ack() error
Ack is a method that acknowledges the receipt of the NATS message by calling the underlying nats.Msg's Ack method. If an error occurs while acknowledging the message, it returns an error with a message indicating the failure. Returns nil if the acknowledgement is successful.
func (*NatsMsgWrapper) Data ¶ added in v1.1.927
func (w *NatsMsgWrapper) Data() []byte
Data is a method that retrieves the data from the underlying nats.Msg.
func (*NatsMsgWrapper) DoubleAck ¶ added in v1.1.927
func (w *NatsMsgWrapper) DoubleAck(context.Context) error
DoubleAck is a method that simulates a double acknowledgement of the NATS message. It returns an error with a message indicating that double ack is not allowed.
func (*NatsMsgWrapper) Headers ¶ added in v1.1.927
func (w *NatsMsgWrapper) Headers() nats.Header
Headers is a method that retrieves the headers from the underlying nats.Msg.
func (*NatsMsgWrapper) InProgress ¶ added in v1.1.927
func (w *NatsMsgWrapper) InProgress() error
InProgress is a method that indicates that the message is still in progress.
func (*NatsMsgWrapper) Metadata ¶ added in v1.1.927
func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)
Metadata is a method that retrieves the metadata from the underlying nats.Msg.
func (*NatsMsgWrapper) Nak ¶ added in v1.1.927
func (w *NatsMsgWrapper) Nak() error
Nak is a method that nak's the message..
func (*NatsMsgWrapper) NakWithDelay ¶ added in v1.1.927
func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error
NakWithDelay is a method that nak's the message, and will not re-process before delay.
func (*NatsMsgWrapper) Reply ¶ added in v1.1.927
func (w *NatsMsgWrapper) Reply() string
Reply is a method that retrieves the reply from the underlying nats.Msg.
func (*NatsMsgWrapper) SetData ¶ added in v1.1.927
func (w *NatsMsgWrapper) SetData(b []byte)
SetData is a method that sets the data of the underlying nats.Msg.
func (*NatsMsgWrapper) Subject ¶ added in v1.1.927
func (w *NatsMsgWrapper) Subject() string
Subject is a method that retrieves the subject from the underlying nats.Msg.
func (*NatsMsgWrapper) Term ¶ added in v1.1.927
func (w *NatsMsgWrapper) Term() error
Term is a method that calls the `Term` method on the underlying `NatsMsgWrapper` instance.
type ProcessOption ¶ added in v1.0.623
type ProcessOption interface {
Set(opts *ProcessOpts)
}
ProcessOption represents an option function that can be passed to message processing.
type ProcessOpts ¶ added in v1.0.623
type ProcessOpts struct {
BackoffCalc BackoffFn
}
ProcessOpts holds the settings for message processing.
type SharHandler ¶ added in v1.1.754
type SharHandler struct {
// contains filtered or unexported fields
}
SharHandler is an implementation of a shar specific slog.Handler
func (*SharHandler) Enabled ¶ added in v1.1.754
Enabled determine whether or not a log message is written based on log level
func (*SharHandler) Handle ¶ added in v1.1.754
Handle will accept an slog.Record, transform to a LogRequest and publish it to nats subject
type TrackingID ¶ added in v1.0.215
type TrackingID []string
TrackingID is an ID stack that maintains the callstack
func (TrackingID) Ancestor ¶ added in v1.0.215
func (t TrackingID) Ancestor(gen int) string
Ancestor provides the ID of the caller back <gen> generations.
func (TrackingID) NewID ¶ added in v1.1.1099
func (t TrackingID) NewID() TrackingID
NewID adds a new random ID to the trackingID
func (TrackingID) ParentID ¶ added in v1.0.215
func (t TrackingID) ParentID() string
ParentID provides the ID of the caller.
func (TrackingID) Pop ¶ added in v1.0.215
func (t TrackingID) Pop() TrackingID
Pop removes the current ID from the callstack.
func (TrackingID) Push ¶ added in v1.0.215
func (t TrackingID) Push(id string) TrackingID
Push adds a new ID to the callstack.