Documentation ¶
Index ¶
- func NewAsyncProducer(log logger.Logger, brokers []string) *producer
- func NewAsyncProducerWithCallback(log logger.Logger, brokers []string, cb AsyncWriterCallback) *producer
- func NewAsyncWriter(brokers []string, errLogger kafka.Logger, log logger.Logger) *kafka.Writer
- func NewAsyncWriterWithCallback(brokers []string, errLogger kafka.Logger, log logger.Logger, ...) *kafka.Writer
- func NewConsumerGroup(brokers []string, groupID string, log logger.Logger) *consumerGroup
- func NewKafkaConn(ctx context.Context, kafkaCfg *Config) (*kafka.Conn, error)
- func NewKafkaReader(kafkaURL []string, topic, groupID string, errLogger kafka.Logger) *kafka.Reader
- func NewProducer(log logger.Logger, brokers []string) *producer
- func NewRequireNoneProducer(log logger.Logger, brokers []string) *producer
- func NewRequireNoneWriter(brokers []string, errLogger kafka.Logger, log logger.Logger) *kafka.Writer
- func NewWriter(brokers []string, errLogger kafka.Logger) *kafka.Writer
- type AsyncWriterCallback
- type Config
- type ConsumerGroup
- type MessageProcessor
- type Producer
- type Worker
- type WorkerErrGroup
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewAsyncProducer ¶
NewAsyncProducer create new kafka producer
func NewAsyncProducerWithCallback ¶
func NewAsyncProducerWithCallback(log logger.Logger, brokers []string, cb AsyncWriterCallback) *producer
NewAsyncProducerWithCallback create new kafka producer with callback for delete invalid projection
func NewAsyncWriter ¶
NewAsyncWriter create new configured kafka async writer
func NewAsyncWriterWithCallback ¶
func NewAsyncWriterWithCallback(brokers []string, errLogger kafka.Logger, log logger.Logger, cb AsyncWriterCallback) *kafka.Writer
NewAsyncWriterWithCallback create new configured kafka async writer
func NewConsumerGroup ¶
NewConsumerGroup kafka consumer group constructor
func NewKafkaConn ¶
NewKafkaConn create new kafka connection
func NewKafkaReader ¶
NewKafkaReader create new configured kafka reader
func NewProducer ¶
NewProducer create new kafka producer
func NewRequireNoneProducer ¶
NewRequireNoneProducer create new fire and forget kafka producer
func NewRequireNoneWriter ¶
func NewRequireNoneWriter(brokers []string, errLogger kafka.Logger, log logger.Logger) *kafka.Writer
NewRequireNoneWriter create new configured kafka writer
Types ¶
type AsyncWriterCallback ¶
type AsyncWriterCallback func(messages []kafka.Message) error
type Config ¶
type Config struct { Brokers []string `mapstructure:"brokers" validate:"required"` GroupID string `mapstructure:"groupID" validate:"required,gte=0"` InitTopics bool `mapstructure:"initTopics"` }
Config kafka config
type ConsumerGroup ¶
type ConsumerGroup interface { ConsumeTopic(ctx context.Context, groupTopics []string, poolSize int, worker Worker) ConsumeTopicWithErrGroup(ctx context.Context, groupTopics []string, poolSize int, worker WorkerErrGroup) error GetNewKafkaReader(kafkaURL []string, groupTopics []string, groupID string) *kafka.Reader GetNewKafkaWriter() *kafka.Writer }
type MessageProcessor ¶
type MessageProcessor interface { ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) ProcessMessagesWithErrGroup(ctx context.Context, r *kafka.Reader, workerID int) }
MessageProcessor processor methods must implement kafka.Worker func method interface