Documentation ¶
Index ¶
- Constants
- Variables
- func CommitOrRollback(p *AliYunMQTransProducer, receiptHandle string, trans string) (err error)
- func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)
- func ConstructRecMessage(entries *[]ConsumeMessageEntry)
- func ContainsSpecialChar(input string) (result bool)
- func ParseError(resp ErrorResponse, resource string) (err error)
- type AckMessageErrorEntry
- type AckMessageErrorResponse
- type AliYunMQAckMsgDecoder
- type AliYunMQClient
- func (p *AliYunMQClient) GetConsumer(instanceID string, topicName string, consumer string, messageTag string) Consumer
- func (p *AliYunMQClient) GetCredential() Credential
- func (p *AliYunMQClient) GetEndpoint() string
- func (p *AliYunMQClient) GetProducer(instanceID string, topicName string) Producer
- func (p *AliYunMQClient) GetTransProducer(instanceID string, topicName string, groupID string) TransProducer
- func (p *AliYunMQClient) Send(decoder Decoder, method Method, headers map[string]string, message interface{}, ...) (statusCode int, err error)
- func (p *AliYunMQClient) Send0(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)
- type AliYunMQConsumer
- func (p *AliYunMQConsumer) AckMessage(receiptHandles []string) (err error)
- func (p *AliYunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliYunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliYunMQConsumer) Consumer() string
- func (p *AliYunMQConsumer) InstanceID() string
- func (p *AliYunMQConsumer) MessageTag() string
- func (p *AliYunMQConsumer) TopicName() string
- type AliYunMQDecoder
- type AliYunMQProducer
- type AliYunMQTransProducer
- func (p *AliYunMQTransProducer) Commit(receiptHandle string) (err error)
- func (p *AliYunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliYunMQTransProducer) GroupID() string
- func (p *AliYunMQTransProducer) InstanceID() string
- func (p *AliYunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
- func (p *AliYunMQTransProducer) Rollback(receiptHandle string) (err error)
- func (p *AliYunMQTransProducer) TopicName() string
- type Client
- type ConsumeMessageEntry
- type ConsumeMessageResponse
- type Consumer
- type Credential
- type CredentialI
- type Decoder
- type ErrAckItem
- type ErrorResponse
- type MessageResponder
- type MessageResponse
- type Method
- type Producer
- type PublishMessageRequest
- type PublishMessageResponse
- type ReceiptHandles
- type TransProducer
Constants ¶
const ( // ClientName . ClientName = "mq-go-sdk/1.0.3(fasthttp)" // ClientVersion . ClientVersion = "2015-06-06" // DefaultTimeout . DefaultTimeout int64 = 35 )
const ( // GET . GET Method = "GET" // POST . POST = "POST" // DELETE . DELETE = "DELETE" )
const ( // Authorization . Authorization = "Authorization" // ContentType . ContentType = "Content-Type" // ContentMd5 . ContentMd5 = "Content-MD5" // Version . Version = "x-mq-version" // Date . Date = "Date" // SecurityToken . SecurityToken = "security-token" )
const ( // ErrNs . ErrNs = "MQ" // ErrTempStr . ErrTempStr = "" /* 133-byte string literal not displayed */ )
const ( // StartDeliverTime . StartDeliverTime = "__STARTDELIVERTIME" // TransCheckImmunityTime . TransCheckImmunityTime = "__TransCheckT" // Keys . Keys = "KEYS" // SHARDING . SHARDING = "__SHARDINGKEY" )
Variables ¶
var ( // ErrSignMessageFailed . ErrSignMessageFailed = errors.TN(ErrNs, 1, "sign message failed, {{.err}}") // ErrMarshalMessageFailed . ErrMarshalMessageFailed = errors.TN(ErrNs, 2, "marshal message filed, {{.err}}") // ErrGeneralAuthHeaderFailed . ErrGeneralAuthHeaderFailed = errors.TN(ErrNs, 3, "general auth header failed, {{.err}}") // ErrMessageProperty . ErrMessageProperty = errors.TN(ErrNs, 4, "message property can not contains:\" ' < > & : |, {{.err}}") // ErrSendRequestFailed . ErrSendRequestFailed = errors.TN(ErrNs, 5, "send request failed, {{.err}}") // ErrUnmarshalErrorResponseFailed . ErrUnmarshalErrorResponseFailed = errors.TN(ErrNs, 7, "unmarshal error response failed, {{.err}}, ResponseBody: {{.resp}}") // ErrUnmarshalResponseFailed . ErrUnmarshalResponseFailed = errors.TN(ErrNs, 8, "unmarshal response failed, {{.err}}") // ErrMqServer . ErrMqServer = errors.TN(ErrNs, 101, ErrTempStr) // ErrAckMessage . ErrAckMessage = errors.TN(ErrNs, 102, "AliYun_mq ack message error,code: {{.Code}}, message: {{.Message}}, receiptHandle: {{.ReceiptHandle}}, requestId: {{.RequestID}}") )
var ( // DefaultNumOfMessages . DefaultNumOfMessages int32 = 16 )
Functions ¶
func CommitOrRollback ¶
func CommitOrRollback(p *AliYunMQTransProducer, receiptHandle string, trans string) (err error)
CommitOrRollback .
func ConstructPubMessage ¶
func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)
ConstructPubMessage .
func ConstructRecMessage ¶
func ConstructRecMessage(entries *[]ConsumeMessageEntry)
ConstructRecMessage .
func ContainsSpecialChar ¶
ContainsSpecialChar .
Types ¶
type AckMessageErrorEntry ¶
type AckMessageErrorEntry struct { XMLName xml.Name `xml:"Error" json:"-"` // Ack消息出错的错误码 ErrorCode string `xml:"ErrorCode" json:"error_code"` // Ack消息出错的错误描述 ErrorMessage string `xml:"ErrorMessage" json:"error_messages"` // Ack消息出错的消息句柄 ReceiptHandle string `xml:"ReceiptHandle,omitempty" json:"receipt_handle"` }
AckMessageErrorEntry .
type AckMessageErrorResponse ¶
type AckMessageErrorResponse struct { XMLName xml.Name `xml:"Errors" json:"-"` FailedMessages []AckMessageErrorEntry `xml:"Error" json:"errors"` }
AckMessageErrorResponse .
type AliYunMQAckMsgDecoder ¶
type AliYunMQAckMsgDecoder struct {
// contains filtered or unexported fields
}
AliYunMQAckMsgDecoder .
func (*AliYunMQAckMsgDecoder) Decode ¶
func (p *AliYunMQAckMsgDecoder) Decode(reader io.Reader, v interface{}) (err error)
Decode .
func (*AliYunMQAckMsgDecoder) DecodeError ¶
func (p *AliYunMQAckMsgDecoder) DecodeError(bodyBytes []byte, resource string, requestID string) (decodedError error, err error)
DecodeError .
type AliYunMQClient ¶
type AliYunMQClient struct {
// contains filtered or unexported fields
}
AliYunMQClient .
func (*AliYunMQClient) GetConsumer ¶
func (p *AliYunMQClient) GetConsumer(instanceID string, topicName string, consumer string, messageTag string) Consumer
GetConsumer .
func (*AliYunMQClient) GetCredential ¶
func (p *AliYunMQClient) GetCredential() Credential
GetCredential .
func (*AliYunMQClient) GetProducer ¶
func (p *AliYunMQClient) GetProducer(instanceID string, topicName string) Producer
GetProducer .
func (*AliYunMQClient) GetTransProducer ¶
func (p *AliYunMQClient) GetTransProducer(instanceID string, topicName string, groupID string) TransProducer
GetTransProducer .
type AliYunMQConsumer ¶
type AliYunMQConsumer struct {
// contains filtered or unexported fields
}
AliYunMQConsumer .
func (*AliYunMQConsumer) AckMessage ¶
func (p *AliYunMQConsumer) AckMessage(receiptHandles []string) (err error)
AckMessage .
func (*AliYunMQConsumer) ConsumeMessage ¶
func (p *AliYunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
ConsumeMessage .
func (*AliYunMQConsumer) ConsumeMessageOrderly ¶
func (p *AliYunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
ConsumeMessageOrderly .
type AliYunMQDecoder ¶
type AliYunMQDecoder struct { }
AliYunMQDecoder .
func (*AliYunMQDecoder) Decode ¶
func (p *AliYunMQDecoder) Decode(reader io.Reader, v interface{}) (err error)
Decode .
func (*AliYunMQDecoder) DecodeError ¶
func (p *AliYunMQDecoder) DecodeError(bodyBytes []byte, resource string, requestID string) (decodedError error, err error)
DecodeError .
type AliYunMQProducer ¶
type AliYunMQProducer struct {
// contains filtered or unexported fields
}
AliYunMQProducer .
func (*AliYunMQProducer) PublishMessage ¶
func (p *AliYunMQProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
PublishMessage .
type AliYunMQTransProducer ¶
type AliYunMQTransProducer struct {
// contains filtered or unexported fields
}
AliYunMQTransProducer .
func (*AliYunMQTransProducer) Commit ¶
func (p *AliYunMQTransProducer) Commit(receiptHandle string) (err error)
Commit .
func (*AliYunMQTransProducer) ConsumeHalfMessage ¶
func (p *AliYunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitSeconds int64)
ConsumeHalfMessage .
func (*AliYunMQTransProducer) InstanceID ¶
func (p *AliYunMQTransProducer) InstanceID() string
InstanceID .
func (*AliYunMQTransProducer) PublishMessage ¶
func (p *AliYunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
PublishMessage .
func (*AliYunMQTransProducer) Rollback ¶
func (p *AliYunMQTransProducer) Rollback(receiptHandle string) (err error)
Rollback .
func (*AliYunMQTransProducer) TopicName ¶
func (p *AliYunMQTransProducer) TopicName() string
TopicName .
type Client ¶
type Client interface { GetProducer(instanceID string, topicName string) Producer GetTransProducer(instanceID string, topicName string, groupID string) TransProducer GetConsumer(instanceID string, topicName string, consumer string, messageTag string) Consumer GetCredential() Credential GetEndpoint() string }
Client .
func NewAliYunMQClient ¶
NewAliYunMQClient .
type ConsumeMessageEntry ¶
type ConsumeMessageEntry struct { MessageResponse // 消息ID MessageID string `xml:"MessageId" json:"message_id"` // 消息句柄 ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"` // 消息体MD5 MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"` // 消息体 MessageBody string `xml:"MessageBody" json:"message_body"` // 消息发送时间 PublishTime int64 `xml:"PublishTime" json:"publish_time"` // 下次消费消息的时间(如果这次消费的消息没有Ack) NextConsumeTime int64 `xml:"NextConsumeTime" json:"next_consume_time"` // 第一次消费的时间,此值对于顺序消费没有意义 FirstConsumeTime int64 `xml:"FirstConsumeTime" json:"first_consume_time"` // 消费次数 ConsumedTimes int64 `xml:"ConsumedTimes" json:"consumed_times"` // 消息标签 MessageTag string `xml:"MessageTag" json:"message_tag"` // 消息属性 Properties map[string]string `xml:"-" json:"-"` // 序列化属性请勿使用 PropInner string `xml:"Properties,omitempty" json:"properties,omitempty"` // 消息KEY MessageKey string `xml:"-" json:"-"` // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递 StartDeliverTime int64 `xml:"-" json:"-"` // 顺序消息分区Key ShardingKey string `xml:"-" json:"-"` // 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息 TransCheckImmunityTime int `xml:"-" json:"-"` }
ConsumeMessageEntry .
type ConsumeMessageResponse ¶
type ConsumeMessageResponse struct { XMLName xml.Name `xml:"Messages" json:"-"` Messages []ConsumeMessageEntry `xml:"Message" json:"messages"` }
ConsumeMessageResponse .
func (*ConsumeMessageResponse) Responses ¶
func (r *ConsumeMessageResponse) Responses() []*MessageResponse
Responses .
type Consumer ¶
type Consumer interface { // TopicName 主题名字 TopicName() string // InstanceID 实例ID,可空 InstanceID() string // Consumer 消费者的名字 Consumer() string // MessageTag 消费消息过滤的标签 MessageTag() string // ConsumeMessage 消费消息,如果该条消息没有被 {AckMessage} 确认消费成功,即在NextConsumeTime时会再次消费到该条消息 ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64) // ConsumeMessageOrderly . // 顺序消费消息,获取的消息可能是多个分区但是一个分区内消息一定是顺序的, // 对于一个分区的消息必须要全部ACK成功才能消费下一批消息否则在NextConsumeTime后会再次消费到相同的消息 ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64) // AckMessage 确认消息消费成功 AckMessage(receiptHandles []string) (err error) }
Consumer MQ的消息消费者
type Credential ¶
type Credential struct {
// contains filtered or unexported fields
}
Credential .
func NewMQCredential ¶
func NewMQCredential(accessKeyID string, accessKeySecret string, securityToken string) Credential
NewMQCredential .
func (*Credential) AccessKeySecret ¶
func (p *Credential) AccessKeySecret() string
AccessKeySecret .
type CredentialI ¶
type CredentialI interface { Signature(method Method, headers map[string]string, resource string) (signature string, err error) AccessKeyID() string AccessKeySecret() string SecurityToken() string }
CredentialI .
type Decoder ¶
type Decoder interface { Decode(reader io.Reader, v interface{}) (err error) DecodeError(bodyBytes []byte, resource string, requestID string) (decodedError error, err error) }
Decoder .
type ErrAckItem ¶
ErrAckItem .
type ErrorResponse ¶
type ErrorResponse struct { XMLName xml.Name `xml:"Error" json:"-"` // 错误码 Code string `xml:"Code,omitempty" json:"code,omitempty"` // 错误描述 Message string `xml:"Message,omitempty" json:"message,omitempty"` // 请求ID RequestID string `xml:"RequestId,omitempty" json:"request_id,omitempty"` // 请求HOST HostID string `xml:"HostId,omitempty" json:"host_id,omitempty"` }
ErrorResponse .
type MessageResponder ¶
type MessageResponder interface {
Responses() []*MessageResponse
}
MessageResponder .
type MessageResponse ¶
type MessageResponse struct { XMLName xml.Name `xml:"Message" json:"-"` // 错误码 Code string `xml:"Code,omitempty" json:"code,omitempty"` // 错误描述 Message string `xml:"Message,omitempty" json:"message,omitempty"` // 请求ID RequestID string `xml:"RequestId,omitempty" json:"request_id,omitempty"` // 请求HOST HostID string `xml:"HostId,omitempty" json:"host_id,omitempty"` }
MessageResponse .
type Producer ¶
type Producer interface { // TopicName 主题名字 TopicName() string // InstanceID 实例ID,可空 InstanceID() string // PublishMessage 发送消息 PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error) }
Producer MQ的消息生产者
type PublishMessageRequest ¶
type PublishMessageRequest struct { XMLName xml.Name `xml:"Message" json:"-"` // 消息体 MessageBody string `xml:"MessageBody" json:"message_body"` // 消息标签 MessageTag string `xml:"MessageTag,omitempty" json:"message_tag,omitempty"` // 消息属性 Properties map[string]string `xml:"-" json:"-"` // 消息KEY MessageKey string `xml:"-" json:"-"` // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递 StartDeliverTime int64 `xml:"-" json:"-"` // 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息, 10~300s TransCheckImmunityTime int `xml:"-" json:"-"` // 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。全局顺序消息,该字段可以设置为任意非空字符串。 ShardingKey string `xml:"-" json:"-"` // contains filtered or unexported fields }
PublishMessageRequest .
type PublishMessageResponse ¶
type PublishMessageResponse struct { MessageResponse // 消息ID MessageID string `xml:"MessageId" json:"message_id"` // 消息体MD5 MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"` // 消息句柄只有事务消息才有 ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"` }
PublishMessageResponse .
func (*PublishMessageResponse) Responses ¶
func (r *PublishMessageResponse) Responses() []*MessageResponse
Responses .
type ReceiptHandles ¶
type ReceiptHandles struct { XMLName xml.Name `xml:"ReceiptHandles"` // 消息句柄 ReceiptHandles []string `xml:"ReceiptHandle"` }
ReceiptHandles .
type TransProducer ¶
type TransProducer interface { // TopicName 主题名字 TopicName() string // InstanceID 实例ID,可空 InstanceID() string // GroupID GroupId,非空 GroupID() string // PublishMessage 发送消息 PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error) // ConsumeHalfMessage 消费事务半消息 ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64) // Commit 提交事务消息 Commit(receiptHandle string) (err error) // Rollback 回滚事务消息 Rollback(receiptHandle string) (err error) }
TransProducer MQ的事务消息生产者