esb

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2021 License: MIT Imports: 14 Imported by: 0

README

ESB定义

ESB引擎定义,允许各系统之间通过ESB进行消息异步交互,同时,业务系统对每个ESB节点需要注册一个或多个Processor消息处理器,否则收到的消息由于无法找到对应的消息处理器而会被忽略,从而对业务造成影响,业务系统需要实现上述接口并在启动Client之前手动注册消息处理器到对应的客户端中。

Client定义

提供给业务系统使用和ESB进行交互的接口,允许业务系统发送消息到ESB和处理从ESB中收到的消息。每个ESB客户端都需要指定一个唯一的标示以及初始化ESB客户端所需要的配置参数需要特别注意的是,每个系统实例化一个Client 后,该实例会唯一的只监听使用该系统ID标示的一个队列,而这个队列的名称格式为:sys_esb_{systemId}_{node},其中{systemId}即为当前系统的唯一四位数数字标示,而node则标明该客户端连接到的ESB节点。

 client.Start()

温馨提示:

获取目标系统的队列名称可使用方法 client.BuildQueueName来获取

Documentation

Index

Constants

View Source
const (
	// NOTICE 通知消息
	NOTICE _MessageCategory = "1"
	// SIMPLEX 单向事务消息
	SIMPLEX _MessageCategory = "2"
	// DUPLEX 双向事务消息
	DUPLEX _MessageCategory = "3"
)
View Source
const (
	// SenderReq 消息已发出,发送方已发出消息,所有新消息的初始状态
	SenderReq _MessagePhase = "1"
	// ReceiverAck 接收方已应答,事务消息中的应答阶段
	ReceiverAck _MessagePhase = "2"
	// SenderAck 发送方已应答,对应双向事务消息中的发送方应答
	SenderAck _MessagePhase = "3"
)

Variables

View Source
var (
	BIZ = add(0)
)

Functions

func GetGenre

func GetGenre(msg interface{}) string

func NewMessageListener

func NewMessageListener(log *log.Slf4g, node Node, processor func(genre string) Processor) *defaultMessageListener

func NewMsgId

func NewMsgId() *msgId

NewMsgId 创建一个新的MsgId实例并返回。

func Register

func Register(nm map[int]string)

func Signature

func Signature(mpl *MsgPayload) string

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 提供给业务系统使用和ESB进行交互的接口,允许业务系统发送消息到ESB和处理从ESB中收到的消息。每个ESB客户端都需要指定一个唯一的标示以及初始化ESB客户端所需要的配置参数. 需要特别注意的是,每个系统实例化一个`Client`后,该实例会唯一的只监听使用该系统ID标示的一个队列,而这个队列的名称格式为:<b>sys_esb_{systemId}_{node}</b>,其中{systemId}即为当前系统的唯一四位数数字标示,而{node}则标明该客户端连接到的ESB节点。

温馨提示:获取目标系统的队列名称可使用方法`BuildQueueName`来获取

func New

func New(provider Provider, opts ...Option) *Client

New 给定ESB节点的唯一标示和连接到ESB的配置来构造一个ESB客户端,如果初始化失败则抛出异常。该初始化方法会 同步初始化对应节点的所有消息处理器并启动监听。

func (*Client) AddProcessor

func (c *Client) AddProcessor(processors ...Processor)

AddProcessor 为当前客户端添加一个或多个`processors`消息处理器,需要确保该方法在`start`方法之前调用,否则系统会抛出异常。

func (*Client) BuildQueueName

func (c *Client) BuildQueueName(systemId string) string

BuildQueueName 使用当前客户端构建一个ESB消息的目标队列名称,目标队列名称满足格式:sys_esb_{systemId}_{node},其中`systemId`为目标系统的四位数数字ID,`node`为目标系统监听的ESB节点标示}。

func (*Client) Close

func (c *Client) Close()

Close 关闭所有的资源,该方法不会抛出任何异常。

func (*Client) Send

func (c *Client) Send(msg interface{}) error

Send 发送新消息到ESB中,这里是所有新消息的发送入口,如果发送失败则会抛出异常。请注意,消息的目标队列名称请使用方法`buildQueueName`来构建并设置,不满足格式的目标队列名称会导致消息发送失败。

func (*Client) Start

func (c *Client) Start() (closer func(), err error)

Start 在当前节点上启动进行本地系统的队列监听,该方法请务必在`AddProcessor(...Processor)`方法之后调用,否则可能会导致部分消息由于没有对应的消息处理器而丢失。 特别注意:如果收到的消息类型没有对应的消息处理器,系统只会简单的丢弃并打印告警信息;同时注意,该方法只能被调用一次,如果多次调用则后续的调用会抛出异常。

type DuplexMessage

type DuplexMessage struct {
	Message
	// 发送方应答队列
	Source string
	// 接受方请求队列
	DestinationNew string
	// 接收方应答队列
	DestinationAck string
}

func NewDuplexMessage

func NewDuplexMessage(msgId string) *DuplexMessage

type Message

type Message struct {
	Body *MsgBody
	// contains filtered or unexported fields
}

Message 所有ESB消息的基类

func NewMessage

func NewMessage(mid *msgId) *Message

func (*Message) SetBody

func (m *Message) SetBody(body *MsgBody)

SetBody 设置消息的消息体

func (*Message) SetType

func (m *Message) SetType(genre string)

type MessageListener

type MessageListener interface {
	// OnReceived 接收消息`msg`并进行处理,如果处理失败则抛出异常。如果需要返回消息(如事务消息需要返回)请返回消息体内容,否则请返回nil。
	OnReceived(msg interface{}) (*MsgBody, error)

	// OnRecipientAckReceived 处理接收方发出的ACK消息,由消息发送方处理。在双向事务消息中消息发送方还应该在处理完成后返回一个应答消息通知接收方处理结果`rsp`。如果返回结果为nil则不发送。
	OnRecipientAckReceived(genre, msgId string, rsp *MsgBody) (*MsgBody, error)

	// OnSenderAckReceived 处理发送方发出的ACK消息,由接收方处理,该方法仅在双向事务消息中有效,通知接收方最终发送方的处理结果`rsp`。
	OnSenderAckReceived(genre, msgId string, rsp *MsgBody) error
}

MessageListener ESB消息的监听器接口定义。

type MsgBody

type MsgBody struct {
	Body map[string]string `json:"body"`
}

MsgBody ESB消息体封装,提供流式操作。

func NewMessageBody

func NewMessageBody() *MsgBody

func (*MsgBody) Add

func (mb *MsgBody) Add(key string, value interface{}) *MsgBody

func (*MsgBody) Get

func (mb *MsgBody) Get(key string) string

func (*MsgBody) GetFloat

func (mb *MsgBody) GetFloat(key string) float64

func (*MsgBody) GetInt

func (mb *MsgBody) GetInt(key string) int

func (*MsgBody) HasKey

func (mb *MsgBody) HasKey(key string) bool

func (*MsgBody) ToString

func (mb *MsgBody) ToString() string

ToString 输出ESB消息体内容

type MsgPayload

type MsgPayload struct {
	Category    _MessageCategory `json:"category"`    // 消息分类
	Genre       string           `json:"type"`        // 消息类型
	MsgId       string           `json:"msgId"`       // 消息的唯一ID,发送时自动生成
	SrcAckQueue string           `json:"srcAckQueue"` // 消息发送方的应答队列名称(对事务消息有效)
	DstNewQueue string           `json:"dstNewQueue"` // 消息接收方的新消息队列名称
	DstAckQueue string           `json:"dstAckQueue"` // 消息接收方的应答消息队列(对双向事务消息有效)
	Body        *MsgBody         `json:"body"`        // 业务数据
	SendTime    int64            `json:"sendTime"`    // 发送时间
	Phase       _MessagePhase    `json:"phase"`       // 消息所处的阶段
	Sign        string           `json:"sign"`        // 签名信息
}

MsgPayload 发送到ESB中去的消息的封装,对通知消息和事务消息进行统一封装。

func DuplexPayload

func DuplexPayload(message *DuplexMessage) *MsgPayload

func HandleAck

func HandleAck(msg *MsgPayload, listener MessageListener) (*MsgPayload, error)

HandleAck 处理收到的单向/双向事务消息的应答消息。

func HandleNew

func HandleNew(msg *MsgPayload, listener MessageListener) (*MsgPayload, error)

HandleNew 处理收到的新消息(包括通知消息和事务消息)

func NewPayload

func NewPayload(msg *MsgPayload, phase _MessagePhase) *MsgPayload

func NoticePayload

func NoticePayload(message *NoticeMessage) *MsgPayload

func SimplexPayload

func SimplexPayload(message *SimplexMessage) *MsgPayload

func (*MsgPayload) ConvertToDuplex

func (mpl *MsgPayload) ConvertToDuplex() (*DuplexMessage, error)

func (*MsgPayload) ConvertToNotice

func (mpl *MsgPayload) ConvertToNotice() (*NoticeMessage, error)

func (*MsgPayload) ConvertToSimplex

func (mpl *MsgPayload) ConvertToSimplex() (*SimplexMessage, error)

func (*MsgPayload) SendQueueName

func (mpl *MsgPayload) SendQueueName() (string, error)

SendQueueName 根据当前消息的类型和所处的阶段来决定发送的队列名称。

func (*MsgPayload) SetBody

func (mpl *MsgPayload) SetBody(body *MsgBody)

func (*MsgPayload) SetSign

func (mpl *MsgPayload) SetSign(sign string)

func (*MsgPayload) String

func (mpl *MsgPayload) String() string

type Node

type Node int64

func NewNode

func NewNode(c int) Node

func Str2Node

func Str2Node(n string) Node

func (Node) IsValid

func (n Node) IsValid() error

func (Node) String

func (n Node) String() string

type NoticeMessage

type NoticeMessage struct {
	Message
	Destination string
}

NoticeMessage 发送到ESB的通知类消息,通知类消息只保证被正确投递到ESB队列中,不保证接收方是否处理成功。

func NewNoticeMessage

func NewNoticeMessage(msgId string) *NoticeMessage

type Option

type Option func(*options)

Option 是ESB选项.

func WithLogger

func WithLogger(logger log.Logger) Option

func WithNode

func WithNode(node Node) Option

func WithSystemId

func WithSystemId(systemId string) Option

type Processor

type Processor interface {
	// GetType 获取要处理的消息类型,对应{Message}中的type字段。
	GetType() string

	// OnReceived 接收新消息并进行处理,如果新消息为事务消息并且需要返回处理结果则请返回处理结果,否则返回Null即可。
	OnReceived(msg interface{}) (*MsgBody, error)

	// OnRecipientAckReceived 单向/双向事务消息中的接收方应答消息的接收处理,对于单向事务消息处理完成后返回Null即可,对于双向事务消息则还需要返回系统的处理结果给到接收方进行后续处理。
	OnRecipientAckReceived(msgId string, rsp *MsgBody) (*MsgBody, error)

	// OnSenderAckReceived 双向事务消息中的发送方的确认应答消息(msgId:事务消息的唯一ID),由接收方进行处理发送方返回的应答消息`rsp`。
	OnSenderAckReceived(msgId string, rsp *MsgBody) error
}

Processor ESB消息的处理器接口定义,业务系统实现该接口后需要手动注册到{Client}中去方可生效。

type Provider

type Provider interface {
	// Listen 监听指定名称`name`的消息队列,当有新消息或事务应答消息送达时会调用监听器`listener`接口并返回关闭动作`closer`,可同时监听多个不同的队列。
	// 特别注意:同一个消息队列只能有一个监听器!
	Listen(name string, listener MessageListener) (closer func(), err error)

	// Cancel 取消对指定队列名称`name`的监听。
	Cancel(name string)

	// Send 发送ESB消息
	Send(message interface{}) error

	// Close 关闭到消息中间件的连接,清除资源。
	Close()
}

Provider 企业服务总线(ESB)的客户端接口,通过ESB实现系统解耦并实现系统之间的交互需求,目前支持的系统交互有如下几类: 消息通知:发送方单纯的发出通知给接收方,不对接收方的处理结果进行响应; 单向事务:发送方发出请求给接收方,等待接收方反馈处理结果(成功或失败)后进行处理; 双向事务:发送方发出请求给接收方,等待接收方处理结果后,再次通知接收方我方的处理结果;

基于上述交互需求,所选择的ESB中间件要满足如下条件: 消息的顺序性:发送方发出的消息顺序和接收方接收到的消息顺序要一致; 唯一消费保证:同一个消息队列中的消息只能被一个消费者消费; 消息通知保证:中间件要保证消息正确无误的送达给消费者并且被成功消费;

type SimplexMessage

type SimplexMessage struct {
	Message
	Source      string
	Destination string
}

func NewSimplexMessage

func NewSimplexMessage(msgId string) *SimplexMessage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL