natsq

package
v0.0.0-...-5d40105 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NatDefaultMode = iota
	NatJetMode
)

Variables

This section is empty.

Functions

func MustNewConsumerManager

func MustNewConsumerManager(cfg *NatsConfig, cq []*ConsumerQueue, mode uint) queue.MessageQueue

MustNewConsumerManager creates a new ConsumerManager instance. It connects to NATS server, registers the provided consumer queues, and returns the ConsumerManager. If any error occurs during the process, it logs the error and continues.

Types

type ConsumeHandle

type ConsumeHandle func(m *Msg) error

type ConsumeHandler

type ConsumeHandler interface {
	HandleMessage(m *Msg) error
}

ConsumeHandler Consumer interface, used to define the methods required by the consumer

type ConsumerManager

type ConsumerManager struct {
	// contains filtered or unexported fields
}

ConsumerManager Consumer manager for managing multiple consumer queues

func (*ConsumerManager) Start

func (cm *ConsumerManager) Start()

Start starts consuming messages from all the registered consumer queues. It launches a goroutine for each consumer queue to subscribe and process messages. The method blocks until the doneChan is closed.

func (*ConsumerManager) Stop

func (cm *ConsumerManager) Stop()

Stop closes the NATS connection and stops the ConsumerManager.

type ConsumerQueue

type ConsumerQueue struct {
	StreamName string                     // stream name
	QueueName  string                     // queue name
	Subjects   []string                   // Subscribe subject
	Consumer   ConsumeHandler             // consumer object
	JetOption  []jetstream.PullConsumeOpt // Jetstream configuration
}

ConsumerQueue Consumer queue, used to maintain the relationship between a consumer queue

type DefaultProducer

type DefaultProducer struct {
	// contains filtered or unexported fields
}

func NewDefaultProducer

func NewDefaultProducer(c *NatsConfig) (*DefaultProducer, error)

NewDefaultProducer creates a new default NATS producer. It takes a NatsConfig as input and returns a pointer to a DefaultProducer and an error. It connects to the NATS server using the provided configuration.

func (*DefaultProducer) Close

func (p *DefaultProducer) Close()

Close closes the NATS connection of the default producer.

func (*DefaultProducer) Publish

func (p *DefaultProducer) Publish(subject string, data []byte) error

Publish publishes a message with the specified subject and data using the default NATS producer. It takes a subject string and data byte slice as input and returns an error if the publish fails.

type JetProducer

type JetProducer struct {
	// contains filtered or unexported fields
}

func NewJetProducer

func NewJetProducer(c *NatsConfig) (*JetProducer, error)

NewJetProducer creates a new JetStream producer. It takes a NatsConfig as input and returns a pointer to a JetProducer and an error. It connects to the NATS server using the provided configuration and creates a new JetStream context.

func (*JetProducer) Close

func (j *JetProducer) Close()

Close closes the NATS connection of the JetStream producer.

func (*JetProducer) CreateOrUpdateStream

func (j *JetProducer) CreateOrUpdateStream(config jetstream.StreamConfig) error

CreateOrUpdateStream creates or updates a JetStream stream with the specified configuration. It takes a jetstream.StreamConfig as input and returns an error if the operation fails.

func (*JetProducer) Publish

func (j *JetProducer) Publish(subject string, data []byte) error

Publish publishes a message with the specified subject and data using the JetStream producer. It takes a subject string and data byte slice as input and returns an error if the publish fails.

type Msg

type Msg struct {
	Subject string
	Data    []byte
}

type NatsConfig

type NatsConfig struct {
	ServerUri  string
	ClientName string
	Options    []nats.Option
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL