Documentation ¶
Index ¶
- Constants
- Variables
- type Bin
- type BridgeConnector
- type BridgeStats
- type Connector
- func CreateConnector(config conf.ConnectorConfig, bridge *NATSKafkaBridge) (Connector, error)
- func NewJetStream2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
- func NewKafka2JetStreamConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
- func NewKafka2NATSConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
- func NewKafka2StanConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
- func NewNATS2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
- func NewStan2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
- type ConnectorStats
- type ConnectorStatsHolder
- func (stats *ConnectorStatsHolder) AddConnect()
- func (stats *ConnectorStatsHolder) AddDisconnect()
- func (stats *ConnectorStatsHolder) AddMessageIn(bytes int64)
- func (stats *ConnectorStatsHolder) AddMessageOut(bytes int64)
- func (stats *ConnectorStatsHolder) AddRequest(bytesIn int64, bytesOut int64, reqTime time.Duration)
- func (stats *ConnectorStatsHolder) AddRequestTime(reqTime time.Duration)
- func (stats *ConnectorStatsHolder) ID() string
- func (stats *ConnectorStatsHolder) Name() string
- func (stats *ConnectorStatsHolder) Stats() ConnectorStats
- type Flags
- type Histogram
- type JetStream2KafkaConnector
- type Kafka2JetStreamConnector
- type Kafka2NATSConnector
- type Kafka2StanConnector
- type NATS2KafkaConnector
- type NATSCallback
- type NATSKafkaBridge
- func (server *NATSKafkaBridge) ApplyConfigFile(configFile string) error
- func (server *NATSKafkaBridge) CheckJetStream() bool
- func (server *NATSKafkaBridge) CheckNATS() bool
- func (server *NATSKafkaBridge) CheckStan() bool
- func (server *NATSKafkaBridge) ConnectorError(connector Connector, err error)
- func (server *NATSKafkaBridge) FatalError(format string, args ...interface{})
- func (server *NATSKafkaBridge) GetMonitoringRootURL() string
- func (server *NATSKafkaBridge) HandleHealthz(w http.ResponseWriter, r *http.Request)
- func (server *NATSKafkaBridge) HandleRoot(w http.ResponseWriter, r *http.Request)
- func (server *NATSKafkaBridge) HandleVarz(w http.ResponseWriter, r *http.Request)
- func (server *NATSKafkaBridge) InitializeFromConfig(config conf.NATSKafkaBridgeConfig) error
- func (server *NATSKafkaBridge) InitializeFromFlags(flags Flags) error
- func (server *NATSKafkaBridge) JetStream() nats.JetStreamContext
- func (server *NATSKafkaBridge) Logger() logging.Logger
- func (server *NATSKafkaBridge) NATS() *nats.Conn
- func (server *NATSKafkaBridge) SafeStats() BridgeStats
- func (server *NATSKafkaBridge) Stan() stan.Conn
- func (server *NATSKafkaBridge) Start() error
- func (server *NATSKafkaBridge) Stop()
- func (server *NATSKafkaBridge) StopMonitoring() error
- type ShutdownCallback
- type Stan2KafkaConnector
Constants ¶
const ( RootPath = "/" VarzPath = "/varz" HealthzPath = "/healthz" )
HTTP endpoints
Variables ¶
var Version = "0.0-dev"
Version specifies the command version. This should be set at compile time.
Functions ¶
This section is empty.
Types ¶
type BridgeConnector ¶
BridgeConnector is the base type used for connectors so that they can share code The config, bridge and stats are all fixed at creation, so no lock is required on the connector at this level. The stats do keep a lock to protect their data. The connector has a lock for use by composing types to protect themselves during start/shutdown.
func (*BridgeConnector) CheckConnections ¶
func (conn *BridgeConnector) CheckConnections() error
CheckConnections is a no-op, designed for overriding This is called when nats or stan goes down the connector should return an error if it has to be shut down
func (*BridgeConnector) ID ¶
func (conn *BridgeConnector) ID() string
ID returns the id from the stats
func (*BridgeConnector) Shutdown ¶
func (conn *BridgeConnector) Shutdown() error
Shutdown is a no-op, designed for overriding
func (*BridgeConnector) Start ¶
func (conn *BridgeConnector) Start() error
Start is a no-op, designed for overriding
func (*BridgeConnector) Stats ¶
func (conn *BridgeConnector) Stats() ConnectorStats
Stats returns a copy of the current stats for this connector
func (*BridgeConnector) String ¶
func (conn *BridgeConnector) String() string
String returns the name passed into init
type BridgeStats ¶
type BridgeStats struct { StartTime int64 `json:"start_time"` ServerTime int64 `json:"current_time"` UpTime string `json:"uptime"` RequestCount int64 `json:"request_count"` Connections []ConnectorStats `json:"connectors"` HTTPRequests map[string]int64 `json:"http_requests"` }
BridgeStats wraps the current status of the bridge and all of its connectors
type Connector ¶
type Connector interface { Start() error Shutdown() error CheckConnections() error String() string ID() string Stats() ConnectorStats }
Connector is the abstraction for all of the bridge connector types
func CreateConnector ¶
func CreateConnector(config conf.ConnectorConfig, bridge *NATSKafkaBridge) (Connector, error)
CreateConnector builds a connector from the supplied configuration
func NewJetStream2KafkaConnector ¶ added in v1.0.0
func NewJetStream2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
NewJetStream2KafkaConnector create a new stan to kafka
func NewKafka2JetStreamConnector ¶ added in v1.0.0
func NewKafka2JetStreamConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
NewKafka2JetStreamConnector create a new Kafka to JetStream connector
func NewKafka2NATSConnector ¶
func NewKafka2NATSConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
NewKafka2NATSConnector create a new Kafka to NATS connector
func NewKafka2StanConnector ¶
func NewKafka2StanConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
NewKafka2StanConnector create a new Kafka to STAN connector
func NewNATS2KafkaConnector ¶
func NewNATS2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
NewNATS2KafkaConnector create a nats to MQ connector
func NewStan2KafkaConnector ¶
func NewStan2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector
NewStan2KafkaConnector create a new stan to kafka
type ConnectorStats ¶
type ConnectorStats struct { Name string `json:"name"` ID string `json:"id"` Connected bool `json:"connected"` Connects int64 `json:"connects"` Disconnects int64 `json:"disconnects"` BytesIn int64 `json:"bytes_in"` BytesOut int64 `json:"bytes_out"` MessagesIn int64 `json:"msg_in"` MessagesOut int64 `json:"msg_out"` RequestCount int64 `json:"count"` MovingAverage float64 `json:"rma"` Quintile50 float64 `json:"q50"` Quintile75 float64 `json:"q75"` Quintile90 float64 `json:"q90"` Quintile95 float64 `json:"q95"` }
ConnectorStats captures the statistics for a single connector times are in nanoseconds, use a holder to get the protection of a lock and to fill in the quantiles
type ConnectorStatsHolder ¶
ConnectorStatsHolder provides a lock and histogram for a connector to updated it's stats. The holder's Stats() method should be used to get the current values.
func NewConnectorStatsHolder ¶
func NewConnectorStatsHolder(name string, id string) *ConnectorStatsHolder
NewConnectorStatsHolder creates an empty stats holder, and initializes the request time histogram
func (*ConnectorStatsHolder) AddConnect ¶
func (stats *ConnectorStatsHolder) AddConnect()
AddConnect updates the reconnects field locks/unlocks the stats
func (*ConnectorStatsHolder) AddDisconnect ¶
func (stats *ConnectorStatsHolder) AddDisconnect()
AddDisconnect updates the disconnects field locks/unlocks the stats
func (*ConnectorStatsHolder) AddMessageIn ¶
func (stats *ConnectorStatsHolder) AddMessageIn(bytes int64)
AddMessageIn updates the messages in and bytes in fields locks/unlocks the stats
func (*ConnectorStatsHolder) AddMessageOut ¶
func (stats *ConnectorStatsHolder) AddMessageOut(bytes int64)
AddMessageOut updates the messages out and bytes out fields locks/unlocks the stats
func (*ConnectorStatsHolder) AddRequest ¶
func (stats *ConnectorStatsHolder) AddRequest(bytesIn int64, bytesOut int64, reqTime time.Duration)
AddRequest groups addMessageIn, addMessageOut and addRequest time into a single call to reduce locking requirements. locks/unlocks the stats
func (*ConnectorStatsHolder) AddRequestTime ¶
func (stats *ConnectorStatsHolder) AddRequestTime(reqTime time.Duration)
AddRequestTime register a time, updating the request count, RMA and histogram For information on the running moving average, see https://en.wikipedia.org/wiki/Moving_average locks/unlocks the stats
func (*ConnectorStatsHolder) ID ¶
func (stats *ConnectorStatsHolder) ID() string
ID returns the ID the holder was created with
func (*ConnectorStatsHolder) Name ¶
func (stats *ConnectorStatsHolder) Name() string
Name returns the name the holder was created with
func (*ConnectorStatsHolder) Stats ¶
func (stats *ConnectorStatsHolder) Stats() ConnectorStats
Stats updates the quantiles and returns a copy of the stats locks/unlocks the stats
type Flags ¶
Flags defines the various flags you can call the account server with. These are used in main and passed down to the server code to process.
type Histogram ¶
type Histogram struct { Bins []Bin `json:"bins"` MaxBins int `json:"max"` Total uint64 `json:"total"` }
Histogram stores N bins using the streaming approximate histogram approach The histogram is not thread safe
func NewHistogram ¶
NewHistogram returns a new Histogram with a maximum of n bins.
There is no "optimal" bin count, but somewhere between 20 and 80 bins should be sufficient.
type JetStream2KafkaConnector ¶ added in v1.0.0
type JetStream2KafkaConnector struct { BridgeConnector // contains filtered or unexported fields }
JetStream2KafkaConnector connects a JetStream stream to Kafka
func (*JetStream2KafkaConnector) CheckConnections ¶ added in v1.0.0
func (conn *JetStream2KafkaConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*JetStream2KafkaConnector) Shutdown ¶ added in v1.0.0
func (conn *JetStream2KafkaConnector) Shutdown() error
Shutdown the connector
func (*JetStream2KafkaConnector) Start ¶ added in v1.0.0
func (conn *JetStream2KafkaConnector) Start() error
Start the connector
type Kafka2JetStreamConnector ¶ added in v1.0.0
type Kafka2JetStreamConnector struct { BridgeConnector // contains filtered or unexported fields }
Kafka2JetStreamConnector connects Kafka topic to JetStream
func (*Kafka2JetStreamConnector) CheckConnections ¶ added in v1.0.0
func (conn *Kafka2JetStreamConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Kafka2JetStreamConnector) Shutdown ¶ added in v1.0.0
func (conn *Kafka2JetStreamConnector) Shutdown() error
Shutdown the connector
func (*Kafka2JetStreamConnector) Start ¶ added in v1.0.0
func (conn *Kafka2JetStreamConnector) Start() error
Start the connector
type Kafka2NATSConnector ¶
type Kafka2NATSConnector struct { BridgeConnector // contains filtered or unexported fields }
Kafka2NATSConnector connects Kafka topic to a NATS subject
func (*Kafka2NATSConnector) CheckConnections ¶
func (conn *Kafka2NATSConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Kafka2NATSConnector) Shutdown ¶
func (conn *Kafka2NATSConnector) Shutdown() error
Shutdown the connector
func (*Kafka2NATSConnector) Start ¶
func (conn *Kafka2NATSConnector) Start() error
Start the connector
type Kafka2StanConnector ¶
type Kafka2StanConnector struct { BridgeConnector // contains filtered or unexported fields }
Kafka2StanConnector connects Kafka topic to a nats streaming channel
func (*Kafka2StanConnector) CheckConnections ¶
func (conn *Kafka2StanConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Kafka2StanConnector) Shutdown ¶
func (conn *Kafka2StanConnector) Shutdown() error
Shutdown the connector
func (*Kafka2StanConnector) Start ¶
func (conn *Kafka2StanConnector) Start() error
Start the connector
type NATS2KafkaConnector ¶
type NATS2KafkaConnector struct { BridgeConnector // contains filtered or unexported fields }
NATS2KafkaConnector connects a NATS subject to a Kafka topic
func (*NATS2KafkaConnector) CheckConnections ¶
func (conn *NATS2KafkaConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*NATS2KafkaConnector) Shutdown ¶
func (conn *NATS2KafkaConnector) Shutdown() error
Shutdown the connector
func (*NATS2KafkaConnector) Start ¶
func (conn *NATS2KafkaConnector) Start() error
Start the connector
type NATSCallback ¶
NATSCallback used by conn-nats connectors in an conn library callback The lock will be held by the caller!
type NATSKafkaBridge ¶
NATSKafkaBridge is the core structure for the server.
func NewNATSKafkaBridge ¶
func NewNATSKafkaBridge() *NATSKafkaBridge
NewNATSKafkaBridge creates a new account server with a default logger
func (*NATSKafkaBridge) ApplyConfigFile ¶
func (server *NATSKafkaBridge) ApplyConfigFile(configFile string) error
ApplyConfigFile applies the config file to the server's config
func (*NATSKafkaBridge) CheckJetStream ¶ added in v1.0.0
func (server *NATSKafkaBridge) CheckJetStream() bool
CheckJetStream returns true if the bridge is connected to JetStream
func (*NATSKafkaBridge) CheckNATS ¶
func (server *NATSKafkaBridge) CheckNATS() bool
CheckNATS returns true if the bridge is connected to nats
func (*NATSKafkaBridge) CheckStan ¶
func (server *NATSKafkaBridge) CheckStan() bool
CheckStan returns true if the bridge is connected to stan
func (*NATSKafkaBridge) ConnectorError ¶
func (server *NATSKafkaBridge) ConnectorError(connector Connector, err error)
ConnectorError is called by a connector if it has a failure that requires a reconnect
func (*NATSKafkaBridge) FatalError ¶
func (server *NATSKafkaBridge) FatalError(format string, args ...interface{})
FatalError stops the server, prints the messages and exits
func (*NATSKafkaBridge) GetMonitoringRootURL ¶
func (server *NATSKafkaBridge) GetMonitoringRootURL() string
GetMonitoringRootURL returns the protocol://host:port for the monitoring server, useful for testing
func (*NATSKafkaBridge) HandleHealthz ¶
func (server *NATSKafkaBridge) HandleHealthz(w http.ResponseWriter, r *http.Request)
HandleHealthz returns status 200.
func (*NATSKafkaBridge) HandleRoot ¶
func (server *NATSKafkaBridge) HandleRoot(w http.ResponseWriter, r *http.Request)
HandleRoot will show basic info and links to others handlers.
func (*NATSKafkaBridge) HandleVarz ¶
func (server *NATSKafkaBridge) HandleVarz(w http.ResponseWriter, r *http.Request)
HandleVarz returns statistics about the server.
func (*NATSKafkaBridge) InitializeFromConfig ¶
func (server *NATSKafkaBridge) InitializeFromConfig(config conf.NATSKafkaBridgeConfig) error
InitializeFromConfig initialize the server's configuration to an existing config object, useful for tests Does not change the config at all, use DefaultServerConfig() to create a default config
func (*NATSKafkaBridge) InitializeFromFlags ¶
func (server *NATSKafkaBridge) InitializeFromFlags(flags Flags) error
InitializeFromFlags is called from main to configure the server, the server will decide what needs to happen based on the flags. On reload the same flags are passed
func (*NATSKafkaBridge) JetStream ¶ added in v1.0.0
func (server *NATSKafkaBridge) JetStream() nats.JetStreamContext
JetStream hosts a shared JetStream connection for the connectors
func (*NATSKafkaBridge) Logger ¶
func (server *NATSKafkaBridge) Logger() logging.Logger
Logger hosts a shared logger
func (*NATSKafkaBridge) NATS ¶
func (server *NATSKafkaBridge) NATS() *nats.Conn
NATS hosts a shared nats connection for the connectors
func (*NATSKafkaBridge) SafeStats ¶
func (server *NATSKafkaBridge) SafeStats() BridgeStats
SafeStats grabs the lock then calls stats(), useful for tests
func (*NATSKafkaBridge) Stan ¶
func (server *NATSKafkaBridge) Stan() stan.Conn
Stan hosts a shared streaming connection for the connectors
func (*NATSKafkaBridge) Start ¶
func (server *NATSKafkaBridge) Start() error
Start the server, will lock the server, assumes the config is loaded
func (*NATSKafkaBridge) StopMonitoring ¶
func (server *NATSKafkaBridge) StopMonitoring() error
StopMonitoring shuts down the http server used for monitoring expects the lock to be held
type ShutdownCallback ¶
type ShutdownCallback func() error
ShutdownCallback is returned when setting up a callback or polling so the connector can shut it down
type Stan2KafkaConnector ¶
type Stan2KafkaConnector struct { BridgeConnector // contains filtered or unexported fields }
Stan2KafkaConnector connects a STAN channel to Kafka
func (*Stan2KafkaConnector) CheckConnections ¶
func (conn *Stan2KafkaConnector) CheckConnections() error
CheckConnections ensures the nats/stan connection and report an error if it is down
func (*Stan2KafkaConnector) Shutdown ¶
func (conn *Stan2KafkaConnector) Shutdown() error
Shutdown the connector
func (*Stan2KafkaConnector) Start ¶
func (conn *Stan2KafkaConnector) Start() error
Start the connector