Documentation ¶
Index ¶
- func CheckVersion(ctx context.Context, nc *nats.Conn) error
- func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
- func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
- func Delete(ctx context.Context, kv jetstream.KeyValue, key string) error
- func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func DropStateParams(state *model.WorkflowState)
- func ElementTable(process *model.Workflow) map[string]*model.Element
- func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, ...) error
- func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, ...) error
- func ExtendLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error
- func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)
- func KSuidTo128bit(k string) [16]byte
- func KSuidTo64bit(k string) [8]byte
- func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, ...) ([]string, error)
- func Load(ctx context.Context, wf jetstream.KeyValue, k string) ([]byte, error)
- func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) ([]byte, error)
- func LoadLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func LoadObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error
- func Lock(ctx context.Context, kv jetstream.KeyValue, lockID string) (bool, error)
- func Log(ctx context.Context, js nats.JetStream, trackingID string, ...) error
- func NewOtelHandler() (slog.Handler, func() error)
- func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler
- func NewTextHandler(level slog.Level, addSource bool) slog.Handler
- func Process(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, ...) error
- func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, ...) error
- func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T
- func Save(ctx context.Context, wf jetstream.KeyValue, k string, v []byte) error
- func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func SaveObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error
- func UnLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error
- func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) (T, error)
- func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, ...) error
- func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, ...) (bool, error)
- type BackoffFn
- type BackoffProcessOption
- type HandlerOptions
- type KeyPrefixResultOpts
- type LogPublisher
- type MultiHandler
- type NatsConn
- type NatsLogPublisher
- type NatsMsgWrapper
- func (w *NatsMsgWrapper) Ack() error
- func (w *NatsMsgWrapper) Data() []byte
- func (w *NatsMsgWrapper) DoubleAck(context.Context) error
- func (w *NatsMsgWrapper) Headers() nats.Header
- func (w *NatsMsgWrapper) InProgress() error
- func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)
- func (w *NatsMsgWrapper) Nak() error
- func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error
- func (w *NatsMsgWrapper) Reply() string
- func (w *NatsMsgWrapper) SetData(b []byte)
- func (w *NatsMsgWrapper) Subject() string
- func (w *NatsMsgWrapper) Term() error
- type ProcessOption
- type ProcessOpts
- type SharHandler
- type TrackingID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CheckVersion ¶ added in v1.1.927
CheckVersion checks the NATS server version against a minimum supported version
func ContextLoggerWithWfState ¶ added in v1.1.754
func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
ContextLoggerWithWfState will populate a context with relevant fields from a WorkflowState model
func CopyWorkflowState ¶ added in v1.0.446
func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
CopyWorkflowState - clones a proto model.WorkflowState for modification.
func DeleteLarge ¶ added in v1.0.498
func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string) error
DeleteLarge deletes a large binary from the object store
func DropStateParams ¶ added in v1.0.451
func DropStateParams(state *model.WorkflowState)
DropStateParams removes any parameters unsafe to send across a state transition.
func ElementTable ¶ added in v0.1.134
ElementTable indexes an entire process for quick ID lookups
func EnsureBucket ¶ added in v1.0.477
func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, name string, ttl time.Duration) error
EnsureBucket creates a bucket if it does not exist
func EnsureBuckets ¶
func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, names []string) error
EnsureBuckets ensures that a list of key value stores exist
func ExtendLock ¶ added in v1.0.477
ExtendLock extends the lock past its stale time.
func IndexProcessElements ¶ added in v0.1.134
IndexProcessElements is the recursive part of the index
func KSuidTo128bit ¶ added in v0.1.78
KSuidTo128bit returns a KSuid as bytes.
func KSuidTo64bit ¶ added in v0.1.78
KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.
func KeyPrefixSearch ¶ added in v1.1.927
func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) ([]string, error)
KeyPrefixSearch searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a slice of strings containing the keys, and an error if any.
func LoadLarge ¶ added in v1.0.498
func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, opt ...jetstream.GetObjectOpt) ([]byte, error)
LoadLarge load a large binary from the object store
func LoadLargeObj ¶ added in v1.0.498
func LoadLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...jetstream.GetObjectOpt) error
LoadLargeObj loads a protobuf message from a key value store
func Lock ¶ added in v1.0.477
Lock ensures a lock on a given ID, it returns true if a lock was granted.
func Log ¶ added in v1.0.271
func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error
Log is the generic metod to output to SHAR telemetry.
func NewOtelHandler ¶ added in v1.1.754
NewOtelHandler constructs and initialises an otel handler for log exports
func NewSharHandler ¶ added in v1.1.754
func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler
NewSharHandler will return a new instance of a SharHandler
func NewTextHandler ¶ added in v1.1.754
NewTextHandler initialises a text handler writing to stdout for slog
func Process ¶ added in v0.1.78
func Process( ctx context.Context, js jetstream.JetStream, streamName string, traceName string, closer chan struct{}, subject string, durable string, concurrency int, middleware []middleware.Receive, fn func(ctx context.Context, log *slog.Logger, msg jetstream.Msg) (bool, error), opts ...ProcessOption, ) error
Process processes messages from a nats consumer and executes a function against each one.
func PublishObj ¶ added in v1.1.754
func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, middlewareFn func(*nats.Msg) error) error
PublishObj publishes a proto message to a subject.
func PublishOnce ¶ added in v1.1.732
func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, streamName string, consumerName string, msg *nats.Msg) error
PublishOnce sets up a single message to be used as a timer.
func RemoveWhere ¶ added in v1.0.484
func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T
RemoveWhere removes an array element based upon a condition.
func SaveLarge ¶ added in v1.0.498
func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, data []byte) error
SaveLarge saves a large binary from the object store
func SaveLargeObj ¶ added in v1.0.498
func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error
SaveLargeObj save an protobuf message to a document store
func UpdateLargeObj ¶ added in v1.0.498
func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)
UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.
func UpdateObj ¶
func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error
UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.
func UpdateObjIsNew ¶ added in v1.0.446
func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)
UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.
Types ¶
type BackoffFn ¶ added in v1.0.623
BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK
type BackoffProcessOption ¶ added in v1.0.623
type BackoffProcessOption struct {
// contains filtered or unexported fields
}
BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn
func WithBackoffFn ¶ added in v1.0.623
func WithBackoffFn(fn BackoffFn) BackoffProcessOption
WithBackoffFn adds a back-off function to message processing
func (BackoffProcessOption) Set ¶ added in v1.0.623
func (b BackoffProcessOption) Set(opts *ProcessOpts)
Set the backoff function in the process settings
type HandlerOptions ¶ added in v1.1.754
HandlerOptions provides an ability to configure a shar slog handler
type KeyPrefixResultOpts ¶ added in v1.1.927
type KeyPrefixResultOpts struct { Sort bool // Sort the returned values ExcludeDeleted bool // ExcludeDeleted filters deleted key-values from the result (cost penalty)¬. }
KeyPrefixResultOpts represents the options for KeyPrefixSearch function. Sort field indicates whether the returned values should be sorted. ExcludeDeleted field filters out deleted key-values from the result.
type LogPublisher ¶ added in v1.1.754
type LogPublisher interface {
Publish(ctx context.Context, lr *model.LogRequest) error
}
LogPublisher is an interface defining the ability to send a LogRequest to a destination
type MultiHandler ¶ added in v1.1.754
MultiHandler implements slog.Handler and wraps a collection of slog.Handlers that are delegated to
func NewMultiHandler ¶ added in v1.1.754
func NewMultiHandler(handlers []slog.Handler) *MultiHandler
NewMultiHandler creates a new instance of a multi handler
func (*MultiHandler) Enabled ¶ added in v1.1.754
Enabled always returns true as the actual decision as to whether a log record is emitted is delegated to the wrapped handlers in the Handle method
func (*MultiHandler) Handle ¶ added in v1.1.754
Handle will iterate over the slice of wrapped handlers and determine whether that handler is Enabled for the given record log level
type NatsConn ¶
type NatsConn interface { QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error) Publish(subj string, bytes []byte) error PublishMsg(msg *nats.Msg) error }
NatsConn is the trimmed down NATS Connection interface that only encompasses the methods used by SHAR
type NatsLogPublisher ¶ added in v1.1.754
type NatsLogPublisher struct {
Conn *nats.Conn
}
NatsLogPublisher is an impl of LogPublisher sending a LogRequest to a destination nats subject
func (*NatsLogPublisher) Publish ¶ added in v1.1.754
func (nlp *NatsLogPublisher) Publish(ctx context.Context, lr *model.LogRequest) error
Publish writes a LogRequest to a Nats subject
type NatsMsgWrapper ¶ added in v1.1.927
NatsMsgWrapper is a wrapper type that combines the jetstream.Msg and nats.Msg types.
func NewNatsMsgWrapper ¶ added in v1.1.927
func NewNatsMsgWrapper(msg *nats.Msg) *NatsMsgWrapper
NewNatsMsgWrapper is a function that creates a new instance of NatsMsgWrapper, which is a wrapper type that combines the jetstream.Msg and nats.Msg types.
func (*NatsMsgWrapper) Ack ¶ added in v1.1.927
func (w *NatsMsgWrapper) Ack() error
Ack is a method that acknowledges the receipt of the NATS message by calling the underlying nats.Msg's Ack method. If an error occurs while acknowledging the message, it returns an error with a message indicating the failure. Returns nil if the acknowledgement is successful.
func (*NatsMsgWrapper) Data ¶ added in v1.1.927
func (w *NatsMsgWrapper) Data() []byte
Data is a method that retrieves the data from the underlying nats.Msg.
func (*NatsMsgWrapper) DoubleAck ¶ added in v1.1.927
func (w *NatsMsgWrapper) DoubleAck(context.Context) error
DoubleAck is a method that simulates a double acknowledgement of the NATS message. It returns an error with a message indicating that double ack is not allowed.
func (*NatsMsgWrapper) Headers ¶ added in v1.1.927
func (w *NatsMsgWrapper) Headers() nats.Header
Headers is a method that retrieves the headers from the underlying nats.Msg.
func (*NatsMsgWrapper) InProgress ¶ added in v1.1.927
func (w *NatsMsgWrapper) InProgress() error
InProgress is a method that indicates that the message is still in progress.
func (*NatsMsgWrapper) Metadata ¶ added in v1.1.927
func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)
Metadata is a method that retrieves the metadata from the underlying nats.Msg.
func (*NatsMsgWrapper) Nak ¶ added in v1.1.927
func (w *NatsMsgWrapper) Nak() error
Nak is a method that nak's the message..
func (*NatsMsgWrapper) NakWithDelay ¶ added in v1.1.927
func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error
NakWithDelay is a method that nak's the message, and will not re-process before delay.
func (*NatsMsgWrapper) Reply ¶ added in v1.1.927
func (w *NatsMsgWrapper) Reply() string
Reply is a method that retrieves the reply from the underlying nats.Msg.
func (*NatsMsgWrapper) SetData ¶ added in v1.1.927
func (w *NatsMsgWrapper) SetData(b []byte)
SetData is a method that sets the data of the underlying nats.Msg.
func (*NatsMsgWrapper) Subject ¶ added in v1.1.927
func (w *NatsMsgWrapper) Subject() string
Subject is a method that retrieves the subject from the underlying nats.Msg.
func (*NatsMsgWrapper) Term ¶ added in v1.1.927
func (w *NatsMsgWrapper) Term() error
Term is a method that calls the `Term` method on the underlying `NatsMsgWrapper` instance.
type ProcessOption ¶ added in v1.0.623
type ProcessOption interface {
Set(opts *ProcessOpts)
}
ProcessOption represents an option function that can be passed to message processing.
type ProcessOpts ¶ added in v1.0.623
type ProcessOpts struct {
BackoffCalc BackoffFn
}
ProcessOpts holds the settings for message processing.
type SharHandler ¶ added in v1.1.754
type SharHandler struct {
// contains filtered or unexported fields
}
SharHandler is an implementation of a shar specific slog.Handler
func (*SharHandler) Enabled ¶ added in v1.1.754
Enabled determine whether or not a log message is written based on log level
func (*SharHandler) Handle ¶ added in v1.1.754
Handle will accept an slog.Record, transform to a LogRequest and publish it to nats subject
type TrackingID ¶ added in v1.0.215
type TrackingID []string
TrackingID is an ID stack that maintains the callstack
func (TrackingID) Ancestor ¶ added in v1.0.215
func (t TrackingID) Ancestor(gen int) string
Ancestor provides the ID of the caller back <gen> generations.
func (TrackingID) ParentID ¶ added in v1.0.215
func (t TrackingID) ParentID() string
ParentID provides the ID of the caller.
func (TrackingID) Pop ¶ added in v1.0.215
func (t TrackingID) Pop() TrackingID
Pop removes the current ID from the callstack.
func (TrackingID) Push ¶ added in v1.0.215
func (t TrackingID) Push(id string) TrackingID
Push adds a new ID to the callstack.