Documentation ¶
Index ¶
- Constants
- Variables
- func GetGenre(msg interface{}) string
- func NewMessageListener(log *log.Slf4g, node Node, processor func(genre string) Processor) *defaultMessageListener
- func NewMsgId() *msgId
- func Register(nm map[int]string)
- func Signature(mpl *MsgPayload) string
- type Client
- type DuplexMessage
- type Message
- type MessageListener
- type MsgBody
- type MsgPayload
- func DuplexPayload(message *DuplexMessage) *MsgPayload
- func HandleAck(msg *MsgPayload, listener MessageListener) (*MsgPayload, error)
- func HandleNew(msg *MsgPayload, listener MessageListener) (*MsgPayload, error)
- func NewPayload(msg *MsgPayload, phase _MessagePhase) *MsgPayload
- func NoticePayload(message *NoticeMessage) *MsgPayload
- func SimplexPayload(message *SimplexMessage) *MsgPayload
- func (mpl *MsgPayload) ConvertToDuplex() (*DuplexMessage, error)
- func (mpl *MsgPayload) ConvertToNotice() (*NoticeMessage, error)
- func (mpl *MsgPayload) ConvertToSimplex() (*SimplexMessage, error)
- func (mpl *MsgPayload) SendQueueName() (string, error)
- func (mpl *MsgPayload) SetBody(body *MsgBody)
- func (mpl *MsgPayload) SetSign(sign string)
- func (mpl *MsgPayload) String() string
- type Node
- type NoticeMessage
- type Option
- type Processor
- type Provider
- type SimplexMessage
Constants ¶
const ( // NOTICE 通知消息 NOTICE _MessageCategory = "1" // SIMPLEX 单向事务消息 SIMPLEX _MessageCategory = "2" // DUPLEX 双向事务消息 DUPLEX _MessageCategory = "3" )
const ( // SenderReq 消息已发出,发送方已发出消息,所有新消息的初始状态 SenderReq _MessagePhase = "1" // ReceiverAck 接收方已应答,事务消息中的应答阶段 ReceiverAck _MessagePhase = "2" // SenderAck 发送方已应答,对应双向事务消息中的发送方应答 SenderAck _MessagePhase = "3" )
Variables ¶
var (
BIZ = add(0)
)
Functions ¶
func NewMessageListener ¶
func Signature ¶
func Signature(mpl *MsgPayload) string
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client 提供给业务系统使用和ESB进行交互的接口,允许业务系统发送消息到ESB和处理从ESB中收到的消息。每个ESB客户端都需要指定一个唯一的标示以及初始化ESB客户端所需要的配置参数. 需要特别注意的是,每个系统实例化一个`Client`后,该实例会唯一的只监听使用该系统ID标示的一个队列,而这个队列的名称格式为:<b>sys_esb_{systemId}_{node}</b>,其中{systemId}即为当前系统的唯一四位数数字标示,而{node}则标明该客户端连接到的ESB节点。
温馨提示:获取目标系统的队列名称可使用方法`BuildQueueName`来获取
func (*Client) AddProcessor ¶
AddProcessor 为当前客户端添加一个或多个`processors`消息处理器,需要确保该方法在`start`方法之前调用,否则系统会抛出异常。
func (*Client) BuildQueueName ¶
BuildQueueName 使用当前客户端构建一个ESB消息的目标队列名称,目标队列名称满足格式:sys_esb_{systemId}_{node},其中`systemId`为目标系统的四位数数字ID,`node`为目标系统监听的ESB节点标示}。
type DuplexMessage ¶
type DuplexMessage struct { Message // 发送方应答队列 Source string // 接受方请求队列 DestinationNew string // 接收方应答队列 DestinationAck string }
func NewDuplexMessage ¶
func NewDuplexMessage(msgId string) *DuplexMessage
type Message ¶
type Message struct { Body *MsgBody // contains filtered or unexported fields }
Message 所有ESB消息的基类
func NewMessage ¶
func NewMessage(mid *msgId) *Message
type MessageListener ¶
type MessageListener interface { // OnReceived 接收消息`msg`并进行处理,如果处理失败则抛出异常。如果需要返回消息(如事务消息需要返回)请返回消息体内容,否则请返回nil。 OnReceived(msg interface{}) (*MsgBody, error) // OnRecipientAckReceived 处理接收方发出的ACK消息,由消息发送方处理。在双向事务消息中消息发送方还应该在处理完成后返回一个应答消息通知接收方处理结果`rsp`。如果返回结果为nil则不发送。 OnRecipientAckReceived(genre, msgId string, rsp *MsgBody) (*MsgBody, error) // OnSenderAckReceived 处理发送方发出的ACK消息,由接收方处理,该方法仅在双向事务消息中有效,通知接收方最终发送方的处理结果`rsp`。 OnSenderAckReceived(genre, msgId string, rsp *MsgBody) error }
MessageListener ESB消息的监听器接口定义。
type MsgPayload ¶
type MsgPayload struct { Category _MessageCategory `json:"category"` // 消息分类 Genre string `json:"type"` // 消息类型 MsgId string `json:"msgId"` // 消息的唯一ID,发送时自动生成 SrcAckQueue string `json:"srcAckQueue"` // 消息发送方的应答队列名称(对事务消息有效) DstNewQueue string `json:"dstNewQueue"` // 消息接收方的新消息队列名称 DstAckQueue string `json:"dstAckQueue"` // 消息接收方的应答消息队列(对双向事务消息有效) Body *MsgBody `json:"body"` // 业务数据 SendTime int64 `json:"sendTime"` // 发送时间 Phase _MessagePhase `json:"phase"` // 消息所处的阶段 Sign string `json:"sign"` // 签名信息 }
MsgPayload 发送到ESB中去的消息的封装,对通知消息和事务消息进行统一封装。
func DuplexPayload ¶
func DuplexPayload(message *DuplexMessage) *MsgPayload
func HandleAck ¶
func HandleAck(msg *MsgPayload, listener MessageListener) (*MsgPayload, error)
HandleAck 处理收到的单向/双向事务消息的应答消息。
func HandleNew ¶
func HandleNew(msg *MsgPayload, listener MessageListener) (*MsgPayload, error)
HandleNew 处理收到的新消息(包括通知消息和事务消息)
func NewPayload ¶
func NewPayload(msg *MsgPayload, phase _MessagePhase) *MsgPayload
func NoticePayload ¶
func NoticePayload(message *NoticeMessage) *MsgPayload
func SimplexPayload ¶
func SimplexPayload(message *SimplexMessage) *MsgPayload
func (*MsgPayload) ConvertToDuplex ¶
func (mpl *MsgPayload) ConvertToDuplex() (*DuplexMessage, error)
func (*MsgPayload) ConvertToNotice ¶
func (mpl *MsgPayload) ConvertToNotice() (*NoticeMessage, error)
func (*MsgPayload) ConvertToSimplex ¶
func (mpl *MsgPayload) ConvertToSimplex() (*SimplexMessage, error)
func (*MsgPayload) SendQueueName ¶
func (mpl *MsgPayload) SendQueueName() (string, error)
SendQueueName 根据当前消息的类型和所处的阶段来决定发送的队列名称。
func (*MsgPayload) SetBody ¶
func (mpl *MsgPayload) SetBody(body *MsgBody)
func (*MsgPayload) SetSign ¶
func (mpl *MsgPayload) SetSign(sign string)
func (*MsgPayload) String ¶
func (mpl *MsgPayload) String() string
type NoticeMessage ¶
NoticeMessage 发送到ESB的通知类消息,通知类消息只保证被正确投递到ESB队列中,不保证接收方是否处理成功。
func NewNoticeMessage ¶
func NewNoticeMessage(msgId string) *NoticeMessage
type Processor ¶
type Processor interface { // GetType 获取要处理的消息类型,对应{Message}中的type字段。 GetType() string // OnReceived 接收新消息并进行处理,如果新消息为事务消息并且需要返回处理结果则请返回处理结果,否则返回Null即可。 OnReceived(msg interface{}) (*MsgBody, error) // OnRecipientAckReceived 单向/双向事务消息中的接收方应答消息的接收处理,对于单向事务消息处理完成后返回Null即可,对于双向事务消息则还需要返回系统的处理结果给到接收方进行后续处理。 OnRecipientAckReceived(msgId string, rsp *MsgBody) (*MsgBody, error) // OnSenderAckReceived 双向事务消息中的发送方的确认应答消息(msgId:事务消息的唯一ID),由接收方进行处理发送方返回的应答消息`rsp`。 OnSenderAckReceived(msgId string, rsp *MsgBody) error }
Processor ESB消息的处理器接口定义,业务系统实现该接口后需要手动注册到{Client}中去方可生效。
type Provider ¶
type Provider interface { // Listen 监听指定名称`name`的消息队列,当有新消息或事务应答消息送达时会调用监听器`listener`接口并返回关闭动作`closer`,可同时监听多个不同的队列。 // 特别注意:同一个消息队列只能有一个监听器! Listen(name string, listener MessageListener) (closer func(), err error) // Cancel 取消对指定队列名称`name`的监听。 Cancel(name string) // Send 发送ESB消息 Send(message interface{}) error // Close 关闭到消息中间件的连接,清除资源。 Close() }
Provider 企业服务总线(ESB)的客户端接口,通过ESB实现系统解耦并实现系统之间的交互需求,目前支持的系统交互有如下几类: 消息通知:发送方单纯的发出通知给接收方,不对接收方的处理结果进行响应; 单向事务:发送方发出请求给接收方,等待接收方反馈处理结果(成功或失败)后进行处理; 双向事务:发送方发出请求给接收方,等待接收方处理结果后,再次通知接收方我方的处理结果;
基于上述交互需求,所选择的ESB中间件要满足如下条件: 消息的顺序性:发送方发出的消息顺序和接收方接收到的消息顺序要一致; 唯一消费保证:同一个消息队列中的消息只能被一个消费者消费; 消息通知保证:中间件要保证消息正确无误的送达给消费者并且被成功消费;
type SimplexMessage ¶
func NewSimplexMessage ¶
func NewSimplexMessage(msgId string) *SimplexMessage