Documentation ¶
Index ¶
- Constants
- Variables
- func ConnectToQueueManager(mqconfig conf.MQConfig) (*ibmmq.MQQueueManager, error)
- type Bin
- type BridgeConnector
- type BridgeServer
- func (bridge *BridgeServer) CheckNATS() bool
- func (bridge *BridgeServer) CheckStan() bool
- func (bridge *BridgeServer) ConnectorError(connector Connector, err error)
- func (bridge *BridgeServer) GetMonitoringRootURL() string
- func (bridge *BridgeServer) HandleHealthz(w http.ResponseWriter, r *http.Request)
- func (bridge *BridgeServer) HandleRoot(w http.ResponseWriter, r *http.Request)
- func (bridge *BridgeServer) HandleVarz(w http.ResponseWriter, r *http.Request)
- func (bridge *BridgeServer) LoadConfig(config conf.BridgeConfig) error
- func (bridge *BridgeServer) LoadConfigFile(configFile string) error
- func (bridge *BridgeServer) Logger() logging.Logger
- func (bridge *BridgeServer) MQToNATSMessage(mqmd *ibmmq.MQMD, handle ibmmq.MQMessageHandle, data []byte, length int, ...) ([]byte, string, error)
- func (bridge *BridgeServer) NATS() *nats.Conn
- func (bridge *BridgeServer) NATSToMQMessage(data []byte, replyTo string, qmgr *ibmmq.MQQueueManager) (*ibmmq.MQMD, ibmmq.MQMessageHandle, []byte, error)
- func (bridge *BridgeServer) RegisterReplyInfo(desc string, config conf.ConnectorConfig)
- func (bridge *BridgeServer) SafeStats() BridgeStats
- func (bridge *BridgeServer) Stan() stan.Conn
- func (bridge *BridgeServer) Start() error
- func (bridge *BridgeServer) Stop()
- func (bridge *BridgeServer) StopMonitoring() error
- type BridgeStats
- type Connector
- func CreateConnector(config conf.ConnectorConfig, bridge *BridgeServer) (Connector, error)
- func NewNATS2QueueConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- func NewNATS2TopicConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- func NewQueue2NATSConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- func NewQueue2STANConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- func NewStan2QueueConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- func NewStan2TopicConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- func NewTopic2NATSConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- func NewTopic2StanConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
- type ConnectorStats
- func (stats *ConnectorStats) AddConnect()
- func (stats *ConnectorStats) AddDisconnect()
- func (stats *ConnectorStats) AddMessageIn(bytes int64)
- func (stats *ConnectorStats) AddMessageOut(bytes int64)
- func (stats *ConnectorStats) AddRequestTime(reqTime time.Duration)
- func (stats *ConnectorStats) UpdateQuintiles()
- type Histogram
- type MQTestServer
- type NATS2QueueConnector
- type NATS2TopicConnector
- type NATSCallback
- type Queue2NATSConnector
- type Queue2STANConnector
- type ShutdownCallback
- type Stan2QueueConnector
- type Stan2TopicConnector
- type TestEnv
- func (tbs *TestEnv) Close()
- func (tbs *TestEnv) GetMessageFromQueue(qName string, waitMillis int32) (*ibmmq.MQMD, *ibmmq.MQGMO, []byte, error)
- func (tbs *TestEnv) GetQueueManagerName() string
- func (tbs *TestEnv) PutMessageOnQueue(qName string, mqmd *ibmmq.MQMD, msgData []byte) error
- func (tbs *TestEnv) PutMessageOnTopic(topicName string, mqmd *ibmmq.MQMD, msgData []byte) error
- func (tbs *TestEnv) RestartMQ(useTLS bool) error
- func (tbs *TestEnv) RestartNATS(useTLS bool) error
- func (tbs *TestEnv) StartBridge(connections []conf.ConnectorConfig, useTLS bool) error
- func (tbs *TestEnv) StartNATSandStan(useTLS bool, port int, clusterID string, clientID string, ...) error
- func (tbs *TestEnv) StopBridge()
- func (tbs *TestEnv) StopNATS() error
- type Topic2NATSConnector
- type Topic2StanConnector
Constants ¶
const ( RootPath = "/" VarzPath = "/varz" HealthzPath = "/healthz" )
HTTP endpoints
Variables ¶
var EmptyHandle ibmmq.MQMessageHandle = ibmmq.MQMessageHandle{}
EmptyHandle is used when there is no message handle to pass in
Functions ¶
func ConnectToQueueManager ¶
func ConnectToQueueManager(mqconfig conf.MQConfig) (*ibmmq.MQQueueManager, error)
ConnectToQueueManager utility to connect to a queue manager from a configuration
Types ¶
type BridgeConnector ¶
BridgeConnector is the base type used for connectors so that they can share code
func (*BridgeConnector) CheckConnections ¶
func (mq *BridgeConnector) CheckConnections() error
CheckConnections is a no-op, designed for overriding This is called when nats or stan goes down the connector should return an error if it has to be shut down
func (*BridgeConnector) ID ¶
func (mq *BridgeConnector) ID() string
ID returns the id from the stats
func (*BridgeConnector) Shutdown ¶
func (mq *BridgeConnector) Shutdown() error
Shutdown is a no-op, designed for overriding
func (*BridgeConnector) Start ¶
func (mq *BridgeConnector) Start() error
Start is a no-op, designed for overriding
func (*BridgeConnector) Stats ¶
func (mq *BridgeConnector) Stats() ConnectorStats
Stats returns a copy of the current stats for this connector
func (*BridgeConnector) String ¶
func (mq *BridgeConnector) String() string
String returns the name passed into init
type BridgeServer ¶
type BridgeServer struct {
// contains filtered or unexported fields
}
BridgeServer represents the bridge server
func NewBridgeServer ¶
func NewBridgeServer() *BridgeServer
NewBridgeServer creates a new bridge server with a default logger
func (*BridgeServer) CheckNATS ¶
func (bridge *BridgeServer) CheckNATS() bool
CheckNATS returns true if the bridge is connected to nats
func (*BridgeServer) CheckStan ¶
func (bridge *BridgeServer) CheckStan() bool
CheckStan returns true if the bridge is connected to stan
func (*BridgeServer) ConnectorError ¶
func (bridge *BridgeServer) ConnectorError(connector Connector, err error)
ConnectorError is called by a connector if it has a failure that requires a reconnect
func (*BridgeServer) GetMonitoringRootURL ¶
func (bridge *BridgeServer) GetMonitoringRootURL() string
GetMonitoringRootURL returns the protocol://host:port for the monitoring server, useful for testing
func (*BridgeServer) HandleHealthz ¶
func (bridge *BridgeServer) HandleHealthz(w http.ResponseWriter, r *http.Request)
HandleHealthz returns status 200.
func (*BridgeServer) HandleRoot ¶
func (bridge *BridgeServer) HandleRoot(w http.ResponseWriter, r *http.Request)
HandleRoot will show basic info and links to others handlers.
func (*BridgeServer) HandleVarz ¶
func (bridge *BridgeServer) HandleVarz(w http.ResponseWriter, r *http.Request)
HandleVarz returns statistics about the server.
func (*BridgeServer) LoadConfig ¶
func (bridge *BridgeServer) LoadConfig(config conf.BridgeConfig) error
LoadConfig initialize the server's configuration to an existing config object, useful for tests Does not initialize the config at all, use DefaultBridgeConfig() to create a default config
func (*BridgeServer) LoadConfigFile ¶
func (bridge *BridgeServer) LoadConfigFile(configFile string) error
LoadConfigFile initialize the server's configuration from a file
func (*BridgeServer) Logger ¶
func (bridge *BridgeServer) Logger() logging.Logger
Logger hosts a shared logger
func (*BridgeServer) MQToNATSMessage ¶
func (bridge *BridgeServer) MQToNATSMessage(mqmd *ibmmq.MQMD, handle ibmmq.MQMessageHandle, data []byte, length int, qmgr *ibmmq.MQQueueManager) ([]byte, string, error)
MQToNATSMessage convert an incoming MQ message to a set of NATS bytes and a reply subject if the qmgr is nil, the return value is just the message body if the qmgr is not nil the message is encoded as a BridgeMessage The data array is always just bytes from MQ, and is not an encoded BridgeMessage Header fields that are byte arrays are trimmed, "\x00" removed, on conversion to BridgeMessage.Header
func (*BridgeServer) NATS ¶
func (bridge *BridgeServer) NATS() *nats.Conn
NATS hosts a shared nats connection for the connectors
func (*BridgeServer) NATSToMQMessage ¶
func (bridge *BridgeServer) NATSToMQMessage(data []byte, replyTo string, qmgr *ibmmq.MQQueueManager) (*ibmmq.MQMD, ibmmq.MQMessageHandle, []byte, error)
NATSToMQMessage decode an incoming nats message to an MQ message if the qmgr is nil, data is considered to just be a message body if the qmgr is not nil the message is treated as an encoded BridgeMessage The returned byte array just bytes from MQ, and is not an encoded BridgeMessage Header fields that are byte arrays are padded, "\x00" added, on conversion from BridgeMessage.Header
func (*BridgeServer) RegisterReplyInfo ¶
func (bridge *BridgeServer) RegisterReplyInfo(desc string, config conf.ConnectorConfig)
RegisterReplyInfo tracks incoming descriptions so that reply to values can be mapped correctly
func (*BridgeServer) SafeStats ¶
func (bridge *BridgeServer) SafeStats() BridgeStats
SafeStats grabs the lock then calls stats(), useful for tests
func (*BridgeServer) Stan ¶
func (bridge *BridgeServer) Stan() stan.Conn
Stan hosts a shared streaming connection for the connectors
func (*BridgeServer) Start ¶
func (bridge *BridgeServer) Start() error
Start the server, will lock the server, assumes the config is loaded
func (*BridgeServer) StopMonitoring ¶
func (bridge *BridgeServer) StopMonitoring() error
StopMonitoring shuts down the http server used for monitoring expects the lock to be held
type BridgeStats ¶
type BridgeStats struct { StartTime int64 `json:"start_time"` ServerTime int64 `json:"current_time"` UpTime string `json:"uptime"` Connections []ConnectorStats `json:"connectors"` HTTPRequests map[string]int64 `json:"http_requests"` }
BridgeStats wraps the current status of the bridge and all of its connectors
type Connector ¶
type Connector interface { Start() error Shutdown() error CheckConnections() error String() string ID() string Stats() ConnectorStats }
Connector is the abstraction for all of the bridge connector types
func CreateConnector ¶
func CreateConnector(config conf.ConnectorConfig, bridge *BridgeServer) (Connector, error)
CreateConnector builds a connector from the supplied configuration
func NewNATS2QueueConnector ¶
func NewNATS2QueueConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewNATS2QueueConnector create a nats to MQ connector
func NewNATS2TopicConnector ¶
func NewNATS2TopicConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewNATS2TopicConnector create a nats to MQ connector
func NewQueue2NATSConnector ¶
func NewQueue2NATSConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewQueue2NATSConnector create a new MQ to Stan connector
func NewQueue2STANConnector ¶
func NewQueue2STANConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewQueue2STANConnector create a new MQ to Stan connector
func NewStan2QueueConnector ¶
func NewStan2QueueConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewStan2QueueConnector create a new Stan to MQ connector
func NewStan2TopicConnector ¶
func NewStan2TopicConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewStan2TopicConnector create a new Stan to MQ connector
func NewTopic2NATSConnector ¶
func NewTopic2NATSConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewTopic2NATSConnector create a new MQ to Stan connector
func NewTopic2StanConnector ¶
func NewTopic2StanConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector
NewTopic2StanConnector create a new MQ to Stan connector
type ConnectorStats ¶
type ConnectorStats struct { Name string `json:"name"` ID string `json:"id"` Connected bool `json:"connected"` Connects int64 `json:"connects"` Disconnects int64 `json:"disconnects"` BytesIn int64 `json:"bytes_in"` BytesOut int64 `json:"bytes_out"` MessagesIn int64 `json:"msg_in"` MessagesOut int64 `json:"msg_out"` RequestCount int64 `json:"count"` MovingAverage float64 `json:"rma"` Quintile50 float64 `json:"q50"` Quintile75 float64 `json:"q75"` Quintile90 float64 `json:"q90"` Quintile95 float64 `json:"q95"` // contains filtered or unexported fields }
ConnectorStats captures the statistics for a single connector
func NewConnectorStats ¶
func NewConnectorStats() ConnectorStats
NewConnectorStats creates an empty stats, and initializes the request time histogram
func (*ConnectorStats) AddConnect ¶
func (stats *ConnectorStats) AddConnect()
AddConnect updates the reconnects field
func (*ConnectorStats) AddDisconnect ¶
func (stats *ConnectorStats) AddDisconnect()
AddDisconnect updates the disconnects field
func (*ConnectorStats) AddMessageIn ¶
func (stats *ConnectorStats) AddMessageIn(bytes int64)
AddMessageIn updates the messages in and bytes in fields
func (*ConnectorStats) AddMessageOut ¶
func (stats *ConnectorStats) AddMessageOut(bytes int64)
AddMessageOut updates the messages out and bytes out fields
func (*ConnectorStats) AddRequestTime ¶
func (stats *ConnectorStats) AddRequestTime(reqTime time.Duration)
AddRequestTime register a time, updating the request count, RMA and histogram For information on the running moving average, see https://en.wikipedia.org/wiki/Moving_average
func (*ConnectorStats) UpdateQuintiles ¶
func (stats *ConnectorStats) UpdateQuintiles()
UpdateQuintiles updates the quantile fields, these are not updated on each request to reduce the cost of tracking statistics
type Histogram ¶
type Histogram struct { Bins []Bin `json:"bins"` MaxBins int `json:"max"` Total uint64 `json:"total"` }
Histogram stores N bins using the streaming approximate histogram approach The histogram is not thread safe
func NewHistogram ¶
NewHistogram returns a new Histogram with a maximum of n bins.
There is no "optimal" bin count, but somewhere between 20 and 80 bins should be sufficient.
type MQTestServer ¶
type MQTestServer struct { QueueManager string CID string AppHostPort string WebHostPort string AppPort int }
MQTestServer is based on - https://ericchiang.github.io/post/testing-dbs-with-docker/ MQTestServer wraps an MQ server running in docker
func StartMQTestServer ¶
func StartMQTestServer(waitForStart time.Duration, useTLS bool, mqPort int) (*MQTestServer, *ibmmq.MQQueueManager, error)
StartMQTestServer creates a test db in docker
type NATS2QueueConnector ¶
type NATS2QueueConnector struct { BridgeConnector // contains filtered or unexported fields }
NATS2QueueConnector connects a NATS subject to an MQ queue
func (*NATS2QueueConnector) CheckConnections ¶
func (mq *NATS2QueueConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*NATS2QueueConnector) Shutdown ¶
func (mq *NATS2QueueConnector) Shutdown() error
Shutdown the connector
func (*NATS2QueueConnector) Start ¶
func (mq *NATS2QueueConnector) Start() error
Start the connector
type NATS2TopicConnector ¶
type NATS2TopicConnector struct { BridgeConnector // contains filtered or unexported fields }
NATS2TopicConnector connects a NATS subject to an MQ topic
func (*NATS2TopicConnector) CheckConnections ¶
func (mq *NATS2TopicConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*NATS2TopicConnector) Shutdown ¶
func (mq *NATS2TopicConnector) Shutdown() error
Shutdown the connector
func (*NATS2TopicConnector) Start ¶
func (mq *NATS2TopicConnector) Start() error
Start the connector
type NATSCallback ¶
NATSCallback used by mq-nats connectors in an MQ library callback The lock will be held by the caller!
type Queue2NATSConnector ¶
type Queue2NATSConnector struct { BridgeConnector // contains filtered or unexported fields }
Queue2NATSConnector connects an MQ queue to a NATS subject
func (*Queue2NATSConnector) CheckConnections ¶
func (mq *Queue2NATSConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Queue2NATSConnector) Shutdown ¶
func (mq *Queue2NATSConnector) Shutdown() error
Shutdown the connector
func (*Queue2NATSConnector) Start ¶
func (mq *Queue2NATSConnector) Start() error
Start the connector
type Queue2STANConnector ¶
type Queue2STANConnector struct { BridgeConnector // contains filtered or unexported fields }
Queue2STANConnector connects an MQ queue to a NATS subject
func (*Queue2STANConnector) CheckConnections ¶
func (mq *Queue2STANConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Queue2STANConnector) Shutdown ¶
func (mq *Queue2STANConnector) Shutdown() error
Shutdown the connector
func (*Queue2STANConnector) Start ¶
func (mq *Queue2STANConnector) Start() error
Start the connector
type ShutdownCallback ¶
type ShutdownCallback func() error
ShutdownCallback is returned when setting up a callback or polling so the connector can shut it down
type Stan2QueueConnector ¶
type Stan2QueueConnector struct { BridgeConnector // contains filtered or unexported fields }
Stan2QueueConnector connects a STAN channel to an MQ Queue
func (*Stan2QueueConnector) CheckConnections ¶
func (mq *Stan2QueueConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Stan2QueueConnector) Shutdown ¶
func (mq *Stan2QueueConnector) Shutdown() error
Shutdown the connector
func (*Stan2QueueConnector) Start ¶
func (mq *Stan2QueueConnector) Start() error
Start the connector
type Stan2TopicConnector ¶
type Stan2TopicConnector struct { BridgeConnector // contains filtered or unexported fields }
Stan2TopicConnector connects a STAN channel to an MQ Topic
func (*Stan2TopicConnector) CheckConnections ¶
func (mq *Stan2TopicConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Stan2TopicConnector) Shutdown ¶
func (mq *Stan2TopicConnector) Shutdown() error
Shutdown the connector
func (*Stan2TopicConnector) Start ¶
func (mq *Stan2TopicConnector) Start() error
Start the connector
type TestEnv ¶
type TestEnv struct { MQServer *MQTestServer QMgr *ibmmq.MQQueueManager // For bypassing the bridge connection GNATSD *ns.Server Stan *nss.StanServer NC *nats.Conn // for bypassing the bridge SC stan.Conn // for bypassing the bridge Bridge *BridgeServer Config *conf.BridgeConfig // contains filtered or unexported fields }
TestEnv encapsulate a bridge test environment
func StartTLSTestEnvironment ¶
func StartTLSTestEnvironment(connections []conf.ConnectorConfig) (*TestEnv, error)
StartTLSTestEnvironment calls StartTestEnvironmentInfrastructure followed by StartBridge, with TLS enabled
func StartTestEnvironment ¶
func StartTestEnvironment(connections []conf.ConnectorConfig) (*TestEnv, error)
StartTestEnvironment calls StartTestEnvironmentInfrastructure followed by StartBridge
func StartTestEnvironmentInfrastructure ¶
StartTestEnvironmentInfrastructure creates the MQMgr, Nats and streaming but does not start a bridge, you can use StartBridge to start a bridge afterward
func (*TestEnv) Close ¶
func (tbs *TestEnv) Close()
Close the bridge server and clean up the test environment
func (*TestEnv) GetMessageFromQueue ¶
func (tbs *TestEnv) GetMessageFromQueue(qName string, waitMillis int32) (*ibmmq.MQMD, *ibmmq.MQGMO, []byte, error)
GetMessageFromQueue uses the test environments extra connection to talk to the queue, bypassing the bridge's connection
func (*TestEnv) GetQueueManagerName ¶
GetQueueManagerName get the queue manager name for the test MQ server
func (*TestEnv) PutMessageOnQueue ¶
PutMessageOnQueue uses the test environments extra connection to talk to the queue, bypassing the bridge's connection
func (*TestEnv) PutMessageOnTopic ¶
PutMessageOnTopic uses the test environments extra connection to talk to the topic, bypassing the bridge's connection
func (*TestEnv) RestartNATS ¶
RestartNATS shuts down the NATS and stan server and then starts it again
func (*TestEnv) StartBridge ¶
func (tbs *TestEnv) StartBridge(connections []conf.ConnectorConfig, useTLS bool) error
StartBridge is the second half of StartTestEnvironment it is provided separately so that environment can be created before the bridge runs
type Topic2NATSConnector ¶
type Topic2NATSConnector struct { BridgeConnector // contains filtered or unexported fields }
Topic2NATSConnector connects an MQ queue to a NATS subject
func (*Topic2NATSConnector) CheckConnections ¶
func (mq *Topic2NATSConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Topic2NATSConnector) Shutdown ¶
func (mq *Topic2NATSConnector) Shutdown() error
Shutdown the connector
func (*Topic2NATSConnector) Start ¶
func (mq *Topic2NATSConnector) Start() error
Start the connector
type Topic2StanConnector ¶
type Topic2StanConnector struct { BridgeConnector // contains filtered or unexported fields }
Topic2StanConnector connects an MQ queue to a NATS channel
func (*Topic2StanConnector) CheckConnections ¶
func (mq *Topic2StanConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Topic2StanConnector) Shutdown ¶
func (mq *Topic2StanConnector) Shutdown() error
Shutdown the connector
func (*Topic2StanConnector) Start ¶
func (mq *Topic2StanConnector) Start() error
Start the connector