Transport. An event bus for Go
Transport is a full stack, simple, fast, expandable application event bus for your applications. It can transport anything you want around your application,
as well extend different channels to brokers and destinations. Transport makes it easy to move your bits around localized or distribiuted apps, without worrying
about all the wiring.
What does that mean?
Transport is an event bus, that allows application developers to build components that can talk to one another, really easily.
It provides a standardized and simple API, implemented in multiple languages, to allow any individual component inside your
applications to talk to one another.
It really comes to life when you use it to send messages, requests, responses and events around your backend and front-end.
Your backend can stream messages to your UI, or other services or agents, as if they were sitting right next to each other,
You can build one to one, many to one and many to many topologies with ease.
Channels can be extended to major brokers like Kafka or RabbitMQ, so Transport becomes an 'on/off-ramp' for your main sources of truth.
Watch this quick video for an overview
Transport for Go also comes with plank
plank is a micro platform for building just about anything you can think of, it exposes RESTful and AsyncAPI & Pub/Sub endpoints
for services that can do just about anything. Read more about plank
Getting Started
Other versions of Transport
Transport Site
https://transport-bus.io
Quick Start
Install transport
go get -u github.com/vmware/transport-go
To create an instance of the bus
import "github.com/vmware/transport-go/bus"
var transport = bus.GetBus()
Transport is a singleton, there is (should) only ever a single instance of the bus in your application.
Managing / Creating Channels
The ChannelManager
interface on the EventBus
interface facilitates all Channel operations.
channelManager := transport.GetChannelManager()
Creating Channels
The CreateChannel
method will create a new channel with the name "some-channel". It will return a pointer to a
Channel
object. You don't need to hold on to that pointer if you don't want to. The channel will still exist.
channel := channelManager.CreateChanel("some-channel")
Simple Example
A simple ping pong looks a little like this.
// listen for a single request on 'some-channel'
ts := bus.GetBus()
channel := "some-channel"
ts.GetChannelManager().CreateChannel(channel)
// listen for a single request on 'some-channel'
requestHandler, _ := ts.ListenRequestStream(channel)
requestHandler.Handle(
func(msg *model.Message) {
pingContent := msg.Payload.(string)
fmt.Printf("\nPing: %s\n", pingContent)
// send a response back.
ts.SendResponseMessage(channel, pingContent, msg.DestinationId)
},
func(err error) {
// something went wrong...
})
// send a request to 'some-channel' and handle a single response
responseHandler, _ := ts.RequestOnce(channel, "Woo!")
responseHandler.Handle(
func(msg *model.Message) {
fmt.Printf("Pong: %s\n", msg.Payload.(string))
},
func(err error) {
// something went wrong...
})
// fire the request.
responseHandler.Fire()
This will output:
🌈 Transport booted with id [e495e5d5-2b72-46dd-8013-d49049bd4800]
Ping: Woo!
Pong: Woo!
Connecting to a message broker and using galactic channels
You can see this all working live in some of our interactive demos for Transport TypeScript.
it shows Transport acting as both client and server, in which we use plank
to run
services, and the UI subscribes to those services and talks to them.
We have a live and running instance of plank
operating at
transport-bus.io. You can try the example code below to use the sample
simple stream service and
see how simple it is for yourself.
Simple Stream Example
// get a pointer to the bus.
b := bus.GetBus()
// get a pointer to the channel manager
cm := b.GetChannelManager()
// create a broker connector config and connect to
// transport-bus.io over WebSocket using TLS.
config := &bridge.BrokerConnectorConfig{
Username: "guest", // not required for demo, but our API requires it.
Password: "guest", // ^^ same.
ServerAddr: "transport-bus.io", // our live broker running plank and demo services.
UseWS: true, // connect over websockets
WebSocketConfig: &bridge.WebSocketConfig{ // configure websocket
WSPath: "/ws", // websocket endpoint
UseTLS: true,
// use TLS/HTTPS. When using TLS, you can supply your own TLSConfig value, or we can
// generate a basic one for you if you leave TLSConfig empty. In most cases,
// you won't need to supply one.
}}
// connect to transport-bus.io demo broker
c, err := b.ConnectBroker(config)
if err != nil {
utils.Log.Fatalf("unable to connect to transport-bus.io, error: %v", err.Error())
}
// create a local channel on the bus.
myLocalChan := "my-stream"
cm.CreateChannel(myLocalChan)
// listen to stream of messages coming in on channel, a handler is returned
// that allows you to add in lambdas that handle your success messages, and your errors.
handler, _ := b.ListenStream(myLocalChan)
// mark our local 'my-stream' myLocalChan as galactic and map it to our connection and
// the /topic/simple-stream service
err = cm.MarkChannelAsGalactic(myLocalChan, "/topic/simple-stream", c)
if err != nil {
utils.Log.Fatalf("unable to map local channel to broker destination: %e", err)
}
// collect the streamed values in a slice
var streamedValues []string
// create a wait group that will wait 10 times before completing.
var wg sync.WaitGroup
wg.Add(10)
// keep listening
handler.Handle(
func(msg *model.Message) {
// unmarshal the message payload into a model.Response object
// this is a wrapper transport uses when being used as a server,
// it encapsulates a rich set of data about the message,
// but you only really care about the payload (body)
r := &model.Response{}
d := msg.Payload.([]byte)
err := json.Unmarshal(d, &r)
if err != nil {
utils.Log.Errorf("error unmarshalling request: %v", err.Error())
return
}
// the value we want is in the payload of our model.Response
value := r.Payload.(string)
// log it and save it to our streamedValues
utils.Log.Infof("stream ticked: %s", value)
streamedValues = append(streamedValues, value)
wg.Done()
},
func(err error) {
utils.Log.Errorf("error received on channel: %e", err)
})
// wait for 10 ticks of the stream, then we're done.
wg.Wait()
// close our handler, we're done.
handler.Close()
// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(myLocalChan)
if err != nil {
utils.Log.Fatalf("unable to unsubscribe, error: %e", err)
}
// disconnect
err = c.Disconnect()
if err != nil {
utils.Log.Fatalf("unable to disconnect, error: %e", err)
}
// return what we got from the stream.
return streamedValues
You can see this simple example here
plank
is 'just enough' of a platform for building just about anything you want. Run RESTful and AsyncAPIs with the
same code, build simple or complex services that can be exposed to any client in any manner. Talk over WebSockets and pub/sub
and streaming, or call the same APIs via REST mappings. plank
can do it all. It's tiny, super-fast and runs on any platform. Runs in
just a few megabytes of memory, and can be compiled down to the same. It can be used for micro-services, daemons, agents,
local UI helper applications... anything!
plank
is only available in transport-go
Take a look at plank
Contributing
The transport-go project team welcomes contributions from the community. Before you start working with transport-go, please
read our Developer Certificate of Origin. All contributions to this repository must be
signed as described on that page. Your signature certifies that you wrote the patch or have the right to pass it on
as an open-source patch. For more detailed information, refer to CONTRIBUTING.md.
License
BSD-2-Clause
Copyright (c) 2016-2021, VMware, Inc.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.