Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaConfig ¶
type KafkaConfig struct { Server string `json:"server"` Topics []KafkaTopic `json:"topics"` }
type KafkaConsumer ¶
type KafkaConsumer struct { Config KafkaConfig Queue *queue.MessageQueue Consumer sarama.Consumer DocDBconn *documents.DocDB DB *sql.DB SignalRClient signalr.Client AppServer string ApiKey string // contains filtered or unexported fields }
func NewKafkaConsumer ¶
func NewKafkaConsumer(config KafkaConfig) *KafkaConsumer
func NewKafkaConsumerExternal ¶
func NewKafkaConsumerExternal(config KafkaConfig, docDBconn *documents.DocDB, db *sql.DB, signalRClient signalr.Client) *KafkaConsumer
func (*KafkaConsumer) BuildKafkaConsumer ¶
func (KafkaConsumer *KafkaConsumer) BuildKafkaConsumer()
func (*KafkaConsumer) CallWebService ¶
func (KafkaConsumer *KafkaConsumer) CallWebService(msg *sarama.ConsumerMessage, topic string, handler string)
func (*KafkaConsumer) PartitionTopics ¶
func (KafkaConsumer *KafkaConsumer) PartitionTopics()
func (KafkaConsumer *KafkaConsumer) ClusterGroup() {
config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second group := uuid.New().String() topics := []string{} for _, data := range KafkaConsumer.Config.Topics { topics.append(data.Topic) } consumerGroup, err := cluster.NewConsumer( []string{KafkaConsumer.Config.Server}, group, topics, config, ) KafkaConsumer.Consumer = consumerGroup if err != nil { KafkaConsumer.iLog.Error(fmt.Sprintf("Failed to create consumer group: %v", err)) } defer consumerGroup.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT) q := KafkaConsumer.Queue go func() { for message := range consumerGroup.Messages() { KafkaConsumer.iLog.Debug(fmt.Sprintf("Received message: %s", message.Value)) for _, data := range KafkaConsumer.Config.Topics { if message.Topic == data.Topic { handler = data.Handler if Handler != "" { ID := uuid.New().String() msg := queue.Message{ Id: ID, UUID: ID, Retry: 3, Execute: 0, Topic: message.Topic, PayLoad: []byte(message.Value), Handler: handler, CreatedOn: time.Now(), } iLog.Debug(fmt.Sprintf("Push message %s to queue: %s", msg, q.QueueID)) q.Push(msg) } break } } consumerGroup.MarkMessage(message, "") // Mark the message as processed } }() KafkaConsumer.waitForTerminationSignal() }
type KafkaTopic ¶
type KafkasConfig ¶
type KafkasConfig struct { Kafkas []KafkaConfig `json:"kafkas"` ApiKey string `json:"apikey"` }
Click to show internal directories.
Click to hide internal directories.