Documentation
¶
Index ¶
- type Consumer
- type ConsumerOption
- func ConsumerLogger(customLogger *gologger.CustomLogger) ConsumerOption
- func EnableDeadLettering() ConsumerOption
- func EnableReplayMode(replayType ReplayType, replayFrom string, replyCompletionChannel chan bool) ConsumerOption
- func SetConsumerCustomConfig(customConfig map[string]interface{}) ConsumerOption
- func SetOffsetCommitMessageInterval(msgInterval int) ConsumerOption
- type DLConsumer
- func (kc *DLConsumer) GetPartitionCount(topic string) int
- func (kc *DLConsumer) GetPartitions() []kafka.TopicPartition
- func (kc *DLConsumer) ReadMessageFromPartitions(timeoutMs int)
- func (kc *DLConsumer) ReadPartition(partition int, timeoutMs int64)
- func (kc *DLConsumer) Start(processor IProcessor)
- func (kc *DLConsumer) SubscribeTopic(topics []string)
- type IProcessor
- type KafkaTopic
- type Message
- type Producer
- type ProducerOption
- type RawEvent
- type ReplayType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { InstanceID string BrokerServers string Topics []string ConsumerGroupName string Consumer *kafka.Consumer CloseChannel chan os.Signal RetryCount int // default to 5 RetryDuration time.Duration // default to 24 hours ReplayMode bool ReplayFrom time.Duration //duration - defaults to 1h ReplayType ReplayType ReplyCompletionChannel chan bool // contains filtered or unexported fields }
Consumer holds the configuration for kafka consumers
func NewKafkaConsumer ¶
func NewKafkaConsumer(brokerServers string, consumerGroupName string, topics []string, options ...ConsumerOption) *Consumer
NewKafkaConsumer Initialize a KafkaConsumer for provided configuration It will initialize with the following defaults offsetCommitMessageInterval: 1000 lastOffsetCommitMessageInterval: 0 enableDL: false broker.address.family: v4 session.timeout.ms: 6000 enable.auto.commit: false auto.offset.reset: earliest ReplayMode: false ReplayType: timestamp ReplayFrom: 1h
func (*Consumer) ForceCommitOffset ¶
func (kc *Consumer) ForceCommitOffset()
ForceCommitOffset Methods actually call kafka commit offset API
func (*Consumer) Start ¶
func (kc *Consumer) Start(processor IProcessor)
Start starts the consumer with the settings applied while creating the consumer
type ConsumerOption ¶
type ConsumerOption func(l *Consumer)
ConsumerOption sets a parameter for the KafkaProducer
func ConsumerLogger ¶
func ConsumerLogger(customLogger *gologger.CustomLogger) ConsumerOption
ConsumerLogger sets the logger for consul Defaults to consul logger
func EnableDeadLettering ¶
func EnableDeadLettering() ConsumerOption
EnableDeadLettering Method to enable deadlettering
func EnableReplayMode ¶
func EnableReplayMode(replayType ReplayType, replayFrom string, replyCompletionChannel chan bool) ConsumerOption
EnableReplayMode Method to enable replaymode When enabling replaymode you need to pass the following ReplayType - this can be timestamp of beginning ReplayFrom - the duration before the current time from which you need to process the message. this is only considered in timestamp mode
func SetConsumerCustomConfig ¶
func SetConsumerCustomConfig(customConfig map[string]interface{}) ConsumerOption
SetConsumerCustomConfig sets the custom config for kafka
func SetOffsetCommitMessageInterval ¶
func SetOffsetCommitMessageInterval(msgInterval int) ConsumerOption
SetOffsetCommitMessageInterval sets the offset commit message interval. The interval should be positive If it is not positive it will be set to default of 1000
type DLConsumer ¶
type DLConsumer struct { InstanceID string BrokerServers string Topics []string ConsumerGroupName string Consumer *kafka.Consumer CloseChannel chan os.Signal RetryCount int // default to 5 RetryDuration time.Duration // default to 24 hours // contains filtered or unexported fields }
DLConsumer holds the configuration for the DL consumer
func NewKafkaDLConsumer ¶
func NewKafkaDLConsumer(brokerServers string, consumerGroupName string, customConfig map[string]interface{}, logger *gologger.CustomLogger) *DLConsumer
NewKafkaDLConsumer Initialize a DLConsumer for provided configuration
func (*DLConsumer) GetPartitionCount ¶
func (kc *DLConsumer) GetPartitionCount(topic string) int
GetPartitionCount return partitionCount of the subscribed topic
func (*DLConsumer) GetPartitions ¶
func (kc *DLConsumer) GetPartitions() []kafka.TopicPartition
GetPartitions sets and return partitions of the subscribed topic
func (*DLConsumer) ReadMessageFromPartitions ¶
func (kc *DLConsumer) ReadMessageFromPartitions(timeoutMs int)
ReadMessageFromPartitions reads message from partition with a timeout in milliseconds
func (*DLConsumer) ReadPartition ¶
func (kc *DLConsumer) ReadPartition(partition int, timeoutMs int64)
ReadPartition reads message from partition till timeoutMs or if message in partition can't be processed currently
func (*DLConsumer) Start ¶
func (kc *DLConsumer) Start(processor IProcessor)
Start starts the dl consumer
func (*DLConsumer) SubscribeTopic ¶
func (kc *DLConsumer) SubscribeTopic(topics []string)
SubscribeTopic suscribes to a list of topics
type IProcessor ¶
IProcessor : interface for consuming messages from queue
type KafkaTopic ¶
KafkaTopic is used to create topics in kafka.
type Message ¶
type Message struct { Data RawEvent TopicPartition kafka.TopicPartition Timestamp time.Time }
Message the message that is published to kafka
type Producer ¶
type Producer struct { BrokerServers string IsAutoEventLogEnabled bool EventsChannel chan kafka.Event CloseChannel chan os.Signal // contains filtered or unexported fields }
Producer carries all the settings for the kafka producer
func NewKafkaProducer ¶
func NewKafkaProducer(brokerServers string, options ...ProducerOption) *Producer
NewKafkaProducer creates a new producer Following is the defaults for the kafka configuration
"go.batch.producer": true "go.events.channel.size": 100000 "go.produce.channel.size": 100000 "max.in.flight.requests.per.connection": 1000000 "linger.ms": 100 "queue.buffering.max.messages": 100000 "batch.num.messages": 5000 "acks": "1"
You can change the defaults by sending a map to the SetCustomConfig Option
func (*Producer) CreateTopics ¶
func (kp *Producer) CreateTopics(topics ...KafkaTopic) error
CreateTopics creats a new topics if they do not exist.
func (*Producer) PublishMessageToTopic ¶
PublishMessageToTopic publishes message to topic
type ProducerOption ¶
type ProducerOption func(l *Producer)
ProducerOption sets a parameter for the KafkaProducer
func EnableEventLogging ¶
func EnableEventLogging(enableEventLogging bool) ProducerOption
EnableEventLogging will enable event logging. By default it is disabled
func SetProducerCustomConfig ¶
func SetProducerCustomConfig(customConfig map[string]interface{}) ProducerOption
SetProducerCustomConfig sets the custom config for kafka
func SetProducerLogger ¶
func SetProducerLogger(customLogger *gologger.CustomLogger) ProducerOption
SetProducerLogger sets the logger for consul Defaults to consul logger
type ReplayType ¶
type ReplayType int
ReplayType is an enum which represents two types of kafka consumer replays. timestamp and beginning
const ( // TIMESTAMP replays logs from a particular time TIMESTAMP ReplayType = iota // BEGINNING replays logs from the beginning BEGINNING )
func (ReplayType) EnumIndex ¶
func (rf ReplayType) EnumIndex() int
EnumIndex - Creating common behavior - give the type a EnumIndex function
func (ReplayType) String ¶
func (rf ReplayType) String() string
String - Creating common behavior - give the type a String function