Documentation ¶
Index ¶
- func NewConsumerGroup(brokers []string, groupID string, log logger.Logger) *consumerGroup
- func NewKafkaConn(ctx context.Context, kafkaCfg *Config) (*kafka.Conn, error)
- func NewKafkaReader(kafkaURL []string, topic, groupID string, errLogger kafka.Logger) *kafka.Reader
- func NewProducer(log logger.Logger, brokers []string) *producer
- func NewWriter(brokers []string, errLogger kafka.Logger) *kafka.Writer
- type Config
- type ConsumerGroup
- type MessageProcessor
- type Producer
- type TopicConfig
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewConsumerGroup ¶
NewConsumerGroup kafka consumer group constructor
func NewKafkaConn ¶
NewKafkaConn create new kafka connection
func NewKafkaReader ¶
NewKafkaReader create new configured kafka reader
func NewProducer ¶
NewProducer create new kafka producer
Types ¶
type Config ¶
type Config struct { Brokers []string `mapstructure:"brokers"` GroupID string `mapstructure:"groupID"` InitTopics bool `mapstructure:"initTopics"` }
Config kafka config
type ConsumerGroup ¶
type MessageProcessor ¶
type MessageProcessor interface {
ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int)
}
MessageProcessor processor methods must implement kafka.Worker func method interface
type TopicConfig ¶
type TopicConfig struct { TopicName string `mapstructure:"topicName"` Partitions int `mapstructure:"partitions"` ReplicationFactor int `mapstructure:"replicationFactor"` }
TopicConfig kafka topic config
Click to show internal directories.
Click to hide internal directories.