amqp

package
v2.9.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2017 License: MIT Imports: 10 Imported by: 15

README

API Reference

TTN makes use of an AMQP Topic exchange. See this for details.

AMQP is not yet available in the public network.

  • Host: The address of the handler on which your application is registered.
  • Port: 5672
  • Exchange: ttn.handler

Routing key: <AppID>.devices.<DevID>.up

Wildcards are allowed. For example <AppID>.devices.*.up to get uplink messages for all devices.

Message

{
  "app_id": "my-app-id",                 // Same as in the topic
  "dev_id": "my-dev-id",                 // Same as in the topic
  "hardware_serial": "0102030405060708", // In case of LoRaWAN: the DevEUI
  "port": 1,                             // LoRaWAN FPort
  "counter": 2,                          // LoRaWAN frame counter
  "is_retry": false,                     // Is set to true if this message is a retry (you could also detect this from the counter)
  "confirmed": false,                    // Is set to true if this message was a confirmed message
  "payload_raw": "AQIDBA==",             // Base64 encoded payload: [0x01, 0x02, 0x03, 0x04]
  "payload_fields": {},                  // Object containing the results from the payload functions - left out when empty
  "metadata": {
    "time": "1970-01-01T00:00:00Z",      // Time when the server received the message
    "frequency": 868.1,                  // Frequency at which the message was sent
    "modulation": "LORA",                // Modulation that was used - LORA or FSK
    "data_rate": "SF7BW125",             // Data rate that was used - if LORA modulation
    "bit_rate": 50000,                   // Bit rate that was used - if FSK modulation
    "coding_rate": "4/5",                // Coding rate that was used
    "gateways": [
      {
        "gtw_id": "ttn-herengracht-ams", // EUI of the gateway
        "timestamp": 12345,              // Timestamp when the gateway received the message
        "time": "1970-01-01T00:00:00Z",  // Time when the gateway received the message - left out when gateway does not have synchronized time
        "channel": 0,                    // Channel where the gateway received the message
        "rssi": -25,                     // Signal strength of the received message
        "snr": 5,                        // Signal to noise ratio of the received message
        "rf_chain": 0,                   // RF chain where the gateway received the message
        "latitude": 52.1234,             // Latitude of the gateway reported in its status updates
        "longitude": 6.1234,             // Longitude of the gateway
        "altitude": 6                    // Altitude of the gateway
      },
      //...more if received by more gateways...
    ],
    "latitude": 52.2345,                 // Latitude of the device
    "longitude": 6.2345,                 // Longitude of the device
    "altitude": 2                        // Altitude of the device
  }
}

Note: Some values may be omitted if they are null, false, "" or 0.

Usage (Go client):

package main

import (
	"github.com/TheThingsNetwork/go-utils/log/apex"
	"github.com/TheThingsNetwork/ttn/amqp"
	"github.com/TheThingsNetwork/ttn/core/types"
)

func main() {

	ctx := apex.Stdout().WithField("Example", "Go AMQP client")
	c := amqp.NewClient(ctx, "guest", "guest", "localhost:5672")
	c.Connect()
	s := c.NewSubscriber("ttn.handler", "", false, true)
	s.Open()
	s.SubscribeDeviceUplink("my-app-id", "my-dev-id",
		func(_ amqp.Subscriber, appID string, devID string, req types.UplinkMessage) {
			ctx.Info("Uplink received")
			//...
		})
	//...
}

Routing key: <AppID>.devices.<DevID>.down

Message:

{
  "port": 1,                 // LoRaWAN FPort
  "confirmed": false,        // Whether the downlink should be confirmed by the device
  "payload_raw": "AQIDBA==", // Base64 encoded payload: [0x01, 0x02, 0x03, 0x04]
}

Usage (RabbitMQ): rabbitmqadmin publish exchange='ttn.handler' routing_key='my-app-id.devices.my-dev-id.down' payload='{"port":1,"payload_raw":"AQIDBA=="}'

Usage (Go client):

package main

import (
	"github.com/TheThingsNetwork/go-utils/log/apex"
	"github.com/TheThingsNetwork/ttn/amqp"
	"github.com/TheThingsNetwork/ttn/core/types"
)

func main() {

	ctx := apex.Stdout().WithField("test", "test")
	c := amqp.NewClient(ctx, "guest", "guest", "localhost:5672")
	c.Connect()
	p := c.NewPublisher("ttn.handler")
  if err := p.Open(); err != nil {
    ctx.WithError(err).Error("Could not open publishing channel")
  }
  defer p.Close()
	d := types.DownlinkMessage{
    AppID:      "my-app-id",
    DevID:      "my-dev-id",
    FPort:      1,
    PayloadRaw: []byte{0x01, 0x02, 0x03, 0x04}}
  p.PublishDownlink(d)
	//...
}

Device Events

Routing key:

  • <AppID>.devices.<DevID>.events.<event>
  • 0102030405060708.devices.abcdabcd12345678.events.activations
  • *.devices.*.events.*

Message:

{
  "payload": "Base64 encoded LoRaWAN packet",
  "gateway_id": "some-gateway",
  "config": {
    "modulation": "LORA",
    "data_rate": "SF7BW125",
    "counter": 123,
    "frequency": 868300000,
    "power": 14
  }
}

Usages (Go client):

package main

import (
	"github.com/TheThingsNetwork/go-utils/log/apex"
	"github.com/TheThingsNetwork/ttn/amqp"
	"github.com/TheThingsNetwork/ttn/core/types"
)

func main() {

  ctx := apex.Stdout().WithField("test", "test")
  c := amqp.NewClient(ctx, "guest", "guest", "localhost:5672")
  if err := c.Connect(); err != nil {
    ctx.WithError(err).Error("Could not connect")
  }
  s := c.NewSubscriber("ttn.handler", "", true, false)
  if err := s.Open(); err != nil {
    ctx.WithError(err).Error("Could not open subcription channel")
  }
	err = s.SubscribeAppEvents("my-app-id", "some-event",
			func(_ Subscriber, appID string, eventType types.EventType, payload []byte) {
			  // Do your stuff
			})
	//...
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ConnectRetries says how many times the client should retry a failed connection
	ConnectRetries = 10
	// ConnectRetryDelay says how long the client should wait between retries
	ConnectRetryDelay = time.Second
)
View Source
var (
	// PrefetchCount represents the number of messages to prefetch before the AMQP server requires acknowledgment
	PrefetchCount = 3
	// PrefetchSize represents the number of bytes to prefetch before the AMQP server requires acknowledgment
	PrefetchSize = 0
)

Functions

This section is empty.

Types

type AppEventHandler

type AppEventHandler func(sub Subscriber, appID string, eventType types.EventType, payload []byte)

AppEventHandler is called for events

type ApplicationKey

type ApplicationKey struct {
	AppID string
	Type  ApplicationKeyType
	Field string
}

ApplicationKey represents an AMQP topic for applications

func ParseApplicationKey

func ParseApplicationKey(key string) (*ApplicationKey, error)

ParseApplicationKey parses an AMQP application routing key string to an ApplicationKey struct

func (ApplicationKey) String

func (t ApplicationKey) String() string

String implements the Stringer interface

type ApplicationKeyType

type ApplicationKeyType string

ApplicationKeyType represents an AMQP application routing key

const (
	AppEvents ApplicationKeyType = "events"
)

Topic types for Applications

type ChannelClient

type ChannelClient interface {
	Open() error
	io.Closer
}

ChannelClient represents an AMQP channel client

type Client

type Client interface {
	Connect() error
	Disconnect()
	IsConnected() bool

	NewPublisher(exchange string) Publisher
	NewSubscriber(exchange, name string, durable, autoDelete bool) Subscriber
}

Client connects to an AMQP server

func NewClient

func NewClient(ctx log.Interface, username, password, host string) Client

NewClient creates a new DefaultClient

type DefaultChannelClient

type DefaultChannelClient struct {
	// contains filtered or unexported fields
}

DefaultChannelClient represents the default client of an AMQP channel

func (*DefaultChannelClient) Close

func (p *DefaultChannelClient) Close() error

Close closes the channel

func (*DefaultChannelClient) Open

func (p *DefaultChannelClient) Open() error

Open opens a new channel and declares the exchange

type DefaultClient

type DefaultClient struct {
	// contains filtered or unexported fields
}

DefaultClient is the default AMQP client for The Things Network

func (*DefaultClient) Connect

func (c *DefaultClient) Connect() error

Connect to the AMQP server. It will retry for ConnectRetries times with a delay of ConnectRetryDelay between retries

func (*DefaultClient) Disconnect

func (c *DefaultClient) Disconnect()

Disconnect from the AMQP server

func (*DefaultClient) GetChannel

func (c *DefaultClient) GetChannel() (*AMQP.Channel, error)

GetChannel gets a new AMQP channel

func (*DefaultClient) IsConnected

func (c *DefaultClient) IsConnected() bool

IsConnected returns true if there is a connection to the AMQP server.

func (*DefaultClient) NewPublisher

func (c *DefaultClient) NewPublisher(exchange string) Publisher

NewPublisher returns a new topic publisher on the specified exchange

func (*DefaultClient) NewSubscriber

func (c *DefaultClient) NewSubscriber(exchange, name string, durable, autoDelete bool) Subscriber

NewSubscriber returns a new topic subscriber on the specified exchange

type DefaultPublisher

type DefaultPublisher struct {
	DefaultChannelClient
}

DefaultPublisher represents the default AMQP publisher

func (*DefaultPublisher) PublishAppEvent

func (c *DefaultPublisher) PublishAppEvent(appID string, eventType types.EventType, payload interface{}) error

PublishAppEvent publishes an event to the topic for application events of the given type it will marshal the payload to json

func (*DefaultPublisher) PublishDeviceEvent

func (c *DefaultPublisher) PublishDeviceEvent(appID string, devID string, eventType types.EventType, payload interface{}) error

PublishDeviceEvent publishes an event to the topic for device events of the given type it will marshal the payload to json

func (c *DefaultPublisher) PublishDownlink(dataDown types.DownlinkMessage) error

PublishDownlink publishes a downlink message to the AMQP broker

func (c *DefaultPublisher) PublishUplink(dataUp types.UplinkMessage) error

PublishUplink publishes an uplink message to the AMQP broker

type DefaultSubscriber

type DefaultSubscriber struct {
	DefaultChannelClient
	// contains filtered or unexported fields
}

DefaultSubscriber represents the default AMQP subscriber

func (s *DefaultSubscriber) ConsumeUplink(queue string, handler UplinkHandler) error

ConsumeUplink consumes uplink messages in a specific queue

func (*DefaultSubscriber) QueueBind

func (s *DefaultSubscriber) QueueBind(name, key string) error

QueueBind binds the routing key to the specified queue

func (*DefaultSubscriber) QueueDeclare

func (s *DefaultSubscriber) QueueDeclare() (string, error)

QueueDeclare declares the queue on the AMQP broker

func (*DefaultSubscriber) QueueUnbind

func (s *DefaultSubscriber) QueueUnbind(name, key string) error

QueueUnbind unbinds the routing key from the specified queue

func (s *DefaultSubscriber) SubscribeAppDownlink(appID string, handler DownlinkHandler) error

SubscribeAppDownlink subscribes to all downlink messages for the given application

func (*DefaultSubscriber) SubscribeAppEvents

func (s *DefaultSubscriber) SubscribeAppEvents(appID string, eventType types.EventType, handler AppEventHandler) error

SubscribeAppEvents subscribes to events of the given type for the given application. In order to subscribe to application events from all applications the user has access to, pass an empty string as appID.

func (s *DefaultSubscriber) SubscribeAppUplink(appID string, handler UplinkHandler) error

SubscribeAppUplink subscribes to all uplink messages for the given application

func (s *DefaultSubscriber) SubscribeDeviceDownlink(appID, devID string, handler DownlinkHandler) error

SubscribeDeviceDownlink subscribes to all downlink messages for the given application and device

func (*DefaultSubscriber) SubscribeDeviceEvents

func (s *DefaultSubscriber) SubscribeDeviceEvents(appID string, devID string, eventType types.EventType, handler DeviceEventHandler) error

SubscribeDeviceEvents subscribes to events of the given type for the given device. In order to subscribe to events from all devices within an application, pass an empty string as devID. In order to subscribe to all events from all devices in all applications the user has access to, pass an empty string as appID.

func (s *DefaultSubscriber) SubscribeDeviceUplink(appID, devID string, handler UplinkHandler) error

SubscribeDeviceUplink subscribes to all uplink messages for the given application and device

func (s *DefaultSubscriber) SubscribeDownlink(handler DownlinkHandler) error

SubscribeDownlink subscribes to all downlink messages that the current user has access to

func (s *DefaultSubscriber) SubscribeUplink(handler UplinkHandler) error

SubscribeUplink subscribes to all uplink messages that the current user has access to

type DeviceEventHandler

type DeviceEventHandler func(sub Subscriber, appID string, devID string, eventType types.EventType, payload []byte)

DeviceEventHandler is called for events

type DeviceKey

type DeviceKey struct {
	AppID string
	DevID string
	Type  DeviceKeyType
	Field string
}

DeviceKey represents an AMQP routing key for devices

func ParseDeviceKey

func ParseDeviceKey(key string) (*DeviceKey, error)

ParseDeviceKey parses an AMQP device routing key string to a DeviceKey struct

func (DeviceKey) String

func (t DeviceKey) String() string

String implements the Stringer interface

type DeviceKeyType

type DeviceKeyType string

DeviceKeyType represents the type of a device topic

const (
	DeviceEvents   DeviceKeyType = "events"
	DeviceUplink   DeviceKeyType = "up"
	DeviceDownlink DeviceKeyType = "down"
)

Topic types for Devices

type DownlinkHandler

type DownlinkHandler func(subscriber Subscriber, appID string, devID string, req types.DownlinkMessage)

DownlinkHandler is called for downlink messages

type Publisher

type Publisher interface {
	ChannelClient

	PublishUplink(dataUp types.UplinkMessage) error
	PublishDownlink(dataDown types.DownlinkMessage) error
	PublishDeviceEvent(appID string, devID string, eventType types.EventType, payload interface{}) error
	PublishAppEvent(appID string, eventType types.EventType, payload interface{}) error
}

Publisher represents a publisher for uplink messages

type Subscriber

type Subscriber interface {
	ChannelClient

	QueueDeclare() (string, error)
	QueueBind(name, key string) error
	QueueUnbind(name, key string) error

	SubscribeDeviceUplink(appID, devID string, handler UplinkHandler) error
	SubscribeAppUplink(appID string, handler UplinkHandler) error
	SubscribeUplink(handler UplinkHandler) error
	ConsumeUplink(queue string, handler UplinkHandler) error

	SubscribeDeviceDownlink(appID, devID string, handler DownlinkHandler) error
	SubscribeAppDownlink(appID string, handler DownlinkHandler) error
	SubscribeDownlink(handler DownlinkHandler) error

	SubscribeDeviceEvents(appID string, devID string, eventType types.EventType, handler DeviceEventHandler) error
	SubscribeAppEvents(appID string, eventType types.EventType, handler AppEventHandler) error
}

Subscriber represents a subscriber for uplink messages

type UplinkHandler

type UplinkHandler func(subscriber Subscriber, appID string, devID string, req types.UplinkMessage)

UplinkHandler is called for uplink messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL