pulsar

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: MIT Imports: 15 Imported by: 0

README

pulsar-client-go

English | 中文版

Introduction

Tuya pulsar client SDK for Golang

Preparation

  1. AccessID: Provided by Tuya platform.
  2. AccessKey: provided by Tuya platform.
  3. Pulsar address: Select the pulsar address according to different business areas. You can find out the address from documents.

Example

package main

import (
	"context"
	"encoding/base64"
	"encoding/json"
	"time"

	pulsar "github.com/fogcloud-io/tuya-pulsar-sdk-go"
	"github.com/fogcloud-io/tuya-pulsar-sdk-go/pkg/tylog"
	"github.com/fogcloud-io/tuya-pulsar-sdk-go/pkg/tyutils"
	"github.com/sirupsen/logrus"
)

func main() {
	pulsar.SetInternalLogLevel(logrus.DebugLevel)
	tylog.SetGlobalLog("sdk", false)
	accessID := "accessID"
	accessKey := "accessKey"
	topic := pulsar.TopicForAccessID(accessID)

	// create client
	cfg := pulsar.ClientConfig{
		PulsarAddr: pulsar.PulsarAddrCN,
	}
	c := pulsar.NewClient(cfg)

	// create consumer
	csmCfg := pulsar.ConsumerConfig{
		Topic: topic,
		Auth:  pulsar.NewAuthProvider(accessID, accessKey),
	}
	csm, _ := c.NewConsumer(csmCfg)

	// handle message
	csm.ReceiveAndHandle(context.Background(), &helloHandler{AesSecret: accessKey[8:24]})

	time.Sleep(10 * time.Second)
}

type helloHandler struct {
	AesSecret string
}

func (h *helloHandler) HandlePayload(ctx context.Context, msg *pulsar.Message, payload []byte) error {
	tylog.Info("payload preview", tylog.String("payload", string(payload)))

	// let's decode the payload with AES
	m := map[string]interface{}{}
	err := json.Unmarshal(payload, &m)
	if err != nil {
		tylog.Error("json unmarshal failed", tylog.ErrorField(err))
		return nil
	}
	bs := m["data"].(string)
	de, err := base64.StdEncoding.DecodeString(string(bs))
	if err != nil {
		tylog.Error("base64 decode failed", tylog.ErrorField(err))
		return nil
	}
	decode := tyutils.EcbDecrypt(de, []byte(h.AesSecret))
	tylog.Info("aes decode", tylog.ByteString("decode payload", decode))

	return nil
}


Precautions

  1. Make sure that the accessID and accessKey are correct.
  2. Make sure that the Pulsar address is correct, you should use pulsar://mqe.tuyaus.com:7285 instead of pulsar+ssl://mqe.tuyaus.com:7285.
  3. Make sure that the SDK code version you use is the latest.

About debug

Through the following code, you can see all communications with the pulsar service in the terminal.

func main(){
	pulsar.SetInternalLogLevel(logrus.DebugLevel)
	// other code
}

Through the following code, you can see the log information of tuya_pulsar_go_sdk. At the same time, the log will be saved in the logs/sdk.log file.

func main(){
	tylog.SetGlobalLog("sdk", false)
}

In a formal environment, you may not want the SDK logs to be output to the terminal. It is recommended that you use the following code to output the log to a file.

func main(){
	tylog.SetGlobalLog("sdk", true)
}

Support

You can get support from Tuya with the following methods:

Documentation

Index

Constants

View Source
const (
	DefaultFlowPeriodSecond = 30
	DefaultFlowPermit       = 10

	PulsarAddrCN = "pulsar://mqe.tuyacn.com:7285"
	PulsarAddrEU = "pulsar://mqe.tuyaeu.com:7285"
	PulsarAddrUS = "pulsar://mqe.tuyaus.com:7285"
)

Variables

This section is empty.

Functions

func NewAuthProvider

func NewAuthProvider(accessID, accessKey string) *authProvider

func SetInternalLogLevel

func SetInternalLogLevel(level logrus.Level)

func TopicForAccessID

func TopicForAccessID(accessID string) string

Types

type AuthProvider

type AuthProvider interface {
	AuthMethod() string
	AuthData() []byte
}

type Client

type Client interface {
	NewConsumer(config ConsumerConfig) (Consumer, error)
	Stop()
}

func NewClient

func NewClient(cfg ClientConfig) Client

type ClientConfig

type ClientConfig struct {
	PulsarAddr string
}

type Consumer

type Consumer interface {
	ReceiveAndHandle(ctx context.Context, handler PayloadHandler)
	Stop()
}

type ConsumerConfig

type ConsumerConfig struct {
	Topic string
	Auth  AuthProvider
}

type ConsumerList

type ConsumerList struct {
	FlowPeriodSecond int
	FlowPermit       uint32
	Topic            string
	Stopped          chan struct{}
	// contains filtered or unexported fields
}

func (*ConsumerList) CronFlow

func (l *ConsumerList) CronFlow()

func (*ConsumerList) ReceiveAndHandle

func (l *ConsumerList) ReceiveAndHandle(ctx context.Context, handler PayloadHandler)

func (*ConsumerList) Stop

func (l *ConsumerList) Stop()

type Message

type Message = msg.Message

type PayloadHandler

type PayloadHandler interface {
	HandlePayload(ctx context.Context, msg *Message, payload []byte) error
}

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL