README
¶
审计中心
基础: 消息队列
package kafka_test
import (
"context"
"fmt"
"log"
"net"
"strconv"
"testing"
"github.com/segmentio/kafka-go"
)
func TestCreateTopic(t *testing.T) {
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
err = controllerConn.CreateTopics(kafka.TopicConfig{Topic: "topic-A", NumPartitions: 6, ReplicationFactor: 1})
if err != nil {
t.Fatal(err)
}
}
func TestPublishMessage(t *testing.T) {
// make a writer that produces to topic-A, using the least-bytes distribution
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
// NOTE: When Topic is not defined here, each Message must define it instead.
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
// The topic will be created if it is missing.
AllowAutoTopicCreation: false,
// 支持消息压缩
// Compression: kafka.Snappy,
// 支持TLS
// Transport: &kafka.Transport{
// TLS: &tls.Config{},
// }
}
err := w.WriteMessages(context.Background(),
kafka.Message{
// 支持 Writing to multiple topics
// NOTE: Each Message has Topic defined, otherwise an error is returned.
// Topic: "topic-A",
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
func TestConsumer(t *testing.T) {
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
// Consumer Groups, 不指定就是普通的一个Consumer
GroupID: "consumer-group-id",
// 可以指定Partition消费消息
// Partition: 0,
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 处理完消息后需要提交该消息已经消费完成, 消费者挂掉后保存消息消费的状态
// FetchMessage() / CommitMessages(ctx, m) 分段提交
// if err := r.CommitMessages(ctx, m); err != nil {
// log.Fatal("failed to commit messages:", err)
// }
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
如何在项目中使用与封装
Controller如何读取自定义配置
type impl struct {
col *mongo.Collection
log *zerolog.Logger
kr *go_kafka.Reader
event.UnimplementedServiceServer
ioc.ObjectImpl
// 事件存储在哪些topic上面 需要配置
// 因此下面是这个控制器实现的具体配置
// 这些配置什么适合加载: 通过ioc, 如果我们通过toml来配置, 配key叫什么: Name()
// # 关于event 模块的配置
// [event]
// group_id="maudit.group.id"
// topics=["maudit.event"]
GroupId string `toml:"group_id" json:"group_id" yaml:"group_id" env:"EVNET_GROUP_ID"`
Topics []string `toml:"topics" json:"topics" yaml:"topics" env:"EVNET_TOPICS" envSeparator:","`
}
kafka配置: ioc 管理这kafka配置
type Kafka struct {
Brokers []string `toml:"brokers" json:"brokers" yaml:"brokers" env:"KAFKA_BROKERS"`
ScramAlgorithm ScramAlgorithm `toml:"scram_algorithm" json:"scram_algorithm" yaml:"scram_algorithm" env:"KAFKA_SCRAM_ALGORITHM"`
UserName string `toml:"username" json:"username" yaml:"username" env:"KAFKA_USERNAME"`
Password string `toml:"password" json:"password" yaml:"password" env:"KAFKA_PASSWORD"`
mechanism sasl.Mechanism
ioc.ObjectImpl
}
# kafka没有开启认证
[kafka]
username=""
# 关于event 模块的配置
[event]
group_id="maudit.group.id"
topics=["maudit.event"]
在Init时 运行
// 在Init方法执行之前, 会自动加载配置
func (i *impl) Init() error {
i.log = logger.Sub(i.Name())
...
i.log.Debug().Msgf("group_id: %s, topics: %s", i.GroupId, i.Topics)
// 使用ioc提供的kafka的配置信息 初始化一个reader
i.kr = kafka.ConsumerGroup(i.GroupId, i.Topics)
// 没有启动消息消费
go i.ConsumerEvent()
return nil
}
问题处理
# clone git checkout v1.9.29
mcube: v1.9.29
把mcube库放到 workspace
go work use ./mcube
如何基于kafak封装 maudit中间件
编写中间件
func NewAuditor() restful.FilterFunction {
topic := os.Getenv("MAUDIT_EVENT_TOPIC")
if topic == "" {
topic = "maudit.event"
}
return (&auditor{
// 引入中间件后,通过配置环境变量接入
w: ioc_kafka.Producer(topic),
l: logger.Sub("auditor"),
}).GoRestfulAuthFunc
}
// 用于接入审计中心的中间件
type auditor struct {
// 集成kafka writer
w *kafka.Writer
l *zerolog.Logger
}
func (a *auditor) GoRestfulAuthFunc(
req *restful.Request,
resp *restful.Response,
next *restful.FilterChain) {
// 从路由当做获取
// 请求拦截, 权限检查
entry := endpoint.NewEntryFromRestRequest(req)
// 由于审计有性能开销
// 开启认证
// Metadata(label.Auth, label.Enable).
// 开启审计
// Metadata(label.Audit, label.Enable).
if entry.AuthEnable && entry.AuditLog {
// 从认证中间件后, 取消认证后的上下文
obj := req.Attribute(token.TOKEN_ATTRIBUTE_NAME)
if obj == nil {
return
}
tk := obj.(*token.Token)
event := &event.OperateEventData{
UserName: tk.Username,
ServiceName: application.App().AppName,
ResourceType: entry.Resource,
Action: entry.Labels["action"],
FeaturePath: entry.Path,
Request: pretty.ToJSON(req.Request),
Response: "{}",
}
err := a.w.WriteMessages(context.Background(),
kafka.Message{
Value: []byte(pretty.ToJSON(event)),
},
)
if err != nil {
a.l.Error().Msgf("write audit log error, %s", err)
}
}
}
测试中间件
使用审计中间件 让cmdb服务接入审计中心
- 应用添加中间件
// 启动之前配置钩子
// app 在启动的时候 能不能加载一种中间件逻辑, 比如认证中间件
application.App().HTTP.RouterBuildConfig.BeforeLoad = func(h http.Handler) {
// 断言 router
r := h.(*restful.Container)
// 添加中间件,接入到用户中心
r.Filter(middleware.RestfulServerInterceptor())
// 接入 审计中间
r.Filter(middlewares.NewAuditor())
}
- 开启接口审计
// 开启审计
Metadata(label.Audit, label.Enable).
- 启动服务测试接口
- mcenter
- maudit
- cmdb
请求cmdb的secrets 接口
GET /cmdb/api/v1/secret HTTP/1.1
Host: 127.0.0.1:8020
Authorization: Bearer mJRdh9W2DDNs9T7oDpz4OOC4
maudit就会收到应用的审计日志
2023-12-16T10:30:36+08:00 DEBUG event/impl/consumer.go:17 > message at topic/partition/offset maudit.event/0/17: = {
"session": "",
"account": "",
"user_name": "admin",
"user_type": "",
"user_domain": "",
"service_name": "cmdb",
"feature_path": "/cmdb/api/v1/secret/",
"resource_type": "secrets",
"action": "list",
"cost": 0,
"request": "\u0026{Method:GET URL:/cmdb/api/v1/secret Proto:HTTP/1.1 ProtoMajor:1 ProtoMinor:1 Header:map[Accept:[*/*] Accept-Encoding:[gzip, deflate,
br] Authorization:[Bearer mJRdh9W2DDNs9T7oDpz4OOC4] Cache-Control:[no-cache] Connection:[keep-alive] Content-Length:[81] Content-Type:[application/json]
Postman-Token:[2923558d-9931-48db-ad41-6d364a828eee] User-Agent:[PostmanRuntime/7.35.0]] Body:0xc000614300 GetBody:\u003cnil\u003e ContentLength:81 TransferEncoding:[] Close:false Host:127.0.0.1:8020 Form:map[] PostForm:map[] MultipartForm:\u003cnil\u003e Trailer:map[] RemoteAddr:127.0.0.1:50972 RequestURI:/cmdb/api/v1/secret TLS:\u003cnil\u003e Cancel:\u003cnil\u003e Response:\u003cnil\u003e ctx:0xc0003c6460}",
"response": "{}"
}
使用Exporter实现应用自定义监控
gorestful 框架开启 prom metrics接口
// 启动之前配置钩子
// app 在启动的时候 能不能加载一种中间件逻辑, 比如认证中间件
application.App().HTTP.RouterBuildConfig.BeforeLoad = func(h http.Handler) {
// 断言 router
r := h.(*restful.Container)
// 添加中间件,接入到用户中心
r.Filter(middleware.RestfulServerInterceptor())
// 使用gorestful 框架注册一个之定义 handler
ws := new(restful.WebService)
ws.Route(ws.GET("/metrics").To(func(r *restful.Request, w *restful.Response) {
// 基于标准库 包装了一层
promhttp.Handler().ServeHTTP(w, r.Request)
}))
r.Add(ws)
}
补充自定义采集器
记录 kafka 消息的输入 入库报错指标(当前有多少事件由于报错没有入库)
package impl
import "github.com/prometheus/client_golang/prometheus"
func NewEventCollect() *EventCollect {
return &EventCollect{
errCountDesc: prometheus.NewDesc(
"save_event_error_count",
"事件入库失败个数统计",
[]string{},
prometheus.Labels{"service": "maudit"},
),
}
}
// 收集事件指标的采集器
type EventCollect struct {
errCountDesc *prometheus.Desc
// 需要自己根据实践情况来维护这个变量
errCount int
}
func (c *EventCollect) Inc() {
c.errCount++
}
// 指标元数据注册
func (c *EventCollect) Describe(ch chan<- *prometheus.Desc) {
ch <- c.errCountDesc
}
// 指标的值的采集
func (c *EventCollect) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(c.errCountDesc, prometheus.GaugeValue, float64(c.errCount))
}
初始化时注册到默认注册表
// 将采集器注册到默认注册表
i.colector = NewEventCollect()
prometheus.MustRegister(i.colector)
采集器统计指标数据
for {
m, err := i.kr.FetchMessage(context.Background())
if err != nil {
i.log.Error().Msgf("consume event error, %s", err)
break
}
i.log.Debug().Msgf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
err = i.kr.CommitMessages(context.Background(), m)
if err != nil {
i.log.Error().Msgf("commit error, %s", err)
}
// 存储事件 i.SaveEvent(), 模拟存储失败报错
err = errors.New("save event error")
if err != nil {
i.colector.Inc()
}
}
接口数据
# HELP save_event_error_count 事件入库失败个数统计
# TYPE save_event_error_count gauge
save_event_error_count{service="maudit"} 1
Documentation
¶
There is no documentation for this package.
Click to show internal directories.
Click to hide internal directories.