Documentation ¶
Overview ¶
Package bus contains all things bus.
Example (UsingGalacticChannels) ¶
package main import ( "encoding/json" "fmt" "github.com/vmware/transport-go/bridge" "github.com/vmware/transport-go/bus" "github.com/vmware/transport-go/model" "log" ) func main() { // get a pointer to the bus. b := bus.GetBus() // get a pointer to the channel manager cm := b.GetChannelManager() channel := "my-stream" cm.CreateChannel(channel) // create done signal var done = make(chan bool) // listen to stream of messages coming in on channel. h, err := b.ListenStream(channel) if err != nil { log.Panicf("unable to listen to channel stream, error: %e", err) } count := 0 // listen for five messages and then exit, send a completed signal on channel. h.Handle( func(msg *model.Message) { // unmarshal the payload into a Response object (used by fabric services) r := &model.Response{} d := msg.Payload.([]byte) json.Unmarshal(d, &r) fmt.Printf("Stream Ticked: %s\n", r.Payload.(string)) count++ if count >= 5 { done <- true } }, func(err error) { log.Panicf("error received on channel %e", err) }) // create a broker connector config, in this case, we will connect to the application fabric demo endpoint. config := &bridge.BrokerConnectorConfig{ Username: "guest", Password: "guest", ServerAddr: "appfabric.vmware.com", WebSocketConfig: &bridge.WebSocketConfig{ WSPath: "/fabric", }, UseWS: true} // connect to broker. c, err := b.ConnectBroker(config) if err != nil { log.Panicf("unable to connect to fabric, error: %e", err) } // mark our local channel as galactic and map it to our connection and the /topic/simple-stream service // running on appfabric.vmware.com err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c) if err != nil { log.Panicf("unable to map local channel to broker destination: %e", err) } // wait for done signal <-done // mark channel as local (unsubscribe from all mappings) err = cm.MarkChannelAsLocal(channel) if err != nil { log.Panicf("unable to unsubscribe, error: %e", err) } err = c.Disconnect() if err != nil { log.Panicf("unable to disconnect, error: %e", err) } }
Output:
Index ¶
- Constants
- func EnableLogging(enable bool)
- type BusStore
- type BusTransaction
- type BusTransactionReadyFunction
- type Channel
- func (channel *Channel) ContainsHandlers() bool
- func (channel *Channel) IsGalactic() bool
- func (channel *Channel) IsPrivate() bool
- func (channel *Channel) Send(message *model.Message)
- func (channel *Channel) SetGalactic(mappedDestination string)
- func (channel *Channel) SetLocal()
- func (channel *Channel) SetPrivate(private bool)
- type ChannelManager
- type EndpointConfig
- type EventBus
- type FabricEndpoint
- type MessageErrorFunction
- type MessageHandler
- type MessageHandlerFunction
- type MonitorEvent
- type MonitorEventHandler
- type MonitorEventListenerId
- type MonitorEventType
- type MutationRequest
- type MutationRequestHandlerFunction
- type MutationStoreStream
- type StompSessionEvent
- type StoreChange
- type StoreChangeHandlerFunction
- type StoreManager
- type StoreStream
Examples ¶
Constants ¶
const (
STOMP_SESSION_NOTIFY_CHANNEL = TRANSPORT_INTERNAL_CHANNEL_PREFIX + "stomp-session-notify"
)
const TRANSPORT_INTERNAL_CHANNEL_PREFIX = "_transportInternal/"
Variables ¶
This section is empty.
Functions ¶
func EnableLogging ¶
func EnableLogging(enable bool)
Types ¶
type BusStore ¶
type BusStore interface { // Get the name (the id) of the store. GetName() string // Add new or updates existing item in the store. Put(id string, value interface{}, state interface{}) // Returns an item from the store and a boolean flag // indicating whether the item exists Get(id string) (interface{}, bool) // Shorten version of the Get() method, returns only the item value. GetValue(id string) interface{} // Remove an item from the store. Returns true if the remove operation was successful. Remove(id string, state interface{}) bool // Return a slice containing all store items. AllValues() []interface{} // Return a map with all items from the store. AllValuesAsMap() map[string]interface{} // Return a map with all items from the store with the current store version. AllValuesAndVersion() (map[string]interface{}, int64) // Subscribe to state changes for a specific object. OnChange(id string, state ...interface{}) StoreStream // Subscribe to state changes for all objects OnAllChanges(state ...interface{}) StoreStream // Notify when the store has been initialize (via populate() or initialize() WhenReady(readyFunction func()) // Populate the store with a map of items and their ID's. Populate(items map[string]interface{}) error // Mark the store as initialized and notify all watchers. Initialize() // Subscribe to mutation requests made via mutate() method. OnMutationRequest(mutationType ...interface{}) MutationStoreStream // Send a mutation request to any subscribers handling mutations. Mutate(request interface{}, requestType interface{}, successHandler func(interface{}), errorHandler func(interface{})) // Removes all items from the store and change its state to uninitialized". Reset() // Returns true if this is galactic store. IsGalactic() bool // Get the item type if such is specified during the creation of the // store GetItemType() reflect.Type }
BusStore is a stateful in memory cache for objects. All state changes (any time the cache is modified) will broadcast that updated object to any subscribers of the BusStore for those specific objects or all objects of a certain type and state changes.
type BusTransaction ¶
type BusTransaction interface { // Sends a request to a channel as a part of this transaction. SendRequest(channel string, payload interface{}) error // Wait for a store to be initialized as a part of this transaction. WaitForStoreReady(storeName string) error // Registers a new complete handler. Once all responses to requests have been received, // the transaction is complete. OnComplete(completeHandler BusTransactionReadyFunction) error // Register a new error handler. If an error is thrown by any of the responders, the transaction // is aborted and the error sent to the registered errorHandlers. OnError(errorHandler MessageErrorFunction) error // Commit the transaction, all requests will be sent and will wait for responses. // Once all the responses are in, onComplete handlers will be called with the responses. Commit() error }
type Channel ¶
type Channel struct { Name string `json:"string"` // contains filtered or unexported fields }
Channel represents the stream and the subscribed event handlers waiting for ticks on the stream
func NewChannel ¶
Create a new Channel with the supplied Channel name. Returns a pointer to that Channel.
func (*Channel) ContainsHandlers ¶
Check if the Channel has any registered subscribers
func (*Channel) IsGalactic ¶
Returns true is the Channel is marked as galactic
func (*Channel) SetGalactic ¶
Mark the Channel as galactic
func (*Channel) SetPrivate ¶
Mark the Channel as private
type ChannelManager ¶
type ChannelManager interface { CreateChannel(channelName string) *Channel DestroyChannel(channelName string) CheckChannelExists(channelName string) bool GetChannel(channelName string) (*Channel, error) GetAllChannels() map[string]*Channel SubscribeChannelHandler(channelName string, fn MessageHandlerFunction, runOnce bool) (*uuid.UUID, error) UnsubscribeChannelHandler(channelName string, id *uuid.UUID) error WaitForChannel(channelName string) error MarkChannelAsGalactic(channelName string, brokerDestination string, connection bridge.Connection) (err error) MarkChannelAsLocal(channelName string) (err error) }
ChannelManager interfaces controls all access to channels vis the bus.
func NewBusChannelManager ¶
func NewBusChannelManager(bus EventBus) ChannelManager
type EndpointConfig ¶
type EndpointConfig struct { // Prefix for public topics e.g. "/topic" TopicPrefix string // Prefix for user queues e.g. "/user/queue" UserQueuePrefix string // Prefix used for public application requests e.g. "/pub" AppRequestPrefix string // Prefix used for "private" application requests e.g. "/pub/queue" // Requests sent to destinations prefixed with the AppRequestQueuePrefix // should generate responses sent to single client queue. // E.g. if a client sends a request to the "/pub/queue/sample-channel" destination // the application should sent the response only to this client on the // "/user/queue/sample-channel" destination. // This behavior will mimic the Spring SimpleMessageBroker implementation. AppRequestQueuePrefix string Heartbeat int64 }
type EventBus ¶
type EventBus interface { GetId() *uuid.UUID GetChannelManager() ChannelManager SendRequestMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error SendResponseMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error SendBroadcastMessage(channelName string, payload interface{}) error SendErrorMessage(channelName string, err error, destinationId *uuid.UUID) error ListenStream(channelName string) (MessageHandler, error) ListenStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error) ListenFirehose(channelName string) (MessageHandler, error) ListenRequestStream(channelName string) (MessageHandler, error) ListenRequestStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error) ListenRequestOnce(channelName string) (MessageHandler, error) ListenRequestOnceForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error) ListenOnce(channelName string) (MessageHandler, error) ListenOnceForDestination(channelName string, destId *uuid.UUID) (MessageHandler, error) RequestOnce(channelName string, payload interface{}) (MessageHandler, error) RequestOnceForDestination(channelName string, payload interface{}, destId *uuid.UUID) (MessageHandler, error) RequestStream(channelName string, payload interface{}) (MessageHandler, error) RequestStreamForDestination(channelName string, payload interface{}, destId *uuid.UUID) (MessageHandler, error) ConnectBroker(config *bridge.BrokerConnectorConfig) (conn bridge.Connection, err error) StartFabricEndpoint(connectionListener stompserver.RawConnectionListener, config EndpointConfig) error StopFabricEndpoint() error GetStoreManager() StoreManager CreateSyncTransaction() BusTransaction CreateAsyncTransaction() BusTransaction AddMonitorEventListener(listener MonitorEventHandler, eventTypes ...MonitorEventType) MonitorEventListenerId RemoveMonitorEventListener(listenerId MonitorEventListenerId) SendMonitorEvent(evtType MonitorEventType, entityName string, data interface{}) }
EventBus provides access to ChannelManager, simple message sending and simple API calls for handling messaging and error handling over channels on the bus.
func NewEventBusInstance ¶
func NewEventBusInstance() EventBus
type FabricEndpoint ¶
type FabricEndpoint interface { Start() Stop() }
type MessageErrorFunction ¶
type MessageErrorFunction func(error)
Signature used for all functions used on bus stream APIs to Handle errors.
type MessageHandler ¶
type MessageHandler interface { GetId() *uuid.UUID GetDestinationId() *uuid.UUID Handle(successHandler MessageHandlerFunction, errorHandler MessageErrorFunction) Fire() error Close() }
MessageHandler provides access to the ID the handler is listening for from all messages It also provides a Handle method that accepts a success and error function as handlers. The Fire method will fire the message queued when using RequestOnce or RequestStream
type MessageHandlerFunction ¶
Signature used for all functions used on bus stream APIs to Handle messages.
type MonitorEvent ¶
type MonitorEvent struct { // Type of the event EventType MonitorEventType // The name of the channel or the store related to this event EntityName string // Optional event data Data interface{} }
func NewMonitorEvent ¶
func NewMonitorEvent(evtType MonitorEventType, entityName string, data interface{}) *MonitorEvent
Create a new monitor event
type MonitorEventHandler ¶
type MonitorEventHandler func(event *MonitorEvent)
type MonitorEventListenerId ¶
type MonitorEventListenerId int
type MonitorEventType ¶
type MonitorEventType int32
const ( ChannelCreatedEvt MonitorEventType = iota ChannelDestroyedEvt ChannelSubscriberJoinedEvt ChannelSubscriberLeftEvt StoreCreatedEvt StoreDestroyedEvt StoreInitializedEvt BrokerSubscribedEvt BrokerUnsubscribedEvt FabricEndpointSubscribeEvt FabricEndpointUnsubscribeEvt )
type MutationRequest ¶
type MutationRequest struct { Request interface{} RequestType interface{} SuccessHandler func(interface{}) ErrorHandler func(interface{}) }
type MutationRequestHandlerFunction ¶
type MutationRequestHandlerFunction func(mutationReq *MutationRequest)
type MutationStoreStream ¶
type MutationStoreStream interface { // Subscribe to the mutation requests stream. Subscribe(handler MutationRequestHandlerFunction) error // Unsubscribe from the stream. Unsubscribe() error }
Interface for subscribing for mutation requests
type StompSessionEvent ¶ added in v1.3.0
type StompSessionEvent struct { Id string EventType stompserver.StompSessionEventType }
type StoreChange ¶
type StoreChange struct { Id string // the id of the updated item Value interface{} // the updated value of the item State interface{} // state associated with this change IsDeleteChange bool // true if the item was removed from the store StoreVersion int64 // the store's version when this change was made }
Describes a single store item change
type StoreChangeHandlerFunction ¶
type StoreChangeHandlerFunction func(change *StoreChange)
type StoreManager ¶
type StoreManager interface { // Create a new Store, if the store already exists, then it will be returned. CreateStore(name string) BusStore // Create a new Store and use the itemType to deserialize item values when handling // incoming UpdateStoreRequest. If the store already exists, the method will return // the existing store instance. CreateStoreWithType(name string, itemType reflect.Type) BusStore // Get a reference to the existing store. Returns nil if the store doesn't exist. GetStore(name string) BusStore // Deletes a store. DestroyStore(name string) bool // Configure galactic store sync channel for a given connection. // Should be called before OpenGalacticStore() and OpenGalacticStoreWithItemType() APIs. ConfigureStoreSyncChannel(conn bridge.Connection, topicPrefix string, pubPrefix string) error // Open new galactic store OpenGalacticStore(name string, conn bridge.Connection) (BusStore, error) // Open new galactic store and deserialize items from server to itemType OpenGalacticStoreWithItemType(name string, conn bridge.Connection, itemType reflect.Type) (BusStore, error) }
StoreManager interface controls all access to BusStores
type StoreStream ¶
type StoreStream interface { // Subscribe to the store changes stream. Subscribe(handler StoreChangeHandlerFunction) error // Unsubscribe from the stream. Unsubscribe() error }
Interface for subscribing for store changes