Documentation ¶
Overview ¶
Package bridge contains all things bridge / broker connector
Example (ConnectUsingBrokerViaTCP) ¶
package main import ( "fmt" "github.com/pb33f/ranch/bridge" "github.com/pb33f/ranch/bus" ) func main() { // get a reference to the event bus. b := bus.GetBus() // create a broker connector configuration, using WebSockets. // Make sure you have a STOMP TCP server running like RabbitMQ config := &bridge.BrokerConnectorConfig{ Username: "guest", Password: "guest", ServerAddr: ":61613", STOMPHeader: map[string]string{ "access-token": "test", }, } // connect to broker. c, err := b.ConnectBroker(config) if err != nil { fmt.Printf("unable to connect, error: %e", err) } defer c.Disconnect() // subscribe to our demo simple-stream s, _ := c.Subscribe("/queue/sample") // set a counter n := 0 // create a control chan done := make(chan bool) // listen for messages var consumer = func() { for { // listen for incoming messages from subscription. m := <-s.GetMsgChannel() n++ // get byte array. d := m.Payload.([]byte) fmt.Printf("Message Received: %s\n", string(d)) // listen for 5 messages then stop. if n >= 5 { break } } done <- true } // send messages var producer = func() { for i := 0; i < 5; i++ { c.SendMessage("/queue/sample", "text/plain", []byte(fmt.Sprintf("message: %d", i))) } } // listen for incoming messages on subscription for destination /queue/sample go consumer() // send some messages to the broker on destination /queue/sample go producer() // wait for messages to be processed. <-done }
Output:
Example (ConnectUsingBrokerViaWebSocket) ¶
package main import ( "encoding/json" "fmt" "github.com/pb33f/ranch/bridge" "github.com/pb33f/ranch/bus" "github.com/pb33f/ranch/model" ) func main() { // get a reference to the event bus. b := bus.GetBus() // create a broker connector configuration, using WebSockets. config := &bridge.BrokerConnectorConfig{ Username: "guest", Password: "guest", ServerAddr: "appfabric.vmware.com", WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/fabric"}, UseWS: true, STOMPHeader: map[string]string{ "access-token": "test", }, } // connect to broker. c, err := b.ConnectBroker(config) if err != nil { fmt.Printf("unable to connect, error: %e", err) } // subscribe to our demo simple-stream s, _ := c.Subscribe("/topic/simple-stream") // set a counter n := 0 // create a control chan done := make(chan bool) var listener = func() { for { // listen for incoming messages from subscription. m := <-s.GetMsgChannel() // unmarshal message. r := &model.Response{} d := m.Payload.([]byte) json.Unmarshal(d, &r) fmt.Printf("Message Received: %s\n", r.Payload.(string)) n++ // listen for 5 messages then stop. if n >= 5 { break } } done <- true } // listen for incoming messages on subscription. go listener() <-done c.Disconnect() }
Output:
Index ¶
- type BridgeClient
- func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error
- func (ws *BridgeClient) Disconnect() error
- func (ws *BridgeClient) Send(destination, contentType string, payload []byte, ...)
- func (ws *BridgeClient) SendFrame(f *frame.Frame)
- func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub
- type BridgeClientSub
- type BrokerConnector
- type BrokerConnectorConfig
- type Connection
- type Subscription
- type WebSocketConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BridgeClient ¶
type BridgeClient struct { WSc *websocket.Conn // WebSocket connection TCPc *stomp.Conn // STOMP TCP Connection ConnectedChan chan bool Subscriptions map[string]*BridgeClientSub // contains filtered or unexported fields }
BridgeClient encapsulates all subscriptions and io to and from brokers.
func NewBridgeWsClient ¶
func NewBridgeWsClient(enableLogging bool) *BridgeClient
NewBridgeWsClient Create a new WebSocket client.
func (*BridgeClient) Connect ¶
func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error
Connect to broker endpoint.
func (*BridgeClient) Disconnect ¶
func (ws *BridgeClient) Disconnect() error
Disconnect from broker endpoint
func (*BridgeClient) Send ¶
func (ws *BridgeClient) Send(destination, contentType string, payload []byte, opts ...func(fr *frame.Frame) error)
Send a payload to a destination
func (*BridgeClient) SendFrame ¶
func (ws *BridgeClient) SendFrame(f *frame.Frame)
SendFrame fire a STOMP frame down the WebSocket
func (*BridgeClient) Subscribe ¶
func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub
Subscribe to destination
type BridgeClientSub ¶
type BridgeClientSub struct { C chan *model.Message // MESSAGE payloads E chan *model.Message // ERROR payloads. Id *uuid.UUID Destination string Client *BridgeClient // contains filtered or unexported fields }
BridgeClientSub is a client subscription that encapsulates message and error channels for a subscription
func (*BridgeClientSub) Unsubscribe ¶
func (cs *BridgeClientSub) Unsubscribe()
Send an UNSUBSCRIBE frame for subscription destination.
type BrokerConnector ¶
type BrokerConnector interface {
Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error)
}
BrokerConnector is used to connect to a message broker over TCP or WebSocket.
type BrokerConnectorConfig ¶
type BrokerConnectorConfig struct { Username string Password string ServerAddr string UseWS bool // use WebSocket instead of TCP WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true HostHeader string HeartBeatOut time.Duration // outbound heartbeat interval (from client to server) HeartBeatIn time.Duration // inbound heartbeat interval (from server to client) STOMPHeader map[string]string // additional STOMP headers for handshake HttpHeader http.Header // additional HTTP headers for WebSocket Upgrade }
BrokerConnectorConfig is a configuration used when connecting to a message broker
type Connection ¶
type Connection interface { GetId() *uuid.UUID Subscribe(destination string) (Subscription, error) SubscribeReplyDestination(destination string) (Subscription, error) Disconnect() (err error) SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error SendMessage(destination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error SendMessageWithReplyDestination(destination, replyDestination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error Conversation(destination string, payload []byte, opts ...func(*frame.Frame) error) (Subscription, error) RequestResponse(ctx context.Context, payload []byte, opts ...func(*frame.Frame) error) (*model.Message, error) }
type Subscription ¶
type WebSocketConfig ¶
type WebSocketConfig struct { WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric') UseTLS bool // use TLS encryption with WebSocket connection TLSConfig *tls.Config // TLS config for WebSocket connection CertFile string // X509 certificate for TLS KeyFile string // matching key file for the X509 certificate }
func (*WebSocketConfig) LoadX509KeyPairFromFiles ¶
func (b *WebSocketConfig) LoadX509KeyPairFromFiles(certFile, keyFile string) error
LoadX509KeyPairFromFiles loads from paths to x509 cert and its matching key files and initializes the Certificates field of the TLS config instance with their contents, only if both Certificates is an empty slice and GetCertificate is nil