Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer interface { // Puller 返回一个用户获取数据的 Channel, 在有数据可消费时该 Channel 会返回一条类型为 Message 的消息 // 使用者自行根据消息进行f处理 (反序列化等) Puller(option ...PullOption) <-chan *Message // Pull 返回一条接收到的数据,在没有消息可消费时该函数会 Block, // 与 Producer 对应,拉取的数据是序列化后的 Json,因此消费者需要通过 Json.Unmarshal // 将数据反序列化为所需的目标类型 Pull(context context.Context, option ...PullOption) (*Message, error) }
func NewRabbitConsumer ¶
func NewRabbitConsumer(conf *config.LocalConfigure, logger log.Logger, conn *connProxy, topic *TopicConfig) (Consumer, error)
type ExchangePushOption ¶
type ExchangePushOption struct {
ExchangeName string
}
func WithExchange ¶
func WithExchange(exchange string) ExchangePushOption
WithExchange 使用指定的交换机替换 Producer 配置中的交换机
func (*ExchangePushOption) Do ¶
func (p *ExchangePushOption) Do(opt *defaultOpt)
type Message ¶
type Message struct { MessageId string // 消息的标识符,生产者在发送时设置,如果无设置,则会生成随机字符串 Queue string // 消息来自哪个 Queue Exchange string // 消息通过哪个交换机发送到当前的 Queue RoutingKey string // 消息通过什么 RoutingKey 对应的规则路由到当前 Queue Priority int // 消息的优先级,由生产者设置,范围为 0-9,数值约大优先级越高 Payload []byte // 消息内容 Timestamp time.Time // 消息发送时的时间戳 App string // 消息来自于哪个应用 ContentType string // MIME content type, 默认情况下都为 application/json ContentEncoding string // MIME content encoding, 默认情况下都为 utf8 }
func (*Message) UnmarshalPayload ¶
UnmarshalPayload 将 Body 中的数据反序列化到 obj 对应的对象中
type MsgIdPushOption ¶
type MsgIdPushOption struct {
MessageId string
}
func WithMessageId ¶
func WithMessageId(msgId string) MsgIdPushOption
func (*MsgIdPushOption) Do ¶
func (p *MsgIdPushOption) Do(opt *defaultOpt)
type PriorityPushOption ¶
type PriorityPushOption struct {
Priority uint8
}
func WithPriority ¶
func WithPriority(priority uint8) PriorityPushOption
WithPriority 为 Push 操作的消息增加优先级信息, 优先级可以是 0-9 的数值,数值越大优先级越高
func (*PriorityPushOption) Do ¶
func (p *PriorityPushOption) Do(opt *defaultOpt)
type Producer ¶
type Producer interface { // Push 往指定的 Topic 推送 data 消息, 所有推送的消息都会通过 Json.Marshal 序列化为 JSON 后进行传输 Push(context context.Context, data interface{}, option ...PushOption) error }
Producer 为已与指定 Topic 绑定的生产者, Push 会根据 Topic 的配置来触发不同的行为, 如发送的信息根据配置可能是广播/延迟信息等
func NewRabbitProducer ¶
func NewRabbitProducer(conf *config.LocalConfigure, logger log.Logger, conn *connProxy, topic *TopicConfig) (Producer, error)
type PullOption ¶
type PullOption interface {
Do(opt *defaultOpt)
}
type PushOption ¶
type PushOption interface {
Do(opt *defaultOpt)
}
type QueueFactory ¶
type QueueFactory interface { // Producer 创建一个绑定了 topic 的生产者 Producer(topic *TopicConfig) (Producer, error) // Consumer 创建一个绑定了 topic 的消费者 Consumer(topic *TopicConfig) (Consumer, error) }
QueueFactory 提供 Producer 及 Consumer 的创建能力, 当前可根据 Topic 获取对应的消费者及生产者,其具体的行为由配置信息来决定,后续有复杂需求时再开放灵活度更高的消息队列接口
func NewQueueFactory ¶
func NewQueueFactory(logger log.Logger, conf *def.Configuration, local *config.LocalConfigure) (QueueFactory, error)
NewQueueFactory 创建一个消息队列的管理器,用于创建消费者及生产者, logger 提供日志记录能力, conf 为配置中心
type RoutingKeyPushOption ¶
type RoutingKeyPushOption struct {
RoutingKey string
}
func WithRoutingKey ¶
func WithRoutingKey(key string) RoutingKeyPushOption
WithRoutingKey 为消息发送增加路由信息,允许消息根据路由提交到指定的消费者
func (*RoutingKeyPushOption) Do ¶
func (p *RoutingKeyPushOption) Do(opt *defaultOpt)
type TopicConfig ¶
type TopicConfig struct { Exchange *def.ExchangeConfig Queue *def.QueueConfig Consume *def.ConsumeConfig }
func NewConsumerConfig ¶
func NewConsumerConfig(qe *def.QueueConfig, con *def.ConsumeConfig) *TopicConfig
NewConsumerConfig 创建一个新的消费者配置
func NewProducerConfig ¶
func NewProducerConfig(ex *def.ExchangeConfig, qe *def.QueueConfig) *TopicConfig
NewProducerConfig 创建一个新的生产者配置
Source Files ¶
Click to show internal directories.
Click to hide internal directories.