Documentation
¶
Index ¶
Examples ¶
Constants ¶
const AlertTopic = "hub.subscription.messageslost"
AlertTopic is used to notify when a nonblocking subscriber loose one message You can subscribe on this topic and log or send metrics.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Fields ¶
type Fields map[string]interface{}
Fields is a [key]value storage for Messages values.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is a component that provides publish and subscribe capabilities for messages. Every message has a Name used to route them to subscribers and this can be used like RabbitMQ topics exchanges. Where every word is separated by dots `.` and you can use `*` as a wildcard.
Example ¶
package main import ( "fmt" "sync" "github.com/leandro-lugaresi/hub" ) func main() { h := hub.New() var wg sync.WaitGroup // the cap param is used to create one buffered channel with cap = 10 // If you wan an unbuferred channel use the 0 cap sub := h.Subscribe(10, "account.login.*", "account.changepassword.*") wg.Add(1) go func(s hub.Subscription) { for msg := range s.Receiver { fmt.Printf("receive msg with topic %s and id %d\n", msg.Name, msg.Fields["id"]) } wg.Done() }(sub) h.Publish(hub.Message{ Name: "account.login.failed", Fields: hub.Fields{"id": 123}, }) h.Publish(hub.Message{ Name: "account.changepassword.failed", Fields: hub.Fields{"id": 456}, }) h.Publish(hub.Message{ Name: "account.login.success", Fields: hub.Fields{"id": 123}, }) // message not routed to this subscriber h.Publish(hub.Message{ Name: "account.foo.failed", Fields: hub.Fields{"id": 789}, }) // close all the subscribers h.Close() // wait until finish all the messages on buffer wg.Wait() }
Output: receive msg with topic account.login.failed and id 123 receive msg with topic account.changepassword.failed and id 456 receive msg with topic account.login.success and id 123
func (*Hub) Close ¶
func (h *Hub) Close()
Close will unsubscribe all the subscriptions and close them all.
func (*Hub) NonBlockingSubscribe ¶
func (h *Hub) NonBlockingSubscribe(cap int, topics ...string) Subscription
NonBlockingSubscribe create a nonblocking subscription to receive events for a given topic. This subscriber will loose messages if the buffer reaches the max capability.
func (*Hub) Subscribe ¶
func (h *Hub) Subscribe(cap int, topics ...string) Subscription
Subscribe create a blocking subscription to receive events for a given topic. The cap param is used inside the subscriber and in this case used to create a channel. cap(1) = unbuffered channel.
func (*Hub) Unsubscribe ¶
func (h *Hub) Unsubscribe(sub Subscription)
Unsubscribe remove and close the Subscription.
type Message ¶
Message represent some message/event passed into the hub It also contain some helper functions to convert the fields to primitive types.
type Subscription ¶
type Subscription struct { Topics []string Receiver <-chan Message // contains filtered or unexported fields }
Subscription represents a topic subscription.