Documentation ¶
Index ¶
- Constants
- Variables
- func InitializeGrpcServer(server ContainerRegistryGRPCServer, d domain.Domain, ev *env.Env)
- func NewResourceDispatcher(producer SendTargetClusterMessagesProducer) domain.ResourceDispatcher
- func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher
- func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, logger logging.Logger, d domain.Domain)
- type AuthGrpcClient
- type AuthorizerHttpServer
- type BuildRunProducer
- type ContainerRegistryGRPCServer
- type ErrorOnApplyConsumer
- type GitWebhookConsumer
- type GithubOAuth
- type IAMGrpcClient
- type ReceiveResourceUpdatesConsumer
- type ResourceEventPublisherImpl
- type SendTargetClusterMessagesProducer
Constants ¶
View Source
const ( GithubEventHeader string = "X-Github-Event" GitlabEventHeader string = "X-Gitlab-Event" )
Variables ¶
View Source
var Module = fx.Module("app", repos.NewFxMongoRepo[*entities.Repository]("repositories", "prj", entities.RepositoryIndexes), repos.NewFxMongoRepo[*entities.Credential]("credentials", "cred", entities.CredentialIndexes), repos.NewFxMongoRepo[*entities.Digest]("tags", "tag", entities.TagIndexes), repos.NewFxMongoRepo[*entities.Build]("builds", "build", entities.BuildIndexes), repos.NewFxMongoRepo[*entities.BuildCacheKey]("build-caches", "build-cache", entities.BuildCacheKeyIndexes), repos.NewFxMongoRepo[*entities.BuildRun]("build_runs", "build_run", entities.BuildRunIndices), fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (GitWebhookConsumer, error) { topic := string(common.GitWebhookTopicName) consumerName := "cr:git-webhook" return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{ Stream: ev.EventsNatsStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer reads message from a subject dedicated to errors, that occurred when the resource was applied at the agent", FilterSubjects: []string{ topic, }, }, }) }), fx.Invoke(func(lf fx.Lifecycle, consumer GitWebhookConsumer, d domain.Domain, logr logging.Logger) { lf.Append(fx.Hook{ OnStart: func(ctx context.Context) error { go func() { err := processGitWebhooks(ctx, d, consumer, logr) if err != nil { logr.Errorf(err, "could not process git webhooks") } }() return nil }, OnStop: func(ctx context.Context) error { return nil }, }) }), fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) BuildRunProducer { return msg_nats.NewJetstreamProducer(jc) }), fx.Provide(func(jsc *nats.JetstreamClient, ev *env.Env) (ReceiveResourceUpdatesConsumer, error) { topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ContainerRegistryReceiver, common.EventResourceUpdate) consumerName := "cr:resource-updates" return msg_nats.NewJetstreamConsumer(context.TODO(), jsc, msg_nats.JetstreamConsumerArgs{ Stream: ev.ResourceNatsStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer receives container registry resource updates, processes them, and keeps our Database updated about things happening in our client's cluster", FilterSubjects: []string{topic}, }, }) }), fx.Invoke(func(lf fx.Lifecycle, consumer ReceiveResourceUpdatesConsumer, d domain.Domain, logger logging.Logger) { lf.Append(fx.Hook{ OnStart: func(context.Context) error { go processResourceUpdates(consumer, d, logger) return nil }, OnStop: func(ctx context.Context) error { return consumer.Stop(ctx) }, }) }), fx.Provide(func(jsc *nats.JetstreamClient, ev *env.Env) (ErrorOnApplyConsumer, error) { topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ContainerRegistryReceiver, common.EventErrorOnApply) consumerName := "cr:error-on-apply" return msg_nats.NewJetstreamConsumer(context.TODO(), jsc, msg_nats.JetstreamConsumerArgs{ Stream: ev.ResourceNatsStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer receives container registry resource apply error on agent, processes them, and keeps our Database updated about why the resource apply failed at agent", FilterSubjects: []string{topic}, }, }) }), fx.Invoke(func(lf fx.Lifecycle, consumer ErrorOnApplyConsumer, d domain.Domain, logger logging.Logger) { lf.Append(fx.Hook{ OnStart: func(context.Context) error { go ProcessErrorOnApply(consumer, logger, d) return nil }, OnStop: func(ctx context.Context) error { return consumer.Stop(ctx) }, }) }), fx.Provide(func(jsc *nats.JetstreamClient, logger logging.Logger) SendTargetClusterMessagesProducer { return msg_nats.NewJetstreamProducer(jsc) }), fx.Provide(func(targetMessageProducer SendTargetClusterMessagesProducer) domain.ResourceDispatcher { return NewResourceDispatcher(targetMessageProducer) }), fx.Provide(func(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher { return NewResourceEventPublisher(cli, logger) }), fx.Provide( func(conn IAMGrpcClient) iam.IAMClient { return iam.NewIAMClient(conn) }, ), fx.Provide( func(conn AuthGrpcClient) auth.AuthClient { return auth.NewAuthClient(conn) }, ), fx.Provide(func(ev *env.Env) *venv { return &venv{ev} }), fxGithub[*venv](), fxGitlab[*venv](), fx.Invoke( func(server httpServer.Server, d domain.Domain, sessionRepo kv.Repo[*common.AuthSession], ev *env.Env) { gqlConfig := generated.Config{Resolvers: &graph.Resolver{Domain: d, Env: ev}} gqlConfig.Directives.IsLoggedInAndVerified = func(ctx context.Context, _ interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } if !sess.UserVerified { return nil, &fiber.Error{ Code: fiber.StatusForbidden, Message: "user's email is not verified", } } return next(context.WithValue(ctx, "user-session", sess)) } gqlConfig.Directives.HasAccount = func(ctx context.Context, _ interface{}, next graphql.Resolver) (res interface{}, err error) { sess := httpServer.GetSession[*common.AuthSession](ctx) if sess == nil { return nil, fiber.ErrUnauthorized } m := httpServer.GetHttpCookies(ctx) klAccount := m[ev.AccountCookieName] if klAccount == "" { return nil, errors.Newf("no cookie named %q present in request", ev.AccountCookieName) } nctx := context.WithValue(ctx, "user-session", sess) nctx = context.WithValue(nctx, "account-name", klAccount) return next(nctx) } schema := generated.NewExecutableSchema(gqlConfig) server.SetupGraphqlServer(schema, httpServer.NewReadSessionMiddleware(sessionRepo, constants.CookieName, constants.CacheSessionPrefix)) }, ), fx.Invoke(func(server AuthorizerHttpServer, envs *env.Env, d domain.Domain, logger logging.Logger) { authLogger := logger.WithKV("route", "/auth") a := server.Raw() a.Post("/events", func(c *fiber.Ctx) error { ctx := c.Context() var eventMessage entities.EventMessage if err := c.BodyParser(&eventMessage); err != nil { return c.SendStatus(400) } if err := d.ProcessRegistryEvents(ctx, eventMessage.Events, logger); err != nil { return c.SendStatus(400) } return c.SendStatus(200) }) a.Use("/auth", func(c *fiber.Ctx) error { path := c.Query("path", "/") method := c.Query("method", "GET") u, err := url.Parse("http://example.com" + path) if err != nil { return c.SendStatus(400) } if u.Query().Has("_state") { return c.Next() } if method == "HEAD" { return c.Next() } logger.Infof("path: %s, method: %s", path, method) b_auth := basicauth.New(basicauth.Config{ Realm: "Forbidden", Authorizer: func(u string, p string) bool { if method == "DELETE" && u != domain.KL_ADMIN { return false } userName, accountName, _, err := registryAuth.ParseToken(p) if err != nil { authLogger.Errorf(err, "could not parse token") return false } s, err := d.GetTokenKey(context.TODO(), userName, accountName) if err != nil { authLogger.Errorf(err, "could not get token key") return false } if err := registryAuth.Authorizer(u, p, path, method, envs.RegistrySecretKey+s); err != nil { authLogger.Errorf(err, "could not authorize") return false } return true }, }) r := b_auth(c) return r }) a.Get("/auth", func(c *fiber.Ctx) error { return c.SendStatus(200) }) }), domain.Module, fx.Invoke(InitializeGrpcServer), )
Functions ¶
func InitializeGrpcServer ¶
func InitializeGrpcServer(server ContainerRegistryGRPCServer, d domain.Domain, ev *env.Env)
func NewResourceDispatcher ¶
func NewResourceDispatcher(producer SendTargetClusterMessagesProducer) domain.ResourceDispatcher
func ProcessErrorOnApply ¶
func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, logger logging.Logger, d domain.Domain)
Types ¶
type AuthGrpcClient ¶
type AuthorizerHttpServer ¶
type AuthorizerHttpServer httpServer.Server
type BuildRunProducer ¶
type ErrorOnApplyConsumer ¶
type GitWebhookConsumer ¶
type GithubOAuth ¶
type GithubOAuth interface {
GithubConfig() (clientId, clientSecret, callbackUrl, githubAppId, githubAppPKFile string)
}
type IAMGrpcClient ¶
type ResourceEventPublisherImpl ¶
type ResourceEventPublisherImpl struct {
// contains filtered or unexported fields
}
func (*ResourceEventPublisherImpl) PublishBuildCacheEvent ¶
func (r *ResourceEventPublisherImpl) PublishBuildCacheEvent(buildCache *entities.BuildCacheKey, msg domain.PublishMsg)
func (*ResourceEventPublisherImpl) PublishBuildRunEvent ¶
func (r *ResourceEventPublisherImpl) PublishBuildRunEvent(buildrun *entities.BuildRun, msg domain.PublishMsg)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.