Documentation ¶
Index ¶
- Constants
- Variables
- func ParseError(resp ErrorMessageResponse, resource string) (err error)
- type AliMNSClient
- type AliMNSCredential
- type AliMNSDecoder
- type AliMNSQueue
- type AliQueueManager
- type Base64Bytes
- type BatchMessageReceiveResponse
- type BatchMessageSendRequest
- type BatchMessageSendResponse
- type CreateQueueRequest
- type Credential
- type ErrorMessageResponse
- type MNSClient
- type MNSDecoder
- type MNSQueue
- func (p *MNSQueue) BatchDeleteMessage(receiptHandles ...string) (err error)
- func (p *MNSQueue) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, ...)
- func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, ...)
- func (p *MNSQueue) BatchSendMessage(messages ...MessageSendRequest) (resp BatchMessageSendResponse, err error)
- func (p *MNSQueue) ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error)
- func (p *MNSQueue) DeleteMessage(receiptHandle string) (err error)
- func (p *MNSQueue) Name() string
- func (p *MNSQueue) PeekMessage(respChan chan MessageReceiveResponse, errChan chan error, ...)
- func (p *MNSQueue) ReceiveMessage(respChan chan MessageReceiveResponse, errChan chan error, waitseconds ...int64)
- func (p *MNSQueue) SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error)
- func (p *MNSQueue) Stop()
- type MNSQueueManager
- func (p *MNSQueueManager) CreateQueue(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, ...) (err error)
- func (p *MNSQueueManager) DeleteQueue(endpoint string, queueName string) (err error)
- func (p *MNSQueueManager) GetQueueAttributes(endpoint string, queueName string) (attr QueueAttribute, err error)
- func (p *MNSQueueManager) ListQueue(endpoint string, nextMarker string, retNumber int32, prefix string) (queues Queues, err error)
- func (p *MNSQueueManager) SetQueueAttributes(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, ...) (err error)
- type MessageReceiveResponse
- type MessageResponse
- type MessageSendRequest
- type MessageSendResponse
- type MessageVisibilityChangeResponse
- type Method
- type QPSMonitor
- type Queue
- type QueueAttribute
- type Queues
- type ReceiptHandles
Constants ¶
View Source
const ( GET Method = "GET" PUT = "PUT" POST = "POST" DELETE = "DELETE" )
View Source
const ( AUTHORIZATION = "Authorization" CONTENT_TYPE = "Content-Type" CONTENT_MD5 = "Content-MD5" MQ_VERSION = "x-mns-version" HOST = "Host" DATE = "Date" KEEP_ALIVE = "Keep-Alive" )
View Source
const ( PROXY_PREFIX = "MNS_PROXY_" GLOBAL_PROXY = "MNS_GLOBAL_PROXY" )
View Source
const (
ALI_MNS_ERR_NS = "MNS"
)
View Source
const (
DefaultTimeout int64 = 35
)
Variables ¶
View Source
var ( ERR_SIGN_MESSAGE_FAILED = errors.TN(ALI_MNS_ERR_NS, 1, "sign message failed, {{.err}}") ERR_MARSHAL_MESSAGE_FAILED = errors.TN(ALI_MNS_ERR_NS, 2, "marshal message filed, {{.err}}") ERR_GENERAL_AUTH_HEADER_FAILED = errors.TN(ALI_MNS_ERR_NS, 3, "general auth header failed, {{.err}}") ERR_CREATE_NEW_REQUEST_FAILED = errors.TN(ALI_MNS_ERR_NS, 4, "create new request failed, {{.err}}") ERR_SEND_REQUEST_FAILED = errors.TN(ALI_MNS_ERR_NS, 5, "send request failed, {{.err}}") ERR_READ_RESPONSE_BODY_FAILED = errors.TN(ALI_MNS_ERR_NS, 6, "read response body failed, {{.err}}") ERR_UNMARSHAL_ERROR_RESPONSE_FAILED = errors.TN(ALI_MNS_ERR_NS, 7, "unmarshal error response failed, {{.err}}") ERR_UNMARSHAL_RESPONSE_FAILED = errors.TN(ALI_MNS_ERR_NS, 8, "unmarshal response failed, {{.err}}") ERR_DECODE_BODY_FAILED = errors.TN(ALI_MNS_ERR_NS, 9, "decode body failed, {{.err}}, body: \"{{.body}}\"") ERR_GET_BODY_DECODE_ELEMENT_ERROR = errors.TN(ALI_MNS_ERR_NS, 10, "get body decode element error, local: {{.local}}, error: {{.err}}") ERR_MNS_ACCESS_DENIED = errors.TN(ALI_MNS_ERR_NS, 100, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_ACCESS_KEY_ID = errors.TN(ALI_MNS_ERR_NS, 101, ali_MNS_ERR_TEMPSTR) ERR_MNS_INTERNAL_ERROR = errors.TN(ALI_MNS_ERR_NS, 102, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_AUTHORIZATION_HEADER = errors.TN(ALI_MNS_ERR_NS, 103, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_DATE_HEADER = errors.TN(ALI_MNS_ERR_NS, 104, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_ARGUMENT = errors.TN(ALI_MNS_ERR_NS, 105, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_DEGIST = errors.TN(ALI_MNS_ERR_NS, 106, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_REQUEST_URL = errors.TN(ALI_MNS_ERR_NS, 107, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_QUERY_STRING = errors.TN(ALI_MNS_ERR_NS, 108, ali_MNS_ERR_TEMPSTR) ERR_MNS_MALFORMED_XML = errors.TN(ALI_MNS_ERR_NS, 109, ali_MNS_ERR_TEMPSTR) ERR_MNS_MISSING_AUTHORIZATION_HEADER = errors.TN(ALI_MNS_ERR_NS, 110, ali_MNS_ERR_TEMPSTR) ERR_MNS_MISSING_DATE_HEADER = errors.TN(ALI_MNS_ERR_NS, 111, ali_MNS_ERR_TEMPSTR) ERR_MNS_MISSING_VERSION_HEADER = errors.TN(ALI_MNS_ERR_NS, 112, ali_MNS_ERR_TEMPSTR) ERR_MNS_MISSING_RECEIPT_HANDLE = errors.TN(ALI_MNS_ERR_NS, 113, ali_MNS_ERR_TEMPSTR) ERR_MNS_MISSING_VISIBILITY_TIMEOUT = errors.TN(ALI_MNS_ERR_NS, 114, ali_MNS_ERR_TEMPSTR) ERR_MNS_MESSAGE_NOT_EXIST = errors.TN(ALI_MNS_ERR_NS, 115, ali_MNS_ERR_TEMPSTR) ERR_MNS_QUEUE_DELETED_RECENTLY = errors.TN(ALI_MNS_ERR_NS, 117, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_QUEUE_NAME = errors.TN(ALI_MNS_ERR_NS, 118, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_VERSION_HEADER = errors.TN(ALI_MNS_ERR_NS, 119, ali_MNS_ERR_TEMPSTR) ERR_MNS_INVALID_CONTENT_TYPE = errors.TN(ALI_MNS_ERR_NS, 120, ali_MNS_ERR_TEMPSTR) ERR_MNS_QUEUE_NAME_LENGTH_ERROR = errors.TN(ALI_MNS_ERR_NS, 121, ali_MNS_ERR_TEMPSTR) ERR_MNS_QUEUE_NOT_EXIST = errors.TN(ALI_MNS_ERR_NS, 122, ali_MNS_ERR_TEMPSTR) ERR_MNS_RECEIPT_HANDLE_ERROR = errors.TN(ALI_MNS_ERR_NS, 123, ali_MNS_ERR_TEMPSTR) ERR_MNS_SIGNATURE_DOES_NOT_MATCH = errors.TN(ALI_MNS_ERR_NS, 124, ali_MNS_ERR_TEMPSTR) ERR_MNS_TIME_EXPIRED = errors.TN(ALI_MNS_ERR_NS, 125, ali_MNS_ERR_TEMPSTR) ERR_MNS_QPS_LIMIT_EXCEEDED = errors.TN(ALI_MNS_ERR_NS, 134, ali_MNS_ERR_TEMPSTR) ERR_MNS_UNKNOWN_CODE = errors.TN(ALI_MNS_ERR_NS, 135, ali_MNS_ERR_TEMPSTR) ERR_MNS_QUEUE_NAME_IS_TOO_LONG = errors.TN(ALI_MNS_ERR_NS, 126, "queue name is too long, the max length is 256") ERR_MNS_DELAY_SECONDS_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 127, "queue delay seconds is not in range of (0~60480)") ERR_MNS_MAX_MESSAGE_SIZE_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 128, "max message size is not in range of (1024~65536)") ERR_MNS_MSG_RETENTION_PERIOD_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 129, "message retention period is not in range of (60~129600)") ERR_MNS_MSG_VISIBILITY_TIMEOUT_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 130, "message visibility timeout is not in range of (1~43200)") ERR_MNS_MSG_POOLLING_WAIT_SECONDS_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 131, "message poolling wait seconds is not in range of (0~30)") REE_MNS_GET_QUEUE_RET_NUMBER_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 132, "get queue list param of ret number is not in range of (1~1000)") ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR = errors.TN(ALI_MNS_ERR_NS, 133, "mns queue already exist, and the attribute is the same, queue name: {{.name}}") ERR_MNS_QUEUE_ALREADY_EXIST = errors.TN(ALI_MNS_ERR_NS, 136, "mns queue already exist, and has different attribute, queue name: {{.name}}") )
View Source
var ( DefaultNumOfMessages int32 = 16 DefaultQPSLimit int32 = 2000 )
View Source
var (
TimeNowFunc = time.Now
)
Functions ¶
func ParseError ¶
func ParseError(resp ErrorMessageResponse, resource string) (err error)
Types ¶
type AliMNSClient ¶
type AliMNSClient struct { Timeout int64 // contains filtered or unexported fields }
func (*AliMNSClient) SetProxy ¶
func (p *AliMNSClient) SetProxy(url string)
type AliMNSCredential ¶
type AliMNSCredential struct {
// contains filtered or unexported fields
}
func NewAliMNSCredential ¶
func NewAliMNSCredential(accessKeySecret string) *AliMNSCredential
func (*AliMNSCredential) SetSecretKey ¶
func (p *AliMNSCredential) SetSecretKey(accessKeySecret string)
type AliMNSDecoder ¶
type AliMNSDecoder struct { }
type AliMNSQueue ¶
type AliMNSQueue interface { Name() string SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error) BatchSendMessage(messages ...MessageSendRequest) (resp BatchMessageSendResponse, err error) ReceiveMessage(respChan chan MessageReceiveResponse, errChan chan error, waitseconds ...int64) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, waitseconds ...int64) PeekMessage(respChan chan MessageReceiveResponse, errChan chan error, interval ...time.Duration) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, interval ...time.Duration) DeleteMessage(receiptHandle string) (err error) BatchDeleteMessage(receiptHandles ...string) (err error) ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error) Stop() }
func NewMNSQueue ¶
func NewMNSQueue(name string, client MNSClient, qps ...int32) AliMNSQueue
type AliQueueManager ¶
type AliQueueManager interface { CreateQueue(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error) SetQueueAttributes(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error) GetQueueAttributes(endpoint string, queueName string) (attr QueueAttribute, err error) DeleteQueue(endpoint string, queueName string) (err error) ListQueue(endpoint string, nextMarker string, retNumber int32, prefix string) (queues Queues, err error) }
func NewMNSQueueManager ¶
func NewMNSQueueManager(accessKeyId, accessKeySecret string) AliQueueManager
type Base64Bytes ¶
type Base64Bytes []byte
func (Base64Bytes) MarshalXML ¶
func (p Base64Bytes) MarshalXML(e *xml.Encoder, start xml.StartElement) error
func (*Base64Bytes) UnmarshalXML ¶
func (p *Base64Bytes) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err error)
type BatchMessageReceiveResponse ¶
type BatchMessageReceiveResponse struct { XMLName xml.Name `xml:"Messages" json:"-"` Messages []MessageReceiveResponse `xml:"Message" json:"messages"` }
type BatchMessageSendRequest ¶
type BatchMessageSendRequest struct { XMLName xml.Name `xml:"Messages"` Messages []MessageSendRequest `xml:"Message"` }
type BatchMessageSendResponse ¶
type BatchMessageSendResponse struct { XMLName xml.Name `xml:"Messages" json:"-"` Messages []MessageSendResponse `xml:"Message" json:"messages"` }
type CreateQueueRequest ¶
type CreateQueueRequest struct { XMLName xml.Name `xml:"Queue" json:"-"` DelaySeconds int32 `xml:"DelaySenconds,omitempty" json:"delay_senconds,omitempty"` MaxMessageSize int32 `xml:"MaximumMessageSize,omitempty" json:"maximum_message_size,omitempty"` MessageRetentionPeriod int32 `xml:"MessageRetentionPeriod,omitempty" json:"message_retention_period,omitempty"` VisibilityTimeout int32 `xml:"VisibilityTimeout,omitempty" json:"visibility_timeout,omitempty"` PollingWaitSeconds int32 `xml:"PollingWaitSeconds,omitempty" json:"polling_wait_secods,omitempty"` }
type Credential ¶
type ErrorMessageResponse ¶
type ErrorMessageResponse struct { XMLName xml.Name `xml:"Error" json:"-"` Code string `xml:"Code,omitempty" json:"code,omitempty"` Message string `xml:"Message,omitempty" json:"message,omitempty"` RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"` HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"` }
type MNSClient ¶
type MNSClient interface { Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error) SetProxy(url string) }
func NewAliMNSClient ¶
type MNSDecoder ¶
func NewAliMNSDecoder ¶
func NewAliMNSDecoder() MNSDecoder
type MNSQueue ¶
type MNSQueue struct {
// contains filtered or unexported fields
}
func (*MNSQueue) BatchDeleteMessage ¶
func (*MNSQueue) BatchPeekMessage ¶
func (*MNSQueue) BatchReceiveMessage ¶
func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, waitseconds ...int64)
func (*MNSQueue) BatchSendMessage ¶
func (p *MNSQueue) BatchSendMessage(messages ...MessageSendRequest) (resp BatchMessageSendResponse, err error)
func (*MNSQueue) ChangeMessageVisibility ¶
func (p *MNSQueue) ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error)
func (*MNSQueue) DeleteMessage ¶
func (*MNSQueue) PeekMessage ¶
func (p *MNSQueue) PeekMessage(respChan chan MessageReceiveResponse, errChan chan error, interval ...time.Duration)
func (*MNSQueue) ReceiveMessage ¶
func (p *MNSQueue) ReceiveMessage(respChan chan MessageReceiveResponse, errChan chan error, waitseconds ...int64)
func (*MNSQueue) SendMessage ¶
func (p *MNSQueue) SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error)
type MNSQueueManager ¶
type MNSQueueManager struct {
// contains filtered or unexported fields
}
func (*MNSQueueManager) CreateQueue ¶
func (*MNSQueueManager) DeleteQueue ¶
func (p *MNSQueueManager) DeleteQueue(endpoint string, queueName string) (err error)
func (*MNSQueueManager) GetQueueAttributes ¶
func (p *MNSQueueManager) GetQueueAttributes(endpoint string, queueName string) (attr QueueAttribute, err error)
type MessageReceiveResponse ¶
type MessageReceiveResponse struct { MessageResponse MessageId string `xml:"MessageId" json:"message_id"` ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"` MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"` MessageBody Base64Bytes `xml:"MessageBody" json:"message_body"` EnqueueTime int64 `xml:"EnqueueTime" json:"enqueue_time"` NextVisibleTime int64 `xml:"NextVisibleTime" json:"next_visible_time"` FirstDequeueTime int64 `xml:"FirstDequeueTime" json:"first_dequeue_time"` DequeueCount int64 `xml:"DequeueCount" json:"dequeue_count"` Priority int64 `xml:"Priority" json:"priority"` }
type MessageResponse ¶
type MessageResponse struct { XMLName xml.Name `xml:"Message" json:"-"` Code string `xml:"Code,omitempty" json:"code,omitempty"` Message string `xml:"Message,omitempty" json:"message,omitempty"` RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"` HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"` }
type MessageSendRequest ¶
type MessageSendRequest struct { XMLName xml.Name `xml:"Message"` MessageBody Base64Bytes `xml:"MessageBody"` DelaySeconds int64 `xml:"DelaySeconds"` Priority int64 `xml:"Priority"` }
type MessageSendResponse ¶
type MessageSendResponse struct { MessageResponse MessageId string `xml:"MessageId" json:"message_id"` MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"` }
type QPSMonitor ¶
type QPSMonitor struct {
// contains filtered or unexported fields
}
func NewQPSMonitor ¶
func NewQPSMonitor(delaySecond int32) *QPSMonitor
func (*QPSMonitor) Pulse ¶
func (p *QPSMonitor) Pulse()
func (*QPSMonitor) QPS ¶
func (p *QPSMonitor) QPS() int32
type QueueAttribute ¶
type QueueAttribute struct { XMLName xml.Name `xml:"Queue" json:"-"` QueueName string `xml:"QueueName,omitempty" json:"queue_name,omitempty"` DelaySeconds int32 `xml:"DelaySenconds,omitempty" json:"delay_senconds,omitempty"` MaxMessageSize int32 `xml:"MaximumMessageSize,omitempty" json:"maximum_message_size,omitempty"` MessageRetentionPeriod int32 `xml:"MessageRetentionPeriod,omitempty" json:"message_retention_period,omitempty"` VisibilityTimeout int32 `xml:"VisibilityTimeout,omitempty" json:"visibility_timeout,omitempty"` PollingWaitSeconds int32 `xml:"PollingWaitSeconds,omitempty" json:"polling_wait_secods,omitempty"` ActiveMessages int64 `xml:"ActiveMessages,omitempty" json:"active_messages,omitempty"` InactiveMessages int64 `xml:"InactiveMessages,omitempty" json:"inactive_messages,omitempty"` DelayMessages int64 `xml:"DelayMessages,omitempty" json:"delay_messages,omitempty"` CreateTime int64 `xml:"CreateTime,omitempty" json:"create_time,omitempty"` LastModifyTime int64 `xml:"LastModifyTime,omitempty" json:"last_modify_time,omitempty"` }
type ReceiptHandles ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.