kafka

package
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	InstanceID string

	BrokerServers     string
	Topics            []string
	ConsumerGroupName string
	Consumer          *kafka.Consumer
	CloseChannel      chan os.Signal

	RetryCount    int           // default to 5
	RetryDuration time.Duration // default to 24 hours

	ReplayMode             bool
	ReplayFrom             time.Duration //duration - defaults to 1h
	ReplayType             ReplayType
	ReplyCompletionChannel chan bool
	// contains filtered or unexported fields
}

Consumer holds the configuration for kafka consumers

func NewKafkaConsumer

func NewKafkaConsumer(brokerServers string, consumerGroupName string, topics []string, options ...ConsumerOption) *Consumer

NewKafkaConsumer Initialize a KafkaConsumer for provided configuration It will initialize with the following defaults offsetCommitMessageInterval: 1000 lastOffsetCommitMessageInterval: 0 enableDL: false broker.address.family: v4 session.timeout.ms: 6000 enable.auto.commit: false auto.offset.reset: earliest ReplayMode: false ReplayType: timestamp ReplayFrom: 1h

func (*Consumer) ForceCommitOffset

func (kc *Consumer) ForceCommitOffset()

ForceCommitOffset Methods actually call kafka commit offset API

func (*Consumer) Start

func (kc *Consumer) Start(processor IProcessor)

Start starts the consumer with the settings applied while creating the consumer

type ConsumerOption

type ConsumerOption func(l *Consumer)

ConsumerOption sets a parameter for the KafkaProducer

func ConsumerLogger

func ConsumerLogger(customLogger *gologger.CustomLogger) ConsumerOption

ConsumerLogger sets the logger for consul Defaults to consul logger

func EnableDeadLettering

func EnableDeadLettering() ConsumerOption

EnableDeadLettering Method to enable deadlettering

func EnableReplayMode

func EnableReplayMode(replayType ReplayType, replayFrom string, replyCompletionChannel chan bool) ConsumerOption

EnableReplayMode Method to enable replaymode When enabling replaymode you need to pass the following ReplayType - this can be timestamp of beginning ReplayFrom - the duration before the current time from which you need to process the message. this is only considered in timestamp mode

func SetConsumerCustomConfig

func SetConsumerCustomConfig(customConfig map[string]interface{}) ConsumerOption

SetConsumerCustomConfig sets the custom config for kafka

func SetOffsetCommitMessageInterval

func SetOffsetCommitMessageInterval(msgInterval int) ConsumerOption

SetOffsetCommitMessageInterval sets the offset commit message interval. The interval should be positive If it is not positive it will be set to default of 1000

type DLConsumer

type DLConsumer struct {
	InstanceID string

	BrokerServers     string
	Topics            []string
	ConsumerGroupName string
	Consumer          *kafka.Consumer
	CloseChannel      chan os.Signal
	RetryCount        int           // default to 5
	RetryDuration     time.Duration // default to 24 hours
	// contains filtered or unexported fields
}

DLConsumer holds the configuration for the DL consumer

func NewKafkaDLConsumer

func NewKafkaDLConsumer(brokerServers string, consumerGroupName string, customConfig map[string]interface{}, logger *gologger.CustomLogger) *DLConsumer

NewKafkaDLConsumer Initialize a DLConsumer for provided configuration

func (*DLConsumer) GetPartitionCount

func (kc *DLConsumer) GetPartitionCount(topic string) int

GetPartitionCount return partitionCount of the subscribed topic

func (*DLConsumer) GetPartitions

func (kc *DLConsumer) GetPartitions() []kafka.TopicPartition

GetPartitions sets and return partitions of the subscribed topic

func (*DLConsumer) ReadMessageFromPartitions

func (kc *DLConsumer) ReadMessageFromPartitions(timeoutMs int)

ReadMessageFromPartitions reads message from partition with a timeout in milliseconds

func (*DLConsumer) ReadPartition

func (kc *DLConsumer) ReadPartition(partition int, timeoutMs int64)

ReadPartition reads message from partition till timeoutMs or if message in partition can't be processed currently

func (*DLConsumer) Start

func (kc *DLConsumer) Start(processor IProcessor)

Start starts the dl consumer

func (*DLConsumer) SubscribeTopic

func (kc *DLConsumer) SubscribeTopic(topics []string)

SubscribeTopic suscribes to a list of topics

type IProcessor

type IProcessor interface {
	ProcessMessage(*Message) bool
}

IProcessor : interface for consuming messages from queue

type KafkaTopic

type KafkaTopic struct {
	TopicName         string
	Partitions        int
	ReplicationFactor int
}

KafkaTopic is used to create topics in kafka.

type Message

type Message struct {
	Data           RawEvent
	TopicPartition kafka.TopicPartition
	Timestamp      time.Time
}

Message the message that is published to kafka

type Producer

type Producer struct {
	BrokerServers         string
	IsAutoEventLogEnabled bool

	EventsChannel chan kafka.Event

	CloseChannel chan os.Signal
	// contains filtered or unexported fields
}

Producer carries all the settings for the kafka producer

func NewKafkaProducer

func NewKafkaProducer(brokerServers string, options ...ProducerOption) *Producer

NewKafkaProducer creates a new producer Following is the defaults for the kafka configuration

"go.batch.producer":                     true
"go.events.channel.size":                100000
"go.produce.channel.size":               100000
"max.in.flight.requests.per.connection": 1000000
"linger.ms":                             100
"queue.buffering.max.messages":          100000
"batch.num.messages":                    5000
"acks":                                  "1"

You can change the defaults by sending a map to the SetCustomConfig Option

func (*Producer) CreateTopics

func (kp *Producer) CreateTopics(topics ...KafkaTopic) error

CreateTopics creats a new topics if they do not exist.

func (*Producer) PublishMessageToTopic

func (kp *Producer) PublishMessageToTopic(msg *[]byte, topic string)

PublishMessageToTopic publishes message to topic

func (*Producer) PublishMessageToTopicWithKey

func (kp *Producer) PublishMessageToTopicWithKey(msg *[]byte, topic string, key string)

PublishMessageToTopicWithKey publishes message to topic with key

type ProducerOption

type ProducerOption func(l *Producer)

ProducerOption sets a parameter for the KafkaProducer

func EnableEventLogging

func EnableEventLogging(enableEventLogging bool) ProducerOption

EnableEventLogging will enable event logging. By default it is disabled

func SetProducerCustomConfig

func SetProducerCustomConfig(customConfig map[string]interface{}) ProducerOption

SetProducerCustomConfig sets the custom config for kafka

func SetProducerLogger

func SetProducerLogger(customLogger *gologger.CustomLogger) ProducerOption

SetProducerLogger sets the logger for consul Defaults to consul logger

type RawEvent

type RawEvent []byte

RawEvent holds the message in byte form

type ReplayType

type ReplayType int

ReplayType is an enum which represents two types of kafka consumer replays. timestamp and beginning

const (
	// TIMESTAMP replays logs from a particular time
	TIMESTAMP ReplayType = iota
	// BEGINNING replays logs from the beginning
	BEGINNING
)

func (ReplayType) EnumIndex

func (rf ReplayType) EnumIndex() int

EnumIndex - Creating common behavior - give the type a EnumIndex function

func (ReplayType) String

func (rf ReplayType) String() string

String - Creating common behavior - give the type a String function

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL