core

package
v0.0.0-...-67cce4b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RootPath    = "/"
	VarzPath    = "/varz"
	HealthzPath = "/healthz"
)

HTTP endpoints

Variables

EmptyHandle is used when there is no message handle to pass in

Functions

func ConnectToQueueManager

func ConnectToQueueManager(mqconfig conf.MQConfig) (*ibmmq.MQQueueManager, error)

ConnectToQueueManager utility to connect to a queue manager from a configuration

Types

type Bin

type Bin struct {
	Value float64 `json:"v"`
	Count float64 `json:"c"`
}

Bin holds a float64 value and count

type BridgeConnector

type BridgeConnector struct {
	sync.Mutex
	// contains filtered or unexported fields
}

BridgeConnector is the base type used for connectors so that they can share code

func (*BridgeConnector) CheckConnections

func (mq *BridgeConnector) CheckConnections() error

CheckConnections is a no-op, designed for overriding This is called when nats or stan goes down the connector should return an error if it has to be shut down

func (*BridgeConnector) ID

func (mq *BridgeConnector) ID() string

ID returns the id from the stats

func (*BridgeConnector) Shutdown

func (mq *BridgeConnector) Shutdown() error

Shutdown is a no-op, designed for overriding

func (*BridgeConnector) Start

func (mq *BridgeConnector) Start() error

Start is a no-op, designed for overriding

func (*BridgeConnector) Stats

func (mq *BridgeConnector) Stats() ConnectorStats

Stats returns a copy of the current stats for this connector

func (*BridgeConnector) String

func (mq *BridgeConnector) String() string

String returns the name passed into init

type BridgeServer

type BridgeServer struct {
	// contains filtered or unexported fields
}

BridgeServer represents the bridge server

func NewBridgeServer

func NewBridgeServer() *BridgeServer

NewBridgeServer creates a new bridge server with a default logger

func (*BridgeServer) CheckNATS

func (bridge *BridgeServer) CheckNATS() bool

CheckNATS returns true if the bridge is connected to nats

func (*BridgeServer) CheckStan

func (bridge *BridgeServer) CheckStan() bool

CheckStan returns true if the bridge is connected to stan

func (*BridgeServer) ConnectorError

func (bridge *BridgeServer) ConnectorError(connector Connector, err error)

ConnectorError is called by a connector if it has a failure that requires a reconnect

func (*BridgeServer) GetMonitoringRootURL

func (bridge *BridgeServer) GetMonitoringRootURL() string

GetMonitoringRootURL returns the protocol://host:port for the monitoring server, useful for testing

func (*BridgeServer) HandleHealthz

func (bridge *BridgeServer) HandleHealthz(w http.ResponseWriter, r *http.Request)

HandleHealthz returns status 200.

func (*BridgeServer) HandleRoot

func (bridge *BridgeServer) HandleRoot(w http.ResponseWriter, r *http.Request)

HandleRoot will show basic info and links to others handlers.

func (*BridgeServer) HandleVarz

func (bridge *BridgeServer) HandleVarz(w http.ResponseWriter, r *http.Request)

HandleVarz returns statistics about the server.

func (*BridgeServer) LoadConfig

func (bridge *BridgeServer) LoadConfig(config conf.BridgeConfig) error

LoadConfig initialize the server's configuration to an existing config object, useful for tests Does not initialize the config at all, use DefaultBridgeConfig() to create a default config

func (*BridgeServer) LoadConfigFile

func (bridge *BridgeServer) LoadConfigFile(configFile string) error

LoadConfigFile initialize the server's configuration from a file

func (*BridgeServer) Logger

func (bridge *BridgeServer) Logger() logging.Logger

Logger hosts a shared logger

func (*BridgeServer) MQToNATSMessage

func (bridge *BridgeServer) MQToNATSMessage(mqmd *ibmmq.MQMD, handle ibmmq.MQMessageHandle, data []byte, length int, qmgr *ibmmq.MQQueueManager) ([]byte, string, error)

MQToNATSMessage convert an incoming MQ message to a set of NATS bytes and a reply subject if the qmgr is nil, the return value is just the message body if the qmgr is not nil the message is encoded as a BridgeMessage The data array is always just bytes from MQ, and is not an encoded BridgeMessage Header fields that are byte arrays are trimmed, "\x00" removed, on conversion to BridgeMessage.Header

func (*BridgeServer) NATS

func (bridge *BridgeServer) NATS() *nats.Conn

NATS hosts a shared nats connection for the connectors

func (*BridgeServer) NATSToMQMessage

func (bridge *BridgeServer) NATSToMQMessage(data []byte, replyTo string, qmgr *ibmmq.MQQueueManager) (*ibmmq.MQMD, ibmmq.MQMessageHandle, []byte, error)

NATSToMQMessage decode an incoming nats message to an MQ message if the qmgr is nil, data is considered to just be a message body if the qmgr is not nil the message is treated as an encoded BridgeMessage The returned byte array just bytes from MQ, and is not an encoded BridgeMessage Header fields that are byte arrays are padded, "\x00" added, on conversion from BridgeMessage.Header

func (*BridgeServer) RegisterReplyInfo

func (bridge *BridgeServer) RegisterReplyInfo(desc string, config conf.ConnectorConfig)

RegisterReplyInfo tracks incoming descriptions so that reply to values can be mapped correctly

func (*BridgeServer) SafeStats

func (bridge *BridgeServer) SafeStats() BridgeStats

SafeStats grabs the lock then calls stats(), useful for tests

func (*BridgeServer) Stan

func (bridge *BridgeServer) Stan() stan.Conn

Stan hosts a shared streaming connection for the connectors

func (*BridgeServer) Start

func (bridge *BridgeServer) Start() error

Start the server, will lock the server, assumes the config is loaded

func (*BridgeServer) Stop

func (bridge *BridgeServer) Stop()

Stop the bridge server

func (*BridgeServer) StopMonitoring

func (bridge *BridgeServer) StopMonitoring() error

StopMonitoring shuts down the http server used for monitoring expects the lock to be held

type BridgeStats

type BridgeStats struct {
	StartTime    int64            `json:"start_time"`
	ServerTime   int64            `json:"current_time"`
	UpTime       string           `json:"uptime"`
	Connections  []ConnectorStats `json:"connectors"`
	HTTPRequests map[string]int64 `json:"http_requests"`
}

BridgeStats wraps the current status of the bridge and all of its connectors

type Connector

type Connector interface {
	Start() error
	Shutdown() error

	CheckConnections() error

	String() string
	ID() string

	Stats() ConnectorStats
}

Connector is the abstraction for all of the bridge connector types

func CreateConnector

func CreateConnector(config conf.ConnectorConfig, bridge *BridgeServer) (Connector, error)

CreateConnector builds a connector from the supplied configuration

func NewNATS2QueueConnector

func NewNATS2QueueConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewNATS2QueueConnector create a nats to MQ connector

func NewNATS2TopicConnector

func NewNATS2TopicConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewNATS2TopicConnector create a nats to MQ connector

func NewQueue2NATSConnector

func NewQueue2NATSConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewQueue2NATSConnector create a new MQ to Stan connector

func NewQueue2STANConnector

func NewQueue2STANConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewQueue2STANConnector create a new MQ to Stan connector

func NewStan2QueueConnector

func NewStan2QueueConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewStan2QueueConnector create a new Stan to MQ connector

func NewStan2TopicConnector

func NewStan2TopicConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewStan2TopicConnector create a new Stan to MQ connector

func NewTopic2NATSConnector

func NewTopic2NATSConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewTopic2NATSConnector create a new MQ to Stan connector

func NewTopic2StanConnector

func NewTopic2StanConnector(bridge *BridgeServer, config conf.ConnectorConfig) Connector

NewTopic2StanConnector create a new MQ to Stan connector

type ConnectorStats

type ConnectorStats struct {
	Name          string  `json:"name"`
	ID            string  `json:"id"`
	Connected     bool    `json:"connected"`
	Connects      int64   `json:"connects"`
	Disconnects   int64   `json:"disconnects"`
	BytesIn       int64   `json:"bytes_in"`
	BytesOut      int64   `json:"bytes_out"`
	MessagesIn    int64   `json:"msg_in"`
	MessagesOut   int64   `json:"msg_out"`
	RequestCount  int64   `json:"count"`
	MovingAverage float64 `json:"rma"`
	Quintile50    float64 `json:"q50"`
	Quintile75    float64 `json:"q75"`
	Quintile90    float64 `json:"q90"`
	Quintile95    float64 `json:"q95"`
	// contains filtered or unexported fields
}

ConnectorStats captures the statistics for a single connector

func NewConnectorStats

func NewConnectorStats() ConnectorStats

NewConnectorStats creates an empty stats, and initializes the request time histogram

func (*ConnectorStats) AddConnect

func (stats *ConnectorStats) AddConnect()

AddConnect updates the reconnects field

func (*ConnectorStats) AddDisconnect

func (stats *ConnectorStats) AddDisconnect()

AddDisconnect updates the disconnects field

func (*ConnectorStats) AddMessageIn

func (stats *ConnectorStats) AddMessageIn(bytes int64)

AddMessageIn updates the messages in and bytes in fields

func (*ConnectorStats) AddMessageOut

func (stats *ConnectorStats) AddMessageOut(bytes int64)

AddMessageOut updates the messages out and bytes out fields

func (*ConnectorStats) AddRequestTime

func (stats *ConnectorStats) AddRequestTime(reqTime time.Duration)

AddRequestTime register a time, updating the request count, RMA and histogram For information on the running moving average, see https://en.wikipedia.org/wiki/Moving_average

func (*ConnectorStats) UpdateQuintiles

func (stats *ConnectorStats) UpdateQuintiles()

UpdateQuintiles updates the quantile fields, these are not updated on each request to reduce the cost of tracking statistics

type Histogram

type Histogram struct {
	Bins    []Bin  `json:"bins"`
	MaxBins int    `json:"max"`
	Total   uint64 `json:"total"`
}

Histogram stores N bins using the streaming approximate histogram approach The histogram is not thread safe

func NewHistogram

func NewHistogram(n int) *Histogram

NewHistogram returns a new Histogram with a maximum of n bins.

There is no "optimal" bin count, but somewhere between 20 and 80 bins should be sufficient.

func (*Histogram) Add

func (h *Histogram) Add(n float64)

Add a value to the histogram, creating a bucket if necessary

func (*Histogram) Count

func (h *Histogram) Count() float64

Count returns the total number of entries in the histogram

func (*Histogram) Mean

func (h *Histogram) Mean() float64

Mean returns the sample mean of the distribution

func (*Histogram) Quantile

func (h *Histogram) Quantile(q float64) float64

Quantile returns the value for the bin at the provided quantile This is "approximate" in the since that the bin may straddle the quantile value

func (*Histogram) Scale

func (h *Histogram) Scale(s float64)

Scale the buckets by s, this is useful for requests or other values that may be in large numbers ie nanoseconds

type MQTestServer

type MQTestServer struct {
	QueueManager string
	CID          string
	AppHostPort  string
	WebHostPort  string
	AppPort      int
}

MQTestServer is based on - https://ericchiang.github.io/post/testing-dbs-with-docker/ MQTestServer wraps an MQ server running in docker

func StartMQTestServer

func StartMQTestServer(waitForStart time.Duration, useTLS bool, mqPort int) (*MQTestServer, *ibmmq.MQQueueManager, error)

StartMQTestServer creates a test db in docker

func (*MQTestServer) Close

func (mq *MQTestServer) Close() error

Close a test db

type NATS2QueueConnector

type NATS2QueueConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

NATS2QueueConnector connects a NATS subject to an MQ queue

func (*NATS2QueueConnector) CheckConnections

func (mq *NATS2QueueConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*NATS2QueueConnector) Shutdown

func (mq *NATS2QueueConnector) Shutdown() error

Shutdown the connector

func (*NATS2QueueConnector) Start

func (mq *NATS2QueueConnector) Start() error

Start the connector

type NATS2TopicConnector

type NATS2TopicConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

NATS2TopicConnector connects a NATS subject to an MQ topic

func (*NATS2TopicConnector) CheckConnections

func (mq *NATS2TopicConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*NATS2TopicConnector) Shutdown

func (mq *NATS2TopicConnector) Shutdown() error

Shutdown the connector

func (*NATS2TopicConnector) Start

func (mq *NATS2TopicConnector) Start() error

Start the connector

type NATSCallback

type NATSCallback func(natsMsg []byte, replyTo string) error

NATSCallback used by mq-nats connectors in an MQ library callback The lock will be held by the caller!

type Queue2NATSConnector

type Queue2NATSConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Queue2NATSConnector connects an MQ queue to a NATS subject

func (*Queue2NATSConnector) CheckConnections

func (mq *Queue2NATSConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Queue2NATSConnector) Shutdown

func (mq *Queue2NATSConnector) Shutdown() error

Shutdown the connector

func (*Queue2NATSConnector) Start

func (mq *Queue2NATSConnector) Start() error

Start the connector

type Queue2STANConnector

type Queue2STANConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Queue2STANConnector connects an MQ queue to a NATS subject

func (*Queue2STANConnector) CheckConnections

func (mq *Queue2STANConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Queue2STANConnector) Shutdown

func (mq *Queue2STANConnector) Shutdown() error

Shutdown the connector

func (*Queue2STANConnector) Start

func (mq *Queue2STANConnector) Start() error

Start the connector

type ShutdownCallback

type ShutdownCallback func() error

ShutdownCallback is returned when setting up a callback or polling so the connector can shut it down

type Stan2QueueConnector

type Stan2QueueConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Stan2QueueConnector connects a STAN channel to an MQ Queue

func (*Stan2QueueConnector) CheckConnections

func (mq *Stan2QueueConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Stan2QueueConnector) Shutdown

func (mq *Stan2QueueConnector) Shutdown() error

Shutdown the connector

func (*Stan2QueueConnector) Start

func (mq *Stan2QueueConnector) Start() error

Start the connector

type Stan2TopicConnector

type Stan2TopicConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Stan2TopicConnector connects a STAN channel to an MQ Topic

func (*Stan2TopicConnector) CheckConnections

func (mq *Stan2TopicConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Stan2TopicConnector) Shutdown

func (mq *Stan2TopicConnector) Shutdown() error

Shutdown the connector

func (*Stan2TopicConnector) Start

func (mq *Stan2TopicConnector) Start() error

Start the connector

type TestEnv

type TestEnv struct {
	MQServer *MQTestServer
	QMgr     *ibmmq.MQQueueManager // For bypassing the bridge connection

	GNATSD *ns.Server
	Stan   *nss.StanServer

	NC *nats.Conn // for bypassing the bridge
	SC stan.Conn  // for bypassing the bridge

	Bridge *BridgeServer
	Config *conf.BridgeConfig
	// contains filtered or unexported fields
}

TestEnv encapsulate a bridge test environment

func StartTLSTestEnvironment

func StartTLSTestEnvironment(connections []conf.ConnectorConfig) (*TestEnv, error)

StartTLSTestEnvironment calls StartTestEnvironmentInfrastructure followed by StartBridge, with TLS enabled

func StartTestEnvironment

func StartTestEnvironment(connections []conf.ConnectorConfig) (*TestEnv, error)

StartTestEnvironment calls StartTestEnvironmentInfrastructure followed by StartBridge

func StartTestEnvironmentInfrastructure

func StartTestEnvironmentInfrastructure(useTLS bool) (*TestEnv, error)

StartTestEnvironmentInfrastructure creates the MQMgr, Nats and streaming but does not start a bridge, you can use StartBridge to start a bridge afterward

func (*TestEnv) Close

func (tbs *TestEnv) Close()

Close the bridge server and clean up the test environment

func (*TestEnv) GetMessageFromQueue

func (tbs *TestEnv) GetMessageFromQueue(qName string, waitMillis int32) (*ibmmq.MQMD, *ibmmq.MQGMO, []byte, error)

GetMessageFromQueue uses the test environments extra connection to talk to the queue, bypassing the bridge's connection

func (*TestEnv) GetQueueManagerName

func (tbs *TestEnv) GetQueueManagerName() string

GetQueueManagerName get the queue manager name for the test MQ server

func (*TestEnv) PutMessageOnQueue

func (tbs *TestEnv) PutMessageOnQueue(qName string, mqmd *ibmmq.MQMD, msgData []byte) error

PutMessageOnQueue uses the test environments extra connection to talk to the queue, bypassing the bridge's connection

func (*TestEnv) PutMessageOnTopic

func (tbs *TestEnv) PutMessageOnTopic(topicName string, mqmd *ibmmq.MQMD, msgData []byte) error

PutMessageOnTopic uses the test environments extra connection to talk to the topic, bypassing the bridge's connection

func (*TestEnv) RestartMQ

func (tbs *TestEnv) RestartMQ(useTLS bool) error

RestartMQ shuts down the MQ server and then starts it again

func (*TestEnv) RestartNATS

func (tbs *TestEnv) RestartNATS(useTLS bool) error

RestartNATS shuts down the NATS and stan server and then starts it again

func (*TestEnv) StartBridge

func (tbs *TestEnv) StartBridge(connections []conf.ConnectorConfig, useTLS bool) error

StartBridge is the second half of StartTestEnvironment it is provided separately so that environment can be created before the bridge runs

func (*TestEnv) StartNATSandStan

func (tbs *TestEnv) StartNATSandStan(useTLS bool, port int, clusterID string, clientID string, bridgeClientID string) error

StartNATSandStan starts up the nats and stan servers

func (*TestEnv) StopBridge

func (tbs *TestEnv) StopBridge()

StopBridge stops the bridge

func (*TestEnv) StopNATS

func (tbs *TestEnv) StopNATS() error

StopNATS shuts down the NATS and Stan servers

type Topic2NATSConnector

type Topic2NATSConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Topic2NATSConnector connects an MQ queue to a NATS subject

func (*Topic2NATSConnector) CheckConnections

func (mq *Topic2NATSConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Topic2NATSConnector) Shutdown

func (mq *Topic2NATSConnector) Shutdown() error

Shutdown the connector

func (*Topic2NATSConnector) Start

func (mq *Topic2NATSConnector) Start() error

Start the connector

type Topic2StanConnector

type Topic2StanConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Topic2StanConnector connects an MQ queue to a NATS channel

func (*Topic2StanConnector) CheckConnections

func (mq *Topic2StanConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Topic2StanConnector) Shutdown

func (mq *Topic2StanConnector) Shutdown() error

Shutdown the connector

func (*Topic2StanConnector) Start

func (mq *Topic2StanConnector) Start() error

Start the connector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL