common

package
v1.1.799 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2024 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextLoggerWithWfState added in v1.1.754

func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)

ContextLoggerWithWfState will populate a context with relevant fields from a WorkflowState model

func CopyWorkflowState added in v1.0.446

func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState

CopyWorkflowState - clones a proto model.WorkflowState for modification.

func Delete added in v0.1.192

func Delete(kv nats.KeyValue, key string) error

Delete deletes an item from a key value store.

func DeleteLarge added in v1.0.498

func DeleteLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string) error

DeleteLarge deletes a large binary from the object store

func DropStateParams added in v1.0.451

func DropStateParams(state *model.WorkflowState)

DropStateParams removes any parameters unsafe to send across a state transition.

func ElementTable added in v0.1.134

func ElementTable(process *model.Workflow) map[string]*model.Element

ElementTable indexes an entire process for quick ID lookups

func EnsureBucket added in v1.0.477

func EnsureBucket(js nats.JetStreamContext, storageType nats.StorageType, name string, ttl time.Duration) error

EnsureBucket creates a bucket if it does not exist

func EnsureBuckets

func EnsureBuckets(js nats.JetStreamContext, storageType nats.StorageType, names []string) error

EnsureBuckets ensures that a list of key value stores exist

func ExtendLock added in v1.0.477

func ExtendLock(kv nats.KeyValue, lockID string) error

ExtendLock extends the lock past its stale time.

func IndexProcessElements added in v0.1.134

func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)

IndexProcessElements is the recursive part of the index

func KSuidTo128bit added in v0.1.78

func KSuidTo128bit(k string) [16]byte

KSuidTo128bit returns a KSuid as bytes.

func KSuidTo64bit added in v0.1.78

func KSuidTo64bit(k string) [8]byte

KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.

func Load

func Load(ctx context.Context, wf nats.KeyValue, k string) ([]byte, error)

Load loads a value from a key value store

func LoadLarge added in v1.0.498

func LoadLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, opt ...nats.GetObjectOpt) ([]byte, error)

LoadLarge load a large binary from the object store

func LoadLargeObj added in v1.0.498

func LoadLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.GetObjectOpt) error

LoadLargeObj loads a protobuf message from a key value store

func LoadObj

func LoadObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error

LoadObj loads a protobuf message from a key value store

func Lock added in v1.0.477

func Lock(kv nats.KeyValue, lockID string) (bool, error)

Lock ensures a lock on a given ID, it returns true if a lock was granted.

func Log added in v1.0.271

func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error

Log is the generic metod to output to SHAR telemetry.

func NewOtelHandler added in v1.1.754

func NewOtelHandler() (slog.Handler, func() error)

NewOtelHandler constructs and initialises an otel handler for log exports

func NewSharHandler added in v1.1.754

func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler

NewSharHandler will return a new instance of a SharHandler

func NewTextHandler added in v1.1.754

func NewTextHandler(level slog.Level, addSource bool) slog.Handler

NewTextHandler initialises a text handler writing to stdout for slog

func Process added in v0.1.78

func Process(ctx context.Context, js nats.JetStreamContext, streamName string, traceName string, closer chan struct{}, subject string, durable string, concurrency int, fn func(ctx context.Context, log *slog.Logger, msg *nats.Msg) (bool, error), opts ...ProcessOption) error

Process processes messages from a nats consumer and executes a function against each one.

func PublishObj added in v1.1.754

func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, middlewareFn func(*nats.Msg) error) error

PublishObj publishes a proto message to a subject.

func PublishOnce added in v1.1.732

func PublishOnce(js nats.JetStreamContext, lockingKV nats.KeyValue, streamName string, consumerName string, msg *nats.Msg) error

PublishOnce sets up a single message to be used as a timer.

func RemoveWhere added in v1.0.484

func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T

RemoveWhere removes an array element based upon a condition.

func Save

func Save(ctx context.Context, wf nats.KeyValue, k string, v []byte) error

Save saves a value to a key value store

func SaveLarge added in v1.0.498

func SaveLarge(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, key string, data []byte, opt ...nats.ObjectOpt) error

SaveLarge saves a large binary from the object store

func SaveLargeObj added in v1.0.498

func SaveLargeObj(ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error

SaveLargeObj save an protobuf message to a document store

func SaveObj

func SaveObj(ctx context.Context, wf nats.KeyValue, k string, v proto.Message) error

SaveObj save an protobuf message to a key value store

func UnLock added in v1.0.477

func UnLock(kv nats.KeyValue, lockID string) error

UnLock closes an existing lock.

func UpdateLargeObj added in v1.0.498

func UpdateLargeObj[T proto.Message](ctx context.Context, ds nats.ObjectStore, mutex nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)

UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.

func UpdateObj

func UpdateObj[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error

UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.

func UpdateObjIsNew added in v1.0.446

func UpdateObjIsNew[T proto.Message](ctx context.Context, wf nats.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)

UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.

Types

type BackoffFn added in v1.0.623

type BackoffFn func(ctx context.Context, msg *nats.Msg) error

BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK

type BackoffProcessOption added in v1.0.623

type BackoffProcessOption struct {
	// contains filtered or unexported fields
}

BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn

func WithBackoffFn added in v1.0.623

func WithBackoffFn(fn BackoffFn) BackoffProcessOption

WithBackoffFn adds a back-off function to message processing

func (BackoffProcessOption) Set added in v1.0.623

func (b BackoffProcessOption) Set(opts *ProcessOpts)

Set the backoff function in the process settings

type HandlerOptions added in v1.1.754

type HandlerOptions struct {
	Level slog.Leveler
}

HandlerOptions provides an ability to configure a shar slog handler

type LogPublisher added in v1.1.754

type LogPublisher interface {
	Publish(ctx context.Context, lr *model.LogRequest) error
}

LogPublisher is an interface defining the ability to send a LogRequest to a destination

type MultiHandler added in v1.1.754

type MultiHandler struct {
	Handlers []slog.Handler
}

MultiHandler implements slog.Handler and wraps a collection of slog.Handlers that are delegated to

func NewMultiHandler added in v1.1.754

func NewMultiHandler(handlers []slog.Handler) *MultiHandler

NewMultiHandler creates a new instance of a multi handler

func (*MultiHandler) Enabled added in v1.1.754

func (mh *MultiHandler) Enabled(_ context.Context, _ slog.Level) bool

Enabled always returns true as the actual decision as to whether a log record is emitted is delegated to the wrapped handlers in the Handle method

func (*MultiHandler) Handle added in v1.1.754

func (mh *MultiHandler) Handle(ctx context.Context, r slog.Record) error

Handle will iterate over the slice of wrapped handlers and determine whether that handler is Enabled for the given record log level

func (*MultiHandler) WithAttrs added in v1.1.754

func (mh *MultiHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs adds the supplied attrs slice to the delegated handler attrs and returns new instances of them

func (*MultiHandler) WithGroup added in v1.1.754

func (mh *MultiHandler) WithGroup(name string) slog.Handler

WithGroup adds the supplied group name to the delegated handler name and returns new instances of them

type NatsConn

type NatsConn interface {
	JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
	QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error)
	Publish(subj string, bytes []byte) error
	PublishMsg(msg *nats.Msg) error
}

NatsConn is the trimmad down NATS Connection interface that only emcompasses the methods used by SHAR

type NatsLogPublisher added in v1.1.754

type NatsLogPublisher struct {
	Conn *nats.Conn
}

NatsLogPublisher is an impl of LogPublisher sending a LogRequest to a destination nats subject

func (*NatsLogPublisher) Publish added in v1.1.754

func (nlp *NatsLogPublisher) Publish(ctx context.Context, lr *model.LogRequest) error

Publish writes a LogRequest to a Nats subject

type ProcessOption added in v1.0.623

type ProcessOption interface {
	Set(opts *ProcessOpts)
}

ProcessOption represents an option function that can be passed to message processing.

type ProcessOpts added in v1.0.623

type ProcessOpts struct {
	BackoffCalc BackoffFn
}

ProcessOpts holds the settings for message processing.

type SharHandler added in v1.1.754

type SharHandler struct {
	// contains filtered or unexported fields
}

SharHandler is an implementation of a shar specific slog.Handler

func (*SharHandler) Enabled added in v1.1.754

func (sh *SharHandler) Enabled(_ context.Context, level slog.Level) bool

Enabled determine whether or not a log message is written based on log level

func (*SharHandler) Handle added in v1.1.754

func (sh *SharHandler) Handle(ctx context.Context, r slog.Record) error

Handle will accept an slog.Record, transform to a LogRequest and publish it to nats subject

func (*SharHandler) WithAttrs added in v1.1.754

func (sh *SharHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs will append the given attrs slice to any existing attrs stored in the handler and return a new handler instance

func (*SharHandler) WithGroup added in v1.1.754

func (sh *SharHandler) WithGroup(name string) slog.Handler

WithGroup will append the given groupt name to any existing group names stored in the handler and return a new handler instance

type TrackingID added in v1.0.215

type TrackingID []string

TrackingID is an ID stack that maintains the callstack

func (TrackingID) Ancestor added in v1.0.215

func (t TrackingID) Ancestor(gen int) string

Ancestor provides the ID of the caller back <gen> generations.

func (TrackingID) ID added in v1.0.215

func (t TrackingID) ID() string

ID provides the current ID

func (TrackingID) ParentID added in v1.0.215

func (t TrackingID) ParentID() string

ParentID provides the ID of the caller.

func (TrackingID) Pop added in v1.0.215

func (t TrackingID) Pop() TrackingID

Pop removes the current ID from the callstack.

func (TrackingID) Push added in v1.0.215

func (t TrackingID) Push(id string) TrackingID

Push adds a new ID to the callstack.

Directories

Path Synopsis
Package base62 implements base62 encoding.
Package base62 implements base62 encoding.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL