events

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxRestartRetries         = 5
	InitialRestartBackoffTime = 150 * time.Millisecond
)
View Source
const DefaultDefaultAsyncPubAckInflight = 256

Default `defaultAsyncPubAckInflight` is `4000` (`nats.go`)

View Source
const DescriptionPrefix = "FiveNet: "

Variables

View Source
var Module = fx.Module("events",
	fx.Provide(
		New,
	),
	fx.Decorate(wrapLogger),
)

Functions

func GetJetstreamMsgContext

func GetJetstreamMsgContext(msg jetstream.Msg) (spanContext trace.SpanContext, err error)

func GetNatsMsgContext

func GetNatsMsgContext(msg *nats.Msg) (spanContext trace.SpanContext, err error)

func SanitizeKey added in v0.9.2

func SanitizeKey(in string) string

Types

type ConsumeErrRestartFn

type ConsumeErrRestartFn func(ctxTimeout context.Context, ctxCancel context.Context) error

type JSWrapper

type JSWrapper struct {
	jetstream.JetStream
	// contains filtered or unexported fields
}

Ensures certain NATS config options are applied

func NewJSWrapper

func NewJSWrapper(js jetstream.JetStream, cfg config.NATS, shutdowner fx.Shutdowner) *JSWrapper

func (*JSWrapper) ConsumeErrHandler

func (j *JSWrapper) ConsumeErrHandler(logger *zap.Logger) jetstream.PullConsumeOpt

func (*JSWrapper) ConsumeErrHandlerWithRestart

func (j *JSWrapper) ConsumeErrHandlerWithRestart(ctxCancel context.Context, logger *zap.Logger, restartFn ConsumeErrRestartFn) jetstream.PullConsumeOpt

func (*JSWrapper) CreateOrUpdateKeyValue

func (j *JSWrapper) CreateOrUpdateKeyValue(ctx context.Context, cfg jetstream.KeyValueConfig) (jetstream.KeyValue, error)

func (*JSWrapper) CreateOrUpdateStream

func (j *JSWrapper) CreateOrUpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)

func (*JSWrapper) Publish

func (j *JSWrapper) Publish(ctx context.Context, subject string, payload []byte, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error)

func (*JSWrapper) PublishAsync

func (j *JSWrapper) PublishAsync(ctx context.Context, subject string, payload []byte, opts ...jetstream.PublishOpt) (jetstream.PubAckFuture, error)

func (*JSWrapper) PublishAsyncProto

func (j *JSWrapper) PublishAsyncProto(ctx context.Context, subject string, msg proto.Message, opts ...jetstream.PublishOpt) (jetstream.PubAckFuture, error)

func (*JSWrapper) PublishMsg

func (j *JSWrapper) PublishMsg(ctx context.Context, msg *nats.Msg, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error)

func (*JSWrapper) PublishMsgAsync

func (j *JSWrapper) PublishMsgAsync(ctx context.Context, msg *nats.Msg, opts ...jetstream.PublishOpt) (jetstream.PubAckFuture, error)

func (*JSWrapper) PublishProto

func (j *JSWrapper) PublishProto(ctx context.Context, subject string, msg proto.Message, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error)

type Params

type Params struct {
	fx.In

	LC         fx.Lifecycle
	Shutdowner fx.Shutdowner

	Logger *zap.Logger
	Config *config.Config
}

type Result

type Result struct {
	fx.Out

	JS *JSWrapper
}

func New

func New(p Params) (res Result, err error)

type Subject

type Subject string

type Topic

type Topic string

type Type

type Type string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL