Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenerateTopic ¶
func GenerateTopic(msg interface{}) string
GenerateTopic Unmarshaled JSON and extracts the app name and hour to create a topic
func ParseActivityLog ¶
func ParseActivityLog(msg *[]byte) interface{}
ParseActivityLog takes a string of bytes and returns json with type interface{} This is designed to handle dynamic json structure
Types ¶
type Kafka ¶
type Kafka struct { Brokers []string ConsumerTopics []string ConsumerGroup string OffsetInitial string }
Kafka stores the necessary values to setup of Kafka consumer and/or producer
func (*Kafka) ProduceMessage ¶
func (k *Kafka) ProduceMessage(message *ProducerMessage) *sarama.ProducerMessage
ProduceMessage creates a message that can be sent to AsyncProducer Input
func (*Kafka) SetupConsumer ¶
func (k *Kafka) SetupConsumer() *KafkaConsumer
SetupConsumer initializes a new consumer
func (*Kafka) SetupProducer ¶
func (k *Kafka) SetupProducer() *KafkaProducer
SetupProducer initializes a new consumer
type KafkaConsumer ¶
type KafkaConsumer struct {
Consumer *cluster.Consumer
}
KafkaConsumer is a sarama-cluster Consumer
type KafkaProducer ¶
type KafkaProducer struct {
Producer sarama.AsyncProducer
}
KafkaProducer is a sarama AsyncProducer
type ProducerMessage ¶
ProducerMessage defines the fields for creating a message to be consumed by a producer
Click to show internal directories.
Click to hide internal directories.