bridge

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2024 License: BSD-2-Clause Imports: 18 Imported by: 0

Documentation

Overview

Package bridge contains all things bridge / broker connector

Example (ConnectUsingBrokerViaTCP)
package main

import (
	"fmt"
	"github.com/pb33f/ranch/bridge"
	"github.com/pb33f/ranch/bus"
)

func main() {

	// get a reference to the event bus.
	b := bus.GetBus()

	// create a broker connector configuration, using WebSockets.
	// Make sure you have a STOMP TCP server running like RabbitMQ
	config := &bridge.BrokerConnectorConfig{
		Username:   "guest",
		Password:   "guest",
		ServerAddr: ":61613",
		STOMPHeader: map[string]string{
			"access-token": "test",
		},
	}

	// connect to broker.
	c, err := b.ConnectBroker(config)
	if err != nil {
		fmt.Printf("unable to connect, error: %e", err)
	}
	defer c.Disconnect()

	// subscribe to our demo simple-stream
	s, _ := c.Subscribe("/queue/sample")

	// set a counter
	n := 0

	// create a control chan
	done := make(chan bool)

	// listen for messages
	var consumer = func() {
		for {
			// listen for incoming messages from subscription.
			m := <-s.GetMsgChannel()
			n++

			// get byte array.
			d := m.Payload.([]byte)

			fmt.Printf("Message Received: %s\n", string(d))
			// listen for 5 messages then stop.
			if n >= 5 {
				break
			}
		}
		done <- true
	}

	// send messages
	var producer = func() {
		for i := 0; i < 5; i++ {
			c.SendMessage("/queue/sample", "text/plain", []byte(fmt.Sprintf("message: %d", i)))
		}
	}

	// listen for incoming messages on subscription for destination /queue/sample
	go consumer()

	// send some messages to the broker on destination /queue/sample
	go producer()

	// wait for messages to be processed.
	<-done
}
Output:

Example (ConnectUsingBrokerViaWebSocket)
package main

import (
	"encoding/json"
	"fmt"
	"github.com/pb33f/ranch/bridge"
	"github.com/pb33f/ranch/bus"
	"github.com/pb33f/ranch/model"
)

func main() {

	// get a reference to the event bus.
	b := bus.GetBus()

	// create a broker connector configuration, using WebSockets.
	config := &bridge.BrokerConnectorConfig{
		Username:        "guest",
		Password:        "guest",
		ServerAddr:      "appfabric.vmware.com",
		WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/fabric"},
		UseWS:           true,
		STOMPHeader: map[string]string{
			"access-token": "test",
		},
	}

	// connect to broker.
	c, err := b.ConnectBroker(config)
	if err != nil {
		fmt.Printf("unable to connect, error: %e", err)
	}

	// subscribe to our demo simple-stream
	s, _ := c.Subscribe("/topic/simple-stream")

	// set a counter
	n := 0

	// create a control chan
	done := make(chan bool)

	var listener = func() {
		for {
			// listen for incoming messages from subscription.
			m := <-s.GetMsgChannel()

			// unmarshal message.
			r := &model.Response{}
			d := m.Payload.([]byte)
			json.Unmarshal(d, &r)
			fmt.Printf("Message Received: %s\n", r.Payload.(string))

			n++

			// listen for 5 messages then stop.
			if n >= 5 {
				break
			}
		}
		done <- true
	}

	// listen for incoming messages on subscription.
	go listener()

	<-done

	c.Disconnect()
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BridgeClient

type BridgeClient struct {
	WSc           *websocket.Conn // WebSocket connection
	TCPc          *stomp.Conn     // STOMP TCP Connection
	ConnectedChan chan bool

	Subscriptions map[string]*BridgeClientSub
	// contains filtered or unexported fields
}

BridgeClient encapsulates all subscriptions and io to and from brokers.

func NewBridgeWsClient

func NewBridgeWsClient(enableLogging bool) *BridgeClient

NewBridgeWsClient Create a new WebSocket client.

func (*BridgeClient) Connect

func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error

Connect to broker endpoint.

func (*BridgeClient) Disconnect

func (ws *BridgeClient) Disconnect() error

Disconnect from broker endpoint

func (*BridgeClient) Send

func (ws *BridgeClient) Send(destination, contentType string, payload []byte, opts ...func(fr *frame.Frame) error)

Send a payload to a destination

func (*BridgeClient) SendFrame

func (ws *BridgeClient) SendFrame(f *frame.Frame)

SendFrame fire a STOMP frame down the WebSocket

func (*BridgeClient) Subscribe

func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub

Subscribe to destination

type BridgeClientSub

type BridgeClientSub struct {
	C           chan *model.Message // MESSAGE payloads
	E           chan *model.Message // ERROR payloads.
	Id          *uuid.UUID
	Destination string
	Client      *BridgeClient
	// contains filtered or unexported fields
}

BridgeClientSub is a client subscription that encapsulates message and error channels for a subscription

func (*BridgeClientSub) Unsubscribe

func (cs *BridgeClientSub) Unsubscribe()

Send an UNSUBSCRIBE frame for subscription destination.

type BrokerConnector

type BrokerConnector interface {
	Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error)
}

BrokerConnector is used to connect to a message broker over TCP or WebSocket.

func NewBrokerConnector

func NewBrokerConnector() BrokerConnector

Create a new broker connector

type BrokerConnectorConfig

type BrokerConnectorConfig struct {
	Username        string
	Password        string
	ServerAddr      string
	UseWS           bool             // use WebSocket instead of TCP
	WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true
	HostHeader      string
	HeartBeatOut    time.Duration     // outbound heartbeat interval (from client to server)
	HeartBeatIn     time.Duration     // inbound heartbeat interval (from server to client)
	STOMPHeader     map[string]string // additional STOMP headers for handshake
	HttpHeader      http.Header       // additional HTTP headers for WebSocket Upgrade
}

BrokerConnectorConfig is a configuration used when connecting to a message broker

type Connection

type Connection interface {
	GetId() *uuid.UUID
	Subscribe(destination string) (Subscription, error)
	SubscribeReplyDestination(destination string) (Subscription, error)
	Disconnect() (err error)
	SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error
	SendMessage(destination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error
	SendMessageWithReplyDestination(destination, replyDestination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error
	Conversation(destination string, payload []byte, opts ...func(*frame.Frame) error) (Subscription, error)
	RequestResponse(ctx context.Context, payload []byte, opts ...func(*frame.Frame) error) (*model.Message, error)
}

type Subscription

type Subscription interface {
	GetId() *uuid.UUID
	GetMsgChannel() chan *model.Message
	GetDestination() string
	Unsubscribe() error
}

type WebSocketConfig

type WebSocketConfig struct {
	WSPath    string      // if UseWS is true, set this to your websocket path (e.g. '/fabric')
	UseTLS    bool        // use TLS encryption with WebSocket connection
	TLSConfig *tls.Config // TLS config for WebSocket connection
	CertFile  string      // X509 certificate for TLS
	KeyFile   string      // matching key file for the X509 certificate
}

func (*WebSocketConfig) LoadX509KeyPairFromFiles

func (b *WebSocketConfig) LoadX509KeyPairFromFiles(certFile, keyFile string) error

LoadX509KeyPairFromFiles loads from paths to x509 cert and its matching key files and initializes the Certificates field of the TLS config instance with their contents, only if both Certificates is an empty slice and GetCertificate is nil

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL