natsutils

package
v1.1.45 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ToPayload

func ToPayload(obj any) []byte

ToPayload converts an object to a json byte array if his type is not string or []byte

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Fetch

func (consumer *Consumer) Fetch(batch int, maxWait time.Duration) (<-chan jetstream.Msg, error)

Fetch returns a channel of messages immediately, the channel will be closed in maxWait or batch messages received

type ConsumerConfig

type ConsumerConfig struct {
	StreamName        string        // stream from which to consume
	ConsumerName      string        // consumer name
	Description       string        // consumer description
	FilterSubjects    []string      // subjects received by the consumer
	Durable           bool          // is a durable consumer
	InactiveThreshold time.Duration // used if durable is true only
}

type HookConfig

type HookConfig struct {
	NatsUtils        *NatsUtils
	Logger           *logrus.Logger
	AdditionalFields logrus.Fields
	Service          string
	Customer         string
}

type JetstreamConsumer

type JetstreamConsumer struct {
	// contains filtered or unexported fields
}

func NewJetstreamConsumer

func NewJetstreamConsumer(ctx context.Context, nc *nats.Conn, config JetstreamConsumerConfig) (*JetstreamConsumer, error)

func (*JetstreamConsumer) Fetch

func (jetstreamConsumer *JetstreamConsumer) Fetch(batch int, maxWait time.Duration) (jetstream.MessageBatch, error)

Fetch returns a channel of messages

type JetstreamConsumerConfig

type JetstreamConsumerConfig struct {
	StreamName     string   // stream from which to consume
	ConsumerName   string   // consumer name
	Description    string   // consumer description
	Durable        bool     // is a durable consumer
	FilterSubjects []string // subjects received by the consumer
}

type NATSHook

type NATSHook struct {
	// contains filtered or unexported fields
}

func NewNatsHook

func NewNatsHook(config HookConfig) (*NATSHook, error)

func (*NATSHook) Fire

func (hook *NATSHook) Fire(entry *logrus.Entry) error

func (*NATSHook) Levels

func (hook *NATSHook) Levels() []logrus.Level

type NatsUtils

type NatsUtils struct {
	NatsConfig *cli.NatsConfig
	// contains filtered or unexported fields
}

NatsUtils provices utilities for nats/jetstream operations

func NewFromConfig

func NewFromConfig(config *cli.NatsConfig) (*NatsUtils, error)

NewFromConfig createx a NatsUtils for a cli.NatsConfig and connects to nats

func NewFromConnection

func NewFromConnection(nc *nats.Conn) (*NatsUtils, error)

NewFromConnection creates a NatsUtils from an existing nats connection

func (*NatsUtils) Close

func (natsUtils *NatsUtils) Close()

Close closes the nats connection if established

func (*NatsUtils) CreateConsumer

func (natsUtils *NatsUtils) CreateConsumer(ctx context.Context, config ConsumerConfig) (*Consumer, error)

func (*NatsUtils) CreateKeyValue

func (natsUtils *NatsUtils) CreateKeyValue(ctx context.Context, config jetstream.KeyValueConfig) (jetstream.KeyValue, error)

CreateKeyValue creates a key value store

func (*NatsUtils) Jetstream

func (natsUtils *NatsUtils) Jetstream() jetstream.JetStream

Jetstream returns the jetstream context

func (*NatsUtils) Nats

func (natsUtils *NatsUtils) Nats() *nats.Conn

Nats returns the nats connection

func (*NatsUtils) PublishOnStream

func (natsUtils *NatsUtils) PublishOnStream(ctx context.Context, stream, subject string, data []byte) (*jetstream.PubAck, error)

PublishOnStream publishes a message on a stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL