mq

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2021 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ClientName .
	ClientName = "mq-go-sdk/1.0.3(fasthttp)"
	// ClientVersion .
	ClientVersion = "2015-06-06"
	// DefaultTimeout .
	DefaultTimeout int64 = 35
)
View Source
const (
	// GET .
	GET Method = "GET"
	// POST .
	POST = "POST"
	// DELETE .
	DELETE = "DELETE"
)
View Source
const (
	// Authorization .
	Authorization = "Authorization"
	// ContentType .
	ContentType = "Content-Type"
	// ContentMd5 .
	ContentMd5 = "Content-MD5"
	// Version .
	Version = "x-mq-version"
	// Date .
	Date = "Date"
	// SecurityToken .
	SecurityToken = "security-token"
)
View Source
const (
	// ErrNs .
	ErrNs = "MQ"

	// ErrTempStr .
	ErrTempStr = "" /* 133-byte string literal not displayed */
)
View Source
const (
	// StartDeliverTime .
	StartDeliverTime = "__STARTDELIVERTIME"
	// TransCheckImmunityTime .
	TransCheckImmunityTime = "__TransCheckT"
	// Keys .
	Keys = "KEYS"
	// SHARDING .
	SHARDING = "__SHARDINGKEY"
)

Variables

View Source
var (
	// ErrSignMessageFailed .
	ErrSignMessageFailed = errors.TN(ErrNs, 1, "sign message failed, {{.err}}")
	// ErrMarshalMessageFailed .
	ErrMarshalMessageFailed = errors.TN(ErrNs, 2, "marshal message filed, {{.err}}")
	// ErrGeneralAuthHeaderFailed .
	ErrGeneralAuthHeaderFailed = errors.TN(ErrNs, 3, "general auth header failed, {{.err}}")
	// ErrMessageProperty .
	ErrMessageProperty = errors.TN(ErrNs, 4, "message property can not contains:\" ' < > & : |, {{.err}}")
	// ErrSendRequestFailed .
	ErrSendRequestFailed = errors.TN(ErrNs, 5, "send request failed, {{.err}}")
	// ErrUnmarshalErrorResponseFailed .
	ErrUnmarshalErrorResponseFailed = errors.TN(ErrNs, 7, "unmarshal error response failed, {{.err}}, ResponseBody: {{.resp}}")
	// ErrUnmarshalResponseFailed .
	ErrUnmarshalResponseFailed = errors.TN(ErrNs, 8, "unmarshal response failed, {{.err}}")
	// ErrMqServer .
	ErrMqServer = errors.TN(ErrNs, 101, ErrTempStr)
	// ErrAckMessage .
	ErrAckMessage = errors.TN(ErrNs, 102, "AliYun_mq ack message error,code: {{.Code}}, message: {{.Message}}, receiptHandle: {{.ReceiptHandle}}, requestId: {{.RequestID}}")
)
View Source
var (
	// DefaultNumOfMessages .
	DefaultNumOfMessages int32 = 16
)

Functions

func CommitOrRollback

func CommitOrRollback(p *AliYunMQTransProducer, receiptHandle string, trans string) (err error)

CommitOrRollback .

func ConstructPubMessage

func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)

ConstructPubMessage .

func ConstructRecMessage

func ConstructRecMessage(entries *[]ConsumeMessageEntry)

ConstructRecMessage .

func ContainsSpecialChar

func ContainsSpecialChar(input string) (result bool)

ContainsSpecialChar .

func ParseError

func ParseError(resp ErrorResponse, resource string) (err error)

ParseError .

Types

type AckMessageErrorEntry

type AckMessageErrorEntry struct {
	XMLName xml.Name `xml:"Error" json:"-"`
	// Ack消息出错的错误码
	ErrorCode string `xml:"ErrorCode" json:"error_code"`
	// Ack消息出错的错误描述
	ErrorMessage string `xml:"ErrorMessage" json:"error_messages"`
	// Ack消息出错的消息句柄
	ReceiptHandle string `xml:"ReceiptHandle,omitempty" json:"receipt_handle"`
}

AckMessageErrorEntry .

type AckMessageErrorResponse

type AckMessageErrorResponse struct {
	XMLName        xml.Name               `xml:"Errors" json:"-"`
	FailedMessages []AckMessageErrorEntry `xml:"Error" json:"errors"`
}

AckMessageErrorResponse .

type AliYunMQAckMsgDecoder

type AliYunMQAckMsgDecoder struct {
	// contains filtered or unexported fields
}

AliYunMQAckMsgDecoder .

func (*AliYunMQAckMsgDecoder) Decode

func (p *AliYunMQAckMsgDecoder) Decode(reader io.Reader, v interface{}) (err error)

Decode .

func (*AliYunMQAckMsgDecoder) DecodeError

func (p *AliYunMQAckMsgDecoder) DecodeError(bodyBytes []byte, resource string, requestID string) (decodedError error, err error)

DecodeError .

type AliYunMQClient

type AliYunMQClient struct {
	// contains filtered or unexported fields
}

AliYunMQClient .

func (*AliYunMQClient) GetConsumer

func (p *AliYunMQClient) GetConsumer(instanceID string, topicName string, consumer string, messageTag string) Consumer

GetConsumer .

func (*AliYunMQClient) GetCredential

func (p *AliYunMQClient) GetCredential() Credential

GetCredential .

func (*AliYunMQClient) GetEndpoint

func (p *AliYunMQClient) GetEndpoint() string

GetEndpoint .

func (*AliYunMQClient) GetProducer

func (p *AliYunMQClient) GetProducer(instanceID string, topicName string) Producer

GetProducer .

func (*AliYunMQClient) GetTransProducer

func (p *AliYunMQClient) GetTransProducer(instanceID string, topicName string, groupID string) TransProducer

GetTransProducer .

func (*AliYunMQClient) Send

func (p *AliYunMQClient) Send(decoder Decoder, method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error)

Send .

func (*AliYunMQClient) Send0

func (p *AliYunMQClient) Send0(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)

Send0 .

type AliYunMQConsumer

type AliYunMQConsumer struct {
	// contains filtered or unexported fields
}

AliYunMQConsumer .

func (*AliYunMQConsumer) AckMessage

func (p *AliYunMQConsumer) AckMessage(receiptHandles []string) (err error)

AckMessage .

func (*AliYunMQConsumer) ConsumeMessage

func (p *AliYunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

ConsumeMessage .

func (*AliYunMQConsumer) ConsumeMessageOrderly

func (p *AliYunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)

ConsumeMessageOrderly .

func (*AliYunMQConsumer) Consumer

func (p *AliYunMQConsumer) Consumer() string

Consumer .

func (*AliYunMQConsumer) InstanceID

func (p *AliYunMQConsumer) InstanceID() string

InstanceID .

func (*AliYunMQConsumer) MessageTag

func (p *AliYunMQConsumer) MessageTag() string

MessageTag .

func (*AliYunMQConsumer) TopicName

func (p *AliYunMQConsumer) TopicName() string

TopicName .

type AliYunMQDecoder

type AliYunMQDecoder struct {
}

AliYunMQDecoder .

func (*AliYunMQDecoder) Decode

func (p *AliYunMQDecoder) Decode(reader io.Reader, v interface{}) (err error)

Decode .

func (*AliYunMQDecoder) DecodeError

func (p *AliYunMQDecoder) DecodeError(bodyBytes []byte, resource string, requestID string) (decodedError error, err error)

DecodeError .

type AliYunMQProducer

type AliYunMQProducer struct {
	// contains filtered or unexported fields
}

AliYunMQProducer .

func (*AliYunMQProducer) InstanceID

func (p *AliYunMQProducer) InstanceID() string

InstanceID .

func (*AliYunMQProducer) PublishMessage

func (p *AliYunMQProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)

PublishMessage .

func (*AliYunMQProducer) TopicName

func (p *AliYunMQProducer) TopicName() string

TopicName .

type AliYunMQTransProducer

type AliYunMQTransProducer struct {
	// contains filtered or unexported fields
}

AliYunMQTransProducer .

func (*AliYunMQTransProducer) Commit

func (p *AliYunMQTransProducer) Commit(receiptHandle string) (err error)

Commit .

func (*AliYunMQTransProducer) ConsumeHalfMessage

func (p *AliYunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitSeconds int64)

ConsumeHalfMessage .

func (*AliYunMQTransProducer) GroupID

func (p *AliYunMQTransProducer) GroupID() string

GroupID .

func (*AliYunMQTransProducer) InstanceID

func (p *AliYunMQTransProducer) InstanceID() string

InstanceID .

func (*AliYunMQTransProducer) PublishMessage

func (p *AliYunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)

PublishMessage .

func (*AliYunMQTransProducer) Rollback

func (p *AliYunMQTransProducer) Rollback(receiptHandle string) (err error)

Rollback .

func (*AliYunMQTransProducer) TopicName

func (p *AliYunMQTransProducer) TopicName() string

TopicName .

type Client

type Client interface {
	GetProducer(instanceID string, topicName string) Producer
	GetTransProducer(instanceID string, topicName string, groupID string) TransProducer
	GetConsumer(instanceID string, topicName string, consumer string, messageTag string) Consumer
	GetCredential() Credential
	GetEndpoint() string
}

Client .

func NewAliYunMQClient

func NewAliYunMQClient(endpoint, accessKeyID, accessKeySecret, securityToken string) Client

NewAliYunMQClient .

func NewAliYunMQClientWithTimeout

func NewAliYunMQClientWithTimeout(endpoint, accessKeyID, accessKeySecret, securityToken string, timeout time.Duration) Client

NewAliYunMQClientWithTimeout .

type ConsumeMessageEntry

type ConsumeMessageEntry struct {
	MessageResponse
	// 消息ID
	MessageID string `xml:"MessageId" json:"message_id"`
	// 消息句柄
	ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
	// 消息体MD5
	MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
	// 消息体
	MessageBody string `xml:"MessageBody" json:"message_body"`
	// 消息发送时间
	PublishTime int64 `xml:"PublishTime" json:"publish_time"`
	// 下次消费消息的时间(如果这次消费的消息没有Ack)
	NextConsumeTime int64 `xml:"NextConsumeTime" json:"next_consume_time"`
	// 第一次消费的时间,此值对于顺序消费没有意义
	FirstConsumeTime int64 `xml:"FirstConsumeTime" json:"first_consume_time"`
	// 消费次数
	ConsumedTimes int64 `xml:"ConsumedTimes" json:"consumed_times"`
	// 消息标签
	MessageTag string `xml:"MessageTag" json:"message_tag"`
	// 消息属性
	Properties map[string]string `xml:"-" json:"-"`
	// 序列化属性请勿使用
	PropInner string `xml:"Properties,omitempty" json:"properties,omitempty"`
	// 消息KEY
	MessageKey string `xml:"-" json:"-"`
	// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
	StartDeliverTime int64 `xml:"-" json:"-"`
	// 顺序消息分区Key
	ShardingKey string `xml:"-" json:"-"`
	// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息
	TransCheckImmunityTime int `xml:"-" json:"-"`
}

ConsumeMessageEntry .

type ConsumeMessageResponse

type ConsumeMessageResponse struct {
	XMLName  xml.Name              `xml:"Messages" json:"-"`
	Messages []ConsumeMessageEntry `xml:"Message" json:"messages"`
}

ConsumeMessageResponse .

func (*ConsumeMessageResponse) Responses

func (r *ConsumeMessageResponse) Responses() []*MessageResponse

Responses .

type Consumer

type Consumer interface {
	// TopicName 主题名字
	TopicName() string
	// InstanceID  实例ID,可空
	InstanceID() string
	// Consumer 消费者的名字
	Consumer() string
	// MessageTag 消费消息过滤的标签
	MessageTag() string
	// ConsumeMessage 消费消息,如果该条消息没有被 {AckMessage} 确认消费成功,即在NextConsumeTime时会再次消费到该条消息
	ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// ConsumeMessageOrderly .
	// 顺序消费消息,获取的消息可能是多个分区但是一个分区内消息一定是顺序的,
	// 对于一个分区的消息必须要全部ACK成功才能消费下一批消息否则在NextConsumeTime后会再次消费到相同的消息
	ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// AckMessage 确认消息消费成功
	AckMessage(receiptHandles []string) (err error)
}

Consumer MQ的消息消费者

type Credential

type Credential struct {
	// contains filtered or unexported fields
}

Credential .

func NewMQCredential

func NewMQCredential(accessKeyID string, accessKeySecret string, securityToken string) Credential

NewMQCredential .

func (*Credential) AccessKeyID

func (p *Credential) AccessKeyID() string

AccessKeyID .

func (*Credential) AccessKeySecret

func (p *Credential) AccessKeySecret() string

AccessKeySecret .

func (*Credential) SecurityToken

func (p *Credential) SecurityToken() string

SecurityToken .

func (*Credential) Signature

func (p *Credential) Signature(method Method, headers map[string]string, resource string) (signature string, err error)

Signature .

type CredentialI

type CredentialI interface {
	Signature(method Method, headers map[string]string, resource string) (signature string, err error)
	AccessKeyID() string
	AccessKeySecret() string
	SecurityToken() string
}

CredentialI .

type Decoder

type Decoder interface {
	Decode(reader io.Reader, v interface{}) (err error)
	DecodeError(bodyBytes []byte, resource string, requestID string) (decodedError error, err error)
}

Decoder .

func NewAliYunMQAckMsgDecoder

func NewAliYunMQAckMsgDecoder() Decoder

NewAliYunMQAckMsgDecoder .

func NewAliYunMQDecoder

func NewAliYunMQDecoder() Decoder

NewAliYunMQDecoder .

type ErrAckItem

type ErrAckItem struct {
	ErrorHandle string
	ErrorMsg    string
	ErrorCode   string
}

ErrAckItem .

type ErrorResponse

type ErrorResponse struct {
	XMLName xml.Name `xml:"Error" json:"-"`
	// 错误码
	Code string `xml:"Code,omitempty" json:"code,omitempty"`
	// 错误描述
	Message string `xml:"Message,omitempty" json:"message,omitempty"`
	// 请求ID
	RequestID string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	// 请求HOST
	HostID string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

ErrorResponse .

type MessageResponder

type MessageResponder interface {
	Responses() []*MessageResponse
}

MessageResponder .

type MessageResponse

type MessageResponse struct {
	XMLName xml.Name `xml:"Message" json:"-"`
	// 错误码
	Code string `xml:"Code,omitempty" json:"code,omitempty"`
	// 错误描述
	Message string `xml:"Message,omitempty" json:"message,omitempty"`
	// 请求ID
	RequestID string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	// 请求HOST
	HostID string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

MessageResponse .

type Method

type Method string

Method .

type Producer

type Producer interface {
	// TopicName 主题名字
	TopicName() string
	// InstanceID 实例ID,可空
	InstanceID() string
	// PublishMessage 发送消息
	PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
}

Producer MQ的消息生产者

type PublishMessageRequest

type PublishMessageRequest struct {
	XMLName xml.Name `xml:"Message" json:"-"`
	// 消息体
	MessageBody string `xml:"MessageBody" json:"message_body"`
	// 消息标签
	MessageTag string `xml:"MessageTag,omitempty" json:"message_tag,omitempty"`
	// 消息属性
	Properties map[string]string `xml:"-" json:"-"`
	// 消息KEY
	MessageKey string `xml:"-" json:"-"`
	// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
	StartDeliverTime int64 `xml:"-" json:"-"`
	// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息, 10~300s
	TransCheckImmunityTime int `xml:"-" json:"-"`
	// 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。全局顺序消息,该字段可以设置为任意非空字符串。
	ShardingKey string `xml:"-" json:"-"`
	// contains filtered or unexported fields
}

PublishMessageRequest .

type PublishMessageResponse

type PublishMessageResponse struct {
	MessageResponse
	// 消息ID
	MessageID string `xml:"MessageId" json:"message_id"`
	// 消息体MD5
	MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
	// 消息句柄只有事务消息才有
	ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
}

PublishMessageResponse .

func (*PublishMessageResponse) Responses

func (r *PublishMessageResponse) Responses() []*MessageResponse

Responses .

type ReceiptHandles

type ReceiptHandles struct {
	XMLName xml.Name `xml:"ReceiptHandles"`
	// 消息句柄
	ReceiptHandles []string `xml:"ReceiptHandle"`
}

ReceiptHandles .

type TransProducer

type TransProducer interface {
	// TopicName 主题名字
	TopicName() string
	// InstanceID 实例ID,可空
	InstanceID() string
	// GroupID GroupId,非空
	GroupID() string
	// PublishMessage  发送消息
	PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
	// ConsumeHalfMessage 消费事务半消息
	ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
	// Commit 提交事务消息
	Commit(receiptHandle string) (err error)
	// Rollback 回滚事务消息
	Rollback(receiptHandle string) (err error)
}

TransProducer MQ的事务消息生产者

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL