Documentation ¶
Index ¶
- Variables
- func NewAccountsSvc(accountsClient accounts.AccountsClient) domain.AccountsSvc
- func NewResourceDispatcher(producer SendTargetClusterMessagesProducer) domain.ResourceDispatcher
- func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher
- func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, logger *slog.Logger, d domain.Domain)
- type AccountGrpcClient
- type AuthCacheClient
- type ConsoleGrpcClient
- type ErrorOnApplyConsumer
- type IAMGrpcClient
- type InfraGrpcServer
- type ReceiveResourceUpdatesConsumer
- type ResourceEventPublisherImpl
- type SendTargetClusterMessagesProducer
Constants ¶
This section is empty.
Variables ¶
View Source
var Module = fx.Module( "app", repos.NewFxMongoRepo[*entities.Cluster]("clusters", "clus", entities.ClusterIndices), repos.NewFxMongoRepo[*entities.GlobalVPNConnection]("global_vpn_connections", "gvpn-conn", entities.GlobalVPNConnectionIndices), repos.NewFxMongoRepo[*entities.GlobalVPN]("global_vpn", "gvpn", entities.GlobalVPNIndices), repos.NewFxMongoRepo[*entities.GlobalVPNDevice]("global_vpn_device", "gvpn-dev", entities.GlobalVPNDeviceIndices), repos.NewFxMongoRepo[*entities.ClaimDeviceIP]("claim_device_ip", "claim-ip", entities.ClaimDeviceIPIndices), repos.NewFxMongoRepo[*entities.FreeDeviceIP]("free_device_ip", "free-ip", entities.FreeDeviceIPIndices), repos.NewFxMongoRepo[*entities.FreeClusterSvcCIDR]("free_cluster_svc_cidr", "free-clus-cidr", entities.FreeClusterSvcCIDRIndices), repos.NewFxMongoRepo[*entities.ClaimClusterSvcCIDR]("claim_cluster_svc_cidr", "claim-clus-cidr", entities.ClaimClusterSvcCIDRIndices), repos.NewFxMongoRepo[*entities.BYOKCluster]("byok_cluster", "byok", entities.BYOKClusterIndices), repos.NewFxMongoRepo[*entities.DomainEntry]("domain_entries", "de", entities.DomainEntryIndices), repos.NewFxMongoRepo[*entities.NodePool]("node_pools", "npool", entities.NodePoolIndices), repos.NewFxMongoRepo[*entities.Node]("node", "node", entities.NodePoolIndices), repos.NewFxMongoRepo[*entities.CloudProviderSecret]("cloud_provider_secrets", "cps", entities.CloudProviderSecretIndices), repos.NewFxMongoRepo[*entities.HelmRelease]("helm_releases", "hr", entities.HelmReleaseIndices), repos.NewFxMongoRepo[*entities.PersistentVolumeClaim]("pvcs", "pvc", entities.PersistentVolumeClaimIndices), repos.NewFxMongoRepo[*entities.Namespace]("namespaces", "ns", entities.NamespaceIndices), repos.NewFxMongoRepo[*entities.PersistentVolume]("pv", "pv", entities.PersistentVolumeIndices), repos.NewFxMongoRepo[*entities.VolumeAttachment]("volume_attachments", "volatt", entities.VolumeAttachmentIndices), fx.Provide( func(conn IAMGrpcClient) iam.IAMClient { return iam.NewIAMClient(conn) }, ), fx.Provide(func(conn AccountGrpcClient) (domain.AccountsSvc, error) { ac := accounts.NewAccountsClient(conn) return NewAccountsSvc(ac), nil }), adapters.FxNewMessageOfficeService(), fx.Provide( func(conn ConsoleGrpcClient) console.ConsoleClient { return console.NewConsoleClient(conn) }, ), fx.Provide(func(jsc *nats.JetstreamClient, logger logging.Logger) SendTargetClusterMessagesProducer { return msg_nats.NewJetstreamProducer(jsc) }), fx.Provide(func(p SendTargetClusterMessagesProducer) domain.ResourceDispatcher { return NewResourceDispatcher(p) }), fx.Invoke(func(lf fx.Lifecycle, producer SendTargetClusterMessagesProducer) { lf.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return producer.Stop(ctx) }, }) }), fx.Provide(func(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher { return NewResourceEventPublisher(cli, logger) }), domain.Module, fx.Provide(func(d domain.Domain, kcli k8s.Client, logger *slog.Logger) infra.InfraServer { return newGrpcServer(d, kcli, logger) }), fx.Invoke(func(gserver InfraGrpcServer, srv infra.InfraServer) { infra.RegisterInfraServer(gserver, srv) }), fx.Provide(func(jsc *nats.JetstreamClient, ev *env.Env) (ReceiveResourceUpdatesConsumer, error) { topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.InfraReceiver, common.EventResourceUpdate) consumerName := "infra:resource-updates" return msg_nats.NewJetstreamConsumer(context.TODO(), jsc, msg_nats.JetstreamConsumerArgs{ Stream: ev.NatsReceiveFromAgentStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer receives infra resource updates, processes them, and keeps our Database updated about things happening in the cluster", FilterSubjects: []string{topic}, }, }) }), fx.Invoke(func(lf fx.Lifecycle, consumer ReceiveResourceUpdatesConsumer, d domain.Domain, logger *slog.Logger) { lf.Append(fx.Hook{ OnStart: func(context.Context) error { go processResourceUpdates(consumer, d, logger) return nil }, OnStop: func(ctx context.Context) error { return consumer.Stop(ctx) }, }) }), fx.Provide(func(jsc *nats.JetstreamClient, ev *env.Env) (ErrorOnApplyConsumer, error) { topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.InfraReceiver, common.EventErrorOnApply) consumerName := "infra:error-on-apply" return msg_nats.NewJetstreamConsumer(context.TODO(), jsc, msg_nats.JetstreamConsumerArgs{ Stream: ev.NatsReceiveFromAgentStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer receives infra resource apply error on agent, processes them, and keeps our Database updated about why the resource apply failed at agent", FilterSubjects: []string{topic}, }, }) }), fx.Invoke(func(lf fx.Lifecycle, consumer ErrorOnApplyConsumer, d domain.Domain, logger *slog.Logger) { lf.Append(fx.Hook{ OnStart: func(context.Context) error { go ProcessErrorOnApply(consumer, logger, d) return nil }, OnStop: func(ctx context.Context) error { return consumer.Stop(ctx) }, }) }), fx.Invoke( func(server httpServer.Server, d domain.Domain, sessionRepo kv.Repo[*common.AuthSession], env *env.Env) { config := generated.Config{Resolvers: &graph.Resolver{Domain: d}} config.Directives.IsLoggedIn = func(ctx context.Context, _ interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } return next(ctx) } config.Directives.IsLoggedInAndVerified = func(ctx context.Context, _ interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } if sess.UserVerified { return next(ctx) } return nil, &fiber.Error{ Code: fiber.ErrUnauthorized.Code, Message: "user's email is not verified, yet", } } config.Directives.HasAccount = func(ctx context.Context, _ interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } m := httpServer.GetHttpCookies(ctx) klAccount := m[env.AccountCookieName] if klAccount == "" { return nil, errors.Newf("no cookie named '%s' present in request", env.AccountCookieName) } cc := domain.InfraContext{ Context: ctx, AccountName: klAccount, UserId: sess.UserId, UserName: sess.UserName, UserEmail: sess.UserEmail, } return next(context.WithValue(ctx, "infra-ctx", cc)) } schema := generated.NewExecutableSchema(config) server.SetupGraphqlServer(schema, httpServer.NewReadSessionMiddleware(sessionRepo, constants.CookieName, constants.CacheSessionPrefix), ) }, ), fx.Invoke( func(server httpServer.Server, d domain.Domain, env *env.Env) { server.Raw().Get("/render/helm/kloudlite-agent/:accountName/:clusterName", func(c *fiber.Ctx) error { s := c.GetReqHeaders()["Authorization"] if len(s) != 1 { return fiber.ErrForbidden } b, err := d.RenderHelmKloudliteAgent(c.Context(), c.Params("accountName"), c.Params("clusterName"), s[0]) if err != nil { if err.Error() == "UnAuthorized" { return fiber.ErrUnauthorized } return err } _, err = c.Write(b) return err }) }, ), )
Functions ¶
func NewAccountsSvc ¶
func NewAccountsSvc(accountsClient accounts.AccountsClient) domain.AccountsSvc
func NewResourceDispatcher ¶
func NewResourceDispatcher(producer SendTargetClusterMessagesProducer) domain.ResourceDispatcher
func ProcessErrorOnApply ¶
func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, logger *slog.Logger, d domain.Domain)
Types ¶
type AccountGrpcClient ¶
type AuthCacheClient ¶
type ConsoleGrpcClient ¶
type ErrorOnApplyConsumer ¶
type IAMGrpcClient ¶
type InfraGrpcServer ¶
type ResourceEventPublisherImpl ¶
type ResourceEventPublisherImpl struct {
// contains filtered or unexported fields
}
func (*ResourceEventPublisherImpl) PublishInfraEvent ¶
func (r *ResourceEventPublisherImpl) PublishInfraEvent(ctx domain.InfraContext, resourceType domain.ResourceType, resName string, update domain.PublishMsg)
func (*ResourceEventPublisherImpl) PublishResourceEvent ¶
func (r *ResourceEventPublisherImpl) PublishResourceEvent(ctx domain.InfraContext, clusterName string, resourceType domain.ResourceType, resName string, update domain.PublishMsg)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.