common

package
v1.1.680 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2023 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CopyWorkflowState added in v1.0.446

func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState

CopyWorkflowState - clones a proto model.WorkflowState for modification.

func Delete added in v0.1.192

func Delete(kv nats.KeyValue, key string) error

Delete deletes an item from a key value store.

func DeleteLarge added in v1.0.498

func DeleteLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string) error

DeleteLarge deletes a large binary from the object store

func DropStateParams added in v1.0.451

func DropStateParams(state *model.WorkflowState)

DropStateParams removes any parameters unsafe to send across a state transition.

func ElementTable added in v0.1.134

func ElementTable(process *model.Workflow) map[string]*model.Element

ElementTable indexes an entire process for quick ID lookups

func EnsureBucket added in v1.0.477

func EnsureBucket(js nats.JetStreamContext, storageType nats.StorageType, name string, ttl time.Duration) error

EnsureBucket creates a bucket if it does not exist

func EnsureBuckets

func EnsureBuckets(js nats.JetStreamContext, storageType nats.StorageType, names []string) error

EnsureBuckets ensures that a list of key value stores exist

func ExtendLock added in v1.0.477

func ExtendLock(kv nats.KeyValue, lockID string) error

ExtendLock extends the lock past its stale time.

func IndexProcessElements added in v0.1.134

func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)

IndexProcessElements is the recursive part of the index

func KSuidTo128bit added in v0.1.78

func KSuidTo128bit(k string) [16]byte

KSuidTo128bit returns a KSuid as bytes.

func KSuidTo64bit added in v0.1.78

func KSuidTo64bit(k string) [8]byte

KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.

func Load

func Load(ctx context.Context, wf nats.KeyValue, k string) ([]byte, error)

Load loads a value from a key value store

func LoadLarge added in v1.0.498

func LoadLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, opt ...nats.GetObjectOpt) ([]byte, error)

LoadLarge load a large binary from the object store

func LoadLargeObj added in v1.0.498

func LoadLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.GetObjectOpt) error

LoadLargeObj loads a protobuf message from a key value store

func LoadObj

func LoadObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error

LoadObj loads a protobuf message from a key value store

func Lock added in v1.0.477

func Lock(kv nats.KeyValue, lockID string) (bool, error)

Lock ensures a lock on a given ID, it returns true if a lock was granted.

func Log added in v1.0.271

func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error

Log is the generic metod to output to SHAR telemetry.

func Process added in v0.1.78

func Process(ctx context.Context, js nats.JetStreamContext, traceName string, closer chan struct{}, subject string, durable string, concurrency int, fn func(ctx context.Context, log *slog.Logger, msg *nats.Msg) (bool, error), opts ...ProcessOption) error

Process processes messages from a nats consumer and executes a function against each one.

func RemoveWhere added in v1.0.484

func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T

RemoveWhere removes an array element based upon a condition.

func Save

func Save(ctx context.Context, wf nats.KeyValue, k string, v []byte) error

Save saves a value to a key value store

func SaveLarge added in v1.0.498

func SaveLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, data []byte, opt ...nats.ObjectOpt) error

SaveLarge saves a large binary from the object store

func SaveLargeObj added in v1.0.498

func SaveLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error

SaveLargeObj save an protobuf message to a document store

func SaveObj

func SaveObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error

SaveObj save an protobuf message to a key value store

func UnLock added in v1.0.477

func UnLock(kv nats.KeyValue, lockID string) error

UnLock closes an existing lock.

func UpdateLargeObj added in v1.0.498

func UpdateLargeObj[T proto.Message](ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)

UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.

func UpdateObj

func UpdateObj[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error

UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.

func UpdateObjIsNew added in v1.0.446

func UpdateObjIsNew[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)

UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.

Types

type BackoffFn added in v1.0.623

type BackoffFn func(ctx context.Context, msg *nats.Msg) error

BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK

type BackoffProcessOption added in v1.0.623

type BackoffProcessOption struct {
	// contains filtered or unexported fields
}

BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn

func WithBackoffFn added in v1.0.623

func WithBackoffFn(fn BackoffFn) BackoffProcessOption

WithBackoffFn adds a back-off function to message processing

func (BackoffProcessOption) Set added in v1.0.623

func (b BackoffProcessOption) Set(opts *ProcessOpts)

Set the backoff function in the process settings

type NatsConn

type NatsConn interface {
	JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
	QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error)
	Publish(subj string, bytes []byte) error
}

NatsConn is the trimmad down NATS Connection interface that only emcompasses the methods used by SHAR

type ProcessOption added in v1.0.623

type ProcessOption interface {
	Set(opts *ProcessOpts)
}

ProcessOption represents an option function that can be passed to message processing.

type ProcessOpts added in v1.0.623

type ProcessOpts struct {
	BackoffCalc BackoffFn
}

ProcessOpts holds the settings for message processing.

type TrackingID added in v1.0.215

type TrackingID []string

TrackingID is an ID stack that maintains the callstack

func (TrackingID) Ancestor added in v1.0.215

func (t TrackingID) Ancestor(gen int) string

Ancestor provides the ID of the caller back <gen> generations.

func (TrackingID) ID added in v1.0.215

func (t TrackingID) ID() string

ID provides the current ID

func (TrackingID) ParentID added in v1.0.215

func (t TrackingID) ParentID() string

ParentID provides the ID of the caller.

func (TrackingID) Pop added in v1.0.215

func (t TrackingID) Pop() TrackingID

Pop removes the current ID from the callstack.

func (TrackingID) Push added in v1.0.215

func (t TrackingID) Push(id string) TrackingID

Push adds a new ID to the callstack.

Directories

Path Synopsis
Package base62 implements base62 encoding.
Package base62 implements base62 encoding.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL