Documentation ¶
Index ¶
- Constants
- Variables
- func ListPods(ctx context.Context, kcli k8s.Client, labels map[string]string) ([]corev1.Pod, error)
- func StreamLogs(ctx context.Context, kcli k8s.Client, podsList []corev1.Pod, ...) error
- type IAMGrpcClient
- type InfraClient
- type ObservabilityArgs
- type ObservabilityLabel
- type PromMetricsType
- type PromOperator
- type PromValue
- type SessionStore
Constants ¶
View Source
const ( PromOperatorEqual = PromOperator("=") PromOperatorNotEqual = PromOperator("!=") PromOperatorMatchRegex = PromOperator("=~") PromOperatorNotMatchRegex = PromOperator("!~") )
Variables ¶
View Source
var Module = fx.Module( "app", fx.Provide( func(conn IAMGrpcClient) iam.IAMClient { return iam.NewIAMClient(conn) }, ), fx.Provide(func(conn InfraClient) infra.InfraClient { return infra.NewInfraClient(conn) }), fx.Invoke(func(infraCli infra.InfraClient, kcfg *rest.Config, iamCli iam.IAMClient, mux *http.ServeMux, sessStore SessionStore, ev *env.Env, logger logging.Logger) { sessionMiddleware := httpServer.NewReadSessionMiddlewareHandler(sessStore, constants.CookieName, constants.CacheSessionPrefix) loggingMiddleware := httpServer.NewLoggingMiddleware(logger) mux.HandleFunc("/observability/metrics/", loggingMiddleware(sessionMiddleware(func(w http.ResponseWriter, r *http.Request) { metricsType := strings.TrimPrefix(r.URL.Path, "/observability/metrics/") sess := httpServer.GetHttpSession[*common.AuthSession](r.Context()) if sess == nil { http.Error(w, "not logged in", http.StatusUnauthorized) return } m, ok := r.Context().Value("http-cookies").(map[string]string) if !ok { m = map[string]string{} } accountName := m[ev.AccountCookieName] if accountName == "" { http.Error(w, fmt.Sprintf("no cookie named '%s' present in request", ev.AccountCookieName), http.StatusBadRequest) return } clusterName := r.URL.Query().Get("cluster_name") if clusterName == "" { http.Error(w, "query param (cluster_name) must be provided", http.StatusBadRequest) return } trackingId := r.URL.Query().Get("tracking_id") if trackingId == "" { http.Error(w, "query param (tracking_id) must be provided", http.StatusBadRequest) } can, err := iamCli.Can(r.Context(), &iam.CanIn{ UserId: string(sess.UserId), ResourceRefs: []string{ iamT.NewResourceRef(accountName, iamT.ResourceAccount, accountName), }, Action: string(iamT.ReadMetrics), }) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if !can.Status { http.Error(w, errors.NewEf(err, "unauthorized to view metrics for resources belonging to account (%s)", accountName).Error(), http.StatusUnauthorized) return } st := r.URL.Query().Get("start_time") if st == "" { st = fmt.Sprintf("%d", time.Now().Add(-3*time.Hour).Unix()) } et := r.URL.Query().Get("end_time") if et == "" { et = fmt.Sprintf("%d", time.Now().Unix()) } step := r.URL.Query().Get("step") if step == "" { step = "15s" } k8sCli, err := func() (k8s.Client, error) { if strings.HasPrefix(trackingId, "clus-") { return k8s.NewClient(kcfg, nil) } return k8s.NewClient(&rest.Config{ Host: fmt.Sprintf("http://device-%s-pl.kl-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName), WrapTransport: func(rt http.RoundTripper) http.RoundTripper { return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{ "X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)}, }) }, }, nil) }() if err != nil { logger.Errorf(err, "failed to create k8s client") http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError) return } pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId}) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } podNames := make([]string, 0, len(pods)) for _, pod := range pods { podNames = append(podNames, pod.Name) } if err := queryProm(ev.PromHttpAddr, PromMetricsType(metricsType), map[string]PromValue{ "kl_account_name": {Operator: PromOperatorEqual, Value: accountName}, "kl_cluster_name": {Operator: PromOperatorEqual, Value: clusterName}, "kl_tracking_id": {Operator: PromOperatorEqual, Value: trackingId}, "pod_name": {Operator: PromOperatorMatchRegex, Value: fmt.Sprintf("^(%s)$", strings.Join(podNames, ","))}, }, st, et, step, w); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } }))) mux.HandleFunc("/observability/logs", loggingMiddleware(sessionMiddleware(func(w http.ResponseWriter, r *http.Request) { sess := httpServer.GetHttpSession[*common.AuthSession](r.Context()) if sess == nil { http.Error(w, "not logged in", http.StatusUnauthorized) return } m, ok := r.Context().Value("http-cookies").(map[string]string) if !ok { m = map[string]string{} } accountName := m[ev.AccountCookieName] if accountName == "" { http.Error(w, fmt.Sprintf("no cookie named '%s' present in request", ev.AccountCookieName), http.StatusBadRequest) return } clusterName := r.URL.Query().Get("cluster_name") trackingId := r.URL.Query().Get("tracking_id") k8sCli, err := func() (k8s.Client, error) { if strings.HasPrefix(trackingId, "clus-") { return k8s.NewClient(kcfg, nil) } return k8s.NewClient(&rest.Config{ Host: fmt.Sprintf("http://device-%s-pl.kl-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName), WrapTransport: func(rt http.RoundTripper) http.RoundTripper { return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{ "X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)}, }) }, }, nil) }() if err != nil { http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError) return } pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId}) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if len(pods) == 0 { logger.Infof("no pods found") http.Error(w, "no pods found", http.StatusTooEarly) return } closed := false go func() { for { if err := r.Context().Err(); err != nil { closed = true return } <-time.After(100 * time.Millisecond) } }() pr, pw := io.Pipe() go func() { b := bufio.NewReader(pr) for !closed { msg, err := b.ReadBytes('\n') if err != nil { if !errors.Is(err, io.EOF) { if !closed { http.Error(w, err.Error(), 500) } } return } fmt.Fprintf(w, "%s", msg) w.(http.Flusher).Flush() } }() if err := StreamLogs(r.Context(), k8sCli, pods, pw, logger); err != nil { http.Error(w, err.Error(), 500) } }))) }), )
Functions ¶
Types ¶
type IAMGrpcClient ¶
type InfraClient ¶
type ObservabilityArgs ¶
type ObservabilityArgs struct { AccountName string `json:"account_name"` ClusterName string `json:"cluster_name"` ResourceName string `json:"resource_name"` ResourceNamespace string `json:"resource_namespace"` ResourceType string `json:"resource_type"` WorkspaceName string `json:"workspace_name"` ProjectName string `json:"project_name"` JobName string `json:"job_name"` JobNamespace string `json:"job_namespace"` StartTime *time.Time `json:"start_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"` }
func (*ObservabilityArgs) Validate ¶
func (args *ObservabilityArgs) Validate() (bool, error)
type ObservabilityLabel ¶
type ObservabilityLabel string
const ( AccountName ObservabilityLabel = "kl_account_name" ClusterName ObservabilityLabel = "kl_cluster_name" ResourceName ObservabilityLabel = "kl_resource_name" ResourceType ObservabilityLabel = "kl_resource_type" ResourceNamespace ObservabilityLabel = "kl_resource_namespace" ResourceComponent ObservabilityLabel = "kl_resource_component" ProjectName ObservabilityLabel = "kl_project_name" ProjectTargetNamespace ObservabilityLabel = "kl_project_target_ns" WorkspaceName ObservabilityLabel = "kl_workspace_name" WorkspaceTargetNs ObservabilityLabel = "kl_workspace_target_ns" )
type PromMetricsType ¶
type PromMetricsType string
const ( Cpu PromMetricsType = "cpu" Memory PromMetricsType = "memory" NetworkRead PromMetricsType = "network-read" NetworkWrite PromMetricsType = "network-write" )
type PromOperator ¶
type PromOperator string
type PromValue ¶
type PromValue struct { Operator PromOperator // Value must be a VALID prometheus value suitable for the specified PromOperator Value any }
type SessionStore ¶
type SessionStore kv.Repo[*common.AuthSession]
Click to show internal directories.
Click to hide internal directories.