Documentation ¶
Index ¶
- func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
- func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
- func Delete(kv nats.KeyValue, key string) error
- func DeleteLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string) error
- func DropStateParams(state *model.WorkflowState)
- func ElementTable(process *model.Workflow) map[string]*model.Element
- func EnsureBucket(js nats.JetStreamContext, storageType nats.StorageType, name string, ...) error
- func EnsureBuckets(js nats.JetStreamContext, storageType nats.StorageType, names []string) error
- func ExtendLock(kv nats.KeyValue, lockID string) error
- func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)
- func KSuidTo128bit(k string) [16]byte
- func KSuidTo64bit(k string) [8]byte
- func Load(ctx context.Context, wf nats.KeyValue, k string) ([]byte, error)
- func LoadLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, ...) ([]byte, error)
- func LoadLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, ...) error
- func LoadObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error
- func Lock(kv nats.KeyValue, lockID string) (bool, error)
- func Log(ctx context.Context, js nats.JetStream, trackingID string, ...) error
- func NewOtelHandler() (slog.Handler, func() error)
- func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler
- func NewTextHandler(level slog.Level, addSource bool) slog.Handler
- func Process(ctx context.Context, js nats.JetStreamContext, streamName string, ...) error
- func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, ...) error
- func PublishOnce(js nats.JetStreamContext, lockingKV nats.KeyValue, streamName string, ...) error
- func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T
- func Save(ctx context.Context, wf nats.KeyValue, k string, v []byte) error
- func SaveLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, ...) error
- func SaveLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, ...) error
- func SaveObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error
- func UnLock(kv nats.KeyValue, lockID string) error
- func UpdateLargeObj[T proto.Message](ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, msg T, ...) (T, error)
- func UpdateObj[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, ...) error
- func UpdateObjIsNew[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, ...) (bool, error)
- type BackoffFn
- type BackoffProcessOption
- type HandlerOptions
- type LogPublisher
- type MultiHandler
- type NatsConn
- type NatsLogPublisher
- type ProcessOption
- type ProcessOpts
- type SharHandler
- type TrackingID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContextLoggerWithWfState ¶ added in v1.1.754
func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
ContextLoggerWithWfState will populate a context with relevant fields from a WorkflowState model
func CopyWorkflowState ¶ added in v1.0.446
func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
CopyWorkflowState - clones a proto model.WorkflowState for modification.
func DeleteLarge ¶ added in v1.0.498
DeleteLarge deletes a large binary from the object store
func DropStateParams ¶ added in v1.0.451
func DropStateParams(state *model.WorkflowState)
DropStateParams removes any parameters unsafe to send across a state transition.
func ElementTable ¶ added in v0.1.134
ElementTable indexes an entire process for quick ID lookups
func EnsureBucket ¶ added in v1.0.477
func EnsureBucket(js nats.JetStreamContext, storageType nats.StorageType, name string, ttl time.Duration) error
EnsureBucket creates a bucket if it does not exist
func EnsureBuckets ¶
EnsureBuckets ensures that a list of key value stores exist
func ExtendLock ¶ added in v1.0.477
ExtendLock extends the lock past its stale time.
func IndexProcessElements ¶ added in v0.1.134
IndexProcessElements is the recursive part of the index
func KSuidTo128bit ¶ added in v0.1.78
KSuidTo128bit returns a KSuid as bytes.
func KSuidTo64bit ¶ added in v0.1.78
KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.
func LoadLarge ¶ added in v1.0.498
func LoadLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, opt ...nats.GetObjectOpt) ([]byte, error)
LoadLarge load a large binary from the object store
func LoadLargeObj ¶ added in v1.0.498
func LoadLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.GetObjectOpt) error
LoadLargeObj loads a protobuf message from a key value store
func Lock ¶ added in v1.0.477
Lock ensures a lock on a given ID, it returns true if a lock was granted.
func Log ¶ added in v1.0.271
func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error
Log is the generic metod to output to SHAR telemetry.
func NewOtelHandler ¶ added in v1.1.754
NewOtelHandler constructs and initialises an otel handler for log exports
func NewSharHandler ¶ added in v1.1.754
func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler
NewSharHandler will return a new instance of a SharHandler
func NewTextHandler ¶ added in v1.1.754
NewTextHandler initialises a text handler writing to stdout for slog
func Process ¶ added in v0.1.78
func Process(ctx context.Context, js nats.JetStreamContext, streamName string, traceName string, closer chan struct{}, subject string, durable string, concurrency int, fn func(ctx context.Context, log *slog.Logger, msg *nats.Msg) (bool, error), opts ...ProcessOption) error
Process processes messages from a nats consumer and executes a function against each one.
func PublishObj ¶ added in v1.1.754
func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, middlewareFn func(*nats.Msg) error) error
PublishObj publishes a proto message to a subject.
func PublishOnce ¶ added in v1.1.732
func PublishOnce(js nats.JetStreamContext, lockingKV nats.KeyValue, streamName string, consumerName string, msg *nats.Msg) error
PublishOnce sets up a single message to be used as a timer.
func RemoveWhere ¶ added in v1.0.484
func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T
RemoveWhere removes an array element based upon a condition.
func SaveLarge ¶ added in v1.0.498
func SaveLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, data []byte, opt ...nats.ObjectOpt) error
SaveLarge saves a large binary from the object store
func SaveLargeObj ¶ added in v1.0.498
func SaveLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error
SaveLargeObj save an protobuf message to a document store
func UpdateLargeObj ¶ added in v1.0.498
func UpdateLargeObj[T proto.Message](ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)
UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.
func UpdateObj ¶
func UpdateObj[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error
UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.
func UpdateObjIsNew ¶ added in v1.0.446
func UpdateObjIsNew[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)
UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.
Types ¶
type BackoffFn ¶ added in v1.0.623
BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK
type BackoffProcessOption ¶ added in v1.0.623
type BackoffProcessOption struct {
// contains filtered or unexported fields
}
BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn
func WithBackoffFn ¶ added in v1.0.623
func WithBackoffFn(fn BackoffFn) BackoffProcessOption
WithBackoffFn adds a back-off function to message processing
func (BackoffProcessOption) Set ¶ added in v1.0.623
func (b BackoffProcessOption) Set(opts *ProcessOpts)
Set the backoff function in the process settings
type HandlerOptions ¶ added in v1.1.754
HandlerOptions provides an ability to configure a shar slog handler
type LogPublisher ¶ added in v1.1.754
type LogPublisher interface {
Publish(ctx context.Context, lr *model.LogRequest) error
}
LogPublisher is an interface defining the ability to send a LogRequest to a destination
type MultiHandler ¶ added in v1.1.754
MultiHandler implements slog.Handler and wraps a collection of slog.Handlers that are delegated to
func NewMultiHandler ¶ added in v1.1.754
func NewMultiHandler(handlers []slog.Handler) *MultiHandler
NewMultiHandler creates a new instance of a multi handler
func (*MultiHandler) Enabled ¶ added in v1.1.754
Enabled always returns true as the actual decision as to whether a log record is emitted is delegated to the wrapped handlers in the Handle method
func (*MultiHandler) Handle ¶ added in v1.1.754
Handle will iterate over the slice of wrapped handlers and determine whether that handler is Enabled for the given record log level
type NatsConn ¶
type NatsConn interface { JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error) QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error) Publish(subj string, bytes []byte) error PublishMsg(msg *nats.Msg) error }
NatsConn is the trimmad down NATS Connection interface that only emcompasses the methods used by SHAR
type NatsLogPublisher ¶ added in v1.1.754
type NatsLogPublisher struct {
Conn *nats.Conn
}
NatsLogPublisher is an impl of LogPublisher sending a LogRequest to a destination nats subject
func (*NatsLogPublisher) Publish ¶ added in v1.1.754
func (nlp *NatsLogPublisher) Publish(ctx context.Context, lr *model.LogRequest) error
Publish writes a LogRequest to a Nats subject
type ProcessOption ¶ added in v1.0.623
type ProcessOption interface {
Set(opts *ProcessOpts)
}
ProcessOption represents an option function that can be passed to message processing.
type ProcessOpts ¶ added in v1.0.623
type ProcessOpts struct {
BackoffCalc BackoffFn
}
ProcessOpts holds the settings for message processing.
type SharHandler ¶ added in v1.1.754
type SharHandler struct {
// contains filtered or unexported fields
}
SharHandler is an implementation of a shar specific slog.Handler
func (*SharHandler) Enabled ¶ added in v1.1.754
Enabled determine whether or not a log message is written based on log level
func (*SharHandler) Handle ¶ added in v1.1.754
Handle will accept an slog.Record, transform to a LogRequest and publish it to nats subject
type TrackingID ¶ added in v1.0.215
type TrackingID []string
TrackingID is an ID stack that maintains the callstack
func (TrackingID) Ancestor ¶ added in v1.0.215
func (t TrackingID) Ancestor(gen int) string
Ancestor provides the ID of the caller back <gen> generations.
func (TrackingID) ParentID ¶ added in v1.0.215
func (t TrackingID) ParentID() string
ParentID provides the ID of the caller.
func (TrackingID) Pop ¶ added in v1.0.215
func (t TrackingID) Pop() TrackingID
Pop removes the current ID from the callstack.
func (TrackingID) Push ¶ added in v1.0.215
func (t TrackingID) Push(id string) TrackingID
Push adds a new ID to the callstack.