tdmq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

Tencent TDMQ-CMQ Go SDK

Manage API: https://cloud.tencent.com/document/product/1496/62819

Data Flow API: https://cloud.tencent.com/document/product/1496/61039

Only support these data flow actions:

  • Queue

    • QueryQueueRoute
    • SendMessage
    • BatchSendMessage
    • ReceiveMessage
    • BatchReceiveMessage
    • DeleteMessage
    • BatchDeleteMessage
  • Topic

    • QueryTopicRoute
    • PublishMessage
    • BatchPublishMessage

Example:

go get -u github.com/TencentCloud/tencentcloud-cmq-sdk-go@0.1.0
package main

import (
    "fmt"
    "time"

    tcmq "github.com/TencentCloud/tencentcloud-cmq-sdk-go"
)

func main() {
    // get your own secretId/secretKey: https://console.cloud.tencent.com/cam/capi
    client, err := tcmq.NewClient("https://cmq-gz.public.tencenttdmq.com", "AKIDxxxxx", "xxxxx", 5*time.Second)
    if err != nil {
        fmt.Println("new TDMQ-CMQ client", err)
        return
    }
    // client.AppId = 12345  // for privatization request without authentication
    // client.Method = `GET` // default: POST
    // client.Token = `your_token` // for temporary secretId/secretKey auth with token
    client.Debug = true // verbose print each request

    queue := &tcmq.Queue{
        Client:             client,
        Name:               `queue0`,
        DelaySeconds:       0,
        PollingWaitSeconds: 5,
    }
    resp1, err := queue.Send(`message test 0`)
    if err != nil {
        fmt.Println("send message:", err)
        return
    }
    fmt.Println("Status:", resp1.StatusCode())
    fmt.Println("Response:", resp1)

    respMsg, err := queue.Receive()
    if err != nil {
        fmt.Println("receive message:", err)
        return
    }
    fmt.Println("Response:", respMsg)

    resp2, err := queue.Delete(respMsg.Handle())
    if err != nil {
        fmt.Println("delete message:", err)
        return
    }
    fmt.Println("Response:", resp2)

    resp3, err := queue.BatchSend("a", "b", "c")
    if err != nil {
        fmt.Println("batch send message:", err)
        return
    }
    fmt.Println("Response:", resp3)

    resp4, err := queue.BatchReceive(5)
    if err != nil {
        fmt.Println("batch receive message:", err)
        return
    }
    fmt.Println("Response:", resp4)
    var handles []string
    for _, msg := range resp4.MsgInfos() {
        if len(msg.Handle()) > 0 {
            handles = append(handles, msg.Handle())
        }
    }

    if len(handles) > 0 {
        res, err := queue.BatchDelete(handles...)
        if err != nil {
            fmt.Println("batch delete message:", err)
            return
        }
        fmt.Println("batch delete result:", res)
    }

    topic := &tcmq.Topic{
        Client:     client,
        Name:       `topic0`,
        RoutingKey: ``,
        Tags:       nil,
    }
    resp5, err := topic.Publish(`message test 1`)
    if err != nil {
        fmt.Println("publish message:", err)
        return
    }
    fmt.Println("Response:", resp5)

    resp6, err := topic.BatchPublish("x", "y", "z")
    if err != nil {
        fmt.Println("publish message:", err)
        return
    }
    fmt.Println("Response:", resp6)
}

Documentation

Index

Constants

View Source
const (
	HmacSHA1   = `HmacSHA1`
	HmacSHA256 = `HmacSHA256`
)

Variables

View Source
var (
	MaxQueueNameSize  = 64
	MaxTopicNameSize  = 64
	MaxMessageSize    = 1 * 1024 * 1024 // default max message size: 1MB
	MaxMessageCount   = 16
	MaxDelaySeconds   = 70 * 24 * 60 * 60 // 70 day
	MaxWaitSeconds    = 30
	MaxHandleCount    = 16
	MaxHandleLength   = 256
	MaxRouteKeyLength = 64
	MaxRouteKeyDots   = 15
	MaxTagCount       = 5
	MaxTagLength      = 16
)
View Source
var (
	ErrInvalidParameter = errors.New("invalid parameter")
)
View Source
var (
	InsecureSkipVerify bool
)

Functions

This section is empty.

Types

type Client

type Client struct {
	Url        *url.URL // ex: http://gateway.tdmq.io
	Method     string   // GET, POST
	SignMethod string   // HmacSHA1, HmacSHA256
	SecretId   string   // AKIDxxxxx
	SecretKey  string
	Token      string
	AppId      uint64 // appId for privatization, need gateway server option enabled
	Header     map[string]string

	Debug      bool // weather print request message
	HttpClient *http.Client
	// contains filtered or unexported fields
}

func NewClient

func NewClient(uri, secretId, secretKey string, t time.Duration, keepalive ...bool) (c *Client, err error)

NewClient create TDMQ CMQ client

input: uri string request uri for TDMQ CMQ service
input: secretId string user secret id from tencent cloud account
input: secretKey string user secret key from tencent cloud account
input: t time.Duration http client request timeout
input: keepalive bool http client connection keep alive to server
return: *Client

func (*Client) BatchDeleteMessage

func (c *Client) BatchDeleteMessage(queue string, receiptHandles []string) (ResponseDMs, error)

BatchDeleteMessage

API: https://cloud.tencent.com/document/product/406/5841
input: queue string
input: receiptHandles []string
return: ResponseDMs
return: error

func (*Client) BatchPublishMessage

func (c *Client) BatchPublishMessage(topic, routingKey string, messages, tags []string) (ResponseSMs, error)

BatchPublishMessage

API: https://cloud.tencent.com/document/product/406/7412
input: topic string
input: routingKey string
input: messages []string
input: tags []string
return: ResponseSMs
return: error

func (*Client) BatchReceiveMessage

func (c *Client) BatchReceiveMessage(queue string, pollingWaitSeconds, numOfMsg int) (ResponseRMs, error)

BatchReceiveMessage

API: https://cloud.tencent.com/document/product/406/5924
input: queue string
input: pollingWaitSeconds int
input: numOfMsg int
return: *ResponseRMs
return: error

func (*Client) BatchSendMessage

func (c *Client) BatchSendMessage(queue string, messages []string, delaySeconds int) (ResponseSMs, error)

BatchSendMessage

API: https://cloud.tencent.com/document/product/406/5838
input: queue string
input: messages []string
input: delaySeconds int
return: ResponseSMs
return: error

func (*Client) DeleteMessage

func (c *Client) DeleteMessage(queue, receiptHandle string) (ResponseDM, error)

DeleteMessage

API: https://cloud.tencent.com/document/product/406/5840
input: queue string
input: receiptHandle string
return: ResponseDM
return: error

func (*Client) PublishMessage

func (c *Client) PublishMessage(topic, message, routingKey string, tags []string) (ResponseSM, error)

PublishMessage

API: https://cloud.tencent.com/document/product/406/7411
input: topic string
input: message string
input: routingKey string
input: tags []string
return: ResponseSM
return: error

func (*Client) QueryQueueRoute

func (c *Client) QueryQueueRoute(queue string) (Route, error)

QueryQueueRoute

input: queue string
return: *ResponseRoute
return: error

func (*Client) QueryTopicRoute

func (c *Client) QueryTopicRoute(topic string) (Route, error)

QueryTopicRoute

input: topic string
return: *ResponseRoute
return: error

func (*Client) ReceiveMessage

func (c *Client) ReceiveMessage(queue string, pollingWaitSeconds int) (ResponseRM, error)

ReceiveMessage

API: https://cloud.tencent.com/document/product/406/5839
input: queue string
input: pollingWaitSeconds int
return: ResponseRM
return: error

func (*Client) SendMessage

func (c *Client) SendMessage(queue, message string, delaySeconds int) (ResponseSM, error)

SendMessage

API: https://cloud.tencent.com/document/product/406/5837
input: queue string
input: message string
input: delaySeconds int
return: ResponseSM
return: error

type Message

type Message interface {
	Msg
	MsgBody() string         // 消费的消息正文
	Handle() string          // 每次消费返回唯一的消息句柄,用于删除消费。仅上一次消费该消息产生的句柄能用于删除消息。且有效期是 visibilityTimeout,即取出消息隐藏时长,超过该时间后该句柄失效。
	EnqueueTime() int64      // 消费被生产出来,进入队列的时间。返回 Unix 时间戳,精确到秒
	FirstDequeueTime() int64 // 保留字段
	NextVisibleTime() int64  // 消息的下次可见(可再次被消费)时间。返回 Unix 时间戳,精确到秒
	DequeueCount() int64     // 保留字段
}

Message information of response message

type Msg

type Msg interface {
	MsgId() string // 消费的消息唯一标识 ID
}

Msg message ID

type MsgError

type MsgError interface {
	Code() int       // 0:表示成功,others:错误
	Message() string // 错误提示信息
	Handle() string  // 每次消费返回唯一的消息句柄,用于删除消费。仅上一次消费该消息产生的句柄能用于删除消息。且有效期是 visibilityTimeout,即取出消息隐藏时长,超过该时间后该句柄失效。
}

MsgError errors in response

type Queue

type Queue struct {
	Client             *Client
	Name               string
	DelaySeconds       int // 消息延迟可见时间, 1 ~ 6048000 秒
	PollingWaitSeconds int // 消费消息长轮询等待时间, 0 ~ 30 秒
}

func (*Queue) BatchDelete

func (q *Queue) BatchDelete(handles ...string) (ResponseDMs, error)

BatchDelete message handle(s)

input: handles ...string
return: ResponseDMs
return: error

func (*Queue) BatchReceive

func (q *Queue) BatchReceive(numOfMsg int) (ResponseRMs, error)

BatchReceive message(s)

input: numOfMsg int
return: *ResponseRMs
return: error

func (*Queue) BatchSend

func (q *Queue) BatchSend(messages ...string) (ResponseSMs, error)

BatchSend message(s)

input: messages ...string
return: ResponseSMs
return: error

func (*Queue) Delete

func (q *Queue) Delete(handle string) (ResponseDM, error)

Delete message handle

input: handle string
return: ResponseDM
return: error

func (*Queue) Receive

func (q *Queue) Receive() (ResponseRM, error)

Receive message

return: ResponseRM
return: error

func (*Queue) Send

func (q *Queue) Send(message string) (ResponseSM, error)

Send message

input: message string
return: ResponseSM
return: error

type ResponseDM

type ResponseDM interface {
	Result
}

ResponseDM Response of delete message

type ResponseDMs

type ResponseDMs interface {
	Result
	Errors() []MsgError // 无法成功删除的错误列表。每个元素列出了消息无法成功被删除的错误及原因
}

ResponseDMs Response of delete messages

type ResponseRM

type ResponseRM interface {
	Result
	Message
}

ResponseRM Response of receive message

type ResponseRMs

type ResponseRMs interface {
	Result
	MsgInfos() []Message // messages 信息列表,每个元素是一条消息的具体信息
}

ResponseRMs Response of receive messages

type ResponseSM

type ResponseSM interface {
	Result
	Msg
}

ResponseSM Response of send message

type ResponseSMs

type ResponseSMs interface {
	Result
	MsgIDs() []Msg // 服务器生成消息的唯一标识 ID 列表,每个元素是一条消息的信息
}

ResponseSMs Response of send messages

type Result

type Result interface {
	StatusCode() int   // HTTP Response status code
	Code() int         // 0:表示成功,others:错误
	Message() string   // 错误提示信息
	RequestId() string // 服务器生成的请求ID
	ClientId() uint64  // 客户端发送ID
	fmt.Stringer
}

Result common result of request TDMQ-CMQ

type Route

type Route interface {
	Result
	Addr() []string // TDMQ gateway tcp 服务地址
}

Route Response of query route

type Topic

type Topic struct {
	Client     *Client
	Name       string
	RoutingKey string
	Tags       []string
}

func (*Topic) BatchPublish

func (t *Topic) BatchPublish(messages ...string) (ResponseSMs, error)

func (*Topic) Publish

func (t *Topic) Publish(message string) (ResponseSM, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL