Documentation ¶
Index ¶
- Constants
- func NewMetricMiddleware(cfg *ConsumerConfig, app *fiber.App, consumerMetric *ConsumerMetric, ...) (func(ctx *fiber.Ctx) error, error)
- type API
- type APIConfiguration
- type BatchConfiguration
- type BatchConsumeFn
- type ConsumeFn
- type Consumer
- type ConsumerConfig
- type ConsumerMetric
- type DialConfig
- type Dialer
- type DistributedTracingConfiguration
- type Layer
- type LogLevel
- type LoggerInterface
- type Mechanism
- type Message
- type MetricConfiguration
- type OtelKafkaKonsumerWriter
- type Producer
- type ProducerConfig
- type Reader
- type ReaderConfig
- type RetryConfiguration
- type SASLConfig
- type TLSConfig
- type Transport
- type TransportConfig
- type Writer
- type WriterConfig
Constants ¶
View Source
const ( MechanismScram = "scram" MechanismPlain = "plain" )
View Source
const Name = "kafka_konsumer"
Variables ¶
This section is empty.
Functions ¶
func NewMetricMiddleware ¶ added in v1.3.3
func NewMetricMiddleware(cfg *ConsumerConfig, app *fiber.App, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector, ) (func(ctx *fiber.Ctx) error, error)
Types ¶
type API ¶
type API interface { Start() Stop() }
func NewAPI ¶
func NewAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) API
type APIConfiguration ¶
type BatchConfiguration ¶
type BatchConfiguration struct { BatchConsumeFn BatchConsumeFn MessageGroupLimit int MessageGroupDuration time.Duration }
type BatchConsumeFn ¶
type Consumer ¶
type Consumer interface { // Consume starts consuming Consume() // WithLogger for injecting custom log implementation WithLogger(logger LoggerInterface) // Stop for graceful shutdown. In order to avoid data loss, you have to call it! Stop() error }
func NewConsumer ¶
func NewConsumer(cfg *ConsumerConfig) (Consumer, error)
type ConsumerConfig ¶
type ConsumerConfig struct { APIConfiguration APIConfiguration Logger LoggerInterface MetricConfiguration MetricConfiguration SASL *SASLConfig TLS *TLSConfig Dial *DialConfig BatchConfiguration *BatchConfiguration ConsumeFn ConsumeFn ClientID string Rack string LogLevel LogLevel Reader ReaderConfig RetryConfiguration RetryConfiguration CommitInterval time.Duration DistributedTracingEnabled bool DistributedTracingConfiguration DistributedTracingConfiguration Concurrency int RetryEnabled bool APIEnabled bool }
type ConsumerMetric ¶ added in v1.3.3
type DialConfig ¶ added in v1.4.6
type DistributedTracingConfiguration ¶ added in v1.7.7
type DistributedTracingConfiguration struct { TracerProvider trace.TracerProvider Propagator propagation.TextMapPropagator }
type LoggerInterface ¶
type LoggerInterface interface { // With returns a logger based off the root logger and decorates it with the given context and arguments. With(args ...interface{}) LoggerInterface // Debug uses fmt.Sprint to construct and log a message at DEBUG level Debug(args ...interface{}) // Info uses fmt.Sprint to construct and log a message at INFO level Info(args ...interface{}) // Warn uses fmt.Sprint to construct and log a message at ERROR level Warn(args ...interface{}) // Error uses fmt.Sprint to construct and log a message at ERROR level Error(args ...interface{}) // Debugf uses fmt.Sprintf to construct and log a message at DEBUG level Debugf(format string, args ...interface{}) // Infof uses fmt.Sprintf to construct and log a message at INFO level Infof(format string, args ...interface{}) // Warnf uses fmt.Sprintf to construct and log a message at WARN level Warnf(format string, args ...interface{}) // Errorf uses fmt.Sprintf to construct and log a message at ERROR level Errorf(format string, args ...interface{}) Infow(msg string, keysAndValues ...interface{}) Errorw(msg string, keysAndValues ...interface{}) Warnw(msg string, keysAndValues ...interface{}) }
LoggerInterface is a logger that supports log levels, context and structured logging.
func NewZapLogger ¶
func NewZapLogger(level LogLevel) LoggerInterface
type Message ¶
type Message struct { Topic string Partition int Offset int64 HighWaterMark int64 Key []byte Value []byte Headers []kafka.Header WriterData interface{} Time time.Time // Context To enable distributed tracing support Context context.Context }
func (*Message) RemoveHeader ¶
func (m *Message) RemoveHeader(header kafka.Header)
type MetricConfiguration ¶
type MetricConfiguration struct { // Path default is /metrics Path *string }
type OtelKafkaKonsumerWriter ¶ added in v1.7.7
type Producer ¶
type Producer interface { Produce(ctx context.Context, message Message) error ProduceBatch(ctx context.Context, messages []Message) error Close() error }
func NewProducer ¶
func NewProducer(cfg *ProducerConfig) (Producer, error)
type ProducerConfig ¶
type ProducerConfig struct { Transport *TransportConfig SASL *SASLConfig TLS *TLSConfig ClientID string Writer WriterConfig DistributedTracingEnabled bool DistributedTracingConfiguration DistributedTracingConfiguration }
type Reader ¶ added in v1.6.7
func NewOtelReaderWrapper ¶ added in v1.7.7
func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error)
func NewReaderWrapper ¶ added in v1.6.7
type ReaderConfig ¶
type ReaderConfig kafka.ReaderConfig
type RetryConfiguration ¶
type SASLConfig ¶
func (*SASLConfig) IsEmpty ¶
func (s *SASLConfig) IsEmpty() bool
type TransportConfig ¶ added in v1.4.6
type Writer ¶ added in v1.6.7
func NewOtelProducer ¶ added in v1.7.7
func NewOtelProducer(cfg *ProducerConfig, writer *segmentio.Writer) (Writer, error)
type WriterConfig ¶
type WriterConfig struct { ErrorLogger kafka.Logger Logger kafka.Logger Balancer kafka.Balancer Completion func(messages []kafka.Message, err error) Topic string Brokers []string ReadTimeout time.Duration BatchTimeout time.Duration BatchBytes int64 WriteTimeout time.Duration RequiredAcks kafka.RequiredAcks BatchSize int WriteBackoffMax time.Duration WriteBackoffMin time.Duration MaxAttempts int Async bool Compression kafka.Compression AllowAutoTopicCreation bool }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.