mq_http_go_sdk

package module
v0.0.0-...-6c1e228 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2023 License: MIT Imports: 17 Imported by: 0

README

MQ GO HTTP SDK

Alyun MQ Documents: http://www.aliyun.com/product/ons

Aliyun MQ Console: https://ons.console.aliyun.com

Use

  1. import github.com/aliyunmq/mq-http-go-sdk
  2. setup GOPATH:
    • MAC/LINUX: export GOPATH={dir}, export GOBIN=$GOPATH/bin
    • WINDOWS: set GOPATH={dir}
  3. go get -x -v

Note

  1. Http consumer only support timer msg (less than 3 days), no matter the msg is produced from http or tcp protocol.
  2. Order is only supported at special server cluster.

Sample (github)

Publish Message

Consume Message

Transaction Message

Publish Order Message

Consume Order Message

Sample (code.aliyun.com)

Publish Message

Consume Message

Transaction Message

Publish Order Message

Consume Order Message

Documentation

Index

Constants

View Source
const (
	ClientName           = "mq-go-sdk/1.0.3(fasthttp)"
	ClientVersion        = "2015-06-06"
	DefaultTimeout int64 = 35
)
View Source
const (
	GET    Method = "GET"
	POST          = "POST"
	DELETE        = "DELETE"
)
View Source
const (
	AUTHORIZATION  = "Authorization"
	CONTENT_TYPE   = "Content-Type"
	CONTENT_MD5    = "Content-MD5"
	MQ_VERSION     = "x-mq-version"
	HOST           = "Host"
	DATE           = "Date"
	KEEP_ALIVE     = "Keep-Alive"
	SECURITY_TOKEN = "security-token"
)
View Source
const (
	ALIYUN_MQ_ERR_NS = "MQ"

	ALIYUN_MQ_ERR_TEMPSTR = "" /* 133-byte string literal not displayed */
)
View Source
const (
	StartDeliverTime       = "__STARTDELIVERTIME"
	TransCheckImmunityTime = "__TransCheckT"
	Keys                   = "KEYS"
	SHARDING               = "__SHARDINGKEY"
)

Variables

View Source
var (
	ErrSignMessageFailed       = errors.TN(ALIYUN_MQ_ERR_NS, 1, "sign message failed, {{.err}}")
	ErrMarshalMessageFailed    = errors.TN(ALIYUN_MQ_ERR_NS, 2, "marshal message filed, {{.err}}")
	ErrGeneralAuthHeaderFailed = errors.TN(ALIYUN_MQ_ERR_NS, 3, "general auth header failed, {{.err}}")
	ErrMessageProperty         = errors.TN(ALIYUN_MQ_ERR_NS, 4, "message property can not contains:\" ' < > & : |, {{.err}}")

	ErrSendRequestFailed = errors.TN(ALIYUN_MQ_ERR_NS, 5, "send request failed, {{.err}}")

	ErrUnmarshalErrorResponseFailed = errors.TN(ALIYUN_MQ_ERR_NS, 7, "unmarshal error response failed, {{.err}}, ResponseBody: {{.resp}}")
	ErrUnmarshalResponseFailed      = errors.TN(ALIYUN_MQ_ERR_NS, 8, "unmarshal response failed, {{.err}}")

	ErrMqServer = errors.TN(ALIYUN_MQ_ERR_NS, 101, ALIYUN_MQ_ERR_TEMPSTR)

	ErrAckMessage = errors.TN(ALIYUN_MQ_ERR_NS, 102, "aliyun_mq ack message error,code: {{.Code}}, message: {{.Message}}, receiptHandle: {{.ReceiptHandle}}, requestId: {{.RequestId}}")
)
View Source
var (
	DefaultNumOfMessages int32 = 16
)

Functions

func CommitOrRollback

func CommitOrRollback(p *AliyunMQTransProducer, receiptHandle string, trans string) (err error)

func ConstructPubMessage

func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)

func ConstructRecMessage

func ConstructRecMessage(entries *[]ConsumeMessageEntry)

func ContainsSpecialChar

func ContainsSpecialChar(input string) (result bool)

func ParseError

func ParseError(resp ErrorResponse, resource string) (err error)

Types

type AckMessageErrorEntry

type AckMessageErrorEntry struct {
	XMLName xml.Name `xml:"Error" json:"-"`
	// Ack消息出错的错误码
	ErrorCode string `xml:"ErrorCode" json:"error_code"`
	// Ack消息出错的错误描述
	ErrorMessage string `xml:"ErrorMessage" json:"error_messages"`
	// Ack消息出错的消息句柄
	ReceiptHandle string `xml:"ReceiptHandle,omitempty" json:"receipt_handle"`
}

type AckMessageErrorResponse

type AckMessageErrorResponse struct {
	XMLName        xml.Name               `xml:"Errors" json:"-"`
	FailedMessages []AckMessageErrorEntry `xml:"Error" json:"errors"`
}

type AliyunMQAckMsgDecoder

type AliyunMQAckMsgDecoder struct {
	// contains filtered or unexported fields
}

func (*AliyunMQAckMsgDecoder) Decode

func (p *AliyunMQAckMsgDecoder) Decode(reader io.Reader, v interface{}) (err error)

func (*AliyunMQAckMsgDecoder) DecodeError

func (p *AliyunMQAckMsgDecoder) DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error)

type AliyunMQClient

type AliyunMQClient struct {
	Client *fasthttp.Client
	// contains filtered or unexported fields
}

func (*AliyunMQClient) GetConsumer

func (p *AliyunMQClient) GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer

func (*AliyunMQClient) GetCredential

func (p *AliyunMQClient) GetCredential() Credential

func (*AliyunMQClient) GetEndpoint

func (p *AliyunMQClient) GetEndpoint() string

func (*AliyunMQClient) GetProducer

func (p *AliyunMQClient) GetProducer(instanceId string, topicName string) MQProducer

func (*AliyunMQClient) GetTransProducer

func (p *AliyunMQClient) GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer

func (*AliyunMQClient) Send

func (p *AliyunMQClient) Send(decoder MQDecoder, method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error)

func (*AliyunMQClient) Send0

func (p *AliyunMQClient) Send0(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)

type AliyunMQConsumer

type AliyunMQConsumer struct {
	// contains filtered or unexported fields
}

func (*AliyunMQConsumer) AckMessage

func (p *AliyunMQConsumer) AckMessage(receiptHandles []string) (err error)

func (*AliyunMQConsumer) ConsumeMessage

func (p *AliyunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

func (*AliyunMQConsumer) ConsumeMessageOrderly

func (p *AliyunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

func (*AliyunMQConsumer) Consumer

func (p *AliyunMQConsumer) Consumer() string

func (*AliyunMQConsumer) InstanceId

func (p *AliyunMQConsumer) InstanceId() string

func (*AliyunMQConsumer) MessageTag

func (p *AliyunMQConsumer) MessageTag() string

func (*AliyunMQConsumer) TopicName

func (p *AliyunMQConsumer) TopicName() string

type AliyunMQDecoder

type AliyunMQDecoder struct {
}

func (*AliyunMQDecoder) Decode

func (p *AliyunMQDecoder) Decode(reader io.Reader, v interface{}) (err error)

func (*AliyunMQDecoder) DecodeError

func (p *AliyunMQDecoder) DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error)

type AliyunMQProducer

type AliyunMQProducer struct {
	// contains filtered or unexported fields
}

func (*AliyunMQProducer) InstanceId

func (p *AliyunMQProducer) InstanceId() string

func (*AliyunMQProducer) PublishMessage

func (p *AliyunMQProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)

func (*AliyunMQProducer) TopicName

func (p *AliyunMQProducer) TopicName() string

type AliyunMQTransProducer

type AliyunMQTransProducer struct {
	// contains filtered or unexported fields
}

func (*AliyunMQTransProducer) Commit

func (p *AliyunMQTransProducer) Commit(receiptHandle string) (err error)

func (*AliyunMQTransProducer) ConsumeHalfMessage

func (p *AliyunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

func (*AliyunMQTransProducer) GroupId

func (p *AliyunMQTransProducer) GroupId() string

func (*AliyunMQTransProducer) InstanceId

func (p *AliyunMQTransProducer) InstanceId() string

func (*AliyunMQTransProducer) PublishMessage

func (p *AliyunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)

func (*AliyunMQTransProducer) Rollback

func (p *AliyunMQTransProducer) Rollback(receiptHandle string) (err error)

func (*AliyunMQTransProducer) TopicName

func (p *AliyunMQTransProducer) TopicName() string

type ConsumeMessageEntry

type ConsumeMessageEntry struct {
	MessageResponse
	// 消息ID
	MessageId string `xml:"MessageId" json:"message_id"`
	// 消息句柄
	ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
	// 消息体MD5
	MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
	// 消息体
	MessageBody string `xml:"MessageBody" json:"message_body"`
	// 消息发送时间
	PublishTime int64 `xml:"PublishTime" json:"publish_time"`
	// 下次消费消息的时间(如果这次消费的消息没有Ack)
	NextConsumeTime int64 `xml:"NextConsumeTime" json:"next_consume_time"`
	// 第一次消费的时间,此值对于顺序消费没有意义
	FirstConsumeTime int64 `xml:"FirstConsumeTime" json:"first_consume_time"`
	// 消费次数
	ConsumedTimes int64 `xml:"ConsumedTimes" json:"consumed_times"`
	// 消息标签
	MessageTag string `xml:"MessageTag" json:"message_tag"`
	// 消息属性
	Properties map[string]string `xml:"-" json:"-"`
	// 序列化属性请勿使用
	PropInner string `xml:"Properties,omitempty" json:"properties,omitempty"`
	// 消息KEY
	MessageKey string `xml:"-" json:"-"`
	// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
	StartDeliverTime int64 `xml:"-" json:"-"`
	// 顺序消息分区Key
	ShardingKey string `xml:"-" json:"-"`
	// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息
	TransCheckImmunityTime int `xml:"-" json:"-"`
}

type ConsumeMessageResponse

type ConsumeMessageResponse struct {
	XMLName  xml.Name              `xml:"Messages" json:"-"`
	Messages []ConsumeMessageEntry `xml:"Message" json:"messages"`
}

func (*ConsumeMessageResponse) Responses

func (r *ConsumeMessageResponse) Responses() []*MessageResponse

type Credential

type Credential interface {
	Signature(method Method, headers map[string]string, resource string) (signature string, err error)
	AccessKeyId() string
	AccessKeySecret() string
	SecurityToken() string
}

type ErrAckItem

type ErrAckItem struct {
	ErrorHandle string
	ErrorMsg    string
	ErrorCode   string
}

type ErrorResponse

type ErrorResponse struct {
	XMLName xml.Name `xml:"Error" json:"-"`
	// 错误码
	Code string `xml:"Code,omitempty" json:"code,omitempty"`
	// 错误描述
	Message string `xml:"Message,omitempty" json:"message,omitempty"`
	// 请求ID
	RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	// 请求HOST
	HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

type MQClient

type MQClient interface {
	GetProducer(instanceId string, topicName string) MQProducer
	GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer
	GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer
	GetCredential() Credential
	GetEndpoint() string
}

func NewAliyunMQClient

func NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, securityToken string) MQClient

func NewAliyunMQClientWithTimeout

func NewAliyunMQClientWithTimeout(endpoint, accessKeyId, accessKeySecret, securityToken string, timeout time.Duration, maxConnsPerHost int) MQClient

type MQConsumer

type MQConsumer interface {
	// 主题名字
	TopicName() string
	// 实例ID,可空
	InstanceId() string
	// 消费者的名字
	Consumer() string
	// 消费消息过滤的标签
	MessageTag() string
	// 消费消息,如果该条消息没有被 {AckMessage} 确认消费成功,即在NextConsumeTime时会再次消费到该条消息
	ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// 顺序消费消息,获取的消息可能是多个分区但是一个分区内消息一定是顺序的,对于一个分区的消息必须要全部ACK成功才能消费下一批消息
	// 否则在NextConsumeTime后会再次消费到相同的消息
	ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// 确认消息消费成功
	AckMessage(receiptHandles []string) (err error)
}

MQ的消息消费者

type MQCredential

type MQCredential struct {
	// contains filtered or unexported fields
}

func NewMQCredential

func NewMQCredential(accessKeyId string, accessKeySecret string, securityToken string) *MQCredential

func (*MQCredential) AccessKeyId

func (p *MQCredential) AccessKeyId() string

func (*MQCredential) AccessKeySecret

func (p *MQCredential) AccessKeySecret() string

func (*MQCredential) SecurityToken

func (p *MQCredential) SecurityToken() string

func (*MQCredential) Signature

func (p *MQCredential) Signature(method Method, headers map[string]string, resource string) (signature string, err error)

type MQDecoder

type MQDecoder interface {
	Decode(reader io.Reader, v interface{}) (err error)
	DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error)
}

func NewAliyunMQAckMsgDecoder

func NewAliyunMQAckMsgDecoder() MQDecoder

func NewAliyunMQDecoder

func NewAliyunMQDecoder() MQDecoder

type MQProducer

type MQProducer interface {
	// 主题名字
	TopicName() string
	// 实例ID,可空
	InstanceId() string
	// 发送消息
	PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
}

MQ的消息生产者

type MQTransProducer

type MQTransProducer interface {
	// 主题名字
	TopicName() string
	// 实例ID,可空
	InstanceId() string
	// GroupId,非空
	GroupId() string
	// 发送消息
	PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
	// 消费事务半消息
	ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// 提交事务消息
	Commit(receiptHandle string) (err error)
	// 回滚事务消息
	Rollback(receiptHandle string) (err error)
}

MQ的事务消息生产者

type MessageResponder

type MessageResponder interface {
	Responses() []*MessageResponse
}

type MessageResponse

type MessageResponse struct {
	XMLName xml.Name `xml:"Message" json:"-"`
	// 错误码
	Code string `xml:"Code,omitempty" json:"code,omitempty"`
	// 错误描述
	Message string `xml:"Message,omitempty" json:"message,omitempty"`
	// 请求ID
	RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	// 请求HOST
	HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

type Method

type Method string

type PublishMessageRequest

type PublishMessageRequest struct {
	XMLName xml.Name `xml:"Message" json:"-"`
	// 消息体
	MessageBody string `xml:"MessageBody" json:"message_body"`
	// 消息标签
	MessageTag string `xml:"MessageTag,omitempty" json:"message_tag,omitempty"`
	// 消息属性
	Properties map[string]string `xml:"-" json:"-"`
	// 消息KEY
	MessageKey string `xml:"-" json:"-"`
	// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
	StartDeliverTime int64 `xml:"-" json:"-"`
	// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息, 10~300s
	TransCheckImmunityTime int `xml:"-" json:"-"`
	// 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。全局顺序消息,该字段可以设置为任意非空字符串。
	ShardingKey string `xml:"-" json:"-"`
	// contains filtered or unexported fields
}

type PublishMessageResponse

type PublishMessageResponse struct {
	MessageResponse
	// 消息ID
	MessageId string `xml:"MessageId" json:"message_id"`
	// 消息体MD5
	MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
	// 消息句柄只有事务消息才有
	ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
}

func (*PublishMessageResponse) Responses

func (r *PublishMessageResponse) Responses() []*MessageResponse

type ReceiptHandles

type ReceiptHandles struct {
	XMLName xml.Name `xml:"ReceiptHandles"`
	// 消息句柄
	ReceiptHandles []string `xml:"ReceiptHandle"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL