kafka

package
v0.0.0-...-edbed3e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2024 License: MIT Imports: 14 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaConfig

type KafkaConfig struct {
	Server string       `json:"server"`
	Topics []KafkaTopic `json:"topics"`
}

type KafkaConsumer

type KafkaConsumer struct {
	Config KafkaConfig
	Queue  *queue.MessageQueue

	Consumer      sarama.Consumer
	DocDBconn     *documents.DocDB
	DB            *sql.DB
	SignalRClient signalr.Client
	AppServer     string
	ApiKey        string
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(config KafkaConfig) *KafkaConsumer

func NewKafkaConsumerExternal

func NewKafkaConsumerExternal(config KafkaConfig, docDBconn *documents.DocDB, db *sql.DB, signalRClient signalr.Client) *KafkaConsumer

func (*KafkaConsumer) BuildKafkaConsumer

func (KafkaConsumer *KafkaConsumer) BuildKafkaConsumer()

func (*KafkaConsumer) CallWebService

func (KafkaConsumer *KafkaConsumer) CallWebService(msg *sarama.ConsumerMessage, topic string, handler string)

func (*KafkaConsumer) PartitionTopics

func (KafkaConsumer *KafkaConsumer) PartitionTopics()

func (KafkaConsumer *KafkaConsumer) ClusterGroup() {

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.AutoCommit.Enable = true
	config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

	group := uuid.New().String()
	topics := []string{}
	for _, data := range KafkaConsumer.Config.Topics {
		topics.append(data.Topic)
	}

	consumerGroup, err := cluster.NewConsumer(
		[]string{KafkaConsumer.Config.Server},
		group,
		topics,
		config,
	)

	KafkaConsumer.Consumer = consumerGroup

	if err != nil {
		KafkaConsumer.iLog.Error(fmt.Sprintf("Failed to create consumer group: %v", err))
	}
	defer consumerGroup.Close()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT)

	q := KafkaConsumer.Queue
	go func() {
		for message := range consumerGroup.Messages() {
			KafkaConsumer.iLog.Debug(fmt.Sprintf("Received message: %s", message.Value))
			for _, data := range KafkaConsumer.Config.Topics {
				if message.Topic == data.Topic {
					handler = data.Handler
					if Handler != "" {
						ID := uuid.New().String()
						msg := queue.Message{
							Id:        ID,
							UUID:      ID,
							Retry:     3,
							Execute:   0,
							Topic:     message.Topic,
							PayLoad:   []byte(message.Value),
							Handler:   handler,
							CreatedOn: time.Now(),
						}
						iLog.Debug(fmt.Sprintf("Push message %s to queue: %s", msg, q.QueueID))
						q.Push(msg)
					}
					break
				}
			}

			consumerGroup.MarkMessage(message, "") // Mark the message as processed
		}
	}()
	KafkaConsumer.waitForTerminationSignal()
}

type KafkaTopic

type KafkaTopic struct {
	Topic   string `json:"topic"`
	Handler string `json:"handler"`
	Mode    string `json:"mode"`
	Type    string `json:"type"`
}

type KafkasConfig

type KafkasConfig struct {
	Kafkas []KafkaConfig `json:"kafkas"`
	ApiKey string        `json:"apikey"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL