Documentation ¶
Index ¶
- Constants
- Variables
- func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher
- type AccountsClient
- type ConsoleGrpcServer
- type DNSServer
- type IAMGrpcClient
- type InfraClient
- type MessageOfficeInternalClient
- type ObservabilityArgs
- type ObservabilityLabel
- type PromMetricsType
- type ResourceEventPublisherImpl
- func (r *ResourceEventPublisherImpl) PublishClusterManagedServiceEvent(ctx domain.ConsoleContext, msvcName string, resourceType entities.ResourceType, ...)
- func (r *ResourceEventPublisherImpl) PublishConsoleEvent(ctx domain.ConsoleContext, resourceType entities.ResourceType, name string, ...)
- func (r *ResourceEventPublisherImpl) PublishEnvironmentResourceEvent(ctx domain.ConsoleContext, envName string, resourceType entities.ResourceType, ...)
- func (r *ResourceEventPublisherImpl) PublishResourceEvent(ctx domain.ResourceContext, resourceType entities.ResourceType, name string, ...)
- type WebhookConsumer
Constants ¶
View Source
const (
DefaultDNSTTL = 5
)
Variables ¶
View Source
var Module = fx.Module("app", repos.NewFxMongoRepo[*entities.Environment]("environments", "env", entities.EnvironmentIndexes), repos.NewFxMongoRepo[*entities.App]("apps", "app", entities.AppIndexes), repos.NewFxMongoRepo[*entities.ExternalApp]("ext_apps", "extapp", entities.ExternalAppIndexes), repos.NewFxMongoRepo[*entities.Config]("configs", "cfg", entities.ConfigIndexes), repos.NewFxMongoRepo[*entities.Secret]("secrets", "scrt", entities.SecretIndexes), repos.NewFxMongoRepo[*entities.ManagedResource]("managed_resources", "mres", entities.MresIndexes), repos.NewFxMongoRepo[*entities.ImportedManagedResource]("imported_managed_resources", "impmres", entities.ImportedManagedResourceIndexes), repos.NewFxMongoRepo[*entities.Router]("routers", "rt", entities.RouterIndexes), repos.NewFxMongoRepo[*entities.ImagePullSecret]("image_pull_secrets", "ips", entities.ImagePullSecretIndexes), repos.NewFxMongoRepo[*entities.ResourceMapping]("resource_mappings", "rmap", entities.ResourceMappingIndices), repos.NewFxMongoRepo[*entities.ServiceBinding]("service_bindings", "svcb", entities.ServiceBindingIndexes), repos.NewFxMongoRepo[*entities.ClusterManagedService]("cluster_managed_services", "cmsvc", entities.ClusterManagedServiceIndices), repos.NewFxMongoRepo[*entities.RegistryImage]("registry_images", "reg_img", entities.RegistryImageIndexes), fx.Provide( func(conn IAMGrpcClient) iam.IAMClient { return iam.NewIAMClient(conn) }, ), fx.Provide(func(conn MessageOfficeInternalClient) platform_edge.PlatformEdgeClient { return platform_edge.NewPlatformEdgeClient(conn) }), fx.Provide(func(conn AccountsClient) accounts.AccountsClient { return accounts.NewAccountsClient(conn) }), fx.Provide(adapters.NewAccountsSvc), fx.Provide(func(cli infra.InfraClient) ports.InfraService { return infra_service.NewInfraService(cli) }), fx.Invoke( func(server httpServer.Server, d domain.Domain, sessionRepo kv.Repo[*common.AuthSession], ev *env.Env) { gqlConfig := generated.Config{Resolvers: &graph.Resolver{Domain: d, EnvVars: ev}} gqlConfig.Directives.IsLoggedIn = func(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } return next(context.WithValue(ctx, "user-session", sess)) } gqlConfig.Directives.IsLoggedInAndVerified = func(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } if !sess.UserVerified { return nil, &fiber.Error{ Code: fiber.StatusForbidden, Message: "user's email is not verified", } } return next(context.WithValue(ctx, "user-session", sess)) } gqlConfig.Directives.HasAccount = func(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } m := httpServer.GetHttpCookies(ctx) klAccount := m[ev.AccountCookieName] if klAccount == "" { return nil, errors.Newf("no cookie named %q present in request", ev.AccountCookieName) } nctx := context.WithValue(ctx, "user-session", sess) nctx = context.WithValue(nctx, "account-name", klAccount) return next(nctx) } schema := generated.NewExecutableSchema(gqlConfig) server.SetupGraphqlServer(schema, httpServer.NewReadSessionMiddleware(sessionRepo, constants.CookieName, constants.CacheSessionPrefix), ) }, ), fx.Provide(func(jc *nats.JetstreamClient, logger logging.Logger) domain.MessageDispatcher { return msg_nats.NewJetstreamProducer(jc) }), fx.Invoke(func(lf fx.Lifecycle, producer domain.MessageDispatcher) { lf.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return producer.Stop(ctx) }, }) }), fx.Provide( func(conn InfraClient) infra.InfraClient { return infra.NewInfraClient(conn) }, ), fx.Provide(func(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher { return NewResourceEventPublisher(cli, logger) }), domain.Module, fx.Provide(func(d domain.Domain, kcli k8s.Client) console.ConsoleServer { return newConsoleGrpcServer(d, kcli) }), fx.Invoke(func(gserver ConsoleGrpcServer, srv console.ConsoleServer) { console.RegisterConsoleServer(gserver, srv) }), fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (resource_updates_receiver.ErrorOnApplyConsumer, error) { topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ConsoleReceiver, common.EventErrorOnApply) consumerName := "console:error-on-apply" return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{ Stream: ev.NatsReceiveFromAgentStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer reads message from a subject dedicated to errors, that occurred when the resource was applied at the agent", FilterSubjects: []string{topic}, }, }) }), fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (resource_updates_receiver.ResourceUpdateConsumer, error) { topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ConsoleReceiver, common.EventResourceUpdate) consumerName := "console:resource-updates" return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{ Stream: ev.NatsReceiveFromAgentStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer reads message from a subject dedicated to console resource updates from tenant clusters", FilterSubjects: []string{topic}, }, }) }), resource_updates_receiver.Module, fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (WebhookConsumer, error) { topic := string(common.ImageRegistryHookTopicName) consumerName := "console:webhook" return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{ Stream: ev.EventsNatsStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer reads message from a subject dedicated to console registry webhooks", FilterSubjects: []string{topic}, }, }) }), fx.Invoke(func(lf fx.Lifecycle, consumer WebhookConsumer, d domain.Domain, logger logging.Logger) { lf.Append(fx.Hook{ OnStart: func(context.Context) error { go func() { err := processWebhooks(consumer, d, logger) if err != nil { logger.Errorf(err, "could not process webhooks") } }() return nil }, OnStop: func(ctx context.Context) error { return consumer.Stop(ctx) }, }) }), fx.Provide(func(svcBindingRepo repos.DbRepo[*entities.ServiceBinding]) domain.ServiceBindingDomain { return domain.NewSvcBindingDomain(svcBindingRepo) }), fx.Provide(func(logger *slog.Logger, sbd domain.ServiceBindingDomain, ev *env.Env) *dnsHandler { return &dnsHandler{ logger: logger, serviceBindingDomain: sbd, kloudliteDNSSuffix: ev.KloudliteDNSSuffix, } }), fx.Invoke(func(server *DNSServer, handler *dnsHandler, lf fx.Lifecycle, logger *slog.Logger) { lf.Append(fx.Hook{ OnStart: func(ctx context.Context) error { server.Handler = handler go func() { logger.Info("starting dns server", "at", server.Addr) err := server.ListenAndServe() if err != nil { logger.Error("failed to start dns server, got", "err", err) panic(err) } }() return nil }, OnStop: func(ctx context.Context) error { return server.ShutdownContext(ctx) }, }) }), )
Functions ¶
Types ¶
type AccountsClient ¶
type ConsoleGrpcServer ¶
type IAMGrpcClient ¶
type InfraClient ¶
type ObservabilityArgs ¶
type ObservabilityArgs struct { AccountName string `json:"account_name"` ClusterName string `json:"cluster_name"` ResourceName string `json:"resource_name"` ResourceNamespace string `json:"resource_namespace"` ResourceType string `json:"resource_type"` WorkspaceName string `json:"workspace_name"` ProjectName string `json:"project_name"` JobName string `json:"job_name"` JobNamespace string `json:"job_namespace"` StartTime *time.Time `json:"start_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"` }
func (*ObservabilityArgs) Validate ¶
func (args *ObservabilityArgs) Validate() (bool, error)
type ObservabilityLabel ¶
type ObservabilityLabel string
const ( AccountName ObservabilityLabel = "kl_account_name" ClusterName ObservabilityLabel = "kl_cluster_name" ResourceName ObservabilityLabel = "kl_resource_name" ResourceType ObservabilityLabel = "kl_resource_type" ResourceNamespace ObservabilityLabel = "kl_resource_namespace" ResourceComponent ObservabilityLabel = "kl_resource_component" ProjectName ObservabilityLabel = "kl_project_name" ProjectTargetNamespace ObservabilityLabel = "kl_project_target_ns" WorkspaceName ObservabilityLabel = "kl_workspace_name" WorkspaceTargetNs ObservabilityLabel = "kl_workspace_target_ns" )
type PromMetricsType ¶
type PromMetricsType string
const ( Cpu PromMetricsType = "cpu" Memory PromMetricsType = "memory" DiskIO PromMetricsType = "disk-io" NetworkReceived PromMetricsType = "network-received" NetworkTransmitted PromMetricsType = "network-transmitted" )
type ResourceEventPublisherImpl ¶
type ResourceEventPublisherImpl struct {
// contains filtered or unexported fields
}
func (*ResourceEventPublisherImpl) PublishClusterManagedServiceEvent ¶
func (r *ResourceEventPublisherImpl) PublishClusterManagedServiceEvent(ctx domain.ConsoleContext, msvcName string, resourceType entities.ResourceType, name string, update domain.PublishMsg)
func (*ResourceEventPublisherImpl) PublishConsoleEvent ¶
func (r *ResourceEventPublisherImpl) PublishConsoleEvent(ctx domain.ConsoleContext, resourceType entities.ResourceType, name string, update domain.PublishMsg)
func (*ResourceEventPublisherImpl) PublishEnvironmentResourceEvent ¶
func (r *ResourceEventPublisherImpl) PublishEnvironmentResourceEvent(ctx domain.ConsoleContext, envName string, resourceType entities.ResourceType, name string, update domain.PublishMsg)
func (*ResourceEventPublisherImpl) PublishResourceEvent ¶
func (r *ResourceEventPublisherImpl) PublishResourceEvent(ctx domain.ResourceContext, resourceType entities.ResourceType, name string, update domain.PublishMsg)
type WebhookConsumer ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.