message

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	// Puller 返回一个用户获取数据的 Channel, 在有数据可消费时该 Channel 会返回一条类型为 Message 的消息
	// 使用者自行根据消息进行f处理 (反序列化等)
	Puller(option ...PullOption) <-chan *Message

	// Pull 返回一条接收到的数据,在没有消息可消费时该函数会 Block,
	// 与 Producer 对应,拉取的数据是序列化后的 Json,因此消费者需要通过 Json.Unmarshal
	// 将数据反序列化为所需的目标类型
	Pull(context context.Context, option ...PullOption) (*Message, error)
}

func NewRabbitConsumer

func NewRabbitConsumer(conf *config.LocalConfigure, logger log.Logger, conn *connProxy, topic *TopicConfig) (Consumer, error)

type ExchangePushOption

type ExchangePushOption struct {
	ExchangeName string
}

func WithExchange

func WithExchange(exchange string) ExchangePushOption

WithExchange 使用指定的交换机替换 Producer 配置中的交换机

func (*ExchangePushOption) Do

func (p *ExchangePushOption) Do(opt *defaultOpt)

type Message

type Message struct {
	MessageId       string    // 消息的标识符,生产者在发送时设置,如果无设置,则会生成随机字符串
	Queue           string    // 消息来自哪个 Queue
	Exchange        string    // 消息通过哪个交换机发送到当前的 Queue
	RoutingKey      string    // 消息通过什么 RoutingKey 对应的规则路由到当前 Queue
	Priority        int       // 消息的优先级,由生产者设置,范围为 0-9,数值约大优先级越高
	Payload         []byte    // 消息内容
	Timestamp       time.Time // 消息发送时的时间戳
	App             string    // 消息来自于哪个应用
	ContentType     string    // MIME content type, 默认情况下都为 application/json
	ContentEncoding string    // MIME content encoding, 默认情况下都为 utf8
}

func (*Message) Ack

func (m *Message) Ack()

func (*Message) UnmarshalPayload

func (m *Message) UnmarshalPayload(obj interface{}) error

UnmarshalPayload 将 Body 中的数据反序列化到 obj 对应的对象中

type MsgIdPushOption

type MsgIdPushOption struct {
	MessageId string
}

func WithMessageId

func WithMessageId(msgId string) MsgIdPushOption

func (*MsgIdPushOption) Do

func (p *MsgIdPushOption) Do(opt *defaultOpt)

type PriorityPushOption

type PriorityPushOption struct {
	Priority uint8
}

func WithPriority

func WithPriority(priority uint8) PriorityPushOption

WithPriority 为 Push 操作的消息增加优先级信息, 优先级可以是 0-9 的数值,数值越大优先级越高

func (*PriorityPushOption) Do

func (p *PriorityPushOption) Do(opt *defaultOpt)

type Producer

type Producer interface {
	// Push 往指定的 Topic 推送 data 消息, 所有推送的消息都会通过 Json.Marshal 序列化为 JSON 后进行传输
	Push(context context.Context, data interface{}, option ...PushOption) error
}

Producer 为已与指定 Topic 绑定的生产者, Push 会根据 Topic 的配置来触发不同的行为, 如发送的信息根据配置可能是广播/延迟信息等

func NewRabbitProducer

func NewRabbitProducer(conf *config.LocalConfigure, logger log.Logger, conn *connProxy, topic *TopicConfig) (Producer, error)

type PullOption

type PullOption interface {
	Do(opt *defaultOpt)
}

type PushOption

type PushOption interface {
	Do(opt *defaultOpt)
}

type QueueFactory

type QueueFactory interface {
	// Producer 创建一个绑定了 topic 的生产者
	Producer(topic *TopicConfig) (Producer, error)
	// Consumer 创建一个绑定了 topic 的消费者
	Consumer(topic *TopicConfig) (Consumer, error)
}

QueueFactory 提供 Producer 及 Consumer 的创建能力, 当前可根据 Topic 获取对应的消费者及生产者,其具体的行为由配置信息来决定,后续有复杂需求时再开放灵活度更高的消息队列接口

func NewQueueFactory

func NewQueueFactory(logger log.Logger, conf *def.Configuration, local *config.LocalConfigure) (QueueFactory, error)

NewQueueFactory 创建一个消息队列的管理器,用于创建消费者及生产者, logger 提供日志记录能力, conf 为配置中心

type RoutingKeyPushOption

type RoutingKeyPushOption struct {
	RoutingKey string
}

func WithRoutingKey

func WithRoutingKey(key string) RoutingKeyPushOption

WithRoutingKey 为消息发送增加路由信息,允许消息根据路由提交到指定的消费者

func (*RoutingKeyPushOption) Do

func (p *RoutingKeyPushOption) Do(opt *defaultOpt)

type TopicConfig

type TopicConfig struct {
	Exchange *def.ExchangeConfig
	Queue    *def.QueueConfig
	Consume  *def.ConsumeConfig
}

func NewConsumerConfig

func NewConsumerConfig(qe *def.QueueConfig, con *def.ConsumeConfig) *TopicConfig

NewConsumerConfig 创建一个新的消费者配置

func NewProducerConfig

func NewProducerConfig(ex *def.ExchangeConfig, qe *def.QueueConfig) *TopicConfig

NewProducerConfig 创建一个新的生产者配置

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL