Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Module = fx.Module("app", repos.NewFxMongoRepo[*domain.EventLog]("events", "ev", domain.EventLogIndices), fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (EventLogConsumer, error) { topic := common.AuditEventLogTopicName consumerName := "worker-audit-logging:event-log" return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{ Stream: ev.EventLogNatsStream, ConsumerConfig: msg_nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, Description: "this consumer reads message from a subject dedicated to errors, that occurred when the resource was applied at the agent", FilterSubjects: []string{string(topic)}, }, }) }), fx.Invoke(func(consumer EventLogConsumer, logr logging.Logger, d domain.Domain) error { if err := consumer.Consume(func(msg *types.ConsumeMsg) error { logger := logr.WithName("audit-events") logger.Infof("started processing") defer func() { logger.Infof("finished processing") }() var el domain.EventLog if err := json.Unmarshal(msg.Payload, &el); err != nil { return errors.NewE(err) } event, err := d.PushEvent(context.TODO(), &el) if err != nil { return errors.NewE(err) } logger.WithKV("event-id", event.Id).Infof("pushed event to mongo") return nil }, types.ConsumeOpts{}); err != nil { logr.Errorf(err, "error consuming messages") return errors.NewE(err) } return nil }), domain.Module, )
Functions ¶
This section is empty.
Types ¶
type EventLogConsumer ¶
Click to show internal directories.
Click to hide internal directories.