Documentation ¶
Index ¶
- Constants
- Variables
- func CommitOrRollback(p *AliyunMQTransProducer, receiptHandle string, trans string) (err error)
- func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)
- func ConstructRecMessage(entries *[]ConsumeMessageEntry)
- func ContainsSpecialChar(input string) (result bool)
- func ParseError(resp ErrorResponse, resource string) (err error)
- type AckMessageErrorEntry
- type AckMessageErrorResponse
- type AliyunMQAckMsgDecoder
- type AliyunMQClient
- func (p *AliyunMQClient) GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer
- func (p *AliyunMQClient) GetCredential() Credential
- func (p *AliyunMQClient) GetEndpoint() string
- func (p *AliyunMQClient) GetProducer(instanceId string, topicName string) MQProducer
- func (p *AliyunMQClient) GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer
- func (p *AliyunMQClient) Send(decoder MQDecoder, method Method, headers map[string]string, ...) (statusCode int, err error)
- func (p *AliyunMQClient) Send0(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)
- type AliyunMQConsumer
- func (p *AliyunMQConsumer) AckMessage(receiptHandles []string) (err error)
- func (p *AliyunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliyunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliyunMQConsumer) Consumer() string
- func (p *AliyunMQConsumer) InstanceId() string
- func (p *AliyunMQConsumer) MessageTag() string
- func (p *AliyunMQConsumer) TopicName() string
- type AliyunMQDecoder
- type AliyunMQProducer
- type AliyunMQTransProducer
- func (p *AliyunMQTransProducer) Commit(receiptHandle string) (err error)
- func (p *AliyunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliyunMQTransProducer) GroupId() string
- func (p *AliyunMQTransProducer) InstanceId() string
- func (p *AliyunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
- func (p *AliyunMQTransProducer) Rollback(receiptHandle string) (err error)
- func (p *AliyunMQTransProducer) TopicName() string
- type ConsumeMessageEntry
- type ConsumeMessageResponse
- type Credential
- type ErrAckItem
- type ErrorResponse
- type MQClient
- type MQConsumer
- type MQCredential
- type MQDecoder
- type MQProducer
- type MQTransProducer
- type MessageResponder
- type MessageResponse
- type Method
- type PublishMessageRequest
- type PublishMessageResponse
- type ReceiptHandles
Constants ¶
View Source
const ( ClientName = "mq-go-sdk/1.0.3(fasthttp)" ClientVersion = "2015-06-06" DefaultTimeout int64 = 35 )
View Source
const ( GET Method = "GET" POST = "POST" DELETE = "DELETE" )
View Source
const ( AUTHORIZATION = "Authorization" CONTENT_TYPE = "Content-Type" CONTENT_MD5 = "Content-MD5" MQ_VERSION = "x-mq-version" HOST = "Host" DATE = "Date" KEEP_ALIVE = "Keep-Alive" SECURITY_TOKEN = "security-token" )
View Source
const ( ALIYUN_MQ_ERR_NS = "MQ" ALIYUN_MQ_ERR_TEMPSTR = "" /* 133-byte string literal not displayed */ )
View Source
const ( StartDeliverTime = "__STARTDELIVERTIME" TransCheckImmunityTime = "__TransCheckT" Keys = "KEYS" SHARDING = "__SHARDINGKEY" )
Variables ¶
View Source
var ( ErrSignMessageFailed = errors.TN(ALIYUN_MQ_ERR_NS, 1, "sign message failed, {{.err}}") ErrMarshalMessageFailed = errors.TN(ALIYUN_MQ_ERR_NS, 2, "marshal message filed, {{.err}}") ErrGeneralAuthHeaderFailed = errors.TN(ALIYUN_MQ_ERR_NS, 3, "general auth header failed, {{.err}}") ErrMessageProperty = errors.TN(ALIYUN_MQ_ERR_NS, 4, "message property can not contains:\" ' < > & : |, {{.err}}") ErrSendRequestFailed = errors.TN(ALIYUN_MQ_ERR_NS, 5, "send request failed, {{.err}}") ErrUnmarshalErrorResponseFailed = errors.TN(ALIYUN_MQ_ERR_NS, 7, "unmarshal error response failed, {{.err}}, ResponseBody: {{.resp}}") ErrUnmarshalResponseFailed = errors.TN(ALIYUN_MQ_ERR_NS, 8, "unmarshal response failed, {{.err}}") ErrMqServer = errors.TN(ALIYUN_MQ_ERR_NS, 101, ALIYUN_MQ_ERR_TEMPSTR) ErrAckMessage = errors.TN(ALIYUN_MQ_ERR_NS, 102, "aliyun_mq ack message error,code: {{.Code}}, message: {{.Message}}, receiptHandle: {{.ReceiptHandle}}, requestId: {{.RequestId}}") )
View Source
var (
DefaultNumOfMessages int32 = 16
)
Functions ¶
func CommitOrRollback ¶
func CommitOrRollback(p *AliyunMQTransProducer, receiptHandle string, trans string) (err error)
func ConstructPubMessage ¶
func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)
func ConstructRecMessage ¶
func ConstructRecMessage(entries *[]ConsumeMessageEntry)
func ContainsSpecialChar ¶
func ParseError ¶
func ParseError(resp ErrorResponse, resource string) (err error)
Types ¶
type AckMessageErrorEntry ¶
type AckMessageErrorEntry struct { XMLName xml.Name `xml:"Error" json:"-"` // Ack消息出错的错误码 ErrorCode string `xml:"ErrorCode" json:"error_code"` // Ack消息出错的错误描述 ErrorMessage string `xml:"ErrorMessage" json:"error_messages"` // Ack消息出错的消息句柄 ReceiptHandle string `xml:"ReceiptHandle,omitempty" json:"receipt_handle"` }
type AckMessageErrorResponse ¶
type AckMessageErrorResponse struct { XMLName xml.Name `xml:"Errors" json:"-"` FailedMessages []AckMessageErrorEntry `xml:"Error" json:"errors"` }
type AliyunMQAckMsgDecoder ¶
type AliyunMQAckMsgDecoder struct {
// contains filtered or unexported fields
}
func (*AliyunMQAckMsgDecoder) Decode ¶
func (p *AliyunMQAckMsgDecoder) Decode(reader io.Reader, v interface{}) (err error)
func (*AliyunMQAckMsgDecoder) DecodeError ¶
type AliyunMQClient ¶
type AliyunMQClient struct {
// contains filtered or unexported fields
}
func (*AliyunMQClient) GetConsumer ¶
func (p *AliyunMQClient) GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer
func (*AliyunMQClient) GetCredential ¶
func (p *AliyunMQClient) GetCredential() Credential
func (*AliyunMQClient) GetEndpoint ¶
func (p *AliyunMQClient) GetEndpoint() string
func (*AliyunMQClient) GetProducer ¶
func (p *AliyunMQClient) GetProducer(instanceId string, topicName string) MQProducer
func (*AliyunMQClient) GetTransProducer ¶
func (p *AliyunMQClient) GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer
type AliyunMQConsumer ¶
type AliyunMQConsumer struct {
// contains filtered or unexported fields
}
func (*AliyunMQConsumer) AckMessage ¶
func (p *AliyunMQConsumer) AckMessage(receiptHandles []string) (err error)
func (*AliyunMQConsumer) ConsumeMessage ¶
func (p *AliyunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
func (*AliyunMQConsumer) ConsumeMessageOrderly ¶
func (p *AliyunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
func (*AliyunMQConsumer) Consumer ¶
func (p *AliyunMQConsumer) Consumer() string
func (*AliyunMQConsumer) InstanceId ¶
func (p *AliyunMQConsumer) InstanceId() string
func (*AliyunMQConsumer) MessageTag ¶
func (p *AliyunMQConsumer) MessageTag() string
func (*AliyunMQConsumer) TopicName ¶
func (p *AliyunMQConsumer) TopicName() string
type AliyunMQDecoder ¶
type AliyunMQDecoder struct { }
func (*AliyunMQDecoder) Decode ¶
func (p *AliyunMQDecoder) Decode(reader io.Reader, v interface{}) (err error)
func (*AliyunMQDecoder) DecodeError ¶
type AliyunMQProducer ¶
type AliyunMQProducer struct {
// contains filtered or unexported fields
}
func (*AliyunMQProducer) InstanceId ¶
func (p *AliyunMQProducer) InstanceId() string
func (*AliyunMQProducer) PublishMessage ¶
func (p *AliyunMQProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
func (*AliyunMQProducer) TopicName ¶
func (p *AliyunMQProducer) TopicName() string
type AliyunMQTransProducer ¶
type AliyunMQTransProducer struct {
// contains filtered or unexported fields
}
func (*AliyunMQTransProducer) Commit ¶
func (p *AliyunMQTransProducer) Commit(receiptHandle string) (err error)
func (*AliyunMQTransProducer) ConsumeHalfMessage ¶
func (p *AliyunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
func (*AliyunMQTransProducer) GroupId ¶
func (p *AliyunMQTransProducer) GroupId() string
func (*AliyunMQTransProducer) InstanceId ¶
func (p *AliyunMQTransProducer) InstanceId() string
func (*AliyunMQTransProducer) PublishMessage ¶
func (p *AliyunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
func (*AliyunMQTransProducer) Rollback ¶
func (p *AliyunMQTransProducer) Rollback(receiptHandle string) (err error)
func (*AliyunMQTransProducer) TopicName ¶
func (p *AliyunMQTransProducer) TopicName() string
type ConsumeMessageEntry ¶
type ConsumeMessageEntry struct { MessageResponse // 消息ID MessageId string `xml:"MessageId" json:"message_id"` // 消息句柄 ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"` // 消息体MD5 MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"` // 消息体 MessageBody string `xml:"MessageBody" json:"message_body"` // 消息发送时间 PublishTime int64 `xml:"PublishTime" json:"publish_time"` // 下次消费消息的时间(如果这次消费的消息没有Ack) NextConsumeTime int64 `xml:"NextConsumeTime" json:"next_consume_time"` // 第一次消费的时间,此值对于顺序消费没有意义 FirstConsumeTime int64 `xml:"FirstConsumeTime" json:"first_consume_time"` // 消费次数 ConsumedTimes int64 `xml:"ConsumedTimes" json:"consumed_times"` // 消息标签 MessageTag string `xml:"MessageTag" json:"message_tag"` // 消息属性 Properties map[string]string `xml:"-" json:"-"` // 序列化属性请勿使用 PropInner string `xml:"Properties,omitempty" json:"properties,omitempty"` // 消息KEY MessageKey string `xml:"-" json:"-"` // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递 StartDeliverTime int64 `xml:"-" json:"-"` // 顺序消息分区Key ShardingKey string `xml:"-" json:"-"` // 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息 TransCheckImmunityTime int `xml:"-" json:"-"` }
type ConsumeMessageResponse ¶
type ConsumeMessageResponse struct { XMLName xml.Name `xml:"Messages" json:"-"` Messages []ConsumeMessageEntry `xml:"Message" json:"messages"` }
func (*ConsumeMessageResponse) Responses ¶
func (r *ConsumeMessageResponse) Responses() []*MessageResponse
type Credential ¶
type ErrAckItem ¶
type ErrorResponse ¶
type ErrorResponse struct { XMLName xml.Name `xml:"Error" json:"-"` // 错误码 Code string `xml:"Code,omitempty" json:"code,omitempty"` // 错误描述 Message string `xml:"Message,omitempty" json:"message,omitempty"` // 请求ID RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"` // 请求HOST HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"` }
type MQClient ¶
type MQClient interface { GetProducer(instanceId string, topicName string) MQProducer GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer GetCredential() Credential GetEndpoint() string }
func NewAliyunMQClient ¶
type MQConsumer ¶
type MQConsumer interface { // 主题名字 TopicName() string // 实例ID,可空 InstanceId() string // 消费者的名字 Consumer() string // 消费消息过滤的标签 MessageTag() string // 消费消息,如果该条消息没有被 {AckMessage} 确认消费成功,即在NextConsumeTime时会再次消费到该条消息 ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64) // 顺序消费消息,获取的消息可能是多个分区但是一个分区内消息一定是顺序的,对于一个分区的消息必须要全部ACK成功才能消费下一批消息 // 否则在NextConsumeTime后会再次消费到相同的消息 ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64) // 确认消息消费成功 AckMessage(receiptHandles []string) (err error) }
MQ的消息消费者
type MQCredential ¶
type MQCredential struct {
// contains filtered or unexported fields
}
func NewMQCredential ¶
func NewMQCredential(accessKeyId string, accessKeySecret string, securityToken string) *MQCredential
func (*MQCredential) AccessKeyId ¶
func (p *MQCredential) AccessKeyId() string
func (*MQCredential) AccessKeySecret ¶
func (p *MQCredential) AccessKeySecret() string
func (*MQCredential) SecurityToken ¶
func (p *MQCredential) SecurityToken() string
type MQDecoder ¶
type MQDecoder interface { Decode(reader io.Reader, v interface{}) (err error) DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error) }
func NewAliyunMQAckMsgDecoder ¶
func NewAliyunMQAckMsgDecoder() MQDecoder
func NewAliyunMQDecoder ¶
func NewAliyunMQDecoder() MQDecoder
type MQProducer ¶
type MQProducer interface { // 主题名字 TopicName() string // 实例ID,可空 InstanceId() string // 发送消息 PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error) }
MQ的消息生产者
type MQTransProducer ¶
type MQTransProducer interface { // 主题名字 TopicName() string // 实例ID,可空 InstanceId() string // GroupId,非空 GroupId() string // 发送消息 PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error) // 消费事务半消息 ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64) // 提交事务消息 Commit(receiptHandle string) (err error) // 回滚事务消息 Rollback(receiptHandle string) (err error) }
MQ的事务消息生产者
type MessageResponder ¶
type MessageResponder interface {
Responses() []*MessageResponse
}
type MessageResponse ¶
type MessageResponse struct { XMLName xml.Name `xml:"Message" json:"-"` // 错误码 Code string `xml:"Code,omitempty" json:"code,omitempty"` // 错误描述 Message string `xml:"Message,omitempty" json:"message,omitempty"` // 请求ID RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"` // 请求HOST HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"` }
type PublishMessageRequest ¶
type PublishMessageRequest struct { XMLName xml.Name `xml:"Message" json:"-"` // 消息体 MessageBody string `xml:"MessageBody" json:"message_body"` // 消息标签 MessageTag string `xml:"MessageTag,omitempty" json:"message_tag,omitempty"` // 消息属性 Properties map[string]string `xml:"-" json:"-"` // 消息KEY MessageKey string `xml:"-" json:"-"` // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递 StartDeliverTime int64 `xml:"-" json:"-"` // 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息, 10~300s TransCheckImmunityTime int `xml:"-" json:"-"` // 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。全局顺序消息,该字段可以设置为任意非空字符串。 ShardingKey string `xml:"-" json:"-"` // contains filtered or unexported fields }
type PublishMessageResponse ¶
type PublishMessageResponse struct { MessageResponse // 消息ID MessageId string `xml:"MessageId" json:"message_id"` // 消息体MD5 MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"` // 消息句柄只有事务消息才有 ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"` }
func (*PublishMessageResponse) Responses ¶
func (r *PublishMessageResponse) Responses() []*MessageResponse
type ReceiptHandles ¶
Click to show internal directories.
Click to hide internal directories.