Documentation
¶
Overview ¶
Package msgbus implements a generic PubSub message bus that follows MQTT guidelines.
It uses https://github.com/eclipse/paho.mqtt.golang under the hood.
Spec ¶
The MQTT specification lives at http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
Example ¶
b := msgbus.New() base := "homeassistant" var err error // Now all Publish() calls topics are based on "homeassistant/". if b, err = msgbus.RebasePub(b, base); err != nil { log.Fatal(err) } // Now all Subscribe() calls topics are based on "homeassistant/". if b, err = msgbus.RebaseSub(b, base); err != nil { log.Fatal(err) } if err := b.Close(); err != nil { log.Fatal(err) }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Bus ¶
type Bus interface { io.Closer // Publish publishes a message to a topic. // // If msg.Payload is empty, the topic is deleted if it was retained. // // It is not guaranteed that messages are propagated in order, unless // qos ExactlyOnce is used. Publish(msg Message, qos QOS) error // Subscribe sends updates to this topic query through the provided channel. // // It blocks until the context is canceled. Returns an error if subscription // failed. // // Upon subscription, it sends an empty message that can be ignored, to // enable synchronizing with other systems. Subscribe(ctx context.Context, topicQuery string, qos QOS, c chan<- Message) error }
Bus is a publisher-subscriber bus.
The topics are expected to use the MQTT definition. "Mosquitto" has good documentation about this: https://mosquitto.org/man/mqtt-7.html
For more information about retained message behavior, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages
Implementation of Bus are expected to implement fmt.Stringer.
func New ¶
func New() Bus
New returns a local thread safe memory backed Bus.
This Bus is thread safe. It is useful for unit tests or as a local broker.
Example ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() b := msgbus.New() c := make(chan msgbus.Message) go func() { defer close(c) if err := b.Subscribe(ctx, "#", msgbus.BestEffort, c); err != nil { log.Fatal(err) } }() // Wait for subscription to be live. if msg := <-c; len(msg.Topic) != 0 || len(msg.Payload) != 0 || msg.Retained { log.Fatal(msg) } if err := b.Publish(msgbus.Message{Topic: "sensor", Payload: []byte("ON"), Retained: true}, msgbus.BestEffort); err != nil { log.Fatal(err) } msg := <-c fmt.Printf("%s: %s\n", msg.Topic, msg.Payload) if err := b.Close(); err != nil { log.Fatal(err) }
Output: sensor: ON
func NewMQTT ¶
NewMQTT returns an initialized active MQTT connection.
The connection timeouts are fine tuned for a LAN. It will likely fail on a slower connection or when used over the internet.
will is the message to send if the connection is not closed correctly; when Close() is not called.
order determines is messages are processed in order or not. Out of order processing means that a subscription will not be blocked by another one that fails to process its queue in time.
This main purpose of this library is to create a layer that is simpler, more usable and more Go-idiomatic than paho.mqtt.golang.
See https://godoc.org/github.com/eclipse/paho.mqtt.golang#ClientOptions.AddBroker for the accepted server format.
Example ¶
will := msgbus.Message{Topic: "alive", Payload: []byte("NO"), Retained: true} hostname, err := os.Hostname() if err != nil { log.Fatal(err) } b, err := msgbus.NewMQTT("tcp://localhost:1883", hostname, "user", "pass", will, false) if err != nil { log.Fatal(err) } msg := msgbus.Message{Topic: "alive", Payload: []byte("YES"), Retained: true} if err := b.Publish(msg, msgbus.BestEffort); err != nil { log.Fatal(err) } if err := b.Close(); err != nil { log.Fatal(err) }
Output:
type Message ¶
type Message struct { // Topic is the MQTT topic. It may have a prefix stripped by RebaseSub() or // inserted by RebasePub(). Topic string // Payload is the application specific data. // // Publishing a message with no Payload deletes a retained Topic, and has no // effect on non-retained topic. Payload []byte // Retained signifies that the message is permanent until explicitly changed. // Otherwise it is ephemeral. Retained bool }
Message represents a single message to a single topic.
type QOS ¶
type QOS int8
QOS defines the quality of service to use when publishing and subscribing to messages.
The normative definition is http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180912
const ( // BestEffort means the broker/client will deliver the message at most once, // with no confirmation. BestEffort QOS = 0 // MinOnce means the broker/client will deliver the message at least once, // potentially duplicate. // // Do not use if message duplication is problematic. MinOnce QOS = 1 // ExactlyOnce means the broker/client will deliver the message exactly once // by using a four step handshake. ExactlyOnce QOS = 2 )