ali_mns

package module
v0.0.0-...-f749fd3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2019 License: Apache-2.0 Imports: 18 Imported by: 0

README

ali_mns

阿里云消息队列服务GO语言封装

Documentation

Index

Constants

View Source
const (
	GET    Method = "GET"
	PUT           = "PUT"
	POST          = "POST"
	DELETE        = "DELETE"
)
View Source
const (
	AUTHORIZATION = "Authorization"
	CONTENT_TYPE  = "Content-Type"
	CONTENT_MD5   = "Content-MD5"
	MQ_VERSION    = "x-mns-version"
	HOST          = "Host"
	DATE          = "Date"
	KEEP_ALIVE    = "Keep-Alive"
)
View Source
const (
	PROXY_PREFIX = "MNS_PROXY_"
	GLOBAL_PROXY = "MNS_GLOBAL_PROXY"
)
View Source
const (
	ALI_MNS_ERR_NS = "MNS"
)
View Source
const (
	DefaultTimeout int64 = 35
)

Variables

View Source
var (
	ERR_SIGN_MESSAGE_FAILED        = errors.TN(ALI_MNS_ERR_NS, 1, "sign message failed, {{.err}}")
	ERR_MARSHAL_MESSAGE_FAILED     = errors.TN(ALI_MNS_ERR_NS, 2, "marshal message filed, {{.err}}")
	ERR_GENERAL_AUTH_HEADER_FAILED = errors.TN(ALI_MNS_ERR_NS, 3, "general auth header failed, {{.err}}")

	ERR_CREATE_NEW_REQUEST_FAILED = errors.TN(ALI_MNS_ERR_NS, 4, "create new request failed, {{.err}}")
	ERR_SEND_REQUEST_FAILED       = errors.TN(ALI_MNS_ERR_NS, 5, "send request failed, {{.err}}")
	ERR_READ_RESPONSE_BODY_FAILED = errors.TN(ALI_MNS_ERR_NS, 6, "read response body failed, {{.err}}")

	ERR_UNMARSHAL_ERROR_RESPONSE_FAILED = errors.TN(ALI_MNS_ERR_NS, 7, "unmarshal error response failed, {{.err}}")
	ERR_UNMARSHAL_RESPONSE_FAILED       = errors.TN(ALI_MNS_ERR_NS, 8, "unmarshal response failed, {{.err}}")
	ERR_DECODE_BODY_FAILED              = errors.TN(ALI_MNS_ERR_NS, 9, "decode body failed, {{.err}}, body: \"{{.body}}\"")
	ERR_GET_BODY_DECODE_ELEMENT_ERROR   = errors.TN(ALI_MNS_ERR_NS, 10, "get body decode element error, local: {{.local}}, error: {{.err}}")

	ERR_MNS_ACCESS_DENIED                = errors.TN(ALI_MNS_ERR_NS, 100, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_ACCESS_KEY_ID        = errors.TN(ALI_MNS_ERR_NS, 101, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INTERNAL_ERROR               = errors.TN(ALI_MNS_ERR_NS, 102, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_AUTHORIZATION_HEADER = errors.TN(ALI_MNS_ERR_NS, 103, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_DATE_HEADER          = errors.TN(ALI_MNS_ERR_NS, 104, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_ARGUMENT             = errors.TN(ALI_MNS_ERR_NS, 105, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_DEGIST               = errors.TN(ALI_MNS_ERR_NS, 106, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_REQUEST_URL          = errors.TN(ALI_MNS_ERR_NS, 107, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_QUERY_STRING         = errors.TN(ALI_MNS_ERR_NS, 108, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_MALFORMED_XML                = errors.TN(ALI_MNS_ERR_NS, 109, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_MISSING_AUTHORIZATION_HEADER = errors.TN(ALI_MNS_ERR_NS, 110, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_MISSING_DATE_HEADER          = errors.TN(ALI_MNS_ERR_NS, 111, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_MISSING_VERSION_HEADER       = errors.TN(ALI_MNS_ERR_NS, 112, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_MISSING_RECEIPT_HANDLE       = errors.TN(ALI_MNS_ERR_NS, 113, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_MISSING_VISIBILITY_TIMEOUT   = errors.TN(ALI_MNS_ERR_NS, 114, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_MESSAGE_NOT_EXIST            = errors.TN(ALI_MNS_ERR_NS, 115, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_QUEUE_DELETED_RECENTLY       = errors.TN(ALI_MNS_ERR_NS, 117, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_QUEUE_NAME           = errors.TN(ALI_MNS_ERR_NS, 118, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_VERSION_HEADER       = errors.TN(ALI_MNS_ERR_NS, 119, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_INVALID_CONTENT_TYPE         = errors.TN(ALI_MNS_ERR_NS, 120, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_QUEUE_NAME_LENGTH_ERROR      = errors.TN(ALI_MNS_ERR_NS, 121, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_QUEUE_NOT_EXIST              = errors.TN(ALI_MNS_ERR_NS, 122, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_RECEIPT_HANDLE_ERROR         = errors.TN(ALI_MNS_ERR_NS, 123, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_SIGNATURE_DOES_NOT_MATCH     = errors.TN(ALI_MNS_ERR_NS, 124, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_TIME_EXPIRED                 = errors.TN(ALI_MNS_ERR_NS, 125, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_QPS_LIMIT_EXCEEDED           = errors.TN(ALI_MNS_ERR_NS, 134, ali_MNS_ERR_TEMPSTR)
	ERR_MNS_UNKNOWN_CODE                 = errors.TN(ALI_MNS_ERR_NS, 135, ali_MNS_ERR_TEMPSTR)

	ERR_MNS_QUEUE_NAME_IS_TOO_LONG                 = errors.TN(ALI_MNS_ERR_NS, 126, "queue name is too long, the max length is 256")
	ERR_MNS_DELAY_SECONDS_RANGE_ERROR              = errors.TN(ALI_MNS_ERR_NS, 127, "queue delay seconds is not in range of (0~60480)")
	ERR_MNS_MAX_MESSAGE_SIZE_RANGE_ERROR           = errors.TN(ALI_MNS_ERR_NS, 128, "max message size is not in range of (1024~65536)")
	ERR_MNS_MSG_RETENTION_PERIOD_RANGE_ERROR       = errors.TN(ALI_MNS_ERR_NS, 129, "message retention period is not in range of (60~129600)")
	ERR_MNS_MSG_VISIBILITY_TIMEOUT_RANGE_ERROR     = errors.TN(ALI_MNS_ERR_NS, 130, "message visibility timeout is not in range of (1~43200)")
	ERR_MNS_MSG_POOLLING_WAIT_SECONDS_RANGE_ERROR  = errors.TN(ALI_MNS_ERR_NS, 131, "message poolling wait seconds is not in range of (0~30)")
	REE_MNS_GET_QUEUE_RET_NUMBER_RANGE_ERROR       = errors.TN(ALI_MNS_ERR_NS, 132, "get queue list param of ret number is not in range of (1~1000)")
	ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR = errors.TN(ALI_MNS_ERR_NS, 133, "mns queue already exist, and the attribute is the same, queue name: {{.name}}")
	ERR_MNS_QUEUE_ALREADY_EXIST                    = errors.TN(ALI_MNS_ERR_NS, 136, "mns queue already exist, and has different attribute, queue name: {{.name}}")
)
View Source
var (
	DefaultNumOfMessages int32 = 16
	DefaultQPSLimit      int32 = 2000
)
View Source
var (
	TimeNowFunc = time.Now
)

Functions

func ParseError

func ParseError(resp ErrorMessageResponse, resource string) (err error)

Types

type AliMNSClient

type AliMNSClient struct {
	Timeout int64
	// contains filtered or unexported fields
}

func (*AliMNSClient) Send

func (p *AliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error)

func (*AliMNSClient) SetProxy

func (p *AliMNSClient) SetProxy(url string)

type AliMNSCredential

type AliMNSCredential struct {
	// contains filtered or unexported fields
}

func NewAliMNSCredential

func NewAliMNSCredential(accessKeySecret string) *AliMNSCredential

func (*AliMNSCredential) SetSecretKey

func (p *AliMNSCredential) SetSecretKey(accessKeySecret string)

func (*AliMNSCredential) Signature

func (p *AliMNSCredential) Signature(method Method, headers map[string]string, resource string) (signature string, err error)

type AliMNSDecoder

type AliMNSDecoder struct {
}

func (*AliMNSDecoder) Decode

func (p *AliMNSDecoder) Decode(reader io.Reader, v interface{}) (err error)

type AliMNSQueue

type AliMNSQueue interface {
	Name() string
	SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error)
	BatchSendMessage(messages ...MessageSendRequest) (resp BatchMessageSendResponse, err error)
	ReceiveMessage(respChan chan MessageReceiveResponse, errChan chan error, waitseconds ...int64)
	BatchReceiveMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, waitseconds ...int64)
	PeekMessage(respChan chan MessageReceiveResponse, errChan chan error, interval ...time.Duration)
	BatchPeekMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, interval ...time.Duration)
	DeleteMessage(receiptHandle string) (err error)
	BatchDeleteMessage(receiptHandles ...string) (err error)
	ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error)
	Stop()
}

func NewMNSQueue

func NewMNSQueue(name string, client MNSClient, qps ...int32) AliMNSQueue

type AliQueueManager

type AliQueueManager interface {
	CreateQueue(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error)
	SetQueueAttributes(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error)
	GetQueueAttributes(endpoint string, queueName string) (attr QueueAttribute, err error)
	DeleteQueue(endpoint string, queueName string) (err error)
	ListQueue(endpoint string, nextMarker string, retNumber int32, prefix string) (queues Queues, err error)
}

func NewMNSQueueManager

func NewMNSQueueManager(accessKeyId, accessKeySecret string) AliQueueManager

type Base64Bytes

type Base64Bytes []byte

func (Base64Bytes) MarshalXML

func (p Base64Bytes) MarshalXML(e *xml.Encoder, start xml.StartElement) error

func (*Base64Bytes) UnmarshalXML

func (p *Base64Bytes) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err error)

type BatchMessageReceiveResponse

type BatchMessageReceiveResponse struct {
	XMLName  xml.Name                 `xml:"Messages" json:"-"`
	Messages []MessageReceiveResponse `xml:"Message" json:"messages"`
}

type BatchMessageSendRequest

type BatchMessageSendRequest struct {
	XMLName  xml.Name             `xml:"Messages"`
	Messages []MessageSendRequest `xml:"Message"`
}

type BatchMessageSendResponse

type BatchMessageSendResponse struct {
	XMLName  xml.Name              `xml:"Messages" json:"-"`
	Messages []MessageSendResponse `xml:"Message" json:"messages"`
}

type CreateQueueRequest

type CreateQueueRequest struct {
	XMLName                xml.Name `xml:"Queue" json:"-"`
	DelaySeconds           int32    `xml:"DelaySenconds,omitempty" json:"delay_senconds,omitempty"`
	MaxMessageSize         int32    `xml:"MaximumMessageSize,omitempty" json:"maximum_message_size,omitempty"`
	MessageRetentionPeriod int32    `xml:"MessageRetentionPeriod,omitempty" json:"message_retention_period,omitempty"`
	VisibilityTimeout      int32    `xml:"VisibilityTimeout,omitempty" json:"visibility_timeout,omitempty"`
	PollingWaitSeconds     int32    `xml:"PollingWaitSeconds,omitempty" json:"polling_wait_secods,omitempty"`
}

type Credential

type Credential interface {
	Signature(method Method, headers map[string]string, resource string) (signature string, err error)
	SetSecretKey(accessKeySecret string)
}

type ErrorMessageResponse

type ErrorMessageResponse struct {
	XMLName   xml.Name `xml:"Error" json:"-"`
	Code      string   `xml:"Code,omitempty" json:"code,omitempty"`
	Message   string   `xml:"Message,omitempty" json:"message,omitempty"`
	RequestId string   `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	HostId    string   `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

type MNSClient

type MNSClient interface {
	Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error)
	SetProxy(url string)
}

func NewAliMNSClient

func NewAliMNSClient(url, accessKeyId, accessKeySecret string) MNSClient

type MNSDecoder

type MNSDecoder interface {
	Decode(reader io.Reader, v interface{}) (err error)
}

func NewAliMNSDecoder

func NewAliMNSDecoder() MNSDecoder

type MNSQueue

type MNSQueue struct {
	// contains filtered or unexported fields
}

func (*MNSQueue) BatchDeleteMessage

func (p *MNSQueue) BatchDeleteMessage(receiptHandles ...string) (err error)

func (*MNSQueue) BatchPeekMessage

func (p *MNSQueue) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, interval ...time.Duration)

func (*MNSQueue) BatchReceiveMessage

func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, waitseconds ...int64)

func (*MNSQueue) BatchSendMessage

func (p *MNSQueue) BatchSendMessage(messages ...MessageSendRequest) (resp BatchMessageSendResponse, err error)

func (*MNSQueue) ChangeMessageVisibility

func (p *MNSQueue) ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error)

func (*MNSQueue) DeleteMessage

func (p *MNSQueue) DeleteMessage(receiptHandle string) (err error)

func (*MNSQueue) Name

func (p *MNSQueue) Name() string

func (*MNSQueue) PeekMessage

func (p *MNSQueue) PeekMessage(respChan chan MessageReceiveResponse, errChan chan error, interval ...time.Duration)

func (*MNSQueue) ReceiveMessage

func (p *MNSQueue) ReceiveMessage(respChan chan MessageReceiveResponse, errChan chan error, waitseconds ...int64)

func (*MNSQueue) SendMessage

func (p *MNSQueue) SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error)

func (*MNSQueue) Stop

func (p *MNSQueue) Stop()

type MNSQueueManager

type MNSQueueManager struct {
	// contains filtered or unexported fields
}

func (*MNSQueueManager) CreateQueue

func (p *MNSQueueManager) CreateQueue(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error)

func (*MNSQueueManager) DeleteQueue

func (p *MNSQueueManager) DeleteQueue(endpoint string, queueName string) (err error)

func (*MNSQueueManager) GetQueueAttributes

func (p *MNSQueueManager) GetQueueAttributes(endpoint string, queueName string) (attr QueueAttribute, err error)

func (*MNSQueueManager) ListQueue

func (p *MNSQueueManager) ListQueue(endpoint string, nextMarker string, retNumber int32, prefix string) (queues Queues, err error)

func (*MNSQueueManager) SetQueueAttributes

func (p *MNSQueueManager) SetQueueAttributes(endpoint string, queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error)

type MessageReceiveResponse

type MessageReceiveResponse struct {
	MessageResponse
	MessageId        string      `xml:"MessageId" json:"message_id"`
	ReceiptHandle    string      `xml:"ReceiptHandle" json:"receipt_handle"`
	MessageBodyMD5   string      `xml:"MessageBodyMD5" json:"message_body_md5"`
	MessageBody      Base64Bytes `xml:"MessageBody" json:"message_body"`
	EnqueueTime      int64       `xml:"EnqueueTime" json:"enqueue_time"`
	NextVisibleTime  int64       `xml:"NextVisibleTime" json:"next_visible_time"`
	FirstDequeueTime int64       `xml:"FirstDequeueTime" json:"first_dequeue_time"`
	DequeueCount     int64       `xml:"DequeueCount" json:"dequeue_count"`
	Priority         int64       `xml:"Priority" json:"priority"`
}

type MessageResponse

type MessageResponse struct {
	XMLName   xml.Name `xml:"Message" json:"-"`
	Code      string   `xml:"Code,omitempty" json:"code,omitempty"`
	Message   string   `xml:"Message,omitempty" json:"message,omitempty"`
	RequestId string   `xml:"RequestId,omitempty" json:"request_id,omitempty"`
	HostId    string   `xml:"HostId,omitempty" json:"host_id,omitempty"`
}

type MessageSendRequest

type MessageSendRequest struct {
	XMLName      xml.Name    `xml:"Message"`
	MessageBody  Base64Bytes `xml:"MessageBody"`
	DelaySeconds int64       `xml:"DelaySeconds"`
	Priority     int64       `xml:"Priority"`
}

type MessageSendResponse

type MessageSendResponse struct {
	MessageResponse
	MessageId      string `xml:"MessageId" json:"message_id"`
	MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
}

type MessageVisibilityChangeResponse

type MessageVisibilityChangeResponse struct {
	XMLName         xml.Name `xml:"ChangeVisibility" json:"-"`
	ReceiptHandle   string   `xml:"ReceiptHandle" json:"receipt_handle"`
	NextVisibleTime int64    `xml:"NextVisibleTime" json:"next_visible_time"`
}

type Method

type Method string

type QPSMonitor

type QPSMonitor struct {
	// contains filtered or unexported fields
}

func NewQPSMonitor

func NewQPSMonitor(delaySecond int32) *QPSMonitor

func (*QPSMonitor) Pulse

func (p *QPSMonitor) Pulse()

func (*QPSMonitor) QPS

func (p *QPSMonitor) QPS() int32

type Queue

type Queue struct {
	QueueURL string `xml:"QueueURL" json:"url"`
}

type QueueAttribute

type QueueAttribute struct {
	XMLName                xml.Name `xml:"Queue" json:"-"`
	QueueName              string   `xml:"QueueName,omitempty" json:"queue_name,omitempty"`
	DelaySeconds           int32    `xml:"DelaySenconds,omitempty" json:"delay_senconds,omitempty"`
	MaxMessageSize         int32    `xml:"MaximumMessageSize,omitempty" json:"maximum_message_size,omitempty"`
	MessageRetentionPeriod int32    `xml:"MessageRetentionPeriod,omitempty" json:"message_retention_period,omitempty"`
	VisibilityTimeout      int32    `xml:"VisibilityTimeout,omitempty" json:"visibility_timeout,omitempty"`
	PollingWaitSeconds     int32    `xml:"PollingWaitSeconds,omitempty" json:"polling_wait_secods,omitempty"`
	ActiveMessages         int64    `xml:"ActiveMessages,omitempty" json:"active_messages,omitempty"`
	InactiveMessages       int64    `xml:"InactiveMessages,omitempty" json:"inactive_messages,omitempty"`
	DelayMessages          int64    `xml:"DelayMessages,omitempty" json:"delay_messages,omitempty"`
	CreateTime             int64    `xml:"CreateTime,omitempty" json:"create_time,omitempty"`
	LastModifyTime         int64    `xml:"LastModifyTime,omitempty" json:"last_modify_time,omitempty"`
}

type Queues

type Queues struct {
	XMLName    xml.Name `xml:"Queues" json:"-"`
	Queues     []Queue  `xml:"Queue" json:"queues"`
	NextMarker string   `xml:"NextMarker" json:"next_marker"`
}

type ReceiptHandles

type ReceiptHandles struct {
	XMLName        xml.Name `xml:"ReceiptHandles"`
	ReceiptHandles []string `xml:"ReceiptHandle"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL