app

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Module = fx.Module("app",
	repos.NewFxMongoRepo[*domain.EventLog]("events", "ev", domain.EventLogIndices),
	fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (EventLogConsumer, error) {
		topic := common.AuditEventLogTopicName
		consumerName := "worker-audit-logging:event-log"
		return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{
			Stream: ev.EventLogNatsStream,
			ConsumerConfig: msg_nats.ConsumerConfig{
				Name:           consumerName,
				Durable:        consumerName,
				Description:    "this consumer reads message from a subject dedicated to errors, that occurred when the resource was applied at the agent",
				FilterSubjects: []string{string(topic)},
			},
		})
	}),

	fx.Invoke(func(consumer EventLogConsumer, logr logging.Logger, d domain.Domain) error {
		if err := consumer.Consume(func(msg *types.ConsumeMsg) error {
			logger := logr.WithName("audit-events")
			logger.Infof("started processing")
			defer func() {
				logger.Infof("finished processing")
			}()

			var el domain.EventLog
			if err := json.Unmarshal(msg.Payload, &el); err != nil {
				return errors.NewE(err)
			}

			event, err := d.PushEvent(context.TODO(), &el)
			if err != nil {
				return errors.NewE(err)
			}

			logger.WithKV("event-id", event.Id).Infof("pushed event to mongo")
			return nil
		}, types.ConsumeOpts{}); err != nil {
			logr.Errorf(err, "error consuming messages")
			return errors.NewE(err)
		}
		return nil
	}),
	domain.Module,
)

Functions

This section is empty.

Types

type EventLogConsumer

type EventLogConsumer messaging.Consumer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL