Documentation
¶
Index ¶
- Constants
- Variables
- func Alert(msg string)
- func AllMsgPayloads(prefix string, payloadSizes []string, numOfMsg int) ([][]byte, int)
- func BrokerTopicsQuery(brokerBaseURL, token string) ([]string, error)
- func BuildTenantsUsageThread()
- func ClearIncident(component string)
- func CloseOpsGenieAlert(component, alertID string, genieKey string) error
- func ConnectBrokerHealthcheckTopic(brokerURL, clusterName, pulsarURL string, tokenSupplier func() (string, error), ...)
- func CreateIncident(component, alias, msg, desc, priority string)
- func CreateOpsGenieAlert(msg Incident, genieKey string) error
- func CreatePDIncident(component, alias, msg, pdIntegrationKey string) error
- func EvaluateBrokers(urlPrefix, clusterName, pulsarURL string, tokenSupplier func() (string, error), ...) (int, error)
- func EvaluateClusterHealth(client *k8s.Client) error
- func FuncLatencyGaugeOpt() prometheus.GaugeOpts
- func GenPayload(prefix, size string) ([]byte, int)
- func GetBrokers(restBaseURL, clusterName string, tokenSupplier func() (string, error)) ([]string, error)
- func GetGaugeType(nameType string) prometheus.GaugeOpts
- func GetMessageID(prefix, str string) int
- func GetOfflinePodsCounter(subsystem string) prometheus.GaugeOpts
- func GetPulsarClient(globalConfiguration *Configuration, pulsarURL string, ...) (pulsar.Client, error)
- func HeartBeatToOpsGenie(genieURL, genieKey string) error
- func HeartbeatCounterOpt() prometheus.CounterOpts
- func MonitorK8sPulsarCluster() error
- func MonitorSites()
- func MsgLatencyGaugeOpt(typeName, desc string) prometheus.GaugeOpts
- func NumOfBytes(size string) int
- func OfflinePodGaugeOpt(subsystem, desc string) prometheus.GaugeOpts
- func PdV2Event(action, dedupKey, routingKey string, payload *pd.V2Payload) (*pd.V2EventResponse, error)
- func PromCounter(opt prometheus.CounterOpts, cluster string)
- func PromGauge(opt prometheus.GaugeOpts, cluster string, num float64)
- func PromGaugeInt(opt prometheus.GaugeOpts, cluster string, num int)
- func PromLatencySum(opt prometheus.GaugeOpts, cluster string, remoteCluster string, ...)
- func PubSubDowntimeGaugeOpt() prometheus.GaugeOpts
- func PubSubLatency(globalConfiguration *Configuration, clusterName string, topicConfig TopicCfg, ...) (map[string]PubSubResult, error)
- func PulsarAdminTenant(clusterURL string, tokenSupplier func() (string, error)) (int, error)
- func PulsarTenants()
- func PushToPrometheusProxy(proxyURL, authKey string) error
- func PushToPrometheusProxyThread()
- func ReadConfigFile(configFile string)
- func Remail(globalConfig *Configuration, topic TopicCfg)
- func RemailerThread()
- func RemoveIncident(component string)
- func ReportIncident(component, alias, msg, desc string, eval *AlertPolicyCfg) bool
- func ResolvePDIncident(component, alias, pdIntegrationKey string) error
- func RunInterval(fn monitorFunc, interval time.Duration)
- func SendSlackNotification(webhookURL string, msg SlackMessage) error
- func SiteLatencyGaugeOpt() prometheus.GaugeOpts
- func StartHeartBeat()
- func TenantsGaugeOpt() prometheus.GaugeOpts
- func TestBrokers(topicCfg TopicCfg) error
- func TestTopicLatency(topicCfg TopicCfg)
- func TestWsLatency(config WsConfig)
- func TopicLatencyTestThread()
- func UptimeHeartBeat()
- func VerboseAlert(component, message string, silenceWindow time.Duration)
- func WebSocketTopicLatencyTestThread()
- type AckMessage
- type AlertPolicyCfg
- type AlertVerbosity
- type AnalyticsCfg
- type BrokersCfg
- type ClusterHealth
- type Configuration
- type Incident
- type IncidentAlertPolicy
- type K8sClusterCfg
- type MsgResult
- type OpsClusterCfg
- type OpsGenieAlertCloseRequest
- type OpsGenieAlertCreateResponse
- type OpsGenieAlertGetResponse
- type OpsGenieCfg
- type PagerDutyCfg
- type Payload
- type PrometheusCfg
- type PubSubResult
- type PulsarAdminRESTCfg
- type PulsarMessage
- type ReceivingMessage
- type SiteCfg
- type SitesCfg
- type SlackCfg
- type SlackMessage
- type TenantUsageCfg
- type TopicCfg
- type WsConfig
Constants ¶
const ( // LogOnly only logs with no alert notification LogOnly = -1 * time.Second )
const (
// PrefixDelimiter for message prefix
PrefixDelimiter = "-"
)
Variables ¶
var ( // AllowedPriorities a list of allowed priorities AllowedPriorities = []string{"P1", "P2", "P3", "P4", "P5"} )
Functions ¶
func AllMsgPayloads ¶
AllMsgPayloads generates a series of payloads based on specified payload sizes or the number of messages
func BrokerTopicsQuery ¶
BrokerTopicsQuery returns a map of broker and topic full name, or error of this operation
func BuildTenantsUsageThread ¶
func BuildTenantsUsageThread()
BuildTenantsUsageThread is the daemon thread that builds last 30s tenants usage and expose to Prometheus metrics
func CloseOpsGenieAlert ¶
CloseOpsGenieAlert deletes an OpsGenie alert
func ConnectBrokerHealthcheckTopic ¶
func ConnectBrokerHealthcheckTopic(brokerURL, clusterName, pulsarURL string, tokenSupplier func() (string, error), completeChan chan error)
ConnectBrokerHealthcheckTopic reads the latest messages off broker's healthcheck topic
func CreateIncident ¶
func CreateIncident(component, alias, msg, desc, priority string)
CreateIncident creates incident
func CreateOpsGenieAlert ¶
CreateOpsGenieAlert creates an OpsGenie alert
func CreatePDIncident ¶
CreatePDIncident creates PagerDuty incident
func EvaluateBrokers ¶
func EvaluateBrokers(urlPrefix, clusterName, pulsarURL string, tokenSupplier func() (string, error), duration time.Duration) (int, error)
EvaluateBrokers evaluates all brokers' health
func EvaluateClusterHealth ¶
EvaluateClusterHealth evaluates and reports the k8s cluster health
func FuncLatencyGaugeOpt ¶
func FuncLatencyGaugeOpt() prometheus.GaugeOpts
FuncLatencyGaugeOpt is the description of Pulsar Function latency gauge
func GenPayload ¶
GenPayload generates an array of bytes with prefix string and payload size. If the specified payload size is less than the prefix size, the payload will just be the prefix.
func GetBrokers ¶
func GetBrokers(restBaseURL, clusterName string, tokenSupplier func() (string, error)) ([]string, error)
GetBrokers gets a list of brokers and ports
func GetGaugeType ¶
func GetGaugeType(nameType string) prometheus.GaugeOpts
GetGaugeType get the Prometheus Gauge Option based on type/subsystem
func GetMessageID ¶
GetMessageID returns the message index by parsing the template payload string with a prefix.
func GetOfflinePodsCounter ¶
func GetOfflinePodsCounter(subsystem string) prometheus.GaugeOpts
GetOfflinePodsCounter returns prometheus GaugeOpts for kubernetes cluster pod offline counter
func GetPulsarClient ¶
func GetPulsarClient(globalConfiguration *Configuration, pulsarURL string, tokenSupplier func() (string, error)) (pulsar.Client, error)
GetPulsarClient gets the pulsar client object Note: the caller has to Close() the client object
func HeartBeatToOpsGenie ¶
HeartBeatToOpsGenie send heart beat to ops genie
func HeartbeatCounterOpt ¶
func HeartbeatCounterOpt() prometheus.CounterOpts
HeartbeatCounterOpt is the description for heart beat counter
func MonitorK8sPulsarCluster ¶
func MonitorK8sPulsarCluster() error
MonitorK8sPulsarCluster start K8sPulsarClusterMonitor thread
func MsgLatencyGaugeOpt ¶
func MsgLatencyGaugeOpt(typeName, desc string) prometheus.GaugeOpts
MsgLatencyGaugeOpt is the description for Pulsar message latency gauge
func NumOfBytes ¶
NumOfBytes returns a number of bytes with specified size in MB or KB
func OfflinePodGaugeOpt ¶
func OfflinePodGaugeOpt(subsystem, desc string) prometheus.GaugeOpts
OfflinePodGaugeOpt is offline pods counter
func PdV2Event ¶
func PdV2Event(action, dedupKey, routingKey string, payload *pd.V2Payload) (*pd.V2EventResponse, error)
PdV2Event is pd client
func PromCounter ¶
func PromCounter(opt prometheus.CounterOpts, cluster string)
PromCounter registers counter and increment
func PromGauge ¶
func PromGauge(opt prometheus.GaugeOpts, cluster string, num float64)
PromGauge registers gauge reading
func PromGaugeInt ¶
func PromGaugeInt(opt prometheus.GaugeOpts, cluster string, num int)
PromGaugeInt registers gauge reading in integer
func PromLatencySum ¶
func PromLatencySum(opt prometheus.GaugeOpts, cluster string, remoteCluster string, latency time.Duration)
PromLatencySum expose monitoring metrics to Prometheus
func PubSubDowntimeGaugeOpt ¶
func PubSubDowntimeGaugeOpt() prometheus.GaugeOpts
PubSubDowntimeGaugeOpt is the description for downtime summary
func PubSubLatency ¶
func PubSubLatency(globalConfiguration *Configuration, clusterName string, topicConfig TopicCfg, tokenSupplier func() (string, error), msgPrefix string, payloads [][]byte, maxPayloadSize int) (map[string]PubSubResult, error)
PubSubLatency the latency including successful produce and consume of a message
func PulsarAdminTenant ¶
PulsarAdminTenant probes the tenant endpoint to get a list of tenants returns the number of tenants on the cluster
func PushToPrometheusProxy ¶
PushToPrometheusProxy pushes exp data to PrometheusProxy
func PushToPrometheusProxyThread ¶
func PushToPrometheusProxyThread()
PushToPrometheusProxyThread is the daemon thread that scrape and pushes metrics to prometheus proxy
func ReadConfigFile ¶
func ReadConfigFile(configFile string)
ReadConfigFile reads configuration file.
func Remail ¶
func Remail(globalConfig *Configuration, topic TopicCfg)
Remail consume all messages and referrals by adding the cluster name in the "ping-remailer" field of the message. If the message already contains this field or we are the sender, the message is simply discarded. To survive the loss of connection, the state of the client and consumer is checked on each iteration and if it was closed it is recreated.
func RemoveIncident ¶
func RemoveIncident(component string)
RemoveIncident removes an existing incident
func ReportIncident ¶
func ReportIncident(component, alias, msg, desc string, eval *AlertPolicyCfg) bool
ReportIncident reports an incident return bool indicate an incident is created or not.
func ResolvePDIncident ¶
ResolvePDIncident resolves PagerDuty incident
func RunInterval ¶
RunInterval runs interval
func SendSlackNotification ¶
func SendSlackNotification(webhookURL string, msg SlackMessage) error
SendSlackNotification will post to an 'Incoming Webook' url setup in Slack Apps. It accepts some text and the slack channel is saved within Slack.
func SiteLatencyGaugeOpt ¶
func SiteLatencyGaugeOpt() prometheus.GaugeOpts
SiteLatencyGaugeOpt is the description for hosting site latency gauge
func StartHeartBeat ¶
func StartHeartBeat()
StartHeartBeat starts heartbeat monitoring the program by OpsGenie
func TenantsGaugeOpt ¶
func TenantsGaugeOpt() prometheus.GaugeOpts
TenantsGaugeOpt is the description for rest api tenant counts
func TestBrokers ¶
TestBrokers evaluates and reports all brokers health
func TestTopicLatency ¶
func TestTopicLatency(topicCfg TopicCfg)
TestTopicLatency test generic message delivery in topics and the latency
func TestWsLatency ¶
func TestWsLatency(config WsConfig)
TestWsLatency test all clusters' websocket pub sub latency
func TopicLatencyTestThread ¶
func TopicLatencyTestThread()
TopicLatencyTestThread tests a message delivery in topic and measure the latency.
func VerboseAlert ¶
VerboseAlert is able to reduce the verbosity to Slack channel
func WebSocketTopicLatencyTestThread ¶
func WebSocketTopicLatencyTestThread()
WebSocketTopicLatencyTestThread tests a message websocket delivery in topic and measure the latency.
Types ¶
type AckMessage ¶
type AckMessage struct {
MessageID string `json:"messageId"`
}
AckMessage is the message struct to acknowledge a message
type AlertPolicyCfg ¶
type AlertPolicyCfg struct { // first evaluation to count continuous failure Ceiling int `json:"ceiling"` // Second evaluation for moving window MovingWindowSeconds int `json:"movingWindowSeconds"` CeilingInMovingWindow int `json:"ceilingInMovingWindow"` }
AlertPolicyCfg is a set of criteria to evaluation triggers for incident alert
type AlertVerbosity ¶
type AlertVerbosity struct {
// contains filtered or unexported fields
}
AlertVerbosity contains attributes required to calculate whether verbose alert is required or not
func (*AlertVerbosity) MustAlert ¶
func (av *AlertVerbosity) MustAlert() bool
MustAlert returns whether the silence window has expired since the last alert.
type AnalyticsCfg ¶
type AnalyticsCfg struct { APIKey string `json:"apiKey"` IngestionURL string `json:"ingestionUrl"` InsightsWriteKey string `json:"insightsWriteKey"` InsightsAccountID string `json:"insightsAccountId"` }
AnalyticsCfg is analytics usage and statistucs tracking configuration
type BrokersCfg ¶
type BrokersCfg struct { BrokerTestRequired bool `json:"brokerTestRequired"` InClusterRESTURL string `json:"inclusterRestURL"` IntervalSeconds int `json:"intervalSeconds"` AlertPolicy AlertPolicyCfg `json:"AlertPolicy"` }
BrokersCfg monitors all brokers in the cluster
type ClusterHealth ¶
type ClusterHealth struct { sync.RWMutex Status k8s.ClusterStatusCode MissingBrokers int }
ClusterHealth a cluster health struct
func (*ClusterHealth) Get ¶
func (h *ClusterHealth) Get() (k8s.ClusterStatusCode, int)
Get gets the cluster health status
func (*ClusterHealth) Set ¶
func (h *ClusterHealth) Set(status k8s.ClusterStatusCode, offlineBrokers int)
Set sets the cluster health status
type Configuration ¶
type Configuration struct { // Name is the Pulsar cluster name, it is mandatory Name string `json:"name"` // ClusterName is the Pulsar cluster name if the Name cannot be used as the Pulsar cluster name, optional ClusterName string `json:"clusterName"` TokenOAuthConfig *clientcredentials.Config `json:"tokenOAuthConfig"` // TokenFilePath is the file path to Pulsar JWT. It takes precedence of the token attribute. TokenFilePath string `json:"tokenFilePath"` // Token is a Pulsar JWT can be used for both client client or http admin client Token string `json:"token"` BrokersConfig BrokersCfg `json:"brokersConfig"` TrustStore string `json:"trustStore"` K8sConfig K8sClusterCfg `json:"k8sConfig"` AnalyticsConfig AnalyticsCfg `json:"analyticsConfig"` PrometheusConfig PrometheusCfg `json:"prometheusConfig"` SlackConfig SlackCfg `json:"slackConfig"` OpsGenieConfig OpsGenieCfg `json:"opsGenieConfig"` PagerDutyConfig PagerDutyCfg `json:"pagerDutyConfig"` PulsarAdminConfig PulsarAdminRESTCfg `json:"pulsarAdminRestConfig"` PulsarTopicConfig []TopicCfg `json:"pulsarTopicConfig"` SitesConfig SitesCfg `json:"sitesConfig"` WebSocketConfig []WsConfig `json:"webSocketConfig"` TenantUsageConfig TenantUsageCfg `json:"tenantUsageConfig"` // contains filtered or unexported fields }
Configuration - this server's configuration
var Config Configuration
Config - this server's configuration instance
func (*Configuration) Init ¶
func (c *Configuration) Init()
func (*Configuration) TokenSupplier ¶
func (c *Configuration) TokenSupplier() func() (string, error)
type Incident ¶
type Incident struct { Message string `json:"message"` Description string `json:"description"` Priority string `json:"priority"` Entity string `json:"entity"` Alias string `json:"alias"` Tags []string `json:"tags"` Timestamp time.Time `json:"timestamp"` }
Incident is the struct for incident reporting
func NewIncident ¶
NewIncident creates a Incident object
type IncidentAlertPolicy ¶
type IncidentAlertPolicy struct { Entity string Counters int EvalWindowSeconds time.Duration Alerts map[time.Time]bool LimitInWindow int Limit int LastUpdatedAt time.Time }
IncidentAlertPolicy tracks and reports incident when threshold is reached
type K8sClusterCfg ¶
type K8sClusterCfg struct { Enabled bool `json:"enabled"` PulsarNamespace string `json:"pulsarNamespace"` KubeConfigDir string `json:"kubeConfigDir"` AlertPolicy AlertPolicyCfg `json:"AlertPolicy"` }
K8sClusterCfg is configuration to monitor kubernete cluster only to be enabled in-cluster monitoring
type OpsClusterCfg ¶
type OpsClusterCfg struct { Name string `json:"name"` URL string `json:"url"` AlertPolicy AlertPolicyCfg `json:"alertPolicy"` }
OpsClusterCfg is each cluster's configuration
type OpsGenieAlertCloseRequest ¶
type OpsGenieAlertCloseRequest struct { User string `json:"user"` Source string `json:"source"` Note string `json:"note"` }
OpsGenieAlertCloseRequest is the POST request payload json
type OpsGenieAlertCreateResponse ¶
type OpsGenieAlertCreateResponse struct { Result string `json:"result"` Took float64 `json:"took"` RequestID string `json:"requestId"` }
OpsGenieAlertCreateResponse is the response struct returned by OpsGenie https://docs.opsgenie.com/docs/alert-api#section-create-alert
type OpsGenieAlertGetResponse ¶
type OpsGenieAlertGetResponse struct { Data alertGetData `json:"data"` Took float64 `json:"took"` RequestID string `json:"requestId"` }
OpsGenieAlertGetResponse is the response struct returned by OpsGenie https://docs.opsgenie.com/docs/alert-api#section-create-alert
type OpsGenieCfg ¶
type OpsGenieCfg struct { HeartBeatURL string `json:"heartbeatUrl"` HeartbeatKey string `json:"heartbeatKey"` AlertKey string `json:"alertKey"` IntervalSeconds int `json:"intervalSeconds"` }
OpsGenieCfg is opsGenie configuration
type PagerDutyCfg ¶
type PagerDutyCfg struct {
IntegrationKey string `json:"integrationKey"`
}
PagerDutyCfg is opsGenie configuration
type Payload ¶
type Payload struct { Ceiling int Floor int DefaultPayload []byte // to save time for large payload size generation }
Payload defines the payload size
func NewPayload ¶
NewPayload returns a new Payload object with a fixed payload size
func (*Payload) GenDefaultPayload ¶
GenDefaultPayload generates default payload size
func (Payload) PrefixDefaultPayload ¶
PrefixDefaultPayload creates string prefix in the payload
func (Payload) PrefixPayload ¶
PrefixPayload creates string prefix in the payload
type PrometheusCfg ¶
type PrometheusCfg struct { Port string `json:"port"` ExposeMetrics bool `json:"exposeMetrics"` PrometheusProxyURL string `json:"prometheusProxyURL"` PrometheusProxyAPIKey string `json:"prometheusProxyAPIKey"` }
PrometheusCfg configures Premetheus set up
type PubSubResult ¶
type PubSubResult struct { Latency time.Duration InOrderDelivery bool RemainingMessages int Success bool //TODO usefull ? }
PubSubResult give stats for one.
type PulsarAdminRESTCfg ¶
type PulsarAdminRESTCfg struct { Token string `json:"Token"` Clusters []OpsClusterCfg `json:"clusters"` IntervalSeconds int `json:"intervalSeconds"` }
PulsarAdminRESTCfg is for monitor a list of Pulsar cluster
type PulsarMessage ¶
type PulsarMessage struct { Payload string `json:"payload"` Properties map[string]interface{} `json:"properties"` Context string `json:"context,omitempty"` }
PulsarMessage is the required message format for Pulsar Websocket message
type ReceivingMessage ¶
type ReceivingMessage struct { Payload string `json:"payload"` MessageID string `json:"messageId"` Properties map[string]interface{} `json:"properties"` Context string `json:"context,omitempty"` }
ReceivingMessage is the Pulsar message for socket consumer
type SiteCfg ¶
type SiteCfg struct { Headers map[string]string `json:"headers"` URL string `json:"url"` Name string `json:"name"` IntervalSeconds int `json:"intervalSeconds"` ResponseSeconds int `json:"responseSeconds"` StatusCode int `json:"statusCode"` StatusCodeExpr string `json:"statusCodeExpr"` Retries int `json:"retries"` AlertPolicy AlertPolicyCfg `json:"alertPolicy"` }
SiteCfg configures general website
type SitesCfg ¶
type SitesCfg struct {
Sites []SiteCfg `json:"sites"`
}
SitesCfg configures a list of website`
type SlackMessage ¶
type SlackMessage struct { Channel string `json:"channel"` Text string `json:"text"` Username string `json:"username"` IconEmogi string `json:"icon_emogi"` }
SlackMessage is the message struct to be posted for Slack
type TenantUsageCfg ¶
type TenantUsageCfg struct { OutBytesLimit uint64 `json:"outBytesLimit"` AlertIntervalMinutes int `json:"alertIntervalMinutes"` }
TenantUsageCfg tenant usage reporting and monitoring
type TopicCfg ¶
type TopicCfg struct { Name string `json:"name"` ClusterName string `json:"clusterName"` // used for broker monitoring if specified Token string `json:"token"` TrustStore string `json:"trustStore"` NumberOfPartitions int `json:"numberOfPartitions"` LatencyBudgetMs int `json:"latencyBudgetMs"` PulsarURL string `json:"pulsarUrl"` AdminURL string `json:"adminUrl"` TopicName string `json:"topicName"` OutputTopic string `json:"outputTopic"` IntervalSeconds int `json:"intervalSeconds"` ExpectedMsg string `json:"expectedMsg"` PayloadSizes []string `json:"payloadSizes"` NumberOfMessages int `json:"numberOfMessages"` AlertPolicy AlertPolicyCfg `json:"AlertPolicy"` DowntimeTrackerDisabled bool `json:"downtimeTrackerDisabled"` RemoteClusters []string `json:"RemoteClusters"` }
TopicCfg is topic configuration
type WsConfig ¶
type WsConfig struct { Name string `json:"name"` Token string `json:"token"` Cluster string `json:"cluster"` // can be used for alert de-dupe LatencyBudgetMs int `json:"latencyBudgetMs"` ProducerURL string `json:"producerUrl"` ConsumerURL string `json:"consumerUrl"` TopicName string `json:"topicName"` IntervalSeconds int `json:"intervalSeconds"` Scheme string `json:"scheme"` Port string `json:"port"` Subscription string `json:"subscription"` URLQueryParams string `json:"urlQueryParams"` AlertPolicy AlertPolicyCfg `json:"AlertPolicy"` }
WsConfig is configuration to monitor WebSocket pub sub latency