bus

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2024 License: BSD-2-Clause Imports: 13 Imported by: 11

Documentation

Overview

Package bus contains all things bus.

Example (CreateBus)

Basic bus use.

package main

import ()

func main() {

}
Output:

Example (UsingGalacticChannels)
package main

import (
	"encoding/json"
	"fmt"
	"github.com/pb33f/ranch/bridge"
	"github.com/pb33f/ranch/bus"
	"github.com/pb33f/ranch/model"
	"log"
)

func main() {
	// get a pointer to the bus.
	b := bus.GetBus()

	// get a pointer to the channel manager
	cm := b.GetChannelManager()

	channel := "my-stream"
	cm.CreateChannel(channel)

	// create done signal
	var done = make(chan bool)

	// listen to stream of messages coming in on channel.
	h, err := b.ListenStream(channel)

	if err != nil {
		log.Panicf("unable to listen to channel stream, error: %e", err)
	}

	count := 0

	// listen for five messages and then exit, send a completed signal on channel.
	h.Handle(
		func(msg *model.Message) {

			// unmarshal the payload into a Response object (used by fabric services)
			r := &model.Response{}
			d := msg.Payload.([]byte)
			json.Unmarshal(d, &r)
			fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
			count++
			if count >= 5 {
				done <- true
			}
		},
		func(err error) {
			log.Panicf("error received on channel %e", err)
		})

	// create a broker connector config, in this case, we will connect to the application fabric demo endpoint.
	config := &bridge.BrokerConnectorConfig{
		Username:   "guest",
		Password:   "guest",
		ServerAddr: "appfabric.vmware.com",
		WebSocketConfig: &bridge.WebSocketConfig{
			WSPath: "/fabric",
		},
		UseWS: true}

	// connect to broker.
	c, err := b.ConnectBroker(config)
	if err != nil {
		log.Panicf("unable to connect to fabric, error: %e", err)
	}

	// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
	// running on appfabric.vmware.com
	err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
	if err != nil {
		log.Panicf("unable to map local channel to broker destination: %e", err)
	}

	// wait for done signal
	<-done

	// mark channel as local (unsubscribe from all mappings)
	err = cm.MarkChannelAsLocal(channel)
	if err != nil {
		log.Panicf("unable to unsubscribe, error: %e", err)
	}
	err = c.Disconnect()
	if err != nil {
		log.Panicf("unable to disconnect, error: %e", err)
	}
}
Output:

Index

Examples

Constants

View Source
const RANCH_INTERNAL_CHANNEL_PREFIX = "_ranchInternal/"
View Source
const (
	STOMP_SESSION_NOTIFY_CHANNEL = RANCH_INTERNAL_CHANNEL_PREFIX + "stomp-session-notify"
)

Variables

This section is empty.

Functions

func EnableLogging

func EnableLogging(enable bool)

Types

type BusStore

type BusStore interface {
	// Get the name (the id) of the store.
	GetName() string
	// Add new or updates existing item in the store.
	Put(id string, value interface{}, state interface{})
	// Returns an item from the store and a boolean flag
	// indicating whether the item exists
	Get(id string) (interface{}, bool)
	// Shorten version of the Get() method, returns only the item value.
	GetValue(id string) interface{}
	// Remove an item from the store. Returns true if the remove operation was successful.
	Remove(id string, state interface{}) bool
	// Return a slice containing all store items.
	AllValues() []interface{}
	// Return a map with all items from the store.
	AllValuesAsMap() map[string]interface{}
	// Return a map with all items from the store with the current store version.
	AllValuesAndVersion() (map[string]interface{}, int64)
	// Subscribe to state changes for a specific object.
	OnChange(id string, state ...interface{}) StoreStream
	// Subscribe to state changes for all objects
	OnAllChanges(state ...interface{}) StoreStream
	// Notify when the store has been initialize (via populate() or initialize()
	WhenReady(readyFunction func())
	// Populate the store with a map of items and their ID's.
	Populate(items map[string]interface{}) error
	// Mark the store as initialized and notify all watchers.
	Initialize()
	// Subscribe to mutation requests made via mutate() method.
	OnMutationRequest(mutationType ...interface{}) MutationStoreStream
	// Send a mutation request to any subscribers handling mutations.
	Mutate(request interface{}, requestType interface{},
		successHandler func(interface{}), errorHandler func(interface{}))
	// Removes all items from the store and change its state to uninitialized".
	Reset()
	// Returns true if this is galactic store.
	IsGalactic() bool
	// Get the item type if such is specified during the creation of the
	// store
	GetItemType() reflect.Type
}

BusStore is a stateful in memory cache for objects. All state changes (any time the cache is modified) will broadcast that updated object to any subscribers of the BusStore for those specific objects or all objects of a certain type and state changes.

type BusTransaction

type BusTransaction interface {
	// Sends a request to a channel as a part of this transaction.
	SendRequest(channel string, payload interface{}) error
	//  Wait for a store to be initialized as a part of this transaction.
	WaitForStoreReady(storeName string) error
	// Registers a new complete handler. Once all responses to requests have been received,
	// the transaction is complete.
	OnComplete(completeHandler BusTransactionReadyFunction) error
	// Register a new error handler. If an error is thrown by any of the responders, the transaction
	// is aborted and the error sent to the registered errorHandlers.
	OnError(errorHandler MessageErrorFunction) error
	// Commit the transaction, all requests will be sent and will wait for responses.
	// Once all the responses are in, onComplete handlers will be called with the responses.
	Commit() error
}

type BusTransactionReadyFunction

type BusTransactionReadyFunction func(responses []*model.Message)

type Channel

type Channel struct {
	Name string `json:"string"`
	// contains filtered or unexported fields
}

Channel represents the stream and the subscribed event handlers waiting for ticks on the stream

func NewChannel

func NewChannel(channelName string) *Channel

Create a new Channel with the supplied Channel name. Returns a pointer to that Channel.

func (*Channel) ContainsHandlers

func (channel *Channel) ContainsHandlers() bool

Check if the Channel has any registered subscribers

func (*Channel) IsGalactic

func (channel *Channel) IsGalactic() bool

Returns true is the Channel is marked as galactic

func (*Channel) IsPrivate

func (channel *Channel) IsPrivate() bool

Returns true if the Channel is marked as private

func (*Channel) Send

func (channel *Channel) Send(message *model.Message)

Send a new message on this Channel, to all event handlers.

func (*Channel) SetGalactic

func (channel *Channel) SetGalactic(mappedDestination string)

Mark the Channel as galactic

func (*Channel) SetLocal

func (channel *Channel) SetLocal()

Mark the Channel as local

func (*Channel) SetPrivate

func (channel *Channel) SetPrivate(private bool)

Mark the Channel as private

type ChannelManager

type ChannelManager interface {
	CreateChannel(channelName string) *Channel
	DestroyChannel(channelName string)
	CheckChannelExists(channelName string) bool
	GetChannel(channelName string) (*Channel, error)
	GetAllChannels() map[string]*Channel
	SubscribeChannelHandler(channelName string, fn MessageHandlerFunction, runOnce bool) (*uuid.UUID, error)
	UnsubscribeChannelHandler(channelName string, id *uuid.UUID) error
	WaitForChannel(channelName string) error
	MarkChannelAsGalactic(channelName string, brokerDestination string, connection bridge.Connection) (err error)
	MarkChannelAsLocal(channelName string) (err error)
}

ChannelManager interfaces controls all access to channels vis the bus.

func NewBusChannelManager

func NewBusChannelManager(bus EventBus) ChannelManager

type EndpointConfig

type EndpointConfig struct {
	// Prefix for public topics e.g. "/topic"
	TopicPrefix string
	// Prefix for user queues e.g. "/user/queue"
	UserQueuePrefix string
	// Prefix used for public application requests e.g. "/pub"
	AppRequestPrefix string
	// Prefix used for "private" application requests e.g. "/pub/queue"
	// Requests sent to destinations prefixed with the AppRequestQueuePrefix
	// should generate responses sent to single client queue.
	// E.g. if a client sends a request to the "/pub/queue/sample-channel" destination
	// the application should sent the response only to this client on the
	// "/user/queue/sample-channel" destination.
	// This behavior will mimic the Spring SimpleMessageBroker implementation.
	AppRequestQueuePrefix string
	Heartbeat             int64
}

type EventBus

type EventBus interface {
	GetId() *uuid.UUID
	GetChannelManager() ChannelManager
	SendRequestMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error
	SendResponseMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error
	SendBroadcastMessage(channelName string, payload interface{}) error
	SendErrorMessage(channelName string, err error, destinationId *uuid.UUID) error
	ListenStream(channelName string) (MessageHandler, error)
	ListenStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
	ListenFirehose(channelName string) (MessageHandler, error)
	ListenRequestStream(channelName string) (MessageHandler, error)
	ListenRequestStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
	ListenRequestOnce(channelName string) (MessageHandler, error)
	ListenRequestOnceForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
	ListenOnce(channelName string) (MessageHandler, error)
	ListenOnceForDestination(channelName string, destId *uuid.UUID) (MessageHandler, error)
	RequestOnce(channelName string, payload interface{}) (MessageHandler, error)
	RequestOnceForDestination(channelName string, payload interface{}, destId *uuid.UUID) (MessageHandler, error)
	RequestStream(channelName string, payload interface{}) (MessageHandler, error)
	RequestStreamForDestination(channelName string, payload interface{}, destId *uuid.UUID) (MessageHandler, error)
	ConnectBroker(config *bridge.BrokerConnectorConfig) (conn bridge.Connection, err error)
	StartFabricEndpoint(connectionListener stompserver.RawConnectionListener, config EndpointConfig) error
	StopFabricEndpoint() error
	GetStoreManager() StoreManager
	CreateSyncTransaction() BusTransaction
	CreateAsyncTransaction() BusTransaction
	AddMonitorEventListener(listener MonitorEventHandler, eventTypes ...MonitorEventType) MonitorEventListenerId
	RemoveMonitorEventListener(listenerId MonitorEventListenerId)
	SendMonitorEvent(evtType MonitorEventType, entityName string, data interface{})
}

EventBus provides access to ChannelManager, simple message sending and simple API calls for handling messaging and error handling over channels on the bus.

func GetBus

func GetBus() EventBus

Get a reference to the EventBus.

func NewEventBusInstance

func NewEventBusInstance() EventBus

func ResetBus

func ResetBus() EventBus

ResetBus destroys existing bus instance and creates a new one

type FabricEndpoint

type FabricEndpoint interface {
	Start()
	Stop()
}

type MessageErrorFunction

type MessageErrorFunction func(error)

Signature used for all functions used on bus stream APIs to Handle errors.

type MessageHandler

type MessageHandler interface {
	GetId() *uuid.UUID
	GetDestinationId() *uuid.UUID
	Handle(successHandler MessageHandlerFunction, errorHandler MessageErrorFunction)
	Fire() error
	Close()
}

MessageHandler provides access to the ID the handler is listening for from all messages It also provides a Handle method that accepts a success and error function as handlers. The Fire method will fire the message queued when using RequestOnce or RequestStream

type MessageHandlerFunction

type MessageHandlerFunction func(*model.Message)

Signature used for all functions used on bus stream APIs to Handle messages.

type MonitorEvent

type MonitorEvent struct {
	// Type of the event
	EventType MonitorEventType
	// The name of the channel or the store related to this event
	EntityName string
	// Optional event data
	Data interface{}
}

func NewMonitorEvent

func NewMonitorEvent(evtType MonitorEventType, entityName string, data interface{}) *MonitorEvent

Create a new monitor event

type MonitorEventHandler

type MonitorEventHandler func(event *MonitorEvent)

type MonitorEventListenerId

type MonitorEventListenerId int

type MonitorEventType

type MonitorEventType int32
const (
	ChannelCreatedEvt MonitorEventType = iota
	ChannelDestroyedEvt
	ChannelSubscriberJoinedEvt
	ChannelSubscriberLeftEvt
	StoreCreatedEvt
	StoreDestroyedEvt
	StoreInitializedEvt
	BrokerSubscribedEvt
	BrokerUnsubscribedEvt
	FabricEndpointSubscribeEvt
	FabricEndpointUnsubscribeEvt
)

type MutationRequest

type MutationRequest struct {
	Request        interface{}
	RequestType    interface{}
	SuccessHandler func(interface{})
	ErrorHandler   func(interface{})
}

type MutationRequestHandlerFunction

type MutationRequestHandlerFunction func(mutationReq *MutationRequest)

type MutationStoreStream

type MutationStoreStream interface {
	// Subscribe to the mutation requests stream.
	Subscribe(handler MutationRequestHandlerFunction) error
	// Unsubscribe from the stream.
	Unsubscribe() error
}

Interface for subscribing for mutation requests

type StompSessionEvent

type StompSessionEvent struct {
	Id        string
	EventType stompserver.StompSessionEventType
}

type StoreChange

type StoreChange struct {
	Id             string      // the id of the updated item
	Value          interface{} // the updated value of the item
	State          interface{} // state associated with this change
	IsDeleteChange bool        // true if the item was removed from the store
	StoreVersion   int64       // the store's version when this change was made
}

Describes a single store item change

type StoreChangeHandlerFunction

type StoreChangeHandlerFunction func(change *StoreChange)

type StoreManager

type StoreManager interface {
	// Create a new Store, if the store already exists, then it will be returned.
	CreateStore(name string) BusStore
	// Create a new Store and use the itemType to deserialize item values when handling
	// incoming UpdateStoreRequest. If the store already exists, the method will return
	// the existing store instance.
	CreateStoreWithType(name string, itemType reflect.Type) BusStore
	// Get a reference to the existing store. Returns nil if the store doesn't exist.
	GetStore(name string) BusStore
	// Deletes a store.
	DestroyStore(name string) bool
	// Configure galactic store sync channel for a given connection.
	// Should be called before OpenGalacticStore() and OpenGalacticStoreWithItemType() APIs.
	ConfigureStoreSyncChannel(conn bridge.Connection, topicPrefix string, pubPrefix string) error
	// Open new galactic store
	OpenGalacticStore(name string, conn bridge.Connection) (BusStore, error)
	// Open new galactic store and deserialize items from server to itemType
	OpenGalacticStoreWithItemType(name string, conn bridge.Connection, itemType reflect.Type) (BusStore, error)
}

StoreManager interface controls all access to BusStores

type StoreStream

type StoreStream interface {
	// Subscribe to the store changes stream.
	Subscribe(handler StoreChangeHandlerFunction) error
	// Unsubscribe from the stream.
	Unsubscribe() error
}

Interface for subscribing for store changes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL