app

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: AGPL-3.0 Imports: 50 Imported by: 0

Documentation

Index

Constants

View Source
const (
	GithubEventHeader string = "X-Github-Event"
	GitlabEventHeader string = "X-Gitlab-Event"
)

Variables

View Source
var Module = fx.Module("app",
	repos.NewFxMongoRepo[*entities.Repository]("repositories", "prj", entities.RepositoryIndexes),
	repos.NewFxMongoRepo[*entities.Credential]("credentials", "cred", entities.CredentialIndexes),
	repos.NewFxMongoRepo[*entities.Digest]("tags", "tag", entities.TagIndexes),
	repos.NewFxMongoRepo[*entities.Build]("builds", "build", entities.BuildIndexes),
	repos.NewFxMongoRepo[*entities.BuildCacheKey]("build-caches", "build-cache", entities.BuildCacheKeyIndexes),
	repos.NewFxMongoRepo[*entities.BuildRun]("build_runs", "build_run", entities.BuildRunIndices),

	fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (GitWebhookConsumer, error) {
		topic := string(common.GitWebhookTopicName)
		consumerName := "cr:git-webhook"
		return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{
			Stream: ev.EventsNatsStream,
			ConsumerConfig: msg_nats.ConsumerConfig{
				Name:        consumerName,
				Durable:     consumerName,
				Description: "this consumer reads message from a subject dedicated to errors, that occurred when the resource was applied at the agent",
				FilterSubjects: []string{
					topic,
				},
			},
		})
	}),

	fx.Invoke(func(lf fx.Lifecycle, consumer GitWebhookConsumer, d domain.Domain, logr logging.Logger) {
		lf.Append(fx.Hook{
			OnStart: func(ctx context.Context) error {
				go func() {
					err := processGitWebhooks(ctx, d, consumer, logr)
					if err != nil {
						logr.Errorf(err, "could not process git webhooks")
					}
				}()
				return nil
			},
			OnStop: func(ctx context.Context) error {
				return nil
			},
		})
	}),

	fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) BuildRunProducer {
		return msg_nats.NewJetstreamProducer(jc)
	}),

	fx.Provide(func(jsc *nats.JetstreamClient, ev *env.Env) (ReceiveResourceUpdatesConsumer, error) {
		topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ContainerRegistryReceiver, common.EventResourceUpdate)

		consumerName := "cr:resource-updates"
		return msg_nats.NewJetstreamConsumer(context.TODO(), jsc, msg_nats.JetstreamConsumerArgs{
			Stream: ev.ResourceNatsStream,
			ConsumerConfig: msg_nats.ConsumerConfig{
				Name:           consumerName,
				Durable:        consumerName,
				Description:    "this consumer receives container registry resource updates, processes them, and keeps our Database updated about things happening in our client's cluster",
				FilterSubjects: []string{topic},
			},
		})
	}),

	fx.Invoke(func(lf fx.Lifecycle, consumer ReceiveResourceUpdatesConsumer, d domain.Domain, logger logging.Logger) {
		lf.Append(fx.Hook{
			OnStart: func(context.Context) error {
				go processResourceUpdates(consumer, d, logger)
				return nil
			},
			OnStop: func(ctx context.Context) error {
				return consumer.Stop(ctx)
			},
		})
	}),

	fx.Provide(func(jsc *nats.JetstreamClient, ev *env.Env) (ErrorOnApplyConsumer, error) {
		topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ContainerRegistryReceiver, common.EventErrorOnApply)

		consumerName := "cr:error-on-apply"

		return msg_nats.NewJetstreamConsumer(context.TODO(), jsc, msg_nats.JetstreamConsumerArgs{
			Stream: ev.ResourceNatsStream,
			ConsumerConfig: msg_nats.ConsumerConfig{
				Name:           consumerName,
				Durable:        consumerName,
				Description:    "this consumer receives container registry resource apply error on agent, processes them, and keeps our Database updated about why the resource apply failed at agent",
				FilterSubjects: []string{topic},
			},
		})
	}),

	fx.Invoke(func(lf fx.Lifecycle, consumer ErrorOnApplyConsumer, d domain.Domain, logger logging.Logger) {
		lf.Append(fx.Hook{
			OnStart: func(context.Context) error {
				go ProcessErrorOnApply(consumer, logger, d)
				return nil
			},
			OnStop: func(ctx context.Context) error {
				return consumer.Stop(ctx)
			},
		})
	}),

	fx.Provide(func(jsc *nats.JetstreamClient, logger logging.Logger) SendTargetClusterMessagesProducer {
		return msg_nats.NewJetstreamProducer(jsc)
	}),

	fx.Provide(func(targetMessageProducer SendTargetClusterMessagesProducer) domain.ResourceDispatcher {
		return NewResourceDispatcher(targetMessageProducer)
	}),

	fx.Provide(func(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher {
		return NewResourceEventPublisher(cli, logger)
	}),

	fx.Provide(
		func(conn IAMGrpcClient) iam.IAMClient {
			return iam.NewIAMClient(conn)
		},
	),

	fx.Provide(
		func(conn AuthGrpcClient) auth.AuthClient {
			return auth.NewAuthClient(conn)
		},
	),

	fx.Provide(func(ev *env.Env) *venv {
		return &venv{ev}
	}),

	fxGithub[*venv](),
	fxGitlab[*venv](),

	fx.Invoke(
		func(server httpServer.Server, d domain.Domain, sessionRepo kv.Repo[*common.AuthSession], ev *env.Env) {
			gqlConfig := generated.Config{Resolvers: &graph.Resolver{Domain: d, Env: ev}}

			gqlConfig.Directives.IsLoggedInAndVerified = func(ctx context.Context, _ interface{}, next graphql.Resolver) (res interface{}, err error) {
				sess := httpServer.GetSession[*common.AuthSession](ctx)
				if sess == nil {
					return nil, fiber.ErrUnauthorized
				}

				if !sess.UserVerified {
					return nil, &fiber.Error{
						Code:    fiber.StatusForbidden,
						Message: "user's email is not verified",
					}
				}

				return next(context.WithValue(ctx, "user-session", sess))
			}

			gqlConfig.Directives.HasAccount = func(ctx context.Context, _ interface{}, next graphql.Resolver) (res interface{}, err error) {
				sess := httpServer.GetSession[*common.AuthSession](ctx)
				if sess == nil {
					return nil, fiber.ErrUnauthorized
				}
				m := httpServer.GetHttpCookies(ctx)
				klAccount := m[ev.AccountCookieName]
				if klAccount == "" {
					return nil, errors.Newf("no cookie named %q present in request", ev.AccountCookieName)
				}

				nctx := context.WithValue(ctx, "user-session", sess)
				nctx = context.WithValue(nctx, "account-name", klAccount)
				return next(nctx)
			}

			schema := generated.NewExecutableSchema(gqlConfig)
			server.SetupGraphqlServer(schema, httpServer.NewReadSessionMiddleware(sessionRepo, constants.CookieName, constants.CacheSessionPrefix))
		},
	),

	fx.Invoke(func(server AuthorizerHttpServer, envs *env.Env, d domain.Domain, logger logging.Logger) {
		authLogger := logger.WithKV("route", "/auth")

		a := server.Raw()
		a.Post("/events", func(c *fiber.Ctx) error {
			ctx := c.Context()

			var eventMessage entities.EventMessage
			if err := c.BodyParser(&eventMessage); err != nil {
				return c.SendStatus(400)
			}

			if err := d.ProcessRegistryEvents(ctx, eventMessage.Events, logger); err != nil {
				return c.SendStatus(400)
			}

			return c.SendStatus(200)
		})

		a.Use("/auth", func(c *fiber.Ctx) error {
			path := c.Query("path", "/")
			method := c.Query("method", "GET")

			u, err := url.Parse("http://example.com" + path)
			if err != nil {
				return c.SendStatus(400)
			}

			if u.Query().Has("_state") {
				return c.Next()
			}

			if method == "HEAD" {
				return c.Next()
			}
			logger.Infof("path: %s, method: %s", path, method)

			b_auth := basicauth.New(basicauth.Config{
				Realm: "Forbidden",
				Authorizer: func(u string, p string) bool {
					if method == "DELETE" && u != domain.KL_ADMIN {
						return false
					}

					userName, accountName, _, err := registryAuth.ParseToken(p)
					if err != nil {
						authLogger.Errorf(err, "could not parse token")
						return false
					}

					s, err := d.GetTokenKey(context.TODO(), userName, accountName)
					if err != nil {
						authLogger.Errorf(err, "could not get token key")
						return false
					}

					if err := registryAuth.Authorizer(u, p, path, method, envs.RegistrySecretKey+s); err != nil {
						authLogger.Errorf(err, "could not authorize")
						return false
					}
					return true
				},
			})

			r := b_auth(c)

			return r
		})

		a.Get("/auth", func(c *fiber.Ctx) error {
			return c.SendStatus(200)
		})
	}),

	domain.Module,

	fx.Invoke(InitializeGrpcServer),
)

Functions

func InitializeGrpcServer

func InitializeGrpcServer(server ContainerRegistryGRPCServer, d domain.Domain, ev *env.Env)

func NewResourceEventPublisher

func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher

func ProcessErrorOnApply

func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, logger logging.Logger, d domain.Domain)

Types

type AuthGrpcClient

type AuthGrpcClient grpc.Client

type AuthorizerHttpServer

type AuthorizerHttpServer httpServer.Server

type BuildRunProducer

type BuildRunProducer messaging.Producer

type ContainerRegistryGRPCServer

type ContainerRegistryGRPCServer grpc.Server

type ErrorOnApplyConsumer

type ErrorOnApplyConsumer messaging.Consumer

type GitWebhookConsumer

type GitWebhookConsumer messaging.Consumer

type GithubOAuth

type GithubOAuth interface {
	GithubConfig() (clientId, clientSecret, callbackUrl, githubAppId, githubAppPKFile string)
}

type IAMGrpcClient

type IAMGrpcClient grpc.Client

type ReceiveResourceUpdatesConsumer

type ReceiveResourceUpdatesConsumer messaging.Consumer

type ResourceEventPublisherImpl

type ResourceEventPublisherImpl struct {
	// contains filtered or unexported fields
}

func (*ResourceEventPublisherImpl) PublishBuildCacheEvent

func (r *ResourceEventPublisherImpl) PublishBuildCacheEvent(buildCache *entities.BuildCacheKey, msg domain.PublishMsg)

func (*ResourceEventPublisherImpl) PublishBuildRunEvent

func (r *ResourceEventPublisherImpl) PublishBuildRunEvent(buildrun *entities.BuildRun, msg domain.PublishMsg)

type SendTargetClusterMessagesProducer

type SendTargetClusterMessagesProducer messaging.Producer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL