Documentation ¶
Index ¶
- func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
- func Delete(kv nats.KeyValue, key string) error
- func DeleteLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string) error
- func DropStateParams(state *model.WorkflowState)
- func ElementTable(process *model.Workflow) map[string]*model.Element
- func EnsureBucket(js nats.JetStreamContext, storageType nats.StorageType, name string, ...) error
- func EnsureBuckets(js nats.JetStreamContext, storageType nats.StorageType, names []string) error
- func ExtendLock(kv nats.KeyValue, lockID string) error
- func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)
- func KSuidTo128bit(k string) [16]byte
- func KSuidTo64bit(k string) [8]byte
- func Load(ctx context.Context, wf nats.KeyValue, k string) ([]byte, error)
- func LoadLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, ...) ([]byte, error)
- func LoadLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, ...) error
- func LoadObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error
- func Lock(kv nats.KeyValue, lockID string) (bool, error)
- func Log(ctx context.Context, js nats.JetStream, trackingID string, ...) error
- func Process(ctx context.Context, js nats.JetStreamContext, traceName string, ...) error
- func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T
- func Save(ctx context.Context, wf nats.KeyValue, k string, v []byte) error
- func SaveLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, ...) error
- func SaveLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, ...) error
- func SaveObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error
- func UnLock(kv nats.KeyValue, lockID string) error
- func UpdateLargeObj[T proto.Message](ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, msg T, ...) (T, error)
- func UpdateObj[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, ...) error
- func UpdateObjIsNew[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, ...) (bool, error)
- type NatsConn
- type TrackingID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CopyWorkflowState ¶ added in v1.0.446
func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
CopyWorkflowState - clones a proto model.WorkflowState for modification.
func DeleteLarge ¶ added in v1.0.498
DeleteLarge deletes a large binary from the object store
func DropStateParams ¶ added in v1.0.451
func DropStateParams(state *model.WorkflowState)
DropStateParams removes any parameters unsafe to send across a state transition.
func ElementTable ¶ added in v0.1.134
ElementTable indexes an entire process for quick ID lookups
func EnsureBucket ¶ added in v1.0.477
func EnsureBucket(js nats.JetStreamContext, storageType nats.StorageType, name string, ttl time.Duration) error
EnsureBucket creates a bucket if it does not exist
func EnsureBuckets ¶
EnsureBuckets ensures that a list of key value stores exist
func ExtendLock ¶ added in v1.0.477
ExtendLock extends the lock past its stale time.
func IndexProcessElements ¶ added in v0.1.134
IndexProcessElements is the recursive part of the index
func KSuidTo128bit ¶ added in v0.1.78
KSuidTo128bit returns a KSuid as bytes.
func KSuidTo64bit ¶ added in v0.1.78
KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.
func LoadLarge ¶ added in v1.0.498
func LoadLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, opt ...nats.GetObjectOpt) ([]byte, error)
LoadLarge load a large binary from the object store
func LoadLargeObj ¶ added in v1.0.498
func LoadLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.GetObjectOpt) error
LoadLargeObj loads a protobuf message from a key value store
func Lock ¶ added in v1.0.477
Lock ensures a lock on a given ID, it returns true if a lock was granted.
func Log ¶ added in v1.0.271
func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error
Log is the generic metod to output to SHAR telemetry.
func Process ¶ added in v0.1.78
func Process(ctx context.Context, js nats.JetStreamContext, traceName string, closer chan struct{}, subject string, durable string, concurrency int, fn func(ctx context.Context, log *slog.Logger, msg *nats.Msg) (bool, error)) error
Process processes messages from a nats consumer and executes a function against each one.
func RemoveWhere ¶ added in v1.0.484
func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T
RemoveWhere removes an array element based upon a condition.
func SaveLarge ¶ added in v1.0.498
func SaveLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, data []byte, opt ...nats.ObjectOpt) error
SaveLarge saves a large binary from the object store
func SaveLargeObj ¶ added in v1.0.498
func SaveLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error
SaveLargeObj save an protobuf message to a document store
func UpdateLargeObj ¶ added in v1.0.498
func UpdateLargeObj[T proto.Message](ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)
UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.
func UpdateObj ¶
func UpdateObj[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error
UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.
func UpdateObjIsNew ¶ added in v1.0.446
func UpdateObjIsNew[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)
UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.
Types ¶
type NatsConn ¶
type NatsConn interface { JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error) QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error) }
NatsConn is the trimmad down NATS Connection interface that only emcompasses the methods used by SHAR
type TrackingID ¶ added in v1.0.215
type TrackingID []string
TrackingID is an ID stack that maintains the callstack
func (TrackingID) Ancestor ¶ added in v1.0.215
func (t TrackingID) Ancestor(gen int) string
Ancestor provides the ID of the caller back <gen> generations.
func (TrackingID) ParentID ¶ added in v1.0.215
func (t TrackingID) ParentID() string
ParentID provides the ID of the caller.
func (TrackingID) Pop ¶ added in v1.0.215
func (t TrackingID) Pop() TrackingID
Pop removes the current ID from the callstack.
func (TrackingID) Push ¶ added in v1.0.215
func (t TrackingID) Push(id string) TrackingID
Push adds a new ID to the callstack.