mq_http_sdk

package module
v0.0.0-...-5b9c381 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2019 License: MIT Imports: 17 Imported by: 0

README

MQ GO HTTP SDK

Alyun MQ Documents: http://www.aliyun.com/product/ons

Aliyun MQ Console: https://ons.console.aliyun.com

Use

  1. import github.com/haoqipei/mq-http-go-sdk
  2. setup GOPATH:
    • MAC/LINUX: export GOPATH={dir}, export GOBIN=$GOPATH/bin
    • WINDOWS: set GOPATH={dir}
  3. go get -x -v

Sample

V1.0.0 Samples

Publish Message

Consume Message

V1.0.1 Samples

Publish Message

Consume Message

Transaction Message

Note for 1.0.1: Http consumer only support timer msg(less than 3 days), no matter the msg is produced from http or tcp protocal.

V1.0.3 Samples

Publish Message

Consume Message

Transaction Message

Publish Order Message

Consume Order Message

Not for 1.0.3: Order is only supported at special server cluster.

Documentation

Index

Constants

View Source
const (
	ClientName           = "mq-go-sdk/1.0.3(fasthttp)"
	ClientVersion        = "2015-06-06"
	DefaultTimeout int64 = 35
)
View Source
const (
	GET    Method = "GET"
	POST          = "POST"
	DELETE        = "DELETE"
)
View Source
const (
	AUTHORIZATION  = "Authorization"
	CONTENT_TYPE   = "Content-Type"
	CONTENT_MD5    = "Content-MD5"
	MQ_VERSION     = "x-mq-version"
	HOST           = "Host"
	DATE           = "Date"
	KEEP_ALIVE     = "Keep-Alive"
	SECURITY_TOKEN = "security-token"
)
View Source
const (
	ALIYUN_MQ_ERR_NS = "MQ"

	ALIYUN_MQ_ERR_TEMPSTR = "" /* 133-byte string literal not displayed */
)
View Source
const (
	StartDeliverTime       = "__STARTDELIVERTIME"
	TransCheckImmunityTime = "__TransCheckT"
	Keys                   = "KEYS"
	SHARDING               = "__SHARDINGKEY"
)

Variables

View Source
var (
	ErrSignMessageFailed       = errors.TN(ALIYUN_MQ_ERR_NS, 1, "sign message failed, {{.err}}")
	ErrMarshalMessageFailed    = errors.TN(ALIYUN_MQ_ERR_NS, 2, "marshal message filed, {{.err}}")
	ErrGeneralAuthHeaderFailed = errors.TN(ALIYUN_MQ_ERR_NS, 3, "general auth header failed, {{.err}}")
	ErrMessageProperty         = errors.TN(ALIYUN_MQ_ERR_NS, 4, "message property can not contains:\" ' < > & : |, {{.err}}")

	ErrSendRequestFailed = errors.TN(ALIYUN_MQ_ERR_NS, 5, "send request failed, {{.err}}")

	ErrUnmarshalErrorResponseFailed = errors.TN(ALIYUN_MQ_ERR_NS, 7, "unmarshal error response failed, {{.err}}, ResponseBody: {{.resp}}")
	ErrUnmarshalResponseFailed      = errors.TN(ALIYUN_MQ_ERR_NS, 8, "unmarshal response failed, {{.err}}")

	ErrMqServer = errors.TN(ALIYUN_MQ_ERR_NS, 101, ALIYUN_MQ_ERR_TEMPSTR)

	ErrAckMessage = errors.TN(ALIYUN_MQ_ERR_NS, 102, "aliyun_mq ack message error,code: {{.Code}}, message: {{.Message}}, receiptHandle: {{.ReceiptHandle}}, requestId: {{.RequestId}}")
)
View Source
var (
	DefaultNumOfMessages int32 = 16
)

Functions

func CommitOrRollback

func CommitOrRollback(p *AliyunMQTransProducer, receiptHandle string, trans string) (err error)

func ConstructPubMessage

func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)

func ConstructRecMessage

func ConstructRecMessage(entries *[]ConsumeMessageEntry)

func ContainsSpecialChar

func ContainsSpecialChar(input string) (result bool)

func ParseError

func ParseError(resp ErrorResponse, resource string) (err error)

Types

type AckMessageErrorEntry

type AckMessageErrorEntry struct {
	XMLName xml.Name `xml:"Error" json:"-"`
	// Ack消息出错的错误码
	ErrorCode string `xml:"ErrorCode" json:"error_code"`
	// Ack消息出错的错误描述
	ErrorMessage string `xml:"ErrorMessage" json:"error_messages"`
	// Ack消息出错的消息句柄
	ReceiptHandle string `xml:"ReceiptHandle,omitempty" json:"receipt_handle"`
}

type AckMessageErrorResponse

type AckMessageErrorResponse struct {
	XMLName        xml.Name               `xml:"Errors" json:"-"`
	FailedMessages []AckMessageErrorEntry `xml:"Error" json:"errors"`
}

type AliyunMQAckMsgDecoder

type AliyunMQAckMsgDecoder struct {
	// contains filtered or unexported fields
}

func (*AliyunMQAckMsgDecoder) Decode

func (p *AliyunMQAckMsgDecoder) Decode(reader io.Reader, v interface{}) (err error)

func (*AliyunMQAckMsgDecoder) DecodeError

func (p *AliyunMQAckMsgDecoder) DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error)

type AliyunMQClient

type AliyunMQClient struct {
	// contains filtered or unexported fields
}

func (*AliyunMQClient) GetConsumer

func (p *AliyunMQClient) GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer

func (*AliyunMQClient) GetCredential

func (p *AliyunMQClient) GetCredential() Credential

func (*AliyunMQClient) GetEndpoint

func (p *AliyunMQClient) GetEndpoint() string

func (*AliyunMQClient) GetProducer

func (p *AliyunMQClient) GetProducer(instanceId string, topicName string) MQProducer

func (*AliyunMQClient) GetTransProducer

func (p *AliyunMQClient) GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer

func (*AliyunMQClient) Send

func (p *AliyunMQClient) Send(decoder MQDecoder, method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error)

func (*AliyunMQClient) Send0

func (p *AliyunMQClient) Send0(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)

type AliyunMQConsumer

type AliyunMQConsumer struct {
	// contains filtered or unexported fields
}

func (*AliyunMQConsumer) AckMessage

func (p *AliyunMQConsumer) AckMessage(receiptHandles []string) (err error)

func (*AliyunMQConsumer) ConsumeMessage

func (p *AliyunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

func (*AliyunMQConsumer) ConsumeMessageOrderly

func (p *AliyunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

func (*AliyunMQConsumer) Consumer

func (p *AliyunMQConsumer) Consumer() string

func (*AliyunMQConsumer) InstanceId

func (p *AliyunMQConsumer) InstanceId() string

func (*AliyunMQConsumer) MessageTag

func (p *AliyunMQConsumer) MessageTag() string

func (*AliyunMQConsumer) TopicName

func (p *AliyunMQConsumer) TopicName() string

type AliyunMQDecoder

type AliyunMQDecoder struct {
}

func (*AliyunMQDecoder) Decode

func (p *AliyunMQDecoder) Decode(reader io.Reader, v interface{}) (err error)

func (*AliyunMQDecoder) DecodeError

func (p *AliyunMQDecoder) DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error)

type AliyunMQProducer

type AliyunMQProducer struct {
	// contains filtered or unexported fields
}

func (*AliyunMQProducer) InstanceId

func (p *AliyunMQProducer) InstanceId() string

func (*AliyunMQProducer) PublishMessage

func (p *AliyunMQProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)

func (*AliyunMQProducer) TopicName

func (p *AliyunMQProducer) TopicName() string

type AliyunMQTransProducer

type AliyunMQTransProducer struct {
	// contains filtered or unexported fields
}

func (*AliyunMQTransProducer) Commit

func (p *AliyunMQTransProducer) Commit(receiptHandle string) (err error)

func (*AliyunMQTransProducer) ConsumeHalfMessage

func (p *AliyunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

func (*AliyunMQTransProducer) GroupId

func (p *AliyunMQTransProducer) GroupId() string

func (*AliyunMQTransProducer) InstanceId

func (p *AliyunMQTransProducer) InstanceId() string

func (*AliyunMQTransProducer) PublishMessage

func (p *AliyunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)

func (*AliyunMQTransProducer) Rollback

func (p *AliyunMQTransProducer) Rollback(receiptHandle string) (err error)

func (*AliyunMQTransProducer) TopicName

func (p *AliyunMQTransProducer) TopicName() string

type ConsumeMessageEntry

type ConsumeMessageEntry struct {
	MessageResponse
	// 消息ID
	MessageId string `xml:"MessageId" json:"message_id"`
	// 消息句柄
	ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
	// 消息体MD5
	MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
	// 消息体
	MessageBody string `xml:"MessageBody" json:"message_body"`
	// 消息发送时间
	PublishTime int64 `xml:"PublishTime" json:"publish_time"`
	// 下次消费消息的时间(如果这次消费的消息没有Ack)
	NextConsumeTime int64 `xml:"NextConsumeTime" json:"next_consume_time"`
	// 第一次消费的时间,此值对于顺序消费没有意义
	FirstConsumeTime int64 `xml:"FirstConsumeTime" json:"first_consume_time"`
	// 消费次数
	ConsumedTimes int64 `xml:"ConsumedTimes" json:"consumed_times"`
	// 消息标签
	MessageTag string `xml:"MessageTag" json:"message_tag"`
	// 消息属性
	Properties map[string]string `xml:"-" json:"-"`
	// 序列化属性请勿使用
	PropInner string `xml:"Properties,omitempty" json:"properties,omitempty"`
	// 消息KEY
	MessageKey string `xml:"-" json:"-"`
	// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
	StartDeliverTime int64 `xml:"-" json:"-"`
	// 顺序消息分区Key
	ShardingKey string `xml:"-" json:"-"`
	// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息
	TransCheckImmunityTime int `xml:"-" json:"-"`
}

type ConsumeMessageResponse

type ConsumeMessageResponse struct {
	XMLName  xml.Name              `xml:"Messages" json:"-"`
	Messages []ConsumeMessageEntry `xml:"Message" json:"messages"`
}

type Credential

type Credential interface {
	Signature(method Method, headers map[string]string, resource string) (signature string, err error)
	AccessKeyId() string
	AccessKeySecret() string
	SecurityToken() string
}

type ErrAckItem

type ErrAckItem struct {
	ErrorHandle string
	ErrorMsg    string
	ErrorCode   string
}

type ErrorResponse

type ErrorResponse struct {
	XMLName xml.Name `xml:"Error" json:"-"`
	// 错误码
	Code string `xml:"Code,omitempty" json:"code,omitempty"`
	// 错误描述
	Message string `xml:"Message,omitempty" json:"message,omitempty"`
	// 请求ID
	RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	// 请求HOST
	HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

type MQClient

type MQClient interface {
	GetProducer(instanceId string, topicName string) MQProducer
	GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer
	GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer
	GetCredential() Credential
	GetEndpoint() string
}

func NewAliyunMQClient

func NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, securityToken string) MQClient

type MQConsumer

type MQConsumer interface {
	// 主题名字
	TopicName() string
	// 实例ID,可空
	InstanceId() string
	// 消费者的名字
	Consumer() string
	// 消费消息过滤的标签
	MessageTag() string
	// 消费消息,如果该条消息没有被 {AckMessage} 确认消费成功,即在NextConsumeTime时会再次消费到该条消息
	ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// 顺序消费消息,获取的消息可能是多个分区但是一个分区内消息一定是顺序的,对于一个分区的消息必须要全部ACK成功才能消费下一批消息
	// 否则在NextConsumeTime后会再次消费到相同的消息
	ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// 确认消息消费成功
	AckMessage(receiptHandles []string) (err error)
}

MQ的消息消费者

type MQCredential

type MQCredential struct {
	// contains filtered or unexported fields
}

func NewMQCredential

func NewMQCredential(accessKeyId string, accessKeySecret string, securityToken string) *MQCredential

func (*MQCredential) AccessKeyId

func (p *MQCredential) AccessKeyId() string

func (*MQCredential) AccessKeySecret

func (p *MQCredential) AccessKeySecret() string

func (*MQCredential) SecurityToken

func (p *MQCredential) SecurityToken() string

func (*MQCredential) Signature

func (p *MQCredential) Signature(method Method, headers map[string]string, resource string) (signature string, err error)

type MQDecoder

type MQDecoder interface {
	Decode(reader io.Reader, v interface{}) (err error)
	DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error)
}

func NewAliyunMQAckMsgDecoder

func NewAliyunMQAckMsgDecoder() MQDecoder

func NewAliyunMQDecoder

func NewAliyunMQDecoder() MQDecoder

type MQProducer

type MQProducer interface {
	// 主题名字
	TopicName() string
	// 实例ID,可空
	InstanceId() string
	// 发送消息
	PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
}

MQ的消息生产者

type MQTransProducer

type MQTransProducer interface {
	// 主题名字
	TopicName() string
	// 实例ID,可空
	InstanceId() string
	// GroupId,非空
	GroupId() string
	// 发送消息
	PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
	// 消费事务半消息
	ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// 提交事务消息
	Commit(receiptHandle string) (err error)
	// 回滚事务消息
	Rollback(receiptHandle string) (err error)
}

MQ的事务消息生产者

type MessageResponse

type MessageResponse struct {
	XMLName xml.Name `xml:"Message" json:"-"`
	// 错误码
	Code string `xml:"Code,omitempty" json:"code,omitempty"`
	// 错误描述
	Message string `xml:"Message,omitempty" json:"message,omitempty"`
	// 请求ID
	RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	// 请求HOST
	HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

type Method

type Method string

type PublishMessageRequest

type PublishMessageRequest struct {
	XMLName xml.Name `xml:"Message" json:"-"`
	// 消息体
	MessageBody string `xml:"MessageBody" json:"message_body"`
	// 消息标签
	MessageTag string `xml:"MessageTag,omitempty" json:"message_tag,omitempty"`
	// 消息属性
	Properties map[string]string `xml:"-" json:"-"`
	// 消息KEY
	MessageKey string `xml:"-" json:"-"`
	// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
	StartDeliverTime int64 `xml:"-" json:"-"`
	// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息, 10~300s
	TransCheckImmunityTime int `xml:"-" json:"-"`
	// 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。全局顺序消息,该字段可以设置为任意非空字符串。
	ShardingKey string `xml:"-" json:"-"`
	// contains filtered or unexported fields
}

type PublishMessageResponse

type PublishMessageResponse struct {
	MessageResponse
	// 消息ID
	MessageId string `xml:"MessageId" json:"message_id"`
	// 消息体MD5
	MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
	// 消息句柄只有事务消息才有
	ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
}

type ReceiptHandles

type ReceiptHandles struct {
	XMLName xml.Name `xml:"ReceiptHandles"`
	// 消息句柄
	ReceiptHandles []string `xml:"ReceiptHandle"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL