Documentation ¶
Index ¶
Constants ¶
const ( NatDefaultMode = iota NatJetMode )
Variables ¶
This section is empty.
Functions ¶
func MustNewConsumerManager ¶
func MustNewConsumerManager(cfg *NatsConfig, cq []*ConsumerQueue, mode uint) queue.MessageQueue
MustNewConsumerManager creates a new ConsumerManager instance. It connects to NATS server, registers the provided consumer queues, and returns the ConsumerManager. If any error occurs during the process, it logs the error and continues.
Types ¶
type ConsumeHandle ¶
type ConsumeHandler ¶
ConsumeHandler Consumer interface, used to define the methods required by the consumer
type ConsumerManager ¶
type ConsumerManager struct {
// contains filtered or unexported fields
}
ConsumerManager Consumer manager for managing multiple consumer queues
func (*ConsumerManager) Start ¶
func (cm *ConsumerManager) Start()
Start starts consuming messages from all the registered consumer queues. It launches a goroutine for each consumer queue to subscribe and process messages. The method blocks until the doneChan is closed.
func (*ConsumerManager) Stop ¶
func (cm *ConsumerManager) Stop()
Stop closes the NATS connection and stops the ConsumerManager.
type ConsumerQueue ¶
type ConsumerQueue struct { StreamName string // stream name QueueName string // queue name Subjects []string // Subscribe subject Consumer ConsumeHandler // consumer object JetOption []jetstream.PullConsumeOpt // Jetstream configuration }
ConsumerQueue Consumer queue, used to maintain the relationship between a consumer queue
type DefaultProducer ¶
type DefaultProducer struct {
// contains filtered or unexported fields
}
func NewDefaultProducer ¶
func NewDefaultProducer(c *NatsConfig) (*DefaultProducer, error)
NewDefaultProducer creates a new default NATS producer. It takes a NatsConfig as input and returns a pointer to a DefaultProducer and an error. It connects to the NATS server using the provided configuration.
func (*DefaultProducer) Close ¶
func (p *DefaultProducer) Close()
Close closes the NATS connection of the default producer.
type JetProducer ¶
type JetProducer struct {
// contains filtered or unexported fields
}
func NewJetProducer ¶
func NewJetProducer(c *NatsConfig) (*JetProducer, error)
NewJetProducer creates a new JetStream producer. It takes a NatsConfig as input and returns a pointer to a JetProducer and an error. It connects to the NATS server using the provided configuration and creates a new JetStream context.
func (*JetProducer) Close ¶
func (j *JetProducer) Close()
Close closes the NATS connection of the JetStream producer.
func (*JetProducer) CreateOrUpdateStream ¶
func (j *JetProducer) CreateOrUpdateStream(config jetstream.StreamConfig) error
CreateOrUpdateStream creates or updates a JetStream stream with the specified configuration. It takes a jetstream.StreamConfig as input and returns an error if the operation fails.