rabbitmq

package
v2.0.30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2024 License: MPL-2.0 Imports: 14 Imported by: 0

README

RabbitMQ配置

Capability

  • rabbitmq.Capability

配置

[sdk.rabbitmq]
    host = "127.0.0.1"          # 主机,必填
    username = ""               # RabbitMQ连接用户名, 不填默认为guest
    password = ""               # RabbitMQ连接密码,不填默认为guest
    port = 5672                 # RabbitMQ端口,不填默认为5672
    vhost = "/"                 # RabbitMQ VHOST,不填默认为"/"
    requeue_in_failure=true     # 消息消费失败是否重新加入队列, 不填默认为true
    channel_pool_size=10        # channel池大小,不填默认为10
    prefetch_count=2            # 预取数,不填默认为2

使用指南

发送端

    pub, err := hdsdk.Mq().NewPublisher()
    if err != nil {
        return err
    }
    
	// 发送即时消息, 不创建exchange
    err = pub.Publish(context.Background(), "test", [][]byte("test1", "test2"))
    if err != nil {
        return err
    }

    // 发送即时消息, 创建exchange:order, topic为:close
    err = pub.Publish(context.Background(), "order:close", [][]byte("test1", "test2"))
    if err != nil {
        return err
    }

    // 发送10秒的延时消息, 不创建exchange
    err = pub.Publish(context.Background(), "test@delay", [][]byte("test1", "test2"), 10)
    if err != nil {
        return err
    }

    // 发送10秒的延时消息, 创建exchange:order, topic为:cancel
    err = pub.Publish(context.Background(), "order:cancel@delay", [][]byte("test1", "test2"), 10)
    if err != nil {
        return err
    }

接收端

    sub, err := hdsdk.Mq().NewSubscriber()
    if err != nil {
        return err
    }
    
	// 订阅即时消息
	var messages <-chan *mq.Message
	go func() {
        messages, err = sub.Subscribe(context.Background(), "test")
        if err != nil {
            return err
        }
	}
    
    for {
      case msg := <-messages:
            // 处理消息
    }
    // 订阅延时消息
    var messages <-chan *mq.Message
	go func() {
        messages, err = sub.Subscribe(context.Background(), "test@delay")
        if err != nil {
            return err
        }
	}
    
    for {
      case msg := <-messages:
            // 处理消息
    }

Documentation

Index

Constants

This section is empty.

Variables

Functions

func New

func New(configProvider intf.ConfigProvider, logger intf.LoggerProvider) (intf.MessageQueueProvider, error)

Types

type ConnectionConfig

type ConnectionConfig struct {
	Host     string // required, RabbitMQ host
	Port     int    // required, RabbitMQ port
	Username string // required, RabbitMQ username
	Password string // required, RabbitMQ password
	Vhost    string // required,

}

type ExchangeKind

type ExchangeKind string
const (
	ExchangeKindFanout ExchangeKind = "fanout"
)

type RabbitMqConfig

type RabbitMqConfig struct {
	Host     string `mapstructure:"host"`
	Port     int    `mapstructure:"port"`
	Username string `mapstructure:"username"`
	Password string `mapstructure:"password"`
	Vhost    string `mapstructure:"vhost"`

	// Consumer: Whether or not to requeue when sending a negative acknowledgement in case of a failure.
	RequeueInFailure bool `mapstructure:"requeue_in_failure"`

	// Consumer: In order to defeat that we can set the prefetch count with the value of 1.
	// This tells RabbitMQ not to give more than one message to a worker at a time.
	// Or, in other words, don't dispatch a new message to a worker until it has
	// processed and acknowledged the previous one.
	// Instead, it will dispatch it to the next worker that is not still busy.
	PrefetchCount int `mapstructure:"prefetch_count"`

	// connection: ChannelPoolSize specifies the size of a channel pool. All channels in the pool are opened when the publisher is
	// created. When a Publish operation is performed then a channel is taken from the pool to perform the operation and
	// then returned to the pool once the operation has finished. If all channels are in use then the Publish operation
	// waits until a channel is returned to the pool.
	// If this value is set to 0 (default) then channels are not pooled and a new channel is opened/closed for every
	// Publish operation.
	ChannelPoolSize int `mapstructure:"channel_pool_size"`
}

type Topology

type Topology struct {
	Kind         TopologyKind
	ExchangeKind ExchangeKind
	ExchangeName string
	QueueName    string
	RoutingKey   string
	BindingKey   string
}

func (*Topology) BindQueue

func (t *Topology) BindQueue(amqpChannel *amqp.Channel) error

func (*Topology) DeclareExchange

func (t *Topology) DeclareExchange(amqpChannel *amqp.Channel) error

func (*Topology) DeclareQueue

func (t *Topology) DeclareQueue(amqpChannel *amqp.Channel) error

type TopologyKind added in v2.0.4

type TopologyKind int
const (
	TopologyKindDefault TopologyKind = iota // default topology
	TopologyKindDelay                       // delay topology kind
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL