app

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: AGPL-3.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultDNSTTL = 5
)

Variables

View Source
var Module = fx.Module("app",
	repos.NewFxMongoRepo[*entities.Environment]("environments", "env", entities.EnvironmentIndexes),
	repos.NewFxMongoRepo[*entities.App]("apps", "app", entities.AppIndexes),
	repos.NewFxMongoRepo[*entities.ExternalApp]("ext_apps", "extapp", entities.ExternalAppIndexes),
	repos.NewFxMongoRepo[*entities.Config]("configs", "cfg", entities.ConfigIndexes),
	repos.NewFxMongoRepo[*entities.Secret]("secrets", "scrt", entities.SecretIndexes),
	repos.NewFxMongoRepo[*entities.ManagedResource]("managed_resources", "mres", entities.MresIndexes),
	repos.NewFxMongoRepo[*entities.ImportedManagedResource]("imported_managed_resources", "impmres", entities.ImportedManagedResourceIndexes),
	repos.NewFxMongoRepo[*entities.Router]("routers", "rt", entities.RouterIndexes),
	repos.NewFxMongoRepo[*entities.ImagePullSecret]("image_pull_secrets", "ips", entities.ImagePullSecretIndexes),
	repos.NewFxMongoRepo[*entities.ResourceMapping]("resource_mappings", "rmap", entities.ResourceMappingIndices),
	repos.NewFxMongoRepo[*entities.ServiceBinding]("service_bindings", "svcb", entities.ServiceBindingIndexes),
	repos.NewFxMongoRepo[*entities.ClusterManagedService]("cluster_managed_services", "cmsvc", entities.ClusterManagedServiceIndices),
	repos.NewFxMongoRepo[*entities.RegistryImage]("registry_images", "reg_img", entities.RegistryImageIndexes),

	fx.Provide(
		func(conn IAMGrpcClient) iam.IAMClient {
			return iam.NewIAMClient(conn)
		},
	),

	fx.Provide(func(conn MessageOfficeInternalClient) platform_edge.PlatformEdgeClient {
		return platform_edge.NewPlatformEdgeClient(conn)
	}),

	fx.Provide(func(conn AccountsClient) accounts.AccountsClient {
		return accounts.NewAccountsClient(conn)
	}),

	fx.Provide(adapters.NewAccountsSvc),

	fx.Provide(func(cli infra.InfraClient) ports.InfraService {
		return infra_service.NewInfraService(cli)
	}),

	fx.Invoke(
		func(server httpServer.Server, d domain.Domain, sessionRepo kv.Repo[*common.AuthSession], ev *env.Env) {
			gqlConfig := generated.Config{Resolvers: &graph.Resolver{Domain: d, EnvVars: ev}}

			gqlConfig.Directives.IsLoggedIn = func(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) {
				sess := httpServer.GetSession[*common.AuthSession](ctx)
				if sess == nil {
					return nil, fiber.ErrUnauthorized
				}

				return next(context.WithValue(ctx, "user-session", sess))
			}

			gqlConfig.Directives.IsLoggedInAndVerified = func(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) {
				sess := httpServer.GetSession[*common.AuthSession](ctx)
				if sess == nil {
					return nil, fiber.ErrUnauthorized
				}

				if !sess.UserVerified {
					return nil, &fiber.Error{
						Code:    fiber.StatusForbidden,
						Message: "user's email is not verified",
					}
				}

				return next(context.WithValue(ctx, "user-session", sess))
			}

			gqlConfig.Directives.HasAccount = func(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) {
				sess := httpServer.GetSession[*common.AuthSession](ctx)
				if sess == nil {
					return nil, fiber.ErrUnauthorized
				}
				m := httpServer.GetHttpCookies(ctx)
				klAccount := m[ev.AccountCookieName]
				if klAccount == "" {
					return nil, errors.Newf("no cookie named %q present in request", ev.AccountCookieName)
				}

				nctx := context.WithValue(ctx, "user-session", sess)
				nctx = context.WithValue(nctx, "account-name", klAccount)
				return next(nctx)
			}

			schema := generated.NewExecutableSchema(gqlConfig)
			server.SetupGraphqlServer(schema,
				httpServer.NewReadSessionMiddleware(sessionRepo, constants.CookieName, constants.CacheSessionPrefix),
			)
		},
	),

	fx.Provide(func(jc *nats.JetstreamClient, logger logging.Logger) domain.MessageDispatcher {
		return msg_nats.NewJetstreamProducer(jc)
	}),

	fx.Invoke(func(lf fx.Lifecycle, producer domain.MessageDispatcher) {
		lf.Append(fx.Hook{
			OnStop: func(ctx context.Context) error {
				return producer.Stop(ctx)
			},
		})
	}),

	fx.Provide(
		func(conn InfraClient) infra.InfraClient {
			return infra.NewInfraClient(conn)
		},
	),

	fx.Provide(func(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher {
		return NewResourceEventPublisher(cli, logger)
	}),

	domain.Module,

	fx.Provide(func(d domain.Domain, kcli k8s.Client) console.ConsoleServer {
		return newConsoleGrpcServer(d, kcli)
	}),

	fx.Invoke(func(gserver ConsoleGrpcServer, srv console.ConsoleServer) {
		console.RegisterConsoleServer(gserver, srv)
	}),

	fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (resource_updates_receiver.ErrorOnApplyConsumer, error) {
		topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ConsoleReceiver, common.EventErrorOnApply)
		consumerName := "console:error-on-apply"
		return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{
			Stream: ev.NatsReceiveFromAgentStream,
			ConsumerConfig: msg_nats.ConsumerConfig{
				Name:           consumerName,
				Durable:        consumerName,
				Description:    "this consumer reads message from a subject dedicated to errors, that occurred when the resource was applied at the agent",
				FilterSubjects: []string{topic},
			},
		})
	}),

	fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (resource_updates_receiver.ResourceUpdateConsumer, error) {
		topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ConsoleReceiver, common.EventResourceUpdate)

		consumerName := "console:resource-updates"
		return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{
			Stream: ev.NatsReceiveFromAgentStream,
			ConsumerConfig: msg_nats.ConsumerConfig{
				Name:           consumerName,
				Durable:        consumerName,
				Description:    "this consumer reads message from a subject dedicated to console resource updates from tenant clusters",
				FilterSubjects: []string{topic},
			},
		})
	}),

	resource_updates_receiver.Module,

	fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (WebhookConsumer, error) {
		topic := string(common.ImageRegistryHookTopicName)
		consumerName := "console:webhook"
		return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{
			Stream: ev.EventsNatsStream,
			ConsumerConfig: msg_nats.ConsumerConfig{
				Name:           consumerName,
				Durable:        consumerName,
				Description:    "this consumer reads message from a subject dedicated to console registry webhooks",
				FilterSubjects: []string{topic},
			},
		})
	}),

	fx.Invoke(func(lf fx.Lifecycle, consumer WebhookConsumer, d domain.Domain, logger logging.Logger) {
		lf.Append(fx.Hook{
			OnStart: func(context.Context) error {
				go func() {
					err := processWebhooks(consumer, d, logger)
					if err != nil {
						logger.Errorf(err, "could not process webhooks")
					}
				}()
				return nil
			},
			OnStop: func(ctx context.Context) error {
				return consumer.Stop(ctx)
			},
		})
	}),

	fx.Provide(func(svcBindingRepo repos.DbRepo[*entities.ServiceBinding]) domain.ServiceBindingDomain {
		return domain.NewSvcBindingDomain(svcBindingRepo)
	}),

	fx.Provide(func(logger *slog.Logger, sbd domain.ServiceBindingDomain, ev *env.Env) *dnsHandler {
		return &dnsHandler{
			logger:               logger,
			serviceBindingDomain: sbd,
			kloudliteDNSSuffix:   ev.KloudliteDNSSuffix,
		}
	}),

	fx.Invoke(func(server *DNSServer, handler *dnsHandler, lf fx.Lifecycle, logger *slog.Logger) {
		lf.Append(fx.Hook{
			OnStart: func(ctx context.Context) error {
				server.Handler = handler
				go func() {
					logger.Info("starting dns server", "at", server.Addr)
					err := server.ListenAndServe()
					if err != nil {
						logger.Error("failed to start dns server, got", "err", err)
						panic(err)
					}
				}()
				return nil
			},
			OnStop: func(ctx context.Context) error {
				return server.ShutdownContext(ctx)
			},
		})
	}),
)

Functions

func NewResourceEventPublisher

func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher

Types

type AccountsClient

type AccountsClient grpc.Client

type ConsoleGrpcServer

type ConsoleGrpcServer grpc.Server

type DNSServer

type DNSServer struct {
	*dns.Server
}

type IAMGrpcClient

type IAMGrpcClient grpc.Client

type InfraClient

type InfraClient grpc.Client

type MessageOfficeInternalClient

type MessageOfficeInternalClient grpc.Client

type ObservabilityArgs

type ObservabilityArgs struct {
	AccountName string `json:"account_name"`
	ClusterName string `json:"cluster_name"`

	ResourceName      string `json:"resource_name"`
	ResourceNamespace string `json:"resource_namespace"`
	ResourceType      string `json:"resource_type"`
	WorkspaceName     string `json:"workspace_name"`
	ProjectName       string `json:"project_name"`

	JobName      string `json:"job_name"`
	JobNamespace string `json:"job_namespace"`

	StartTime *time.Time `json:"start_time,omitempty"`
	EndTime   *time.Time `json:"end_time,omitempty"`
}

func (*ObservabilityArgs) Validate

func (args *ObservabilityArgs) Validate() (bool, error)

type ObservabilityLabel

type ObservabilityLabel string
const (
	AccountName ObservabilityLabel = "kl_account_name"
	ClusterName ObservabilityLabel = "kl_cluster_name"

	ResourceName      ObservabilityLabel = "kl_resource_name"
	ResourceType      ObservabilityLabel = "kl_resource_type"
	ResourceNamespace ObservabilityLabel = "kl_resource_namespace"
	ResourceComponent ObservabilityLabel = "kl_resource_component"

	ProjectName            ObservabilityLabel = "kl_project_name"
	ProjectTargetNamespace ObservabilityLabel = "kl_project_target_ns"

	WorkspaceName     ObservabilityLabel = "kl_workspace_name"
	WorkspaceTargetNs ObservabilityLabel = "kl_workspace_target_ns"
)

type PromMetricsType

type PromMetricsType string
const (
	Cpu                PromMetricsType = "cpu"
	Memory             PromMetricsType = "memory"
	DiskIO             PromMetricsType = "disk-io"
	NetworkReceived    PromMetricsType = "network-received"
	NetworkTransmitted PromMetricsType = "network-transmitted"
)

type ResourceEventPublisherImpl

type ResourceEventPublisherImpl struct {
	// contains filtered or unexported fields
}

func (*ResourceEventPublisherImpl) PublishClusterManagedServiceEvent

func (r *ResourceEventPublisherImpl) PublishClusterManagedServiceEvent(ctx domain.ConsoleContext, msvcName string, resourceType entities.ResourceType, name string, update domain.PublishMsg)

func (*ResourceEventPublisherImpl) PublishConsoleEvent

func (r *ResourceEventPublisherImpl) PublishConsoleEvent(ctx domain.ConsoleContext, resourceType entities.ResourceType, name string, update domain.PublishMsg)

func (*ResourceEventPublisherImpl) PublishEnvironmentResourceEvent

func (r *ResourceEventPublisherImpl) PublishEnvironmentResourceEvent(ctx domain.ConsoleContext, envName string, resourceType entities.ResourceType, name string, update domain.PublishMsg)

func (*ResourceEventPublisherImpl) PublishResourceEvent

func (r *ResourceEventPublisherImpl) PublishResourceEvent(ctx domain.ResourceContext, resourceType entities.ResourceType, name string, update domain.PublishMsg)

type WebhookConsumer

type WebhookConsumer messaging.Consumer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL