natsclient

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2024 License: GPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KiB = 8 * 1024
	MiB = KiB * 1024
	GiB = MiB * 1024
)
View Source
const (
	StreamDefaultMaxAge = time.Hour * 48
)

Variables

View Source
var (
	ConsumerMetrics = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "nats",
			Subsystem: "consumer",
			Name:      "duration_consume",
			Help:      "Consume duration",
			Buckets:   []float64{.005, .01, .025, .05, .075, .1, .15, .2, .25, .5, 1, 2.5, 5, 10, 15, 30},
		},
		[]string{"subject", "action", "error"},
	)

	PublisherMetrics = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "nats",
			Subsystem: "publisher",
			Name:      "duration_publish",
			Help:      "Publish duration",
			Buckets:   []float64{.005, .01, .025, .05, .075, .1, .15, .2, .25, .5, 1, 2.5, 5, 10, 15, 30},
		},
		[]string{"subject", "error"},
	)
)
View Source
var (
	ErrSubjectRequired = errors.New("subject is required")
	ErrInvalidChars    = errors.New("subject contains spaces")
)
View Source
var ErrGroupRequired = errors.New("group is required")

Functions

func CollectConsumerMetric

func CollectConsumerMetric(subject, action string, err error, duration float64)

func CollectPublisherMetrics

func CollectPublisherMetrics(subject string, err error, duration float64)

Types

type Consumer

type Consumer[T any] struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer[T any](ctx context.Context, conn *nats.Conn, group, subject string, h events.Handler[T], opts ...ConsumerOpt) (*Consumer[T], error)

NewConsumer creates nats QueueSubscribe with custom handler Group must be the name of service or package: core, feed, etc. It's allow handle messages in few consumers.

func (*Consumer[T]) Close

func (c *Consumer[T]) Close() error

type ConsumerOpt

type ConsumerOpt func(*nats.ConsumerConfig)

func WithAckPolicy

func WithAckPolicy(policy nats.AckPolicy) ConsumerOpt

func WithAckWait

func WithAckWait(wait time.Duration) ConsumerOpt

func WithDeliverPolicy

func WithDeliverPolicy(policy nats.DeliverPolicy) ConsumerOpt

func WithMaxAckPending

func WithMaxAckPending(count int) ConsumerOpt

func WithMaxDeliver added in v0.2.4

func WithMaxDeliver(count int) ConsumerOpt

func WithRateLimit

func WithRateLimit(limit uint64) ConsumerOpt

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(conn *nats.Conn, subject string) (*Producer, error)

NewProducer ... fixme: add description

func (*Producer) PublishData

func (p *Producer) PublishData(ctx context.Context, data []byte) error

func (*Producer) PublishJSON

func (p *Producer) PublishJSON(ctx context.Context, v any) error

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(nc *nats.Conn) (*Publisher, error)

func (*Publisher) PublishJSON

func (p *Publisher) PublishJSON(ctx context.Context, subject string, obj any) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL