Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClaimIngestion ¶
func ClaimIngestion(ch *ConsumerHandler, session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage)
Types ¶
type ConsumerHandler ¶
type ConsumerHandler struct {
// contains filtered or unexported fields
}
func NewConsumerHandlerFromApp ¶
func NewConsumerHandlerFromApp(app *newrelic.Application, topic string, clientID string, saramaConfig *sarama.Config, messageHandler func(ctx context.Context, message *sarama.ConsumerMessage)) *ConsumerHandler
NewConsumerHandlerFromApp takes in a new relic application and creates a transaction using it
func NewConsumerHandlerFromTxn ¶
func NewConsumerHandlerFromTxn(txn *newrelic.Transaction, topic string, clientID string, saramaConfig *sarama.Config, messageHandler func(ctx context.Context, message *sarama.ConsumerMessage)) *ConsumerHandler
NewConsumerHandlerFromTxn takes in a new relic transaction. No application instance is required
func (*ConsumerHandler) Cleanup ¶
func (ch *ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error
Cleanup is ran at the end of a new session
func (*ConsumerHandler) ConsumeClaim ¶
func (ch *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerHandler) Setup ¶
func (ch *ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error
Setup is ran at the beginning of a new session
type ConsumerWrapper ¶
type ConsumerWrapper struct {
// contains filtered or unexported fields
}
func (*ConsumerWrapper) Consume ¶
func (cw *ConsumerWrapper) Consume(ctx context.Context, handler *ConsumerHandler) error
type KafkaMessageCarrier ¶
func (KafkaMessageCarrier) Set ¶
func (carrier KafkaMessageCarrier) Set(key, val string)
type ProducerWrapper ¶
type ProducerWrapper struct {
// contains filtered or unexported fields
}
func NewProducerWrapper ¶
func NewProducerWrapper(producer sarama.SyncProducer, txn *newrelic.Transaction) *ProducerWrapper
func (*ProducerWrapper) SendMessage ¶
func (pw *ProducerWrapper) SendMessage(topic string, key []byte, value []byte) error
Click to show internal directories.
Click to hide internal directories.